use std::cell::RefCell;
use std::collections::{BTreeMap, BTreeSet};
use std::rc::Rc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use futures::{pin_mut, StreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::broadcast;
use tokio::sync::{Mutex as AsyncMutex, Notify};
use tracing::Instrument as _;
use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
use crate::llm::vm_value_to_json;
use crate::orchestration::{
append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
};
use crate::stdlib::json_to_vm_value;
use crate::trust_graph::{
append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
};
use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
use crate::vm::Vm;
use self::uri::DispatchUri;
use super::registry::{
binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
};
use super::{
begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
};
use circuits::{
destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
};
use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
use predicate_eval::predicate_node_metadata;
mod circuits;
mod flow_control;
mod predicate_eval;
pub mod retry;
pub mod uri;
pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
thread_local! {
static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
#[cfg(test)]
static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
}
tokio::task_local! {
static ACTIVE_DISPATCH_IS_REPLAY: bool;
}
#[derive(Clone, Debug)]
pub(crate) struct DispatchContext {
pub trigger_event: TriggerEvent,
pub replay_of_event_id: Option<String>,
pub binding_id: String,
pub binding_version: u32,
pub agent_id: String,
pub action: String,
pub autonomy_tier: AutonomyTier,
}
struct DispatchExecutionPolicyGuard;
impl Drop for DispatchExecutionPolicyGuard {
fn drop(&mut self) {
crate::orchestration::pop_execution_policy();
}
}
const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
}
pub(crate) fn current_dispatch_is_replay() -> bool {
ACTIVE_DISPATCH_IS_REPLAY
.try_with(|is_replay| *is_replay)
.unwrap_or(false)
}
pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
}
#[derive(Clone)]
pub struct Dispatcher {
base_vm: Rc<Vm>,
event_log: Arc<AnyEventLog>,
cancel_tx: broadcast::Sender<()>,
state: Arc<DispatcherRuntimeState>,
metrics: Option<Arc<crate::MetricsRegistry>>,
a2a_client: Arc<dyn crate::a2a::A2aClient>,
}
#[derive(Debug)]
struct DispatcherRuntimeState {
in_flight: AtomicU64,
retry_queue_depth: AtomicU64,
dlq: Mutex<Vec<DlqEntry>>,
cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
shutting_down: std::sync::atomic::AtomicBool,
idle_notify: Notify,
flow_control: FlowControlManager,
destination_circuits: DestinationCircuitRegistry,
}
impl DispatcherRuntimeState {
fn new(event_log: Arc<AnyEventLog>) -> Self {
Self {
in_flight: AtomicU64::new(0),
retry_queue_depth: AtomicU64::new(0),
dlq: Mutex::new(Vec::new()),
cancel_tokens: Mutex::new(Vec::new()),
shutting_down: std::sync::atomic::AtomicBool::new(false),
idle_notify: Notify::new(),
flow_control: FlowControlManager::new(event_log),
destination_circuits: DestinationCircuitRegistry::default(),
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct DispatcherStatsSnapshot {
pub in_flight: u64,
pub retry_queue_depth: u64,
pub dlq_depth: u64,
}
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum DispatchStatus {
Succeeded,
Failed,
Dlq,
Skipped,
Waiting,
Cancelled,
}
impl DispatchStatus {
pub fn as_str(&self) -> &'static str {
match self {
Self::Succeeded => "succeeded",
Self::Failed => "failed",
Self::Dlq => "dlq",
Self::Skipped => "skipped",
Self::Waiting => "waiting",
Self::Cancelled => "cancelled",
}
}
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct DispatchOutcome {
pub trigger_id: String,
pub binding_key: String,
pub event_id: String,
pub attempt_count: u32,
pub status: DispatchStatus,
pub handler_kind: String,
pub target_uri: String,
pub replay_of_event_id: Option<String>,
pub result: Option<serde_json::Value>,
pub error: Option<String>,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct InboxEnvelope {
pub trigger_id: Option<String>,
pub binding_version: Option<u32>,
pub event: TriggerEvent,
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(default)]
pub struct DispatcherDrainReport {
pub drained: bool,
pub in_flight: u64,
pub retry_queue_depth: u64,
pub dlq_depth: u64,
}
impl Default for DispatchOutcome {
fn default() -> Self {
Self {
trigger_id: String::new(),
binding_key: String::new(),
event_id: String::new(),
attempt_count: 0,
status: DispatchStatus::Failed,
handler_kind: String::new(),
target_uri: String::new(),
replay_of_event_id: None,
result: None,
error: None,
}
}
}
#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
#[serde(default)]
pub struct DispatchAttemptRecord {
pub trigger_id: String,
pub binding_key: String,
pub event_id: String,
pub attempt: u32,
pub handler_kind: String,
pub started_at: String,
pub completed_at: String,
pub outcome: String,
pub error_msg: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct DispatchCancelRequest {
pub binding_key: String,
pub event_id: String,
#[serde(with = "time::serde::rfc3339")]
pub requested_at: time::OffsetDateTime,
#[serde(default)]
pub requested_by: Option<String>,
#[serde(default)]
pub audit_id: Option<String>,
}
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct DlqEntry {
pub trigger_id: String,
pub binding_key: String,
pub event: TriggerEvent,
pub attempt_count: u32,
pub final_error: String,
#[serde(default = "default_dlq_error_class")]
pub error_class: String,
pub attempts: Vec<DispatchAttemptRecord>,
}
fn default_dlq_error_class() -> String {
"unknown".to_string()
}
#[derive(Clone, Debug)]
struct SingletonLease {
gate: String,
held: bool,
}
#[derive(Clone, Debug)]
struct ConcurrencyLease {
gate: String,
max: u32,
priority_rank: usize,
permit: Option<ConcurrencyPermit>,
}
#[derive(Default, Debug)]
struct AcquiredFlowControl {
singleton: Option<SingletonLease>,
concurrency: Option<ConcurrencyLease>,
}
#[derive(Clone)]
pub(crate) struct DispatchWaitLease {
state: Arc<DispatcherRuntimeState>,
acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
suspended: Arc<AtomicBool>,
}
impl DispatchWaitLease {
fn new(
state: Arc<DispatcherRuntimeState>,
acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
) -> Self {
Self {
state,
acquired,
suspended: Arc::new(AtomicBool::new(false)),
}
}
pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
if self.suspended.swap(true, Ordering::SeqCst) {
return Ok(());
}
let (singleton_gate, concurrency_permit) = {
let mut acquired = self.acquired.lock().await;
let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
if lease.held {
lease.held = false;
Some(lease.gate.clone())
} else {
None
}
});
let concurrency_permit = acquired
.concurrency
.as_mut()
.and_then(|lease| lease.permit.take());
(singleton_gate, concurrency_permit)
};
if let Some(gate) = singleton_gate {
self.state
.flow_control
.release_singleton(&gate)
.await
.map_err(DispatchError::from)?;
}
if let Some(permit) = concurrency_permit {
self.state
.flow_control
.release_concurrency(permit)
.await
.map_err(DispatchError::from)?;
}
Ok(())
}
pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
if !self.suspended.swap(false, Ordering::SeqCst) {
return Ok(());
}
let singleton_gate = {
let acquired = self.acquired.lock().await;
acquired.singleton.as_ref().and_then(|lease| {
if lease.held {
None
} else {
Some(lease.gate.clone())
}
})
};
if let Some(gate) = singleton_gate {
self.state
.flow_control
.acquire_singleton(&gate)
.await
.map_err(DispatchError::from)?;
let mut acquired = self.acquired.lock().await;
if let Some(lease) = acquired.singleton.as_mut() {
lease.held = true;
}
}
let concurrency_spec = {
let acquired = self.acquired.lock().await;
acquired.concurrency.as_ref().and_then(|lease| {
if lease.permit.is_some() {
None
} else {
Some((lease.gate.clone(), lease.max, lease.priority_rank))
}
})
};
if let Some((gate, max, priority_rank)) = concurrency_spec {
let permit = self
.state
.flow_control
.acquire_concurrency(&gate, max, priority_rank)
.await
.map_err(DispatchError::from)?;
let mut acquired = self.acquired.lock().await;
if let Some(lease) = acquired.concurrency.as_mut() {
lease.permit = Some(permit);
}
}
Ok(())
}
}
enum FlowControlOutcome {
Dispatch {
event: Box<TriggerEvent>,
acquired: AcquiredFlowControl,
},
Skip {
reason: String,
},
}
#[derive(Clone, Debug)]
enum DispatchSkipStage {
Predicate,
FlowControl,
}
#[derive(Debug)]
pub enum DispatchError {
EventLog(String),
Registry(String),
Serde(String),
Local(String),
A2a(String),
Denied(String),
Timeout(String),
Waiting(String),
Cancelled(String),
NotImplemented(String),
}
impl std::fmt::Display for DispatchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EventLog(message)
| Self::Registry(message)
| Self::Serde(message)
| Self::Local(message)
| Self::A2a(message)
| Self::Denied(message)
| Self::Timeout(message)
| Self::Waiting(message)
| Self::Cancelled(message)
| Self::NotImplemented(message) => f.write_str(message),
}
}
}
impl std::error::Error for DispatchError {}
impl DispatchError {
fn retryable(&self) -> bool {
!matches!(
self,
Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
)
}
}
impl DispatchSkipStage {
fn as_str(&self) -> &'static str {
match self {
Self::Predicate => "predicate",
Self::FlowControl => "flow_control",
}
}
}
impl From<LogError> for DispatchError {
fn from(value: LogError) -> Self {
Self::EventLog(value.to_string())
}
}
pub async fn append_dispatch_cancel_request(
event_log: &Arc<AnyEventLog>,
request: &DispatchCancelRequest,
) -> Result<u64, DispatchError> {
let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
.expect("static trigger cancel topic should always be valid");
event_log
.append(
&topic,
LogEvent::new(
"dispatch_cancel_requested",
serde_json::to_value(request)
.map_err(|error| DispatchError::Serde(error.to_string()))?,
),
)
.await
.map_err(DispatchError::from)
}
impl Dispatcher {
pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
self.event_log.clone()
}
pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
let event_log = active_event_log().ok_or_else(|| {
DispatchError::EventLog("dispatcher requires an active event log".to_string())
})?;
Ok(Self::with_event_log(base_vm, event_log))
}
pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
Self::with_event_log_and_metrics(base_vm, event_log, None)
}
pub fn with_event_log_and_metrics(
base_vm: Vm,
event_log: Arc<AnyEventLog>,
metrics: Option<Arc<crate::MetricsRegistry>>,
) -> Self {
let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
ACTIVE_DISPATCHER_STATE.with(|slot| {
*slot.borrow_mut() = Some(state.clone());
});
let (cancel_tx, _) = broadcast::channel(32);
Self {
base_vm: Rc::new(base_vm),
event_log,
cancel_tx,
state,
metrics,
a2a_client: Arc::new(crate::a2a::RealA2aClient),
}
}
#[cfg(test)]
pub fn with_a2a_client(mut self, client: Arc<dyn crate::a2a::A2aClient>) -> Self {
self.a2a_client = client;
self
}
pub fn snapshot(&self) -> DispatcherStatsSnapshot {
DispatcherStatsSnapshot {
in_flight: self.state.in_flight.load(Ordering::Relaxed),
retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
dlq_depth: self
.state
.dlq
.lock()
.expect("dispatcher dlq poisoned")
.len() as u64,
}
}
pub fn dlq_entries(&self) -> Vec<DlqEntry> {
self.state
.dlq
.lock()
.expect("dispatcher dlq poisoned")
.clone()
}
pub fn shutdown(&self) {
self.state.shutting_down.store(true, Ordering::SeqCst);
for token in self
.state
.cancel_tokens
.lock()
.expect("dispatcher cancel tokens poisoned")
.iter()
{
token.store(true, Ordering::SeqCst);
}
let _ = self.cancel_tx.send(());
}
pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
self.enqueue_targeted(None, None, event).await
}
pub async fn enqueue_targeted(
&self,
trigger_id: Option<String>,
binding_version: Option<u32>,
event: TriggerEvent,
) -> Result<u64, DispatchError> {
self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
.await
}
pub async fn enqueue_targeted_with_headers(
&self,
trigger_id: Option<String>,
binding_version: Option<u32>,
event: TriggerEvent,
parent_headers: Option<&BTreeMap<String, String>>,
) -> Result<u64, DispatchError> {
let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
let trigger_id_for_metrics = trigger_id.clone();
let mut headers = parent_headers.cloned().unwrap_or_default();
headers.extend(event_headers(&event, None, None, None));
if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
headers.insert("trigger_id".to_string(), trigger_id.clone());
headers.insert(
"binding_key".to_string(),
binding_key_from_parts(trigger_id, binding_version),
);
}
headers
.entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
.or_insert_with(|| unix_ms(event.received_at).to_string());
let payload = serde_json::to_value(InboxEnvelope {
trigger_id,
binding_version,
event: event.clone(),
})
.map_err(|error| DispatchError::Serde(error.to_string()))?;
let mut log_event = LogEvent::new("event_ingested", payload);
let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
let queue_appended_at_ms = headers
.get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
.and_then(|value| value.parse::<i64>().ok())
.unwrap_or(log_event.occurred_at_ms);
headers
.entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
.or_insert_with(|| log_event.occurred_at_ms.to_string());
if let (Some(metrics), Some(trigger_id)) =
(self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
{
let binding_key = binding_key_from_parts(trigger_id, binding_version);
let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
if !had_queue_appended_at {
metrics.record_trigger_accepted_to_queue_append(
trigger_id,
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"queued",
duration_between_ms(queue_appended_at_ms, accepted_at_ms),
);
}
metrics.note_trigger_pending_event(
event.id.0.as_str(),
trigger_id,
&binding_key,
event.provider.as_str(),
tenant_id(&event),
accepted_at_ms,
queue_appended_at_ms,
);
}
log_event.headers = headers;
self.event_log
.append(&topic, log_event)
.await
.map_err(DispatchError::from)
}
pub async fn run(&self) -> Result<(), DispatchError> {
let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
.expect("static trigger inbox envelopes topic is valid");
let start_from = self.event_log.latest(&topic).await?;
let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
pin_mut!(stream);
let mut cancel_rx = self.cancel_tx.subscribe();
loop {
tokio::select! {
received = stream.next() => {
let Some(received) = received else {
break;
};
let (_, event) = received.map_err(DispatchError::from)?;
if event.kind != "event_ingested" {
continue;
}
let parent_headers = event.headers.clone();
let envelope: InboxEnvelope = serde_json::from_value(event.payload)
.map_err(|error| DispatchError::Serde(error.to_string()))?;
notify_test_inbox_dequeued();
let _ = self
.dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
.await;
}
_ = recv_cancel(&mut cancel_rx) => break,
}
}
Ok(())
}
pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let snapshot = self.snapshot();
if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
return Ok(DispatcherDrainReport {
drained: true,
in_flight: snapshot.in_flight,
retry_queue_depth: snapshot.retry_queue_depth,
dlq_depth: snapshot.dlq_depth,
});
}
let notified = self.state.idle_notify.notified();
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
return Ok(DispatcherDrainReport {
drained: false,
in_flight: snapshot.in_flight,
retry_queue_depth: snapshot.retry_queue_depth,
dlq_depth: snapshot.dlq_depth,
});
}
if tokio::time::timeout(remaining, notified).await.is_err() {
let snapshot = self.snapshot();
return Ok(DispatcherDrainReport {
drained: false,
in_flight: snapshot.in_flight,
retry_queue_depth: snapshot.retry_queue_depth,
dlq_depth: snapshot.dlq_depth,
});
}
}
}
pub async fn dispatch_inbox_envelope(
&self,
envelope: InboxEnvelope,
) -> Result<Vec<DispatchOutcome>, DispatchError> {
self.dispatch_inbox_envelope_with_headers(envelope, None)
.await
}
pub async fn dispatch_inbox_envelope_with_parent_headers(
&self,
envelope: InboxEnvelope,
parent_headers: &BTreeMap<String, String>,
) -> Result<Vec<DispatchOutcome>, DispatchError> {
self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
.await
}
async fn dispatch_inbox_envelope_with_headers(
&self,
envelope: InboxEnvelope,
parent_headers: Option<&BTreeMap<String, String>>,
) -> Result<Vec<DispatchOutcome>, DispatchError> {
if let Some(trigger_id) = envelope.trigger_id {
let binding = super::registry::resolve_live_trigger_binding(
&trigger_id,
envelope.binding_version,
)
.map_err(|error| DispatchError::Registry(error.to_string()))?;
return Ok(vec![
self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
.await?,
]);
}
let cron_target = match &envelope.event.provider_payload {
crate::triggers::ProviderPayload::Known(
crate::triggers::event::KnownProviderPayload::Cron(payload),
) => payload.cron_id.clone(),
_ => None,
};
if let Some(trigger_id) = cron_target {
let binding = super::registry::resolve_live_trigger_binding(
&trigger_id,
envelope.binding_version,
)
.map_err(|error| DispatchError::Registry(error.to_string()))?;
return Ok(vec![
self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
.await?,
]);
}
self.dispatch_event_with_headers(envelope.event, parent_headers)
.await
}
pub async fn dispatch_event(
&self,
event: TriggerEvent,
) -> Result<Vec<DispatchOutcome>, DispatchError> {
self.dispatch_event_with_headers(event, None).await
}
async fn dispatch_event_with_headers(
&self,
event: TriggerEvent,
parent_headers: Option<&BTreeMap<String, String>>,
) -> Result<Vec<DispatchOutcome>, DispatchError> {
let bindings = matching_bindings(&event);
let mut outcomes = Vec::new();
for binding in bindings {
outcomes.push(
self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
.await?,
);
}
Ok(outcomes)
}
pub async fn dispatch(
&self,
binding: &TriggerBinding,
event: TriggerEvent,
) -> Result<DispatchOutcome, DispatchError> {
self.dispatch_with_replay(binding, event, None, None, None)
.await
}
pub async fn dispatch_replay(
&self,
binding: &TriggerBinding,
event: TriggerEvent,
replay_of_event_id: String,
) -> Result<DispatchOutcome, DispatchError> {
self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
.await
}
pub async fn dispatch_with_parent_span_id(
&self,
binding: &TriggerBinding,
event: TriggerEvent,
parent_span_id: Option<String>,
) -> Result<DispatchOutcome, DispatchError> {
self.dispatch_with_replay(binding, event, None, parent_span_id, None)
.await
}
async fn dispatch_with_replay(
&self,
binding: &TriggerBinding,
event: TriggerEvent,
replay_of_event_id: Option<String>,
parent_span_id: Option<String>,
parent_headers: Option<&BTreeMap<String, String>>,
) -> Result<DispatchOutcome, DispatchError> {
let parent_headers_for_metrics = parent_headers.cloned();
let admitted_at_ms = current_unix_ms();
if let Some(metrics) = self.metrics.as_ref() {
let binding_key = binding.binding_key();
let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
metrics.record_trigger_queue_age_at_dispatch_admission(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"admitted",
duration_between_ms(admitted_at_ms, queue_appended_at_ms),
);
metrics.clear_trigger_pending_event(
event.id.0.as_str(),
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
admitted_at_ms,
);
}
let span = tracing::info_span!(
"dispatch",
trigger_id = %binding.id.as_str(),
binding_version = binding.version,
trace_id = %event.trace_id.0
);
#[cfg(feature = "otel")]
let span_for_otel = span.clone();
let _ = if let Some(headers) = parent_headers {
crate::observability::otel::set_span_parent_from_headers(
&span,
headers,
&event.trace_id,
parent_span_id.as_deref(),
)
} else {
crate::observability::otel::set_span_parent(
&span,
&event.trace_id,
parent_span_id.as_deref(),
)
};
#[cfg(feature = "otel")]
let started_at = Instant::now();
let metrics = self.metrics.clone();
let outcome = ACTIVE_DISPATCH_IS_REPLAY
.scope(
replay_of_event_id.is_some(),
self.dispatch_with_replay_inner(
binding,
event,
replay_of_event_id,
parent_headers_for_metrics,
)
.instrument(span),
)
.await;
if let Some(metrics) = metrics.as_ref() {
match &outcome {
Ok(dispatch_outcome) => match dispatch_outcome.status {
DispatchStatus::Succeeded | DispatchStatus::Skipped => {
metrics.record_dispatch_succeeded();
}
DispatchStatus::Waiting => {}
_ => metrics.record_dispatch_failed(),
},
Err(_) => metrics.record_dispatch_failed(),
}
let outcome_label = match &outcome {
Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
Err(DispatchError::Cancelled(_)) => "cancelled",
Err(_) => "failed",
};
metrics.record_trigger_dispatched(
binding.id.as_str(),
binding.handler.kind(),
outcome_label,
);
metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
}
#[cfg(feature = "otel")]
{
use tracing_opentelemetry::OpenTelemetrySpanExt as _;
let duration_ms = started_at.elapsed().as_millis() as i64;
let status = match &outcome {
Ok(dispatch_outcome) => match dispatch_outcome.status {
DispatchStatus::Succeeded => "succeeded",
DispatchStatus::Skipped => "skipped",
DispatchStatus::Waiting => "waiting",
DispatchStatus::Cancelled => "cancelled",
DispatchStatus::Failed => "failed",
DispatchStatus::Dlq => "dlq",
},
Err(DispatchError::Cancelled(_)) => "cancelled",
Err(_) => "failed",
};
span_for_otel.set_attribute("result.status", status);
span_for_otel.set_attribute("result.duration_ms", duration_ms);
}
outcome
}
async fn dispatch_with_replay_inner(
&self,
binding: &TriggerBinding,
event: TriggerEvent,
replay_of_event_id: Option<String>,
parent_headers: Option<BTreeMap<String, String>>,
) -> Result<DispatchOutcome, DispatchError> {
let autonomy_tier = crate::resolve_agent_autonomy_tier(
&self.event_log,
binding.id.as_str(),
binding.autonomy_tier,
)
.await
.unwrap_or(binding.autonomy_tier);
let binding_key = binding.binding_key();
let route = DispatchUri::from(&binding.handler);
let trigger_id = binding.id.as_str().to_string();
let event_id = event.id.0.clone();
self.state.in_flight.fetch_add(1, Ordering::Relaxed);
let begin = if replay_of_event_id.is_some() {
super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
} else {
begin_in_flight(binding.id.as_str(), binding.version)
};
begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
let mut attempts = Vec::new();
let mut source_node_id = format!("trigger:{}", event.id.0);
let mut initial_nodes = Vec::new();
let mut initial_edges = Vec::new();
if let Some(original_event_id) = replay_of_event_id.as_ref() {
let original_node_id = format!("trigger:{original_event_id}");
initial_nodes.push(RunActionGraphNodeRecord {
id: original_node_id.clone(),
label: format!(
"{}:{} (original {})",
event.provider.as_str(),
event.kind,
original_event_id
),
kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
status: "historical".to_string(),
outcome: "replayed_from".to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: trigger_node_metadata(&event),
});
initial_edges.push(RunActionGraphEdgeRecord {
from_id: original_node_id,
to_id: source_node_id.clone(),
kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
label: Some("replay chain".to_string()),
});
}
initial_nodes.push(RunActionGraphNodeRecord {
id: source_node_id.clone(),
label: format!("{}:{}", event.provider.as_str(), event.kind),
kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
status: "received".to_string(),
outcome: "received".to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: trigger_node_metadata(&event),
});
self.emit_action_graph(
&event,
initial_nodes,
initial_edges,
serde_json::json!({
"source": "dispatcher",
"trigger_id": trigger_id,
"binding_key": binding_key,
"event_id": event_id,
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
if dispatch_cancel_requested(
&self.event_log,
&binding_key,
&event.id.0,
replay_of_event_id.as_ref(),
)
.await?
{
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Failed,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
return Ok(cancelled_dispatch_outcome(
binding,
&route,
&event,
replay_of_event_id,
0,
"trigger cancel request cancelled dispatch before attempt 1".to_string(),
));
}
if let Some(predicate) = binding.when.as_ref() {
let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
let evaluation = self
.evaluate_predicate(
binding,
predicate,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
)
.await?;
let passed = evaluation.result;
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: predicate_node_id.clone(),
label: predicate.raw.clone(),
kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
status: "completed".to_string(),
outcome: passed.to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
}],
vec![RunActionGraphEdgeRecord {
from_id: source_node_id.clone(),
to_id: predicate_node_id.clone(),
kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
label: None,
}],
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"predicate": predicate.raw,
"reason": evaluation.reason,
"cached": evaluation.cached,
"cost_usd": evaluation.cost_usd,
"tokens": evaluation.tokens,
"latency_ms": evaluation.latency_ms,
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
if !passed {
if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
let final_error = format!(
"trigger budget exhausted: {}",
evaluation.reason.as_deref().unwrap_or("budget_exhausted")
);
self.move_budget_exhausted_to_dlq(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
&final_error,
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dlq,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"dlq",
0,
Some(final_error.clone()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Dlq,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(final_error),
});
}
if evaluation.exhaustion_strategy
== Some(TriggerBudgetExhaustionStrategy::RetryLater)
{
self.append_budget_deferred_event(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Denied,
"waiting",
0,
evaluation.reason.clone(),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Waiting,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(serde_json::json!({
"deferred": true,
"predicate": predicate.raw,
"reason": evaluation.reason,
})),
error: None,
});
}
self.append_skipped_outbox_event(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
DispatchSkipStage::Predicate,
serde_json::json!({
"predicate": predicate.raw,
"reason": evaluation.reason,
}),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Denied,
"skipped",
0,
None,
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Skipped,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(serde_json::json!({
"skipped": true,
"predicate": predicate.raw,
"reason": evaluation.reason,
})),
error: None,
});
}
source_node_id = predicate_node_id;
}
if autonomy_tier == AutonomyTier::ActAuto {
if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
let request_id = self
.append_autonomy_budget_approval_request(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
reason,
)
.await?;
self.emit_autonomy_budget_approval_action_graph(
binding,
&route,
&event,
&source_node_id,
replay_of_event_id.as_ref(),
reason,
&request_id,
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
self.append_tier_transition_trust_record(
binding,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
AutonomyTier::ActWithApproval,
reason,
&request_id,
)
.await?;
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Denied,
"waiting",
0,
Some(reason.to_string()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Waiting,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(serde_json::json!({
"approval_required": true,
"request_id": request_id,
"reason": reason,
"reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
})),
error: None,
});
}
note_autonomous_decision(binding);
}
let (event, acquired_flow) = match self
.apply_flow_control(binding, &event, replay_of_event_id.as_ref())
.await?
{
FlowControlOutcome::Dispatch { event, acquired } => {
(*event, Arc::new(AsyncMutex::new(acquired)))
}
FlowControlOutcome::Skip { reason } => {
self.append_skipped_outbox_event(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
DispatchSkipStage::FlowControl,
serde_json::json!({
"flow_control": reason,
}),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Skipped,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(serde_json::json!({
"skipped": true,
"flow_control": reason,
})),
error: None,
});
}
};
let destination_key = destination_circuit_key(&route);
let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
DestinationCircuitProbe::Allow { half_open } => {
if half_open {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_backpressure_event("circuit", "half_open_probe");
}
}
half_open
}
DestinationCircuitProbe::Block { retry_after } => {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_backpressure_event("circuit", "fail_fast");
}
let final_error = format!(
"destination circuit open for {}; retry after {}s",
destination_key,
retry_after.as_secs().max(1)
);
self.move_circuit_open_to_dlq(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
&final_error,
&destination_key,
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dlq,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"dlq",
0,
Some(final_error.clone()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: 0,
status: DispatchStatus::Dlq,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(final_error),
});
}
};
let mut previous_retry_node = None;
let max_attempts = binding.retry.max_attempts();
for attempt in 1..=max_attempts {
if dispatch_cancel_requested(
&self.event_log,
&binding_key,
&event.id.0,
replay_of_event_id.as_ref(),
)
.await?
{
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Failed,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
decrement_in_flight(&self.state);
return Ok(cancelled_dispatch_outcome(
binding,
&route,
&event,
replay_of_event_id,
attempt.saturating_sub(1),
format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
));
}
maybe_fail_before_outbox();
let attempt_started_instant = Instant::now();
let attempt_started_at_ms = current_unix_ms();
let queue_age_at_start = duration_between_ms(
attempt_started_at_ms,
queue_appended_at_ms(parent_headers.as_ref(), &event),
);
if attempt == 1 {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_trigger_queue_age_at_dispatch_start(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"started",
queue_age_at_start,
);
}
}
tracing::info!(
component = "dispatcher",
lifecycle = "dispatch_started",
trigger_id = %binding.id.as_str(),
binding_key = %binding_key,
event_id = %event.id.0,
attempt,
queue_age_ms = queue_age_at_start.as_millis(),
trace_id = %event.trace_id.0
);
let started_at = now_rfc3339();
let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
self.append_lifecycle_event(
"DispatchStarted",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_OUTBOX_TOPIC,
"dispatch_started",
&event,
Some(binding),
Some(attempt),
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
let mut dispatch_edges = Vec::new();
if attempt == 1 {
dispatch_edges.push(RunActionGraphEdgeRecord {
from_id: source_node_id.clone(),
to_id: attempt_node_id.clone(),
kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
label: binding.when.as_ref().map(|_| "true".to_string()),
});
} else if let Some(retry_node_id) = previous_retry_node.take() {
dispatch_edges.push(RunActionGraphEdgeRecord {
from_id: retry_node_id,
to_id: attempt_node_id.clone(),
kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
label: Some(format!("attempt {attempt}")),
});
}
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: attempt_node_id.clone(),
label: dispatch_node_label(&route),
kind: dispatch_node_kind(&route).to_string(),
status: "running".to_string(),
outcome: format!("attempt_{attempt}"),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: dispatch_node_metadata(&route, binding, &event, attempt),
}],
dispatch_edges,
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"target_agent": dispatch_target_agent(&route),
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
let result = self
.dispatch_once(
binding,
&route,
&event,
autonomy_tier,
Some(DispatchWaitLease::new(
self.state.clone(),
acquired_flow.clone(),
)),
&mut self.cancel_tx.subscribe(),
)
.await;
let attempt_runtime = attempt_started_instant.elapsed();
let attempt_status = dispatch_result_status(&result);
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_trigger_dispatch_runtime(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
attempt_status,
attempt_runtime,
);
}
tracing::info!(
component = "dispatcher",
lifecycle = "handler_completed",
trigger_id = %binding.id.as_str(),
binding_key = %binding_key,
event_id = %event.id.0,
attempt,
status = attempt_status,
runtime_ms = attempt_runtime.as_millis(),
trace_id = %event.trace_id.0
);
let completed_at = now_rfc3339();
match result {
Ok(result) => {
let attempt_record = DispatchAttemptRecord {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0.clone(),
attempt,
handler_kind: route.kind().to_string(),
started_at,
completed_at,
outcome: "success".to_string(),
error_msg: None,
};
attempts.push(attempt_record.clone());
self.append_attempt_record(
&event,
binding,
&attempt_record,
replay_of_event_id.as_ref(),
)
.await?;
self.append_lifecycle_event(
"DispatchSucceeded",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"result": result,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_OUTBOX_TOPIC,
"dispatch_succeeded",
&event,
Some(binding),
Some(attempt),
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"result": result,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: attempt_node_id.clone(),
label: dispatch_node_label(&route),
kind: dispatch_node_kind(&route).to_string(),
status: "completed".to_string(),
outcome: dispatch_success_outcome(&route, &result).to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: dispatch_success_metadata(
&route, binding, &event, attempt, &result,
),
}],
Vec::new(),
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"result": result,
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
self.state
.destination_circuits
.record_success(&destination_key);
if half_open_probe {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_backpressure_event("circuit", "closed");
}
}
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Success,
"succeeded",
attempt,
None,
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: DispatchStatus::Succeeded,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(result),
error: None,
});
}
Err(error) => {
let attempt_record = DispatchAttemptRecord {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0.clone(),
attempt,
handler_kind: route.kind().to_string(),
started_at,
completed_at,
outcome: dispatch_error_label(&error).to_string(),
error_msg: Some(error.to_string()),
};
attempts.push(attempt_record.clone());
self.append_attempt_record(
&event,
binding,
&attempt_record,
replay_of_event_id.as_ref(),
)
.await?;
if let DispatchError::Waiting(message) = &error {
self.append_lifecycle_event(
"DispatchWaiting",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"message": message,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_OUTBOX_TOPIC,
"dispatch_waiting",
&event,
Some(binding),
Some(attempt),
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"message": message,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dispatched,
)
.await
.map_err(|registry_error| {
DispatchError::Registry(registry_error.to_string())
})?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: DispatchStatus::Waiting,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: Some(serde_json::json!({
"waiting": true,
"message": message,
})),
error: None,
});
}
self.append_lifecycle_event(
"DispatchFailed",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"error": error.to_string(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_OUTBOX_TOPIC,
"dispatch_failed",
&event,
Some(binding),
Some(attempt),
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"error": error.to_string(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: attempt_node_id.clone(),
label: dispatch_node_label(&route),
kind: dispatch_node_kind(&route).to_string(),
status: if matches!(error, DispatchError::Cancelled(_)) {
"cancelled".to_string()
} else {
"failed".to_string()
},
outcome: dispatch_error_label(&error).to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: dispatch_error_metadata(
&route, binding, &event, attempt, &error,
),
}],
Vec::new(),
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"attempt": attempt,
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"error": error.to_string(),
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
let circuit_opened = if error.retryable() {
self.state
.destination_circuits
.record_failure(&destination_key)
} else {
false
};
if circuit_opened {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_backpressure_event("circuit", "opened");
metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
metrics.record_trigger_accepted_to_dlq(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"circuit_open",
duration_between_ms(
current_unix_ms(),
accepted_at_ms(parent_headers.as_ref(), &event),
),
);
}
let final_error = format!(
"destination circuit opened for {} after {} consecutive failures: {}",
destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
);
let dlq_entry = DlqEntry {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event: event.clone(),
attempt_count: attempt,
final_error: final_error.clone(),
error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
.to_string(),
attempts: attempts.clone(),
};
self.state
.dlq
.lock()
.expect("dispatcher dlq poisoned")
.push(dlq_entry.clone());
self.append_lifecycle_event(
"DlqMoved",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt_count": attempt,
"final_error": dlq_entry.final_error,
"reason": "circuit_open",
"destination": destination_key,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_DLQ_TOPIC,
"dlq_moved",
&event,
Some(binding),
Some(attempt),
serde_json::to_value(&dlq_entry).map_err(|serde_error| {
DispatchError::Serde(serde_error.to_string())
})?,
replay_of_event_id.as_ref(),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dlq,
)
.await
.map_err(|registry_error| {
DispatchError::Registry(registry_error.to_string())
})?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"dlq",
attempt,
Some(final_error.clone()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: DispatchStatus::Dlq,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(final_error),
});
}
if !error.retryable() {
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Failed,
)
.await
.map_err(|registry_error| {
DispatchError::Registry(registry_error.to_string())
})?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
let trust_outcome = match error {
DispatchError::Denied(_) => TrustOutcome::Denied,
DispatchError::Timeout(_) => TrustOutcome::Timeout,
_ => TrustOutcome::Failure,
};
let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
"cancelled"
} else {
"failed"
};
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
trust_outcome,
terminal_status,
attempt,
Some(error.to_string()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: if matches!(error, DispatchError::Cancelled(_)) {
DispatchStatus::Cancelled
} else {
DispatchStatus::Failed
},
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(error.to_string()),
});
}
if let Some(delay) = binding.retry.next_retry_delay(attempt) {
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_retry_scheduled();
metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
metrics.record_trigger_retry_delay(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"scheduled",
delay,
);
}
tracing::info!(
component = "dispatcher",
lifecycle = "retry_scheduled",
trigger_id = %binding.id.as_str(),
binding_key = %binding_key,
event_id = %event.id.0,
attempt = attempt + 1,
delay_ms = delay.as_millis(),
trace_id = %event.trace_id.0
);
let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
previous_retry_node = Some(retry_node_id.clone());
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: retry_node_id.clone(),
label: format!("retry in {}ms", delay.as_millis()),
kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
status: "scheduled".to_string(),
outcome: format!("attempt_{}", attempt + 1),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: retry_node_metadata(
binding,
&event,
attempt + 1,
delay,
&error,
),
}],
vec![RunActionGraphEdgeRecord {
from_id: attempt_node_id,
to_id: retry_node_id.clone(),
kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
label: Some(format!("attempt {}", attempt + 1)),
}],
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"attempt": attempt + 1,
"delay_ms": delay.as_millis(),
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
self.append_lifecycle_event(
"RetryScheduled",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt + 1,
"delay_ms": delay.as_millis(),
"error": error.to_string(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_ATTEMPTS_TOPIC,
"retry_scheduled",
&event,
Some(binding),
Some(attempt + 1),
serde_json::json!({
"event_id": event.id.0,
"attempt": attempt + 1,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"delay_ms": delay.as_millis(),
"error": error.to_string(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
let sleep_result = sleep_or_cancel_or_request(
&self.event_log,
delay,
&binding_key,
&event.id.0,
replay_of_event_id.as_ref(),
&mut self.cancel_tx.subscribe(),
)
.await;
decrement_retry_queue_depth(&self.state);
if sleep_result.is_err() {
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Failed,
)
.await
.map_err(|registry_error| {
DispatchError::Registry(registry_error.to_string())
})?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"cancelled",
attempt,
Some("dispatcher shutdown cancelled retry wait".to_string()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: DispatchStatus::Cancelled,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some("dispatcher shutdown cancelled retry wait".to_string()),
});
}
continue;
}
let final_error = error.to_string();
let dlq_entry = DlqEntry {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event: event.clone(),
attempt_count: attempt,
final_error: final_error.clone(),
error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
.to_string(),
attempts: attempts.clone(),
};
self.state
.dlq
.lock()
.expect("dispatcher dlq poisoned")
.push(dlq_entry.clone());
if let Some(metrics) = self.metrics.as_ref() {
metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
metrics.record_trigger_accepted_to_dlq(
binding.id.as_str(),
&binding_key,
event.provider.as_str(),
tenant_id(&event),
"retry_exhausted",
duration_between_ms(
current_unix_ms(),
accepted_at_ms(parent_headers.as_ref(), &event),
),
);
}
tracing::info!(
component = "dispatcher",
lifecycle = "dlq_moved",
trigger_id = %binding.id.as_str(),
binding_key = %binding_key,
event_id = %event.id.0,
attempt_count = attempt,
reason = "retry_exhausted",
trace_id = %event.trace_id.0
);
self.emit_action_graph(
&event,
vec![RunActionGraphNodeRecord {
id: format!("dlq:{binding_key}:{}", event.id.0),
label: binding.id.as_str().to_string(),
kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
status: "queued".to_string(),
outcome: "retry_exhausted".to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
}],
vec![RunActionGraphEdgeRecord {
from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
to_id: format!("dlq:{binding_key}:{}", event.id.0),
kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
label: Some(format!("{attempt} attempts")),
}],
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"attempt_count": attempt,
"final_error": final_error,
"replay_of_event_id": replay_of_event_id,
}),
)
.await?;
self.append_lifecycle_event(
"DlqMoved",
&event,
binding,
serde_json::json!({
"event_id": event.id.0,
"attempt_count": attempt,
"final_error": dlq_entry.final_error,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id.as_ref(),
)
.await?;
self.append_topic_event(
TRIGGER_DLQ_TOPIC,
"dlq_moved",
&event,
Some(binding),
Some(attempt),
serde_json::to_value(&dlq_entry)
.map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
replay_of_event_id.as_ref(),
)
.await?;
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Dlq,
)
.await
.map_err(|registry_error| {
DispatchError::Registry(registry_error.to_string())
})?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"dlq",
attempt,
Some(error.to_string()),
)
.await?;
return Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: attempt,
status: DispatchStatus::Dlq,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(error.to_string()),
});
}
}
}
finish_in_flight(
binding.id.as_str(),
binding.version,
TriggerDispatchOutcome::Failed,
)
.await
.map_err(|error| DispatchError::Registry(error.to_string()))?;
self.release_flow_control(&acquired_flow).await?;
decrement_in_flight(&self.state);
self.append_dispatch_trust_record(
binding,
&route,
&event,
replay_of_event_id.as_ref(),
autonomy_tier,
TrustOutcome::Failure,
"failed",
max_attempts,
Some("dispatch exhausted without terminal outcome".to_string()),
)
.await?;
Ok(DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0,
attempt_count: max_attempts,
status: DispatchStatus::Failed,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some("dispatch exhausted without terminal outcome".to_string()),
})
}
async fn dispatch_once(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
autonomy_tier: AutonomyTier,
wait_lease: Option<DispatchWaitLease>,
cancel_rx: &mut broadcast::Receiver<()>,
) -> Result<serde_json::Value, DispatchError> {
match route {
DispatchUri::Local { .. } => {
let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
return Err(DispatchError::Local(format!(
"trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
binding.id.as_str()
)));
};
let value = self
.invoke_vm_callable(
closure,
&binding.binding_key(),
event,
None,
binding.id.as_str(),
&format!("{}.{}", event.provider.as_str(), event.kind),
autonomy_tier,
wait_lease,
cancel_rx,
)
.await?;
Ok(vm_value_to_json(&value))
}
DispatchUri::A2a {
target,
allow_cleartext,
} => {
if self.state.shutting_down.load(Ordering::SeqCst) {
return Err(DispatchError::Cancelled(
"dispatcher shutdown cancelled A2A dispatch".to_string(),
));
}
let (_endpoint, ack) = self
.a2a_client
.dispatch(
target,
*allow_cleartext,
binding.id.as_str(),
&binding.binding_key(),
event,
cancel_rx,
)
.await
.map_err(|error| match error {
crate::a2a::A2aClientError::Cancelled(message) => {
DispatchError::Cancelled(message)
}
other => DispatchError::A2a(other.to_string()),
})?;
match ack {
crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
}
}
DispatchUri::Worker { queue } => {
let receipt = crate::WorkerQueue::new(self.event_log.clone())
.enqueue(&crate::WorkerQueueJob {
queue: queue.clone(),
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
binding_version: binding.version,
event: event.clone(),
replay_of_event_id: current_dispatch_context()
.and_then(|context| context.replay_of_event_id),
priority: worker_queue_priority(binding, event),
})
.await
.map_err(DispatchError::from)?;
Ok(serde_json::to_value(receipt)
.map_err(|error| DispatchError::Serde(error.to_string()))?)
}
DispatchUri::Persona { .. } => {
let TriggerHandlerSpec::Persona {
binding: persona_binding,
} = &binding.handler
else {
return Err(DispatchError::Local(format!(
"trigger '{}' resolved to a persona dispatch URI but does not carry a persona binding",
binding.id.as_str()
)));
};
let receipt = crate::fire_persona_trigger(
&self.event_log,
persona_binding,
event.provider.as_str(),
&event.kind,
trigger_event_persona_metadata(event),
crate::PersonaRunCost::default(),
crate::persona_now_ms(),
)
.await
.map_err(DispatchError::Local)?;
Ok(serde_json::to_value(receipt)
.map_err(|error| DispatchError::Serde(error.to_string()))?)
}
}
}
async fn apply_flow_control(
&self,
binding: &TriggerBinding,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
) -> Result<FlowControlOutcome, DispatchError> {
let flow = &binding.flow_control;
let mut managed_event = event.clone();
if let Some(batch) = &flow.batch {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
batch.key.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
match self
.state
.flow_control
.consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
.await
.map_err(DispatchError::from)?
{
BatchDecision::Dispatch(events) => {
managed_event = build_batched_event(events)?;
}
BatchDecision::Merged => {
return Ok(FlowControlOutcome::Skip {
reason: "batch_merged".to_string(),
})
}
}
}
if let Some(debounce) = &flow.debounce {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
Some(&debounce.key),
&managed_event,
replay_of_event_id,
)
.await?;
let latest = self
.state
.flow_control
.debounce(&gate, debounce.period)
.await
.map_err(DispatchError::from)?;
if !latest {
return Ok(FlowControlOutcome::Skip {
reason: "debounced".to_string(),
});
}
}
if let Some(rate_limit) = &flow.rate_limit {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
rate_limit.key.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
let allowed = self
.state
.flow_control
.check_rate_limit(&gate, rate_limit.period, rate_limit.max)
.await
.map_err(DispatchError::from)?;
if !allowed {
return Ok(FlowControlOutcome::Skip {
reason: "rate_limited".to_string(),
});
}
}
if let Some(throttle) = &flow.throttle {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
throttle.key.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
self.state
.flow_control
.wait_for_throttle(&gate, throttle.period, throttle.max)
.await
.map_err(DispatchError::from)?;
}
let mut acquired = AcquiredFlowControl::default();
if let Some(singleton) = &flow.singleton {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
singleton.key.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
let acquired_singleton = self
.state
.flow_control
.try_acquire_singleton(&gate)
.await
.map_err(DispatchError::from)?;
if !acquired_singleton {
return Ok(FlowControlOutcome::Skip {
reason: "singleton_active".to_string(),
});
}
acquired.singleton = Some(SingletonLease { gate, held: true });
}
if let Some(concurrency) = &flow.concurrency {
let gate = self
.resolve_flow_gate(
&binding.binding_key(),
concurrency.key.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
let priority_rank = self
.resolve_priority_rank(
&binding.binding_key(),
flow.priority.as_ref(),
&managed_event,
replay_of_event_id,
)
.await?;
let permit = self
.state
.flow_control
.acquire_concurrency(&gate, concurrency.max, priority_rank)
.await
.map_err(DispatchError::from)?;
acquired.concurrency = Some(ConcurrencyLease {
gate,
max: concurrency.max,
priority_rank,
permit: Some(permit),
});
}
Ok(FlowControlOutcome::Dispatch {
event: Box::new(managed_event),
acquired,
})
}
async fn release_flow_control(
&self,
acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
) -> Result<(), DispatchError> {
let (singleton_gate, concurrency_permit) = {
let mut acquired = acquired.lock().await;
let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
if lease.held {
lease.held = false;
Some(lease.gate.clone())
} else {
None
}
});
let concurrency_permit = acquired
.concurrency
.as_mut()
.and_then(|lease| lease.permit.take());
(singleton_gate, concurrency_permit)
};
if let Some(gate) = singleton_gate {
self.state
.flow_control
.release_singleton(&gate)
.await
.map_err(DispatchError::from)?;
}
if let Some(permit) = concurrency_permit {
self.state
.flow_control
.release_concurrency(permit)
.await
.map_err(DispatchError::from)?;
}
Ok(())
}
async fn resolve_flow_gate(
&self,
binding_key: &str,
expr: Option<&crate::triggers::TriggerExpressionSpec>,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
) -> Result<String, DispatchError> {
let key = match expr {
Some(expr) => {
self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
.await?
}
None => "_global".to_string(),
};
Ok(format!("{binding_key}:{key}"))
}
async fn resolve_priority_rank(
&self,
binding_key: &str,
priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
) -> Result<usize, DispatchError> {
let Some(priority) = priority else {
return Ok(0);
};
let value = self
.evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
.await?;
Ok(priority
.order
.iter()
.position(|candidate| candidate == &value)
.unwrap_or(priority.order.len()))
}
async fn evaluate_flow_expression(
&self,
binding_key: &str,
expr: &crate::triggers::TriggerExpressionSpec,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
) -> Result<String, DispatchError> {
let value = self
.invoke_vm_callable(
&expr.closure,
binding_key,
event,
replay_of_event_id,
"",
"flow_control",
AutonomyTier::Suggest,
None,
&mut self.cancel_tx.subscribe(),
)
.await?;
Ok(json_value_to_gate(&vm_value_to_json(&value)))
}
#[allow(clippy::too_many_arguments)]
async fn invoke_vm_callable(
&self,
closure: &crate::value::VmClosure,
binding_key: &str,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
agent_id: &str,
action: &str,
autonomy_tier: AutonomyTier,
wait_lease: Option<DispatchWaitLease>,
cancel_rx: &mut broadcast::Receiver<()>,
) -> Result<VmValue, DispatchError> {
let mut vm = self.base_vm.child_vm();
let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
if self.state.shutting_down.load(Ordering::SeqCst) {
cancel_token.store(true, Ordering::SeqCst);
}
self.state
.cancel_tokens
.lock()
.expect("dispatcher cancel tokens poisoned")
.push(cancel_token.clone());
vm.install_cancel_token(cancel_token.clone());
let arg = event_to_handler_value(event)?;
let args = [arg];
let tier_policy = policy_for_autonomy_tier(autonomy_tier);
let effective_policy = match crate::orchestration::current_execution_policy() {
Some(parent) => parent
.intersect(&tier_policy)
.map_err(|error| DispatchError::Local(error.to_string()))?,
None => tier_policy,
};
crate::orchestration::push_execution_policy(effective_policy);
let _policy_guard = DispatchExecutionPolicyGuard;
let future = vm.call_closure_pub(closure, &args);
pin_mut!(future);
let (binding_id, binding_version) = split_binding_key(binding_key);
let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
slot.borrow_mut().replace(DispatchContext {
trigger_event: event.clone(),
replay_of_event_id: replay_of_event_id.cloned(),
binding_id,
binding_version,
agent_id: agent_id.to_string(),
action: action.to_string(),
autonomy_tier,
})
});
let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
.with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
crate::stdlib::hitl::reset_hitl_state();
let mut poll = tokio::time::interval(Duration::from_millis(100));
let result = loop {
tokio::select! {
result = &mut future => break result,
_ = recv_cancel(cancel_rx) => {
cancel_token.store(true, Ordering::SeqCst);
}
_ = poll.tick() => {
if dispatch_cancel_requested(
&self.event_log,
binding_key,
&event.id.0,
replay_of_event_id,
)
.await? {
cancel_token.store(true, Ordering::SeqCst);
}
}
}
};
ACTIVE_DISPATCH_CONTEXT.with(|slot| {
*slot.borrow_mut() = prior_context;
});
ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
*slot.borrow_mut() = prior_wait_lease;
});
crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
{
let mut tokens = self
.state
.cancel_tokens
.lock()
.expect("dispatcher cancel tokens poisoned");
tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
}
if cancel_token.load(Ordering::SeqCst) {
if dispatch_cancel_requested(
&self.event_log,
binding_key,
&event.id.0,
replay_of_event_id,
)
.await?
{
Err(DispatchError::Cancelled(
"trigger cancel request cancelled local handler".to_string(),
))
} else {
Err(DispatchError::Cancelled(
"dispatcher shutdown cancelled local handler".to_string(),
))
}
} else {
result.map_err(dispatch_error_from_vm_error)
}
}
#[allow(clippy::too_many_arguments)]
async fn invoke_vm_callable_with_timeout(
&self,
closure: &crate::value::VmClosure,
binding_key: &str,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
agent_id: &str,
action: &str,
autonomy_tier: AutonomyTier,
cancel_rx: &mut broadcast::Receiver<()>,
timeout: Option<Duration>,
) -> Result<VmValue, DispatchError> {
let future = self.invoke_vm_callable(
closure,
binding_key,
event,
replay_of_event_id,
agent_id,
action,
autonomy_tier,
None,
cancel_rx,
);
pin_mut!(future);
if let Some(timeout) = timeout {
match tokio::time::timeout(timeout, future).await {
Ok(result) => result,
Err(_) => Err(DispatchError::Local(
"predicate evaluation timed out".to_string(),
)),
}
} else {
future.await
}
}
#[allow(clippy::too_many_arguments)]
async fn append_dispatch_trust_record(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
autonomy_tier: AutonomyTier,
outcome: TrustOutcome,
terminal_status: &str,
attempt_count: u32,
error: Option<String>,
) -> Result<(), DispatchError> {
let mut record = TrustRecord::new(
binding.id.as_str().to_string(),
format!("{}.{}", event.provider.as_str(), event.kind),
None,
outcome,
event.trace_id.0.clone(),
autonomy_tier,
);
record.metadata.insert(
"binding_key".to_string(),
serde_json::json!(binding.binding_key()),
);
record.metadata.insert(
"binding_version".to_string(),
serde_json::json!(binding.version),
);
record.metadata.insert(
"provider".to_string(),
serde_json::json!(event.provider.as_str()),
);
record
.metadata
.insert("event_kind".to_string(), serde_json::json!(event.kind));
record
.metadata
.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
record.metadata.insert(
"target_uri".to_string(),
serde_json::json!(route.target_uri()),
);
record.metadata.insert(
"terminal_status".to_string(),
serde_json::json!(terminal_status),
);
record.metadata.insert(
"attempt_count".to_string(),
serde_json::json!(attempt_count),
);
if let Some(replay_of_event_id) = replay_of_event_id {
record.metadata.insert(
"replay_of_event_id".to_string(),
serde_json::json!(replay_of_event_id),
);
}
if let Some(error) = error {
record
.metadata
.insert("error".to_string(), serde_json::json!(error));
}
append_trust_record(&self.event_log, &record)
.await
.map(|_| ())
.map_err(DispatchError::from)
}
async fn append_attempt_record(
&self,
event: &TriggerEvent,
binding: &TriggerBinding,
attempt: &DispatchAttemptRecord,
replay_of_event_id: Option<&String>,
) -> Result<(), DispatchError> {
self.append_topic_event(
TRIGGER_ATTEMPTS_TOPIC,
"attempt_recorded",
event,
Some(binding),
Some(attempt.attempt),
serde_json::to_value(attempt)
.map_err(|error| DispatchError::Serde(error.to_string()))?,
replay_of_event_id,
)
.await
}
async fn append_lifecycle_event(
&self,
kind: &str,
event: &TriggerEvent,
binding: &TriggerBinding,
payload: serde_json::Value,
replay_of_event_id: Option<&String>,
) -> Result<(), DispatchError> {
self.append_topic_event(
TRIGGERS_LIFECYCLE_TOPIC,
kind,
event,
Some(binding),
None,
payload,
replay_of_event_id,
)
.await
}
async fn append_autonomy_budget_approval_request(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
reason: &str,
) -> Result<String, DispatchError> {
let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
let detail = serde_json::json!({
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"reason": reason,
"from_tier": AutonomyTier::ActAuto.as_str(),
"requested_tier": AutonomyTier::ActWithApproval.as_str(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
"max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
"autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
"autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
"replay_of_event_id": replay_of_event_id,
});
let request_id = crate::stdlib::hitl::append_approval_request_on(
&self.event_log,
binding.id.as_str().to_string(),
event.trace_id.0.clone(),
format!(
"approve autonomous dispatch for trigger '{}' after {}",
binding.id.as_str(),
reason
),
detail.clone(),
reviewers.clone(),
)
.await
.map_err(dispatch_error_from_vm_error)?;
self.append_lifecycle_event(
"autonomy.budget_exceeded",
event,
binding,
serde_json::json!({
"trigger_id": binding.id.as_str(),
"event_id": event.id.0,
"reason": reason,
"request_id": request_id,
"reviewers": reviewers,
"from_tier": AutonomyTier::ActAuto.as_str(),
"requested_tier": AutonomyTier::ActWithApproval.as_str(),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id,
)
.await?;
Ok(request_id)
}
#[allow(clippy::too_many_arguments)]
async fn emit_autonomy_budget_approval_action_graph(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
source_node_id: &str,
replay_of_event_id: Option<&String>,
reason: &str,
request_id: &str,
) -> Result<(), DispatchError> {
let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
self.emit_action_graph(
event,
vec![RunActionGraphNodeRecord {
id: approval_node_id.clone(),
label: format!("approval required: {reason}"),
kind: "approval".to_string(),
status: "waiting".to_string(),
outcome: "request_approval".to_string(),
trace_id: Some(event.trace_id.0.clone()),
stage_id: None,
node_id: None,
worker_id: None,
run_id: None,
run_path: None,
metadata: BTreeMap::from([
(
"trigger_id".to_string(),
serde_json::json!(binding.id.as_str()),
),
(
"binding_key".to_string(),
serde_json::json!(binding.binding_key()),
),
("event_id".to_string(), serde_json::json!(event.id.0)),
("reason".to_string(), serde_json::json!(reason)),
("request_id".to_string(), serde_json::json!(request_id)),
(
"reviewers".to_string(),
serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
),
("handler_kind".to_string(), serde_json::json!(route.kind())),
(
"target_uri".to_string(),
serde_json::json!(route.target_uri()),
),
(
"from_tier".to_string(),
serde_json::json!(AutonomyTier::ActAuto.as_str()),
),
(
"requested_tier".to_string(),
serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
),
(
"replay_of_event_id".to_string(),
serde_json::json!(replay_of_event_id),
),
]),
}],
vec![RunActionGraphEdgeRecord {
from_id: source_node_id.to_string(),
to_id: approval_node_id,
kind: "approval_gate".to_string(),
label: Some("autonomy budget".to_string()),
}],
serde_json::json!({
"source": "dispatcher",
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"event_id": event.id.0,
"reason": reason,
"request_id": request_id,
"replay_of_event_id": replay_of_event_id,
}),
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn append_tier_transition_trust_record(
&self,
binding: &TriggerBinding,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
from_tier: AutonomyTier,
to_tier: AutonomyTier,
reason: &str,
request_id: &str,
) -> Result<(), DispatchError> {
let mut record = TrustRecord::new(
binding.id.as_str().to_string(),
"autonomy.tier_transition",
Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
TrustOutcome::Denied,
event.trace_id.0.clone(),
to_tier,
);
record.metadata.insert(
"binding_key".to_string(),
serde_json::json!(binding.binding_key()),
);
record
.metadata
.insert("event_id".to_string(), serde_json::json!(event.id.0));
record.metadata.insert(
"from_tier".to_string(),
serde_json::json!(from_tier.as_str()),
);
record
.metadata
.insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
record
.metadata
.insert("reason".to_string(), serde_json::json!(reason));
record
.metadata
.insert("request_id".to_string(), serde_json::json!(request_id));
if let Some(replay_of_event_id) = replay_of_event_id {
record.metadata.insert(
"replay_of_event_id".to_string(),
serde_json::json!(replay_of_event_id),
);
}
append_trust_record(&self.event_log, &record)
.await
.map(|_| ())
.map_err(DispatchError::from)
}
async fn append_budget_deferred_event(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
reason: &str,
) -> Result<(), DispatchError> {
self.append_topic_event(
TRIGGER_ATTEMPTS_TOPIC,
"budget_deferred",
event,
Some(binding),
None,
serde_json::json!({
"event_id": event.id.0,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"reason": reason,
"retry_at": next_budget_reset_rfc3339(binding),
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id,
)
.await?;
self.append_skipped_outbox_event(
binding,
route,
event,
replay_of_event_id,
DispatchSkipStage::Predicate,
serde_json::json!({
"deferred": true,
"reason": reason,
"retry_at": next_budget_reset_rfc3339(binding),
}),
)
.await
}
async fn append_skipped_outbox_event(
&self,
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
replay_of_event_id: Option<&String>,
stage: DispatchSkipStage,
detail: serde_json::Value,
) -> Result<(), DispatchError> {
self.append_topic_event(
TRIGGER_OUTBOX_TOPIC,
"dispatch_skipped",
event,
Some(binding),
None,
serde_json::json!({
"event_id": event.id.0,
"trigger_id": binding.id.as_str(),
"binding_key": binding.binding_key(),
"handler_kind": route.kind(),
"target_uri": route.target_uri(),
"skip_stage": stage.as_str(),
"detail": detail,
"replay_of_event_id": replay_of_event_id,
}),
replay_of_event_id,
)
.await
}
async fn append_topic_event(
&self,
topic_name: &str,
kind: &str,
event: &TriggerEvent,
binding: Option<&TriggerBinding>,
attempt: Option<u32>,
payload: serde_json::Value,
replay_of_event_id: Option<&String>,
) -> Result<(), DispatchError> {
let topic = topic_for_event(event, topic_name)?;
let headers = event_headers(event, binding, attempt, replay_of_event_id);
self.event_log
.append(&topic, LogEvent::new(kind, payload).with_headers(headers))
.await
.map_err(DispatchError::from)
.map(|_| ())
}
async fn emit_action_graph(
&self,
event: &TriggerEvent,
nodes: Vec<RunActionGraphNodeRecord>,
edges: Vec<RunActionGraphEdgeRecord>,
extra: serde_json::Value,
) -> Result<(), DispatchError> {
let mut headers = BTreeMap::new();
headers.insert("trace_id".to_string(), event.trace_id.0.clone());
headers.insert("event_id".to_string(), event.id.0.clone());
let observability = RunObservabilityRecord {
schema_version: 1,
action_graph_nodes: nodes,
action_graph_edges: edges,
..Default::default()
};
append_action_graph_update(
headers,
serde_json::json!({
"source": "dispatcher",
"trace_id": event.trace_id.0,
"event_id": event.id.0,
"observability": observability,
"context": extra,
}),
)
.await
.map_err(DispatchError::from)
}
}
async fn dispatch_cancel_requested(
event_log: &Arc<AnyEventLog>,
binding_key: &str,
event_id: &str,
replay_of_event_id: Option<&String>,
) -> Result<bool, DispatchError> {
if replay_of_event_id.is_some() {
return Ok(false);
}
let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
.expect("static trigger cancel topic should always be valid");
let events = event_log.read_range(&topic, None, usize::MAX).await?;
let requested = events
.into_iter()
.filter(|(_, event)| event.kind == "dispatch_cancel_requested")
.filter_map(|(_, event)| {
serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
})
.collect::<BTreeSet<_>>();
Ok(requested
.iter()
.any(|request| request.binding_key == binding_key && request.event_id == event_id))
}
async fn sleep_or_cancel_or_request(
event_log: &Arc<AnyEventLog>,
delay: Duration,
binding_key: &str,
event_id: &str,
replay_of_event_id: Option<&String>,
cancel_rx: &mut broadcast::Receiver<()>,
) -> Result<(), DispatchError> {
let sleep = tokio::time::sleep(delay);
pin_mut!(sleep);
let mut poll = tokio::time::interval(Duration::from_millis(100));
loop {
tokio::select! {
_ = &mut sleep => return Ok(()),
_ = recv_cancel(cancel_rx) => {
return Err(DispatchError::Cancelled(
"dispatcher shutdown cancelled retry wait".to_string(),
));
}
_ = poll.tick() => {
if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
return Err(DispatchError::Cancelled(
"trigger cancel request cancelled retry wait".to_string(),
));
}
}
}
}
}
fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
let mut iter = events.into_iter();
let Some(mut root) = iter.next() else {
return Err(DispatchError::Registry(
"batch dispatch produced an empty event list".to_string(),
));
};
let mut batch = Vec::new();
batch.push(
serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
);
for event in iter {
batch.push(
serde_json::to_value(&event)
.map_err(|error| DispatchError::Serde(error.to_string()))?,
);
}
root.batch = Some(batch);
Ok(root)
}
fn json_value_to_gate(value: &serde_json::Value) -> String {
match value {
serde_json::Value::Null => "null".to_string(),
serde_json::Value::String(text) => text.clone(),
serde_json::Value::Bool(value) => value.to_string(),
serde_json::Value::Number(value) => value.to_string(),
other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
}
}
fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
let json =
serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
let value = json_to_vm_value(&json);
match (&event.raw_body, value) {
(Some(raw_body), VmValue::Dict(dict)) => {
let mut map = (*dict).clone();
map.insert(
"raw_body".to_string(),
VmValue::Bytes(Rc::new(raw_body.clone())),
);
Ok(VmValue::Dict(Rc::new(map)))
}
(_, other) => Ok(other),
}
}
fn decrement_in_flight(state: &DispatcherRuntimeState) {
let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
state.idle_notify.notify_waiters();
}
}
fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
state.idle_notify.notify_waiters();
}
}
#[cfg(test)]
fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
*slot.borrow_mut() = Some(tx);
});
}
#[cfg(not(test))]
fn notify_test_inbox_dequeued() {}
#[cfg(test)]
fn notify_test_inbox_dequeued() {
TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
if let Some(tx) = slot.borrow_mut().take() {
let _ = tx.send(());
}
});
}
pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
event_log: &L,
event: &TriggerEvent,
) -> Result<u64, DispatchError> {
let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
let headers = event_headers(event, None, None, None);
let payload =
serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
event_log
.append(
&topic,
LogEvent::new("event_ingested", payload).with_headers(headers),
)
.await
.map_err(DispatchError::from)
}
pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
ACTIVE_DISPATCHER_STATE.with(|slot| {
slot.borrow()
.as_ref()
.map(|state| DispatcherStatsSnapshot {
in_flight: state.in_flight.load(Ordering::Relaxed),
retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
})
.unwrap_or_default()
})
}
pub fn clear_dispatcher_state() {
ACTIVE_DISPATCHER_STATE.with(|slot| {
*slot.borrow_mut() = None;
});
ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
*slot.borrow_mut() = None;
});
}
fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
}
if is_cancelled_vm_error(&error) {
return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
}
if let VmError::Thrown(VmValue::String(message)) = &error {
return DispatchError::Local(message.to_string());
}
match error_to_category(&error) {
ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
ErrorCategory::Cancelled => {
DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
}
_ => DispatchError::Local(error.to_string()),
}
}
fn dispatch_error_label(error: &DispatchError) -> &'static str {
match error {
DispatchError::Denied(_) => "denied",
DispatchError::Timeout(_) => "timeout",
DispatchError::Waiting(_) => "waiting",
DispatchError::Cancelled(_) => "cancelled",
_ => "failed",
}
}
fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
match route {
DispatchUri::Worker { .. } => "enqueued",
DispatchUri::Persona { .. } => "recorded",
DispatchUri::A2a { .. }
if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
{
"pending"
}
DispatchUri::A2a { .. } => "completed",
DispatchUri::Local { .. } => "success",
}
}
fn dispatch_node_id(
route: &DispatchUri,
binding_key: &str,
event_id: &str,
attempt: u32,
) -> String {
let prefix = match route {
DispatchUri::A2a { .. } => "a2a",
_ => "dispatch",
};
format!("{prefix}:{binding_key}:{event_id}:{attempt}")
}
fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
match route {
DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
_ => ACTION_GRAPH_NODE_KIND_DISPATCH,
}
}
fn dispatch_node_label(route: &DispatchUri) -> String {
match route {
DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
_ => route.target_uri(),
}
}
fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
match route {
DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
_ => None,
}
}
fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
match route {
DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
_ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
_ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
}
}
fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
match status {
crate::triggers::SignatureStatus::Verified => "verified",
crate::triggers::SignatureStatus::Unsigned => "unsigned",
crate::triggers::SignatureStatus::Failed { .. } => "failed",
}
}
fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
let mut metadata = BTreeMap::new();
metadata.insert(
"provider".to_string(),
serde_json::json!(event.provider.as_str()),
);
metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
metadata.insert(
"dedupe_key".to_string(),
serde_json::json!(event.dedupe_key),
);
metadata.insert(
"signature_status".to_string(),
serde_json::json!(signature_status_label(&event.signature_status)),
);
metadata
}
fn trigger_event_persona_metadata(event: &TriggerEvent) -> BTreeMap<String, String> {
let mut metadata = BTreeMap::new();
metadata.insert("trigger_event_id".to_string(), event.id.0.clone());
metadata.insert("event_id".to_string(), event.id.0.clone());
metadata.insert("dedupe_key".to_string(), event.dedupe_key.clone());
metadata.insert("trace_id".to_string(), event.trace_id.0.clone());
if let Some(tenant_id) = &event.tenant_id {
metadata.insert("tenant_id".to_string(), tenant_id.0.clone());
}
for (key, value) in &event.headers {
metadata.insert(format!("header.{key}"), value.clone());
}
if let Ok(payload) = serde_json::to_value(&event.provider_payload) {
collect_persona_payload_metadata("", &payload, &mut metadata);
if let Some(raw) = payload.get("raw") {
collect_persona_payload_metadata("", raw, &mut metadata);
}
}
metadata
}
fn collect_persona_payload_metadata(
prefix: &str,
value: &serde_json::Value,
metadata: &mut BTreeMap<String, String>,
) {
let serde_json::Value::Object(object) = value else {
return;
};
for (key, value) in object {
let path = if prefix.is_empty() {
key.clone()
} else {
format!("{prefix}.{key}")
};
match value {
serde_json::Value::String(text) => {
metadata.insert(path, text.clone());
}
serde_json::Value::Number(number) => {
metadata.insert(path, number.to_string());
}
serde_json::Value::Bool(flag) => {
metadata.insert(path, flag.to_string());
}
serde_json::Value::Object(_) if prefix.is_empty() => {
collect_persona_payload_metadata(&path, value, metadata);
}
_ => {}
}
}
}
fn dispatch_node_metadata(
route: &DispatchUri,
binding: &TriggerBinding,
event: &TriggerEvent,
attempt: u32,
) -> BTreeMap<String, serde_json::Value> {
let mut metadata = BTreeMap::new();
metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
metadata.insert(
"target_uri".to_string(),
serde_json::json!(route.target_uri()),
);
metadata.insert("attempt".to_string(), serde_json::json!(attempt));
metadata.insert(
"trigger_id".to_string(),
serde_json::json!(binding.id.as_str()),
);
metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
if let Some(target_agent) = dispatch_target_agent(route) {
metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
}
if let DispatchUri::Worker { queue } = route {
metadata.insert("queue_name".to_string(), serde_json::json!(queue));
}
if let DispatchUri::Persona { name } = route {
metadata.insert("persona".to_string(), serde_json::json!(name));
}
metadata
}
fn dispatch_success_metadata(
route: &DispatchUri,
binding: &TriggerBinding,
event: &TriggerEvent,
attempt: u32,
result: &serde_json::Value,
) -> BTreeMap<String, serde_json::Value> {
let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
match route {
DispatchUri::A2a { .. } => {
if let Some(task_id) = result
.get("task_id")
.or_else(|| result.get("id"))
.and_then(|value| value.as_str())
{
metadata.insert("task_id".to_string(), serde_json::json!(task_id));
}
if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
metadata.insert("state".to_string(), serde_json::json!(state));
}
}
DispatchUri::Worker { .. } => {
if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
{
metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
}
if let Some(response_topic) = result
.get("response_topic")
.and_then(|value| value.as_str())
{
metadata.insert(
"response_topic".to_string(),
serde_json::json!(response_topic),
);
}
}
DispatchUri::Persona { .. } => {
if let Some(receipt_id) = result.get("receipt_id").and_then(|value| value.as_str()) {
metadata.insert("receipt_id".to_string(), serde_json::json!(receipt_id));
}
if let Some(status) = result.get("status").and_then(|value| value.as_str()) {
metadata.insert("status".to_string(), serde_json::json!(status));
}
}
DispatchUri::Local { .. } => {}
}
metadata
}
fn dispatch_error_metadata(
route: &DispatchUri,
binding: &TriggerBinding,
event: &TriggerEvent,
attempt: u32,
error: &DispatchError,
) -> BTreeMap<String, serde_json::Value> {
let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
metadata
}
fn retry_node_metadata(
binding: &TriggerBinding,
event: &TriggerEvent,
attempt: u32,
delay: Duration,
error: &DispatchError,
) -> BTreeMap<String, serde_json::Value> {
let mut metadata = BTreeMap::new();
metadata.insert(
"trigger_id".to_string(),
serde_json::json!(binding.id.as_str()),
);
metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
metadata.insert("attempt".to_string(), serde_json::json!(attempt));
metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
metadata
}
fn split_binding_key(binding_key: &str) -> (String, u32) {
let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
return (binding_key.to_string(), 0);
};
let version = suffix.parse::<u32>().unwrap_or(0);
(binding_id.to_string(), version)
}
fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
match binding_version {
Some(version) => format!("{trigger_id}@v{version}"),
None => trigger_id.to_string(),
}
}
fn tenant_id(event: &TriggerEvent) -> Option<&str> {
event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
}
fn current_unix_ms() -> i64 {
unix_ms(crate::triggers::test_util::clock::now_utc())
}
fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
(timestamp.unix_timestamp_nanos() / 1_000_000) as i64
}
fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
.unwrap_or_else(|| unix_ms(event.received_at))
}
fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
.unwrap_or_else(|| accepted_at_ms(headers, event))
}
fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
headers
.and_then(|headers| headers.get(name))
.and_then(|value| value.parse::<i64>().ok())
}
fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
}
fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
match result {
Ok(_) => "succeeded",
Err(DispatchError::Waiting(_)) => "waiting",
Err(DispatchError::Cancelled(_)) => "cancelled",
Err(DispatchError::Denied(_)) => "denied",
Err(DispatchError::Timeout(_)) => "timeout",
Err(_) => "failed",
}
}
fn is_cancelled_vm_error(error: &VmError) -> bool {
matches!(
error,
VmError::Thrown(VmValue::String(message))
if message.starts_with("kind:cancelled:")
) || matches!(error_to_category(error), ErrorCategory::Cancelled)
}
fn event_headers(
event: &TriggerEvent,
binding: Option<&TriggerBinding>,
attempt: Option<u32>,
replay_of_event_id: Option<&String>,
) -> BTreeMap<String, String> {
let mut headers = BTreeMap::new();
headers.insert("event_id".to_string(), event.id.0.clone());
headers.insert("trace_id".to_string(), event.trace_id.0.clone());
headers.insert("provider".to_string(), event.provider.as_str().to_string());
headers.insert("kind".to_string(), event.kind.clone());
if let Some(replay_of_event_id) = replay_of_event_id {
headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
}
if let Some(tenant_id) = event.tenant_id.as_ref() {
headers.insert("tenant_id".to_string(), tenant_id.0.clone());
}
if let Some(binding) = binding {
headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
headers.insert("binding_key".to_string(), binding.binding_key());
headers.insert(
"handler_kind".to_string(),
DispatchUri::from(&binding.handler).kind().to_string(),
);
}
if let Some(attempt) = attempt {
headers.insert("attempt".to_string(), attempt.to_string());
}
headers
}
fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
let topic = Topic::new(topic_name)
.expect("static trigger dispatcher topic names should always be valid");
match event.tenant_id.as_ref() {
Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
None => Ok(topic),
}
}
fn worker_queue_priority(
binding: &super::registry::TriggerBinding,
event: &TriggerEvent,
) -> crate::WorkerQueuePriority {
match event
.headers
.get("priority")
.map(|value| value.trim().to_ascii_lowercase())
.as_deref()
{
Some("high") => crate::WorkerQueuePriority::High,
Some("low") => crate::WorkerQueuePriority::Low,
_ => binding.dispatch_priority,
}
}
const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
fn maybe_fail_before_outbox() {
if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
std::process::exit(86);
}
}
fn now_rfc3339() -> String {
crate::triggers::test_util::clock::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
}
fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
let now = crate::triggers::test_util::clock::now_utc();
let reset = if binding.hourly_cost_usd.is_some() {
let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
} else {
let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
};
reset
.format(&time::format_description::well_known::Rfc3339)
.unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
}
fn now_unix_ms() -> i64 {
(crate::triggers::test_util::clock::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
}
fn cancelled_dispatch_outcome(
binding: &TriggerBinding,
route: &DispatchUri,
event: &TriggerEvent,
replay_of_event_id: Option<String>,
attempt_count: u32,
error: String,
) -> DispatchOutcome {
DispatchOutcome {
trigger_id: binding.id.as_str().to_string(),
binding_key: binding.binding_key(),
event_id: event.id.0.clone(),
attempt_count,
status: DispatchStatus::Cancelled,
handler_kind: route.kind().to_string(),
target_uri: route.target_uri(),
replay_of_event_id,
result: None,
error: Some(error),
}
}
async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
let _ = cancel_rx.recv().await;
}
#[cfg(test)]
mod tests;