Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
16use crate::llm::vm_value_to_json;
17use crate::orchestration::{
18    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
19    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
20    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
21    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
22    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
23    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
24    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
25};
26use crate::stdlib::json_to_vm_value;
27use crate::trust_graph::{
28    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
29};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::{
35    binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
36    expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
37    note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
38    reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
39    TriggerHandlerSpec,
40};
41use super::{
42    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
43    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
44    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
45    TRIGGER_OUTBOX_TOPIC,
46};
47use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
48
49mod flow_control;
50pub mod retry;
51pub mod uri;
52
53pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
54
55pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
56pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
57pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
58const DESTINATION_CIRCUIT_FAILURE_THRESHOLD: u32 = 5;
59const DESTINATION_CIRCUIT_BACKOFF: Duration = Duration::from_secs(60);
60
61thread_local! {
62    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65    #[cfg(test)]
66    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70    static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75    pub trigger_event: TriggerEvent,
76    pub replay_of_event_id: Option<String>,
77    pub binding_id: String,
78    pub binding_version: u32,
79    pub agent_id: String,
80    pub action: String,
81    pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87    fn drop(&mut self) {
88        crate::orchestration::pop_execution_policy();
89    }
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93struct PredicateCacheRecord {
94    trigger_id: String,
95    event_id: String,
96    entries: Vec<PredicateCacheEntry>,
97}
98
99#[derive(Clone, Debug, Default)]
100struct PredicateEvaluationRecord {
101    result: bool,
102    cost_usd: f64,
103    tokens: u64,
104    latency_ms: u64,
105    cached: bool,
106    reason: Option<String>,
107    exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
108}
109
110const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
111
112pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
113    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
114}
115
116pub(crate) fn current_dispatch_is_replay() -> bool {
117    ACTIVE_DISPATCH_IS_REPLAY
118        .try_with(|is_replay| *is_replay)
119        .unwrap_or(false)
120}
121
122pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
123    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
124}
125
126#[derive(Clone)]
127pub struct Dispatcher {
128    base_vm: Rc<Vm>,
129    event_log: Arc<AnyEventLog>,
130    cancel_tx: broadcast::Sender<()>,
131    state: Arc<DispatcherRuntimeState>,
132    metrics: Option<Arc<crate::MetricsRegistry>>,
133}
134
135#[derive(Debug)]
136struct DispatcherRuntimeState {
137    in_flight: AtomicU64,
138    retry_queue_depth: AtomicU64,
139    dlq: Mutex<Vec<DlqEntry>>,
140    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
141    shutting_down: std::sync::atomic::AtomicBool,
142    idle_notify: Notify,
143    flow_control: FlowControlManager,
144    destination_circuits: DestinationCircuitRegistry,
145}
146
147impl DispatcherRuntimeState {
148    fn new(event_log: Arc<AnyEventLog>) -> Self {
149        Self {
150            in_flight: AtomicU64::new(0),
151            retry_queue_depth: AtomicU64::new(0),
152            dlq: Mutex::new(Vec::new()),
153            cancel_tokens: Mutex::new(Vec::new()),
154            shutting_down: std::sync::atomic::AtomicBool::new(false),
155            idle_notify: Notify::new(),
156            flow_control: FlowControlManager::new(event_log),
157            destination_circuits: DestinationCircuitRegistry::default(),
158        }
159    }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq)]
163enum DestinationCircuitProbe {
164    Allow { half_open: bool },
165    Block { retry_after: Duration },
166}
167
168#[derive(Debug)]
169struct DestinationCircuitRegistry {
170    threshold: u32,
171    backoff: Duration,
172    states: Mutex<BTreeMap<String, DestinationCircuitState>>,
173}
174
175#[derive(Clone, Debug)]
176struct DestinationCircuitState {
177    failures: u32,
178    opened_at: Option<Instant>,
179}
180
181impl Default for DestinationCircuitRegistry {
182    fn default() -> Self {
183        Self {
184            threshold: DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
185            backoff: DESTINATION_CIRCUIT_BACKOFF,
186            states: Mutex::new(BTreeMap::new()),
187        }
188    }
189}
190
191impl DestinationCircuitRegistry {
192    fn check(&self, destination: &str) -> DestinationCircuitProbe {
193        let mut states = self
194            .states
195            .lock()
196            .expect("destination circuit registry poisoned");
197        let Some(state) = states.get_mut(destination) else {
198            return DestinationCircuitProbe::Allow { half_open: false };
199        };
200        let Some(opened_at) = state.opened_at else {
201            return DestinationCircuitProbe::Allow { half_open: false };
202        };
203        let elapsed = opened_at.elapsed();
204        if elapsed >= self.backoff {
205            DestinationCircuitProbe::Allow { half_open: true }
206        } else {
207            DestinationCircuitProbe::Block {
208                retry_after: self.backoff.saturating_sub(elapsed),
209            }
210        }
211    }
212
213    fn record_success(&self, destination: &str) {
214        let mut states = self
215            .states
216            .lock()
217            .expect("destination circuit registry poisoned");
218        states.remove(destination);
219    }
220
221    fn record_failure(&self, destination: &str) -> bool {
222        let mut states = self
223            .states
224            .lock()
225            .expect("destination circuit registry poisoned");
226        let state = states
227            .entry(destination.to_string())
228            .or_insert(DestinationCircuitState {
229                failures: 0,
230                opened_at: None,
231            });
232        if state.opened_at.is_some() {
233            state.failures = self.threshold;
234            state.opened_at = Some(Instant::now());
235            return true;
236        }
237        state.failures = state.failures.saturating_add(1);
238        if state.failures >= self.threshold {
239            state.opened_at = Some(Instant::now());
240            true
241        } else {
242            false
243        }
244    }
245}
246
247#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
248#[serde(default)]
249pub struct DispatcherStatsSnapshot {
250    pub in_flight: u64,
251    pub retry_queue_depth: u64,
252    pub dlq_depth: u64,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256#[serde(rename_all = "snake_case")]
257pub enum DispatchStatus {
258    Succeeded,
259    Failed,
260    Dlq,
261    Skipped,
262    Waiting,
263    Cancelled,
264}
265
266impl DispatchStatus {
267    pub fn as_str(&self) -> &'static str {
268        match self {
269            Self::Succeeded => "succeeded",
270            Self::Failed => "failed",
271            Self::Dlq => "dlq",
272            Self::Skipped => "skipped",
273            Self::Waiting => "waiting",
274            Self::Cancelled => "cancelled",
275        }
276    }
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280#[serde(default)]
281pub struct DispatchOutcome {
282    pub trigger_id: String,
283    pub binding_key: String,
284    pub event_id: String,
285    pub attempt_count: u32,
286    pub status: DispatchStatus,
287    pub handler_kind: String,
288    pub target_uri: String,
289    pub replay_of_event_id: Option<String>,
290    pub result: Option<serde_json::Value>,
291    pub error: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct InboxEnvelope {
296    pub trigger_id: Option<String>,
297    pub binding_version: Option<u32>,
298    pub event: TriggerEvent,
299}
300
301#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
302#[serde(default)]
303pub struct DispatcherDrainReport {
304    pub drained: bool,
305    pub in_flight: u64,
306    pub retry_queue_depth: u64,
307    pub dlq_depth: u64,
308}
309
310impl Default for DispatchOutcome {
311    fn default() -> Self {
312        Self {
313            trigger_id: String::new(),
314            binding_key: String::new(),
315            event_id: String::new(),
316            attempt_count: 0,
317            status: DispatchStatus::Failed,
318            handler_kind: String::new(),
319            target_uri: String::new(),
320            replay_of_event_id: None,
321            result: None,
322            error: None,
323        }
324    }
325}
326
327#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
328#[serde(default)]
329pub struct DispatchAttemptRecord {
330    pub trigger_id: String,
331    pub binding_key: String,
332    pub event_id: String,
333    pub attempt: u32,
334    pub handler_kind: String,
335    pub started_at: String,
336    pub completed_at: String,
337    pub outcome: String,
338    pub error_msg: Option<String>,
339}
340
341#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
342pub struct DispatchCancelRequest {
343    pub binding_key: String,
344    pub event_id: String,
345    #[serde(with = "time::serde::rfc3339")]
346    pub requested_at: time::OffsetDateTime,
347    #[serde(default)]
348    pub requested_by: Option<String>,
349    #[serde(default)]
350    pub audit_id: Option<String>,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct DlqEntry {
355    pub trigger_id: String,
356    pub binding_key: String,
357    pub event: TriggerEvent,
358    pub attempt_count: u32,
359    pub final_error: String,
360    pub attempts: Vec<DispatchAttemptRecord>,
361}
362
363#[derive(Clone, Debug)]
364struct SingletonLease {
365    gate: String,
366    held: bool,
367}
368
369#[derive(Clone, Debug)]
370struct ConcurrencyLease {
371    gate: String,
372    max: u32,
373    priority_rank: usize,
374    permit: Option<ConcurrencyPermit>,
375}
376
377#[derive(Default, Debug)]
378struct AcquiredFlowControl {
379    singleton: Option<SingletonLease>,
380    concurrency: Option<ConcurrencyLease>,
381}
382
383#[derive(Clone)]
384pub(crate) struct DispatchWaitLease {
385    state: Arc<DispatcherRuntimeState>,
386    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
387    suspended: Arc<AtomicBool>,
388}
389
390impl DispatchWaitLease {
391    fn new(
392        state: Arc<DispatcherRuntimeState>,
393        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
394    ) -> Self {
395        Self {
396            state,
397            acquired,
398            suspended: Arc::new(AtomicBool::new(false)),
399        }
400    }
401
402    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
403        if self.suspended.swap(true, Ordering::SeqCst) {
404            return Ok(());
405        }
406        let (singleton_gate, concurrency_permit) = {
407            let mut acquired = self.acquired.lock().await;
408            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
409                if lease.held {
410                    lease.held = false;
411                    Some(lease.gate.clone())
412                } else {
413                    None
414                }
415            });
416            let concurrency_permit = acquired
417                .concurrency
418                .as_mut()
419                .and_then(|lease| lease.permit.take());
420            (singleton_gate, concurrency_permit)
421        };
422
423        if let Some(gate) = singleton_gate {
424            self.state
425                .flow_control
426                .release_singleton(&gate)
427                .await
428                .map_err(DispatchError::from)?;
429        }
430        if let Some(permit) = concurrency_permit {
431            self.state
432                .flow_control
433                .release_concurrency(permit)
434                .await
435                .map_err(DispatchError::from)?;
436        }
437        Ok(())
438    }
439
440    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
441        if !self.suspended.swap(false, Ordering::SeqCst) {
442            return Ok(());
443        }
444
445        let singleton_gate = {
446            let acquired = self.acquired.lock().await;
447            acquired.singleton.as_ref().and_then(|lease| {
448                if lease.held {
449                    None
450                } else {
451                    Some(lease.gate.clone())
452                }
453            })
454        };
455        if let Some(gate) = singleton_gate {
456            self.state
457                .flow_control
458                .acquire_singleton(&gate)
459                .await
460                .map_err(DispatchError::from)?;
461            let mut acquired = self.acquired.lock().await;
462            if let Some(lease) = acquired.singleton.as_mut() {
463                lease.held = true;
464            }
465        }
466
467        let concurrency_spec = {
468            let acquired = self.acquired.lock().await;
469            acquired.concurrency.as_ref().and_then(|lease| {
470                if lease.permit.is_some() {
471                    None
472                } else {
473                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
474                }
475            })
476        };
477        if let Some((gate, max, priority_rank)) = concurrency_spec {
478            let permit = self
479                .state
480                .flow_control
481                .acquire_concurrency(&gate, max, priority_rank)
482                .await
483                .map_err(DispatchError::from)?;
484            let mut acquired = self.acquired.lock().await;
485            if let Some(lease) = acquired.concurrency.as_mut() {
486                lease.permit = Some(permit);
487            }
488        }
489        Ok(())
490    }
491}
492
493enum FlowControlOutcome {
494    Dispatch {
495        event: Box<TriggerEvent>,
496        acquired: AcquiredFlowControl,
497    },
498    Skip {
499        reason: String,
500    },
501}
502
503#[derive(Clone, Debug)]
504enum DispatchSkipStage {
505    Predicate,
506    FlowControl,
507}
508
509#[derive(Debug)]
510pub enum DispatchError {
511    EventLog(String),
512    Registry(String),
513    Serde(String),
514    Local(String),
515    A2a(String),
516    Denied(String),
517    Timeout(String),
518    Waiting(String),
519    Cancelled(String),
520    NotImplemented(String),
521}
522
523impl std::fmt::Display for DispatchError {
524    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525        match self {
526            Self::EventLog(message)
527            | Self::Registry(message)
528            | Self::Serde(message)
529            | Self::Local(message)
530            | Self::A2a(message)
531            | Self::Denied(message)
532            | Self::Timeout(message)
533            | Self::Waiting(message)
534            | Self::Cancelled(message)
535            | Self::NotImplemented(message) => f.write_str(message),
536        }
537    }
538}
539
540impl std::error::Error for DispatchError {}
541
542impl DispatchError {
543    fn retryable(&self) -> bool {
544        !matches!(
545            self,
546            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
547        )
548    }
549}
550
551impl DispatchSkipStage {
552    fn as_str(&self) -> &'static str {
553        match self {
554            Self::Predicate => "predicate",
555            Self::FlowControl => "flow_control",
556        }
557    }
558}
559
560impl From<LogError> for DispatchError {
561    fn from(value: LogError) -> Self {
562        Self::EventLog(value.to_string())
563    }
564}
565
566pub async fn append_dispatch_cancel_request(
567    event_log: &Arc<AnyEventLog>,
568    request: &DispatchCancelRequest,
569) -> Result<u64, DispatchError> {
570    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
571        .expect("static trigger cancel topic should always be valid");
572    event_log
573        .append(
574            &topic,
575            LogEvent::new(
576                "dispatch_cancel_requested",
577                serde_json::to_value(request)
578                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
579            ),
580        )
581        .await
582        .map_err(DispatchError::from)
583}
584
585impl Dispatcher {
586    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
587        self.event_log.clone()
588    }
589
590    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
591        let event_log = active_event_log().ok_or_else(|| {
592            DispatchError::EventLog("dispatcher requires an active event log".to_string())
593        })?;
594        Ok(Self::with_event_log(base_vm, event_log))
595    }
596
597    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
598        Self::with_event_log_and_metrics(base_vm, event_log, None)
599    }
600
601    pub fn with_event_log_and_metrics(
602        base_vm: Vm,
603        event_log: Arc<AnyEventLog>,
604        metrics: Option<Arc<crate::MetricsRegistry>>,
605    ) -> Self {
606        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
607        ACTIVE_DISPATCHER_STATE.with(|slot| {
608            *slot.borrow_mut() = Some(state.clone());
609        });
610        let (cancel_tx, _) = broadcast::channel(32);
611        Self {
612            base_vm: Rc::new(base_vm),
613            event_log,
614            cancel_tx,
615            state,
616            metrics,
617        }
618    }
619
620    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
621        DispatcherStatsSnapshot {
622            in_flight: self.state.in_flight.load(Ordering::Relaxed),
623            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
624            dlq_depth: self
625                .state
626                .dlq
627                .lock()
628                .expect("dispatcher dlq poisoned")
629                .len() as u64,
630        }
631    }
632
633    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
634        self.state
635            .dlq
636            .lock()
637            .expect("dispatcher dlq poisoned")
638            .clone()
639    }
640
641    pub fn shutdown(&self) {
642        self.state.shutting_down.store(true, Ordering::SeqCst);
643        for token in self
644            .state
645            .cancel_tokens
646            .lock()
647            .expect("dispatcher cancel tokens poisoned")
648            .iter()
649        {
650            token.store(true, Ordering::SeqCst);
651        }
652        let _ = self.cancel_tx.send(());
653    }
654
655    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
656        self.enqueue_targeted(None, None, event).await
657    }
658
659    pub async fn enqueue_targeted(
660        &self,
661        trigger_id: Option<String>,
662        binding_version: Option<u32>,
663        event: TriggerEvent,
664    ) -> Result<u64, DispatchError> {
665        self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
666            .await
667    }
668
669    pub async fn enqueue_targeted_with_headers(
670        &self,
671        trigger_id: Option<String>,
672        binding_version: Option<u32>,
673        event: TriggerEvent,
674        parent_headers: Option<&BTreeMap<String, String>>,
675    ) -> Result<u64, DispatchError> {
676        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
677            .expect("static trigger inbox envelopes topic is valid");
678        let trigger_id_for_metrics = trigger_id.clone();
679        let mut headers = parent_headers.cloned().unwrap_or_default();
680        headers.extend(event_headers(&event, None, None, None));
681        if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
682            headers.insert("trigger_id".to_string(), trigger_id.clone());
683            headers.insert(
684                "binding_key".to_string(),
685                binding_key_from_parts(trigger_id, binding_version),
686            );
687        }
688        headers
689            .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
690            .or_insert_with(|| unix_ms(event.received_at).to_string());
691        let payload = serde_json::to_value(InboxEnvelope {
692            trigger_id,
693            binding_version,
694            event: event.clone(),
695        })
696        .map_err(|error| DispatchError::Serde(error.to_string()))?;
697        let mut log_event = LogEvent::new("event_ingested", payload);
698        let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
699        let queue_appended_at_ms = headers
700            .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
701            .and_then(|value| value.parse::<i64>().ok())
702            .unwrap_or(log_event.occurred_at_ms);
703        headers
704            .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
705            .or_insert_with(|| log_event.occurred_at_ms.to_string());
706        if let (Some(metrics), Some(trigger_id)) =
707            (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
708        {
709            let binding_key = binding_key_from_parts(trigger_id, binding_version);
710            let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
711            if !had_queue_appended_at {
712                metrics.record_trigger_accepted_to_queue_append(
713                    trigger_id,
714                    &binding_key,
715                    event.provider.as_str(),
716                    tenant_id(&event),
717                    "queued",
718                    duration_between_ms(queue_appended_at_ms, accepted_at_ms),
719                );
720            }
721            metrics.note_trigger_pending_event(
722                event.id.0.as_str(),
723                trigger_id,
724                &binding_key,
725                event.provider.as_str(),
726                tenant_id(&event),
727                accepted_at_ms,
728                queue_appended_at_ms,
729            );
730        }
731        log_event.headers = headers;
732        self.event_log
733            .append(&topic, log_event)
734            .await
735            .map_err(DispatchError::from)
736    }
737
738    pub async fn run(&self) -> Result<(), DispatchError> {
739        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
740            .expect("static trigger inbox envelopes topic is valid");
741        let start_from = self.event_log.latest(&topic).await?;
742        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
743        pin_mut!(stream);
744        let mut cancel_rx = self.cancel_tx.subscribe();
745
746        loop {
747            tokio::select! {
748                received = stream.next() => {
749                    let Some(received) = received else {
750                        break;
751                    };
752                    let (_, event) = received.map_err(DispatchError::from)?;
753                    if event.kind != "event_ingested" {
754                        continue;
755                    }
756                    let parent_headers = event.headers.clone();
757                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
758                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
759                    notify_test_inbox_dequeued();
760                    let _ = self
761                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
762                        .await;
763                }
764                _ = recv_cancel(&mut cancel_rx) => break,
765            }
766        }
767
768        Ok(())
769    }
770
771    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
772        let deadline = tokio::time::Instant::now() + timeout;
773        loop {
774            let snapshot = self.snapshot();
775            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
776                return Ok(DispatcherDrainReport {
777                    drained: true,
778                    in_flight: snapshot.in_flight,
779                    retry_queue_depth: snapshot.retry_queue_depth,
780                    dlq_depth: snapshot.dlq_depth,
781                });
782            }
783
784            let notified = self.state.idle_notify.notified();
785            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
786            if remaining.is_zero() {
787                return Ok(DispatcherDrainReport {
788                    drained: false,
789                    in_flight: snapshot.in_flight,
790                    retry_queue_depth: snapshot.retry_queue_depth,
791                    dlq_depth: snapshot.dlq_depth,
792                });
793            }
794            if tokio::time::timeout(remaining, notified).await.is_err() {
795                let snapshot = self.snapshot();
796                return Ok(DispatcherDrainReport {
797                    drained: false,
798                    in_flight: snapshot.in_flight,
799                    retry_queue_depth: snapshot.retry_queue_depth,
800                    dlq_depth: snapshot.dlq_depth,
801                });
802            }
803        }
804    }
805
806    pub async fn dispatch_inbox_envelope(
807        &self,
808        envelope: InboxEnvelope,
809    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
810        self.dispatch_inbox_envelope_with_headers(envelope, None)
811            .await
812    }
813
814    pub async fn dispatch_inbox_envelope_with_parent_headers(
815        &self,
816        envelope: InboxEnvelope,
817        parent_headers: &BTreeMap<String, String>,
818    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
819        self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
820            .await
821    }
822
823    async fn dispatch_inbox_envelope_with_headers(
824        &self,
825        envelope: InboxEnvelope,
826        parent_headers: Option<&BTreeMap<String, String>>,
827    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
828        if let Some(trigger_id) = envelope.trigger_id {
829            let binding = super::registry::resolve_live_trigger_binding(
830                &trigger_id,
831                envelope.binding_version,
832            )
833            .map_err(|error| DispatchError::Registry(error.to_string()))?;
834            return Ok(vec![
835                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
836                    .await?,
837            ]);
838        }
839
840        let cron_target = match &envelope.event.provider_payload {
841            crate::triggers::ProviderPayload::Known(
842                crate::triggers::event::KnownProviderPayload::Cron(payload),
843            ) => payload.cron_id.clone(),
844            _ => None,
845        };
846        if let Some(trigger_id) = cron_target {
847            let binding = super::registry::resolve_live_trigger_binding(
848                &trigger_id,
849                envelope.binding_version,
850            )
851            .map_err(|error| DispatchError::Registry(error.to_string()))?;
852            return Ok(vec![
853                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
854                    .await?,
855            ]);
856        }
857
858        self.dispatch_event_with_headers(envelope.event, parent_headers)
859            .await
860    }
861
862    pub async fn dispatch_event(
863        &self,
864        event: TriggerEvent,
865    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
866        self.dispatch_event_with_headers(event, None).await
867    }
868
869    async fn dispatch_event_with_headers(
870        &self,
871        event: TriggerEvent,
872        parent_headers: Option<&BTreeMap<String, String>>,
873    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
874        let bindings = matching_bindings(&event);
875        let mut outcomes = Vec::new();
876        for binding in bindings {
877            outcomes.push(
878                self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
879                    .await?,
880            );
881        }
882        Ok(outcomes)
883    }
884
885    pub async fn dispatch(
886        &self,
887        binding: &TriggerBinding,
888        event: TriggerEvent,
889    ) -> Result<DispatchOutcome, DispatchError> {
890        self.dispatch_with_replay(binding, event, None, None, None)
891            .await
892    }
893
894    pub async fn dispatch_replay(
895        &self,
896        binding: &TriggerBinding,
897        event: TriggerEvent,
898        replay_of_event_id: String,
899    ) -> Result<DispatchOutcome, DispatchError> {
900        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
901            .await
902    }
903
904    pub async fn dispatch_with_parent_span_id(
905        &self,
906        binding: &TriggerBinding,
907        event: TriggerEvent,
908        parent_span_id: Option<String>,
909    ) -> Result<DispatchOutcome, DispatchError> {
910        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
911            .await
912    }
913
914    async fn dispatch_with_replay(
915        &self,
916        binding: &TriggerBinding,
917        event: TriggerEvent,
918        replay_of_event_id: Option<String>,
919        parent_span_id: Option<String>,
920        parent_headers: Option<&BTreeMap<String, String>>,
921    ) -> Result<DispatchOutcome, DispatchError> {
922        let parent_headers_for_metrics = parent_headers.cloned();
923        let admitted_at_ms = current_unix_ms();
924        if let Some(metrics) = self.metrics.as_ref() {
925            let binding_key = binding.binding_key();
926            let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
927            metrics.record_trigger_queue_age_at_dispatch_admission(
928                binding.id.as_str(),
929                &binding_key,
930                event.provider.as_str(),
931                tenant_id(&event),
932                "admitted",
933                duration_between_ms(admitted_at_ms, queue_appended_at_ms),
934            );
935            metrics.clear_trigger_pending_event(
936                event.id.0.as_str(),
937                binding.id.as_str(),
938                &binding_key,
939                event.provider.as_str(),
940                tenant_id(&event),
941                admitted_at_ms,
942            );
943        }
944        let span = tracing::info_span!(
945            "dispatch",
946            trigger_id = %binding.id.as_str(),
947            binding_version = binding.version,
948            trace_id = %event.trace_id.0
949        );
950        #[cfg(feature = "otel")]
951        let span_for_otel = span.clone();
952        let _ = if let Some(headers) = parent_headers {
953            crate::observability::otel::set_span_parent_from_headers(
954                &span,
955                headers,
956                &event.trace_id,
957                parent_span_id.as_deref(),
958            )
959        } else {
960            crate::observability::otel::set_span_parent(
961                &span,
962                &event.trace_id,
963                parent_span_id.as_deref(),
964            )
965        };
966        #[cfg(feature = "otel")]
967        let started_at = Instant::now();
968        let metrics = self.metrics.clone();
969        let outcome = ACTIVE_DISPATCH_IS_REPLAY
970            .scope(
971                replay_of_event_id.is_some(),
972                self.dispatch_with_replay_inner(
973                    binding,
974                    event,
975                    replay_of_event_id,
976                    parent_headers_for_metrics,
977                )
978                .instrument(span),
979            )
980            .await;
981        if let Some(metrics) = metrics.as_ref() {
982            match &outcome {
983                Ok(dispatch_outcome) => match dispatch_outcome.status {
984                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
985                        metrics.record_dispatch_succeeded();
986                    }
987                    DispatchStatus::Waiting => {}
988                    _ => metrics.record_dispatch_failed(),
989                },
990                Err(_) => metrics.record_dispatch_failed(),
991            }
992            let outcome_label = match &outcome {
993                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
994                Err(DispatchError::Cancelled(_)) => "cancelled",
995                Err(_) => "failed",
996            };
997            metrics.record_trigger_dispatched(
998                binding.id.as_str(),
999                binding.handler.kind(),
1000                outcome_label,
1001            );
1002            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
1003        }
1004        #[cfg(feature = "otel")]
1005        {
1006            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1007
1008            let duration_ms = started_at.elapsed().as_millis() as i64;
1009            let status = match &outcome {
1010                Ok(dispatch_outcome) => match dispatch_outcome.status {
1011                    DispatchStatus::Succeeded => "succeeded",
1012                    DispatchStatus::Skipped => "skipped",
1013                    DispatchStatus::Waiting => "waiting",
1014                    DispatchStatus::Cancelled => "cancelled",
1015                    DispatchStatus::Failed => "failed",
1016                    DispatchStatus::Dlq => "dlq",
1017                },
1018                Err(DispatchError::Cancelled(_)) => "cancelled",
1019                Err(_) => "failed",
1020            };
1021            span_for_otel.set_attribute("result.status", status);
1022            span_for_otel.set_attribute("result.duration_ms", duration_ms);
1023        }
1024        outcome
1025    }
1026
1027    async fn dispatch_with_replay_inner(
1028        &self,
1029        binding: &TriggerBinding,
1030        event: TriggerEvent,
1031        replay_of_event_id: Option<String>,
1032        parent_headers: Option<BTreeMap<String, String>>,
1033    ) -> Result<DispatchOutcome, DispatchError> {
1034        let autonomy_tier = crate::resolve_agent_autonomy_tier(
1035            &self.event_log,
1036            binding.id.as_str(),
1037            binding.autonomy_tier,
1038        )
1039        .await
1040        .unwrap_or(binding.autonomy_tier);
1041        let binding_key = binding.binding_key();
1042        let route = DispatchUri::from(&binding.handler);
1043        let trigger_id = binding.id.as_str().to_string();
1044        let event_id = event.id.0.clone();
1045        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
1046        let begin = if replay_of_event_id.is_some() {
1047            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
1048        } else {
1049            begin_in_flight(binding.id.as_str(), binding.version)
1050        };
1051        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
1052
1053        let mut attempts = Vec::new();
1054        let mut source_node_id = format!("trigger:{}", event.id.0);
1055        let mut initial_nodes = Vec::new();
1056        let mut initial_edges = Vec::new();
1057        if let Some(original_event_id) = replay_of_event_id.as_ref() {
1058            let original_node_id = format!("trigger:{original_event_id}");
1059            initial_nodes.push(RunActionGraphNodeRecord {
1060                id: original_node_id.clone(),
1061                label: format!(
1062                    "{}:{} (original {})",
1063                    event.provider.as_str(),
1064                    event.kind,
1065                    original_event_id
1066                ),
1067                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1068                status: "historical".to_string(),
1069                outcome: "replayed_from".to_string(),
1070                trace_id: Some(event.trace_id.0.clone()),
1071                stage_id: None,
1072                node_id: None,
1073                worker_id: None,
1074                run_id: None,
1075                run_path: None,
1076                metadata: trigger_node_metadata(&event),
1077            });
1078            initial_edges.push(RunActionGraphEdgeRecord {
1079                from_id: original_node_id,
1080                to_id: source_node_id.clone(),
1081                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1082                label: Some("replay chain".to_string()),
1083            });
1084        }
1085        initial_nodes.push(RunActionGraphNodeRecord {
1086            id: source_node_id.clone(),
1087            label: format!("{}:{}", event.provider.as_str(), event.kind),
1088            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1089            status: "received".to_string(),
1090            outcome: "received".to_string(),
1091            trace_id: Some(event.trace_id.0.clone()),
1092            stage_id: None,
1093            node_id: None,
1094            worker_id: None,
1095            run_id: None,
1096            run_path: None,
1097            metadata: trigger_node_metadata(&event),
1098        });
1099        self.emit_action_graph(
1100            &event,
1101            initial_nodes,
1102            initial_edges,
1103            serde_json::json!({
1104                "source": "dispatcher",
1105                "trigger_id": trigger_id,
1106                "binding_key": binding_key,
1107                "event_id": event_id,
1108                "replay_of_event_id": replay_of_event_id,
1109            }),
1110        )
1111        .await?;
1112
1113        if dispatch_cancel_requested(
1114            &self.event_log,
1115            &binding_key,
1116            &event.id.0,
1117            replay_of_event_id.as_ref(),
1118        )
1119        .await?
1120        {
1121            finish_in_flight(
1122                binding.id.as_str(),
1123                binding.version,
1124                TriggerDispatchOutcome::Failed,
1125            )
1126            .await
1127            .map_err(|error| DispatchError::Registry(error.to_string()))?;
1128            decrement_in_flight(&self.state);
1129            return Ok(cancelled_dispatch_outcome(
1130                binding,
1131                &route,
1132                &event,
1133                replay_of_event_id,
1134                0,
1135                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1136            ));
1137        }
1138
1139        if let Some(predicate) = binding.when.as_ref() {
1140            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1141            let evaluation = self
1142                .evaluate_predicate(
1143                    binding,
1144                    predicate,
1145                    &event,
1146                    replay_of_event_id.as_ref(),
1147                    autonomy_tier,
1148                )
1149                .await?;
1150            let passed = evaluation.result;
1151            self.emit_action_graph(
1152                &event,
1153                vec![RunActionGraphNodeRecord {
1154                    id: predicate_node_id.clone(),
1155                    label: predicate.raw.clone(),
1156                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1157                    status: "completed".to_string(),
1158                    outcome: passed.to_string(),
1159                    trace_id: Some(event.trace_id.0.clone()),
1160                    stage_id: None,
1161                    node_id: None,
1162                    worker_id: None,
1163                    run_id: None,
1164                    run_path: None,
1165                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1166                }],
1167                vec![RunActionGraphEdgeRecord {
1168                    from_id: source_node_id.clone(),
1169                    to_id: predicate_node_id.clone(),
1170                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1171                    label: None,
1172                }],
1173                serde_json::json!({
1174                    "source": "dispatcher",
1175                    "trigger_id": binding.id.as_str(),
1176                    "binding_key": binding.binding_key(),
1177                    "event_id": event.id.0,
1178                    "predicate": predicate.raw,
1179                    "reason": evaluation.reason,
1180                    "cached": evaluation.cached,
1181                    "cost_usd": evaluation.cost_usd,
1182                    "tokens": evaluation.tokens,
1183                    "latency_ms": evaluation.latency_ms,
1184                    "replay_of_event_id": replay_of_event_id,
1185                }),
1186            )
1187            .await?;
1188
1189            if !passed {
1190                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1191                    let final_error = format!(
1192                        "trigger budget exhausted: {}",
1193                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1194                    );
1195                    self.move_budget_exhausted_to_dlq(
1196                        binding,
1197                        &route,
1198                        &event,
1199                        replay_of_event_id.as_ref(),
1200                        &final_error,
1201                    )
1202                    .await?;
1203                    finish_in_flight(
1204                        binding.id.as_str(),
1205                        binding.version,
1206                        TriggerDispatchOutcome::Dlq,
1207                    )
1208                    .await
1209                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1210                    decrement_in_flight(&self.state);
1211                    self.append_dispatch_trust_record(
1212                        binding,
1213                        &route,
1214                        &event,
1215                        replay_of_event_id.as_ref(),
1216                        autonomy_tier,
1217                        TrustOutcome::Failure,
1218                        "dlq",
1219                        0,
1220                        Some(final_error.clone()),
1221                    )
1222                    .await?;
1223                    return Ok(DispatchOutcome {
1224                        trigger_id: binding.id.as_str().to_string(),
1225                        binding_key: binding.binding_key(),
1226                        event_id: event.id.0,
1227                        attempt_count: 0,
1228                        status: DispatchStatus::Dlq,
1229                        handler_kind: route.kind().to_string(),
1230                        target_uri: route.target_uri(),
1231                        replay_of_event_id,
1232                        result: None,
1233                        error: Some(final_error),
1234                    });
1235                }
1236
1237                if evaluation.exhaustion_strategy
1238                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1239                {
1240                    self.append_budget_deferred_event(
1241                        binding,
1242                        &route,
1243                        &event,
1244                        replay_of_event_id.as_ref(),
1245                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1246                    )
1247                    .await?;
1248                    finish_in_flight(
1249                        binding.id.as_str(),
1250                        binding.version,
1251                        TriggerDispatchOutcome::Dispatched,
1252                    )
1253                    .await
1254                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1255                    decrement_in_flight(&self.state);
1256                    self.append_dispatch_trust_record(
1257                        binding,
1258                        &route,
1259                        &event,
1260                        replay_of_event_id.as_ref(),
1261                        autonomy_tier,
1262                        TrustOutcome::Denied,
1263                        "waiting",
1264                        0,
1265                        evaluation.reason.clone(),
1266                    )
1267                    .await?;
1268                    return Ok(DispatchOutcome {
1269                        trigger_id: binding.id.as_str().to_string(),
1270                        binding_key: binding.binding_key(),
1271                        event_id: event.id.0,
1272                        attempt_count: 0,
1273                        status: DispatchStatus::Waiting,
1274                        handler_kind: route.kind().to_string(),
1275                        target_uri: route.target_uri(),
1276                        replay_of_event_id,
1277                        result: Some(serde_json::json!({
1278                            "deferred": true,
1279                            "predicate": predicate.raw,
1280                            "reason": evaluation.reason,
1281                        })),
1282                        error: None,
1283                    });
1284                }
1285
1286                self.append_skipped_outbox_event(
1287                    binding,
1288                    &route,
1289                    &event,
1290                    replay_of_event_id.as_ref(),
1291                    DispatchSkipStage::Predicate,
1292                    serde_json::json!({
1293                        "predicate": predicate.raw,
1294                        "reason": evaluation.reason,
1295                    }),
1296                )
1297                .await?;
1298                finish_in_flight(
1299                    binding.id.as_str(),
1300                    binding.version,
1301                    TriggerDispatchOutcome::Dispatched,
1302                )
1303                .await
1304                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1305                decrement_in_flight(&self.state);
1306                self.append_dispatch_trust_record(
1307                    binding,
1308                    &route,
1309                    &event,
1310                    replay_of_event_id.as_ref(),
1311                    autonomy_tier,
1312                    TrustOutcome::Denied,
1313                    "skipped",
1314                    0,
1315                    None,
1316                )
1317                .await?;
1318                return Ok(DispatchOutcome {
1319                    trigger_id: binding.id.as_str().to_string(),
1320                    binding_key: binding.binding_key(),
1321                    event_id: event.id.0,
1322                    attempt_count: 0,
1323                    status: DispatchStatus::Skipped,
1324                    handler_kind: route.kind().to_string(),
1325                    target_uri: route.target_uri(),
1326                    replay_of_event_id,
1327                    result: Some(serde_json::json!({
1328                        "skipped": true,
1329                        "predicate": predicate.raw,
1330                        "reason": evaluation.reason,
1331                    })),
1332                    error: None,
1333                });
1334            }
1335
1336            source_node_id = predicate_node_id;
1337        }
1338
1339        if autonomy_tier == AutonomyTier::ActAuto {
1340            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1341                let request_id = self
1342                    .append_autonomy_budget_approval_request(
1343                        binding,
1344                        &route,
1345                        &event,
1346                        replay_of_event_id.as_ref(),
1347                        reason,
1348                    )
1349                    .await?;
1350                self.emit_autonomy_budget_approval_action_graph(
1351                    binding,
1352                    &route,
1353                    &event,
1354                    &source_node_id,
1355                    replay_of_event_id.as_ref(),
1356                    reason,
1357                    &request_id,
1358                )
1359                .await?;
1360                finish_in_flight(
1361                    binding.id.as_str(),
1362                    binding.version,
1363                    TriggerDispatchOutcome::Dispatched,
1364                )
1365                .await
1366                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1367                decrement_in_flight(&self.state);
1368                self.append_tier_transition_trust_record(
1369                    binding,
1370                    &event,
1371                    replay_of_event_id.as_ref(),
1372                    autonomy_tier,
1373                    AutonomyTier::ActWithApproval,
1374                    reason,
1375                    &request_id,
1376                )
1377                .await?;
1378                self.append_dispatch_trust_record(
1379                    binding,
1380                    &route,
1381                    &event,
1382                    replay_of_event_id.as_ref(),
1383                    autonomy_tier,
1384                    TrustOutcome::Denied,
1385                    "waiting",
1386                    0,
1387                    Some(reason.to_string()),
1388                )
1389                .await?;
1390                return Ok(DispatchOutcome {
1391                    trigger_id: binding.id.as_str().to_string(),
1392                    binding_key: binding.binding_key(),
1393                    event_id: event.id.0,
1394                    attempt_count: 0,
1395                    status: DispatchStatus::Waiting,
1396                    handler_kind: route.kind().to_string(),
1397                    target_uri: route.target_uri(),
1398                    replay_of_event_id,
1399                    result: Some(serde_json::json!({
1400                        "approval_required": true,
1401                        "request_id": request_id,
1402                        "reason": reason,
1403                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1404                    })),
1405                    error: None,
1406                });
1407            }
1408            note_autonomous_decision(binding);
1409        }
1410
1411        let (event, acquired_flow) = match self
1412            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1413            .await?
1414        {
1415            FlowControlOutcome::Dispatch { event, acquired } => {
1416                (*event, Arc::new(AsyncMutex::new(acquired)))
1417            }
1418            FlowControlOutcome::Skip { reason } => {
1419                self.append_skipped_outbox_event(
1420                    binding,
1421                    &route,
1422                    &event,
1423                    replay_of_event_id.as_ref(),
1424                    DispatchSkipStage::FlowControl,
1425                    serde_json::json!({
1426                        "flow_control": reason,
1427                    }),
1428                )
1429                .await?;
1430                finish_in_flight(
1431                    binding.id.as_str(),
1432                    binding.version,
1433                    TriggerDispatchOutcome::Dispatched,
1434                )
1435                .await
1436                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1437                decrement_in_flight(&self.state);
1438                return Ok(DispatchOutcome {
1439                    trigger_id: binding.id.as_str().to_string(),
1440                    binding_key: binding.binding_key(),
1441                    event_id: event.id.0,
1442                    attempt_count: 0,
1443                    status: DispatchStatus::Skipped,
1444                    handler_kind: route.kind().to_string(),
1445                    target_uri: route.target_uri(),
1446                    replay_of_event_id,
1447                    result: Some(serde_json::json!({
1448                        "skipped": true,
1449                        "flow_control": reason,
1450                    })),
1451                    error: None,
1452                });
1453            }
1454        };
1455
1456        let destination_key = destination_circuit_key(&route);
1457        let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1458            DestinationCircuitProbe::Allow { half_open } => {
1459                if half_open {
1460                    if let Some(metrics) = self.metrics.as_ref() {
1461                        metrics.record_backpressure_event("circuit", "half_open_probe");
1462                    }
1463                }
1464                half_open
1465            }
1466            DestinationCircuitProbe::Block { retry_after } => {
1467                if let Some(metrics) = self.metrics.as_ref() {
1468                    metrics.record_backpressure_event("circuit", "fail_fast");
1469                }
1470                let final_error = format!(
1471                    "destination circuit open for {}; retry after {}s",
1472                    destination_key,
1473                    retry_after.as_secs().max(1)
1474                );
1475                self.move_circuit_open_to_dlq(
1476                    binding,
1477                    &route,
1478                    &event,
1479                    replay_of_event_id.as_ref(),
1480                    &final_error,
1481                    &destination_key,
1482                )
1483                .await?;
1484                finish_in_flight(
1485                    binding.id.as_str(),
1486                    binding.version,
1487                    TriggerDispatchOutcome::Dlq,
1488                )
1489                .await
1490                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1491                self.release_flow_control(&acquired_flow).await?;
1492                decrement_in_flight(&self.state);
1493                self.append_dispatch_trust_record(
1494                    binding,
1495                    &route,
1496                    &event,
1497                    replay_of_event_id.as_ref(),
1498                    autonomy_tier,
1499                    TrustOutcome::Failure,
1500                    "dlq",
1501                    0,
1502                    Some(final_error.clone()),
1503                )
1504                .await?;
1505                return Ok(DispatchOutcome {
1506                    trigger_id: binding.id.as_str().to_string(),
1507                    binding_key: binding.binding_key(),
1508                    event_id: event.id.0,
1509                    attempt_count: 0,
1510                    status: DispatchStatus::Dlq,
1511                    handler_kind: route.kind().to_string(),
1512                    target_uri: route.target_uri(),
1513                    replay_of_event_id,
1514                    result: None,
1515                    error: Some(final_error),
1516                });
1517            }
1518        };
1519
1520        let mut previous_retry_node = None;
1521        let max_attempts = binding.retry.max_attempts();
1522        for attempt in 1..=max_attempts {
1523            if dispatch_cancel_requested(
1524                &self.event_log,
1525                &binding_key,
1526                &event.id.0,
1527                replay_of_event_id.as_ref(),
1528            )
1529            .await?
1530            {
1531                finish_in_flight(
1532                    binding.id.as_str(),
1533                    binding.version,
1534                    TriggerDispatchOutcome::Failed,
1535                )
1536                .await
1537                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1538                decrement_in_flight(&self.state);
1539                return Ok(cancelled_dispatch_outcome(
1540                    binding,
1541                    &route,
1542                    &event,
1543                    replay_of_event_id,
1544                    attempt.saturating_sub(1),
1545                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1546                ));
1547            }
1548            maybe_fail_before_outbox();
1549            let attempt_started_instant = Instant::now();
1550            let attempt_started_at_ms = current_unix_ms();
1551            let queue_age_at_start = duration_between_ms(
1552                attempt_started_at_ms,
1553                queue_appended_at_ms(parent_headers.as_ref(), &event),
1554            );
1555            if attempt == 1 {
1556                if let Some(metrics) = self.metrics.as_ref() {
1557                    metrics.record_trigger_queue_age_at_dispatch_start(
1558                        binding.id.as_str(),
1559                        &binding_key,
1560                        event.provider.as_str(),
1561                        tenant_id(&event),
1562                        "started",
1563                        queue_age_at_start,
1564                    );
1565                }
1566            }
1567            tracing::info!(
1568                component = "dispatcher",
1569                lifecycle = "dispatch_started",
1570                trigger_id = %binding.id.as_str(),
1571                binding_key = %binding_key,
1572                event_id = %event.id.0,
1573                attempt,
1574                queue_age_ms = queue_age_at_start.as_millis(),
1575                trace_id = %event.trace_id.0
1576            );
1577            let started_at = now_rfc3339();
1578            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1579            self.append_lifecycle_event(
1580                "DispatchStarted",
1581                &event,
1582                binding,
1583                serde_json::json!({
1584                    "event_id": event.id.0,
1585                    "attempt": attempt,
1586                    "handler_kind": route.kind(),
1587                    "target_uri": route.target_uri(),
1588                    "replay_of_event_id": replay_of_event_id,
1589                }),
1590                replay_of_event_id.as_ref(),
1591            )
1592            .await?;
1593            self.append_topic_event(
1594                TRIGGER_OUTBOX_TOPIC,
1595                "dispatch_started",
1596                &event,
1597                Some(binding),
1598                Some(attempt),
1599                serde_json::json!({
1600                    "event_id": event.id.0,
1601                    "attempt": attempt,
1602                    "trigger_id": binding.id.as_str(),
1603                    "binding_key": binding.binding_key(),
1604                    "handler_kind": route.kind(),
1605                    "target_uri": route.target_uri(),
1606                    "replay_of_event_id": replay_of_event_id,
1607                }),
1608                replay_of_event_id.as_ref(),
1609            )
1610            .await?;
1611
1612            let mut dispatch_edges = Vec::new();
1613            if attempt == 1 {
1614                dispatch_edges.push(RunActionGraphEdgeRecord {
1615                    from_id: source_node_id.clone(),
1616                    to_id: attempt_node_id.clone(),
1617                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1618                    label: binding.when.as_ref().map(|_| "true".to_string()),
1619                });
1620            } else if let Some(retry_node_id) = previous_retry_node.take() {
1621                dispatch_edges.push(RunActionGraphEdgeRecord {
1622                    from_id: retry_node_id,
1623                    to_id: attempt_node_id.clone(),
1624                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1625                    label: Some(format!("attempt {attempt}")),
1626                });
1627            }
1628
1629            self.emit_action_graph(
1630                &event,
1631                vec![RunActionGraphNodeRecord {
1632                    id: attempt_node_id.clone(),
1633                    label: dispatch_node_label(&route),
1634                    kind: dispatch_node_kind(&route).to_string(),
1635                    status: "running".to_string(),
1636                    outcome: format!("attempt_{attempt}"),
1637                    trace_id: Some(event.trace_id.0.clone()),
1638                    stage_id: None,
1639                    node_id: None,
1640                    worker_id: None,
1641                    run_id: None,
1642                    run_path: None,
1643                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1644                }],
1645                dispatch_edges,
1646                serde_json::json!({
1647                    "source": "dispatcher",
1648                    "trigger_id": binding.id.as_str(),
1649                    "binding_key": binding.binding_key(),
1650                    "event_id": event.id.0,
1651                    "attempt": attempt,
1652                    "handler_kind": route.kind(),
1653                    "target_uri": route.target_uri(),
1654                    "target_agent": dispatch_target_agent(&route),
1655                    "replay_of_event_id": replay_of_event_id,
1656                }),
1657            )
1658            .await?;
1659
1660            let result = self
1661                .dispatch_once(
1662                    binding,
1663                    &route,
1664                    &event,
1665                    autonomy_tier,
1666                    Some(DispatchWaitLease::new(
1667                        self.state.clone(),
1668                        acquired_flow.clone(),
1669                    )),
1670                    &mut self.cancel_tx.subscribe(),
1671                )
1672                .await;
1673            let attempt_runtime = attempt_started_instant.elapsed();
1674            let attempt_status = dispatch_result_status(&result);
1675            if let Some(metrics) = self.metrics.as_ref() {
1676                metrics.record_trigger_dispatch_runtime(
1677                    binding.id.as_str(),
1678                    &binding_key,
1679                    event.provider.as_str(),
1680                    tenant_id(&event),
1681                    attempt_status,
1682                    attempt_runtime,
1683                );
1684            }
1685            tracing::info!(
1686                component = "dispatcher",
1687                lifecycle = "handler_completed",
1688                trigger_id = %binding.id.as_str(),
1689                binding_key = %binding_key,
1690                event_id = %event.id.0,
1691                attempt,
1692                status = attempt_status,
1693                runtime_ms = attempt_runtime.as_millis(),
1694                trace_id = %event.trace_id.0
1695            );
1696            let completed_at = now_rfc3339();
1697
1698            match result {
1699                Ok(result) => {
1700                    let attempt_record = DispatchAttemptRecord {
1701                        trigger_id: binding.id.as_str().to_string(),
1702                        binding_key: binding.binding_key(),
1703                        event_id: event.id.0.clone(),
1704                        attempt,
1705                        handler_kind: route.kind().to_string(),
1706                        started_at,
1707                        completed_at,
1708                        outcome: "success".to_string(),
1709                        error_msg: None,
1710                    };
1711                    attempts.push(attempt_record.clone());
1712                    self.append_attempt_record(
1713                        &event,
1714                        binding,
1715                        &attempt_record,
1716                        replay_of_event_id.as_ref(),
1717                    )
1718                    .await?;
1719                    self.append_lifecycle_event(
1720                        "DispatchSucceeded",
1721                        &event,
1722                        binding,
1723                        serde_json::json!({
1724                            "event_id": event.id.0,
1725                            "attempt": attempt,
1726                            "handler_kind": route.kind(),
1727                            "target_uri": route.target_uri(),
1728                            "result": result,
1729                            "replay_of_event_id": replay_of_event_id,
1730                        }),
1731                        replay_of_event_id.as_ref(),
1732                    )
1733                    .await?;
1734                    self.append_topic_event(
1735                        TRIGGER_OUTBOX_TOPIC,
1736                        "dispatch_succeeded",
1737                        &event,
1738                        Some(binding),
1739                        Some(attempt),
1740                        serde_json::json!({
1741                            "event_id": event.id.0,
1742                            "attempt": attempt,
1743                            "trigger_id": binding.id.as_str(),
1744                            "binding_key": binding.binding_key(),
1745                            "handler_kind": route.kind(),
1746                            "target_uri": route.target_uri(),
1747                            "result": result,
1748                            "replay_of_event_id": replay_of_event_id,
1749                        }),
1750                        replay_of_event_id.as_ref(),
1751                    )
1752                    .await?;
1753                    self.emit_action_graph(
1754                        &event,
1755                        vec![RunActionGraphNodeRecord {
1756                            id: attempt_node_id.clone(),
1757                            label: dispatch_node_label(&route),
1758                            kind: dispatch_node_kind(&route).to_string(),
1759                            status: "completed".to_string(),
1760                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1761                            trace_id: Some(event.trace_id.0.clone()),
1762                            stage_id: None,
1763                            node_id: None,
1764                            worker_id: None,
1765                            run_id: None,
1766                            run_path: None,
1767                            metadata: dispatch_success_metadata(
1768                                &route, binding, &event, attempt, &result,
1769                            ),
1770                        }],
1771                        Vec::new(),
1772                        serde_json::json!({
1773                            "source": "dispatcher",
1774                            "trigger_id": binding.id.as_str(),
1775                            "binding_key": binding.binding_key(),
1776                            "event_id": event.id.0,
1777                            "attempt": attempt,
1778                            "handler_kind": route.kind(),
1779                            "target_uri": route.target_uri(),
1780                            "result": result,
1781                            "replay_of_event_id": replay_of_event_id,
1782                        }),
1783                    )
1784                    .await?;
1785                    self.state
1786                        .destination_circuits
1787                        .record_success(&destination_key);
1788                    if half_open_probe {
1789                        if let Some(metrics) = self.metrics.as_ref() {
1790                            metrics.record_backpressure_event("circuit", "closed");
1791                        }
1792                    }
1793                    finish_in_flight(
1794                        binding.id.as_str(),
1795                        binding.version,
1796                        TriggerDispatchOutcome::Dispatched,
1797                    )
1798                    .await
1799                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1800                    self.release_flow_control(&acquired_flow).await?;
1801                    decrement_in_flight(&self.state);
1802                    self.append_dispatch_trust_record(
1803                        binding,
1804                        &route,
1805                        &event,
1806                        replay_of_event_id.as_ref(),
1807                        autonomy_tier,
1808                        TrustOutcome::Success,
1809                        "succeeded",
1810                        attempt,
1811                        None,
1812                    )
1813                    .await?;
1814                    return Ok(DispatchOutcome {
1815                        trigger_id: binding.id.as_str().to_string(),
1816                        binding_key: binding.binding_key(),
1817                        event_id: event.id.0,
1818                        attempt_count: attempt,
1819                        status: DispatchStatus::Succeeded,
1820                        handler_kind: route.kind().to_string(),
1821                        target_uri: route.target_uri(),
1822                        replay_of_event_id,
1823                        result: Some(result),
1824                        error: None,
1825                    });
1826                }
1827                Err(error) => {
1828                    let attempt_record = DispatchAttemptRecord {
1829                        trigger_id: binding.id.as_str().to_string(),
1830                        binding_key: binding.binding_key(),
1831                        event_id: event.id.0.clone(),
1832                        attempt,
1833                        handler_kind: route.kind().to_string(),
1834                        started_at,
1835                        completed_at,
1836                        outcome: dispatch_error_label(&error).to_string(),
1837                        error_msg: Some(error.to_string()),
1838                    };
1839                    attempts.push(attempt_record.clone());
1840                    self.append_attempt_record(
1841                        &event,
1842                        binding,
1843                        &attempt_record,
1844                        replay_of_event_id.as_ref(),
1845                    )
1846                    .await?;
1847                    if let DispatchError::Waiting(message) = &error {
1848                        self.append_lifecycle_event(
1849                            "DispatchWaiting",
1850                            &event,
1851                            binding,
1852                            serde_json::json!({
1853                                "event_id": event.id.0,
1854                                "attempt": attempt,
1855                                "handler_kind": route.kind(),
1856                                "target_uri": route.target_uri(),
1857                                "message": message,
1858                                "replay_of_event_id": replay_of_event_id,
1859                            }),
1860                            replay_of_event_id.as_ref(),
1861                        )
1862                        .await?;
1863                        self.append_topic_event(
1864                            TRIGGER_OUTBOX_TOPIC,
1865                            "dispatch_waiting",
1866                            &event,
1867                            Some(binding),
1868                            Some(attempt),
1869                            serde_json::json!({
1870                                "event_id": event.id.0,
1871                                "attempt": attempt,
1872                                "trigger_id": binding.id.as_str(),
1873                                "binding_key": binding.binding_key(),
1874                                "handler_kind": route.kind(),
1875                                "target_uri": route.target_uri(),
1876                                "message": message,
1877                                "replay_of_event_id": replay_of_event_id,
1878                            }),
1879                            replay_of_event_id.as_ref(),
1880                        )
1881                        .await?;
1882                        finish_in_flight(
1883                            binding.id.as_str(),
1884                            binding.version,
1885                            TriggerDispatchOutcome::Dispatched,
1886                        )
1887                        .await
1888                        .map_err(|registry_error| {
1889                            DispatchError::Registry(registry_error.to_string())
1890                        })?;
1891                        self.release_flow_control(&acquired_flow).await?;
1892                        decrement_in_flight(&self.state);
1893                        return Ok(DispatchOutcome {
1894                            trigger_id: binding.id.as_str().to_string(),
1895                            binding_key: binding.binding_key(),
1896                            event_id: event.id.0,
1897                            attempt_count: attempt,
1898                            status: DispatchStatus::Waiting,
1899                            handler_kind: route.kind().to_string(),
1900                            target_uri: route.target_uri(),
1901                            replay_of_event_id,
1902                            result: Some(serde_json::json!({
1903                                "waiting": true,
1904                                "message": message,
1905                            })),
1906                            error: None,
1907                        });
1908                    }
1909
1910                    self.append_lifecycle_event(
1911                        "DispatchFailed",
1912                        &event,
1913                        binding,
1914                        serde_json::json!({
1915                            "event_id": event.id.0,
1916                            "attempt": attempt,
1917                            "handler_kind": route.kind(),
1918                            "target_uri": route.target_uri(),
1919                            "error": error.to_string(),
1920                            "replay_of_event_id": replay_of_event_id,
1921                        }),
1922                        replay_of_event_id.as_ref(),
1923                    )
1924                    .await?;
1925                    self.append_topic_event(
1926                        TRIGGER_OUTBOX_TOPIC,
1927                        "dispatch_failed",
1928                        &event,
1929                        Some(binding),
1930                        Some(attempt),
1931                        serde_json::json!({
1932                            "event_id": event.id.0,
1933                            "attempt": attempt,
1934                            "trigger_id": binding.id.as_str(),
1935                            "binding_key": binding.binding_key(),
1936                            "handler_kind": route.kind(),
1937                            "target_uri": route.target_uri(),
1938                            "error": error.to_string(),
1939                            "replay_of_event_id": replay_of_event_id,
1940                        }),
1941                        replay_of_event_id.as_ref(),
1942                    )
1943                    .await?;
1944                    self.emit_action_graph(
1945                        &event,
1946                        vec![RunActionGraphNodeRecord {
1947                            id: attempt_node_id.clone(),
1948                            label: dispatch_node_label(&route),
1949                            kind: dispatch_node_kind(&route).to_string(),
1950                            status: if matches!(error, DispatchError::Cancelled(_)) {
1951                                "cancelled".to_string()
1952                            } else {
1953                                "failed".to_string()
1954                            },
1955                            outcome: dispatch_error_label(&error).to_string(),
1956                            trace_id: Some(event.trace_id.0.clone()),
1957                            stage_id: None,
1958                            node_id: None,
1959                            worker_id: None,
1960                            run_id: None,
1961                            run_path: None,
1962                            metadata: dispatch_error_metadata(
1963                                &route, binding, &event, attempt, &error,
1964                            ),
1965                        }],
1966                        Vec::new(),
1967                        serde_json::json!({
1968                            "source": "dispatcher",
1969                            "trigger_id": binding.id.as_str(),
1970                            "binding_key": binding.binding_key(),
1971                            "event_id": event.id.0,
1972                            "attempt": attempt,
1973                            "handler_kind": route.kind(),
1974                            "target_uri": route.target_uri(),
1975                            "error": error.to_string(),
1976                            "replay_of_event_id": replay_of_event_id,
1977                        }),
1978                    )
1979                    .await?;
1980
1981                    let circuit_opened = if error.retryable() {
1982                        self.state
1983                            .destination_circuits
1984                            .record_failure(&destination_key)
1985                    } else {
1986                        false
1987                    };
1988                    if circuit_opened {
1989                        if let Some(metrics) = self.metrics.as_ref() {
1990                            metrics.record_backpressure_event("circuit", "opened");
1991                            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1992                            metrics.record_trigger_accepted_to_dlq(
1993                                binding.id.as_str(),
1994                                &binding_key,
1995                                event.provider.as_str(),
1996                                tenant_id(&event),
1997                                "circuit_open",
1998                                duration_between_ms(
1999                                    current_unix_ms(),
2000                                    accepted_at_ms(parent_headers.as_ref(), &event),
2001                                ),
2002                            );
2003                        }
2004                        let final_error = format!(
2005                            "destination circuit opened for {} after {} consecutive failures: {}",
2006                            destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
2007                        );
2008                        let dlq_entry = DlqEntry {
2009                            trigger_id: binding.id.as_str().to_string(),
2010                            binding_key: binding.binding_key(),
2011                            event: event.clone(),
2012                            attempt_count: attempt,
2013                            final_error: final_error.clone(),
2014                            attempts: attempts.clone(),
2015                        };
2016                        self.state
2017                            .dlq
2018                            .lock()
2019                            .expect("dispatcher dlq poisoned")
2020                            .push(dlq_entry.clone());
2021                        self.append_lifecycle_event(
2022                            "DlqMoved",
2023                            &event,
2024                            binding,
2025                            serde_json::json!({
2026                                "event_id": event.id.0,
2027                                "attempt_count": attempt,
2028                                "final_error": dlq_entry.final_error,
2029                                "reason": "circuit_open",
2030                                "destination": destination_key,
2031                                "replay_of_event_id": replay_of_event_id,
2032                            }),
2033                            replay_of_event_id.as_ref(),
2034                        )
2035                        .await?;
2036                        self.append_topic_event(
2037                            TRIGGER_DLQ_TOPIC,
2038                            "dlq_moved",
2039                            &event,
2040                            Some(binding),
2041                            Some(attempt),
2042                            serde_json::to_value(&dlq_entry).map_err(|serde_error| {
2043                                DispatchError::Serde(serde_error.to_string())
2044                            })?,
2045                            replay_of_event_id.as_ref(),
2046                        )
2047                        .await?;
2048                        finish_in_flight(
2049                            binding.id.as_str(),
2050                            binding.version,
2051                            TriggerDispatchOutcome::Dlq,
2052                        )
2053                        .await
2054                        .map_err(|registry_error| {
2055                            DispatchError::Registry(registry_error.to_string())
2056                        })?;
2057                        self.release_flow_control(&acquired_flow).await?;
2058                        decrement_in_flight(&self.state);
2059                        self.append_dispatch_trust_record(
2060                            binding,
2061                            &route,
2062                            &event,
2063                            replay_of_event_id.as_ref(),
2064                            autonomy_tier,
2065                            TrustOutcome::Failure,
2066                            "dlq",
2067                            attempt,
2068                            Some(final_error.clone()),
2069                        )
2070                        .await?;
2071                        return Ok(DispatchOutcome {
2072                            trigger_id: binding.id.as_str().to_string(),
2073                            binding_key: binding.binding_key(),
2074                            event_id: event.id.0,
2075                            attempt_count: attempt,
2076                            status: DispatchStatus::Dlq,
2077                            handler_kind: route.kind().to_string(),
2078                            target_uri: route.target_uri(),
2079                            replay_of_event_id,
2080                            result: None,
2081                            error: Some(final_error),
2082                        });
2083                    }
2084
2085                    if !error.retryable() {
2086                        finish_in_flight(
2087                            binding.id.as_str(),
2088                            binding.version,
2089                            TriggerDispatchOutcome::Failed,
2090                        )
2091                        .await
2092                        .map_err(|registry_error| {
2093                            DispatchError::Registry(registry_error.to_string())
2094                        })?;
2095                        self.release_flow_control(&acquired_flow).await?;
2096                        decrement_in_flight(&self.state);
2097                        let trust_outcome = match error {
2098                            DispatchError::Denied(_) => TrustOutcome::Denied,
2099                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
2100                            _ => TrustOutcome::Failure,
2101                        };
2102                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2103                            "cancelled"
2104                        } else {
2105                            "failed"
2106                        };
2107                        self.append_dispatch_trust_record(
2108                            binding,
2109                            &route,
2110                            &event,
2111                            replay_of_event_id.as_ref(),
2112                            autonomy_tier,
2113                            trust_outcome,
2114                            terminal_status,
2115                            attempt,
2116                            Some(error.to_string()),
2117                        )
2118                        .await?;
2119                        return Ok(DispatchOutcome {
2120                            trigger_id: binding.id.as_str().to_string(),
2121                            binding_key: binding.binding_key(),
2122                            event_id: event.id.0,
2123                            attempt_count: attempt,
2124                            status: if matches!(error, DispatchError::Cancelled(_)) {
2125                                DispatchStatus::Cancelled
2126                            } else {
2127                                DispatchStatus::Failed
2128                            },
2129                            handler_kind: route.kind().to_string(),
2130                            target_uri: route.target_uri(),
2131                            replay_of_event_id,
2132                            result: None,
2133                            error: Some(error.to_string()),
2134                        });
2135                    }
2136
2137                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2138                        if let Some(metrics) = self.metrics.as_ref() {
2139                            metrics.record_retry_scheduled();
2140                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2141                            metrics.record_trigger_retry_delay(
2142                                binding.id.as_str(),
2143                                &binding_key,
2144                                event.provider.as_str(),
2145                                tenant_id(&event),
2146                                "scheduled",
2147                                delay,
2148                            );
2149                        }
2150                        tracing::info!(
2151                            component = "dispatcher",
2152                            lifecycle = "retry_scheduled",
2153                            trigger_id = %binding.id.as_str(),
2154                            binding_key = %binding_key,
2155                            event_id = %event.id.0,
2156                            attempt = attempt + 1,
2157                            delay_ms = delay.as_millis(),
2158                            trace_id = %event.trace_id.0
2159                        );
2160                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2161                        previous_retry_node = Some(retry_node_id.clone());
2162                        self.emit_action_graph(
2163                            &event,
2164                            vec![RunActionGraphNodeRecord {
2165                                id: retry_node_id.clone(),
2166                                label: format!("retry in {}ms", delay.as_millis()),
2167                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2168                                status: "scheduled".to_string(),
2169                                outcome: format!("attempt_{}", attempt + 1),
2170                                trace_id: Some(event.trace_id.0.clone()),
2171                                stage_id: None,
2172                                node_id: None,
2173                                worker_id: None,
2174                                run_id: None,
2175                                run_path: None,
2176                                metadata: retry_node_metadata(
2177                                    binding,
2178                                    &event,
2179                                    attempt + 1,
2180                                    delay,
2181                                    &error,
2182                                ),
2183                            }],
2184                            vec![RunActionGraphEdgeRecord {
2185                                from_id: attempt_node_id,
2186                                to_id: retry_node_id.clone(),
2187                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2188                                label: Some(format!("attempt {}", attempt + 1)),
2189                            }],
2190                            serde_json::json!({
2191                                "source": "dispatcher",
2192                                "trigger_id": binding.id.as_str(),
2193                                "binding_key": binding.binding_key(),
2194                                "event_id": event.id.0,
2195                                "attempt": attempt + 1,
2196                                "delay_ms": delay.as_millis(),
2197                                "replay_of_event_id": replay_of_event_id,
2198                            }),
2199                        )
2200                        .await?;
2201                        self.append_lifecycle_event(
2202                            "RetryScheduled",
2203                            &event,
2204                            binding,
2205                            serde_json::json!({
2206                                "event_id": event.id.0,
2207                                "attempt": attempt + 1,
2208                                "delay_ms": delay.as_millis(),
2209                                "error": error.to_string(),
2210                                "replay_of_event_id": replay_of_event_id,
2211                            }),
2212                            replay_of_event_id.as_ref(),
2213                        )
2214                        .await?;
2215                        self.append_topic_event(
2216                            TRIGGER_ATTEMPTS_TOPIC,
2217                            "retry_scheduled",
2218                            &event,
2219                            Some(binding),
2220                            Some(attempt + 1),
2221                            serde_json::json!({
2222                                "event_id": event.id.0,
2223                                "attempt": attempt + 1,
2224                                "trigger_id": binding.id.as_str(),
2225                                "binding_key": binding.binding_key(),
2226                                "delay_ms": delay.as_millis(),
2227                                "error": error.to_string(),
2228                                "replay_of_event_id": replay_of_event_id,
2229                            }),
2230                            replay_of_event_id.as_ref(),
2231                        )
2232                        .await?;
2233                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2234                        let sleep_result = sleep_or_cancel_or_request(
2235                            &self.event_log,
2236                            delay,
2237                            &binding_key,
2238                            &event.id.0,
2239                            replay_of_event_id.as_ref(),
2240                            &mut self.cancel_tx.subscribe(),
2241                        )
2242                        .await;
2243                        decrement_retry_queue_depth(&self.state);
2244                        if sleep_result.is_err() {
2245                            finish_in_flight(
2246                                binding.id.as_str(),
2247                                binding.version,
2248                                TriggerDispatchOutcome::Failed,
2249                            )
2250                            .await
2251                            .map_err(|registry_error| {
2252                                DispatchError::Registry(registry_error.to_string())
2253                            })?;
2254                            self.release_flow_control(&acquired_flow).await?;
2255                            decrement_in_flight(&self.state);
2256                            self.append_dispatch_trust_record(
2257                                binding,
2258                                &route,
2259                                &event,
2260                                replay_of_event_id.as_ref(),
2261                                autonomy_tier,
2262                                TrustOutcome::Failure,
2263                                "cancelled",
2264                                attempt,
2265                                Some("dispatcher shutdown cancelled retry wait".to_string()),
2266                            )
2267                            .await?;
2268                            return Ok(DispatchOutcome {
2269                                trigger_id: binding.id.as_str().to_string(),
2270                                binding_key: binding.binding_key(),
2271                                event_id: event.id.0,
2272                                attempt_count: attempt,
2273                                status: DispatchStatus::Cancelled,
2274                                handler_kind: route.kind().to_string(),
2275                                target_uri: route.target_uri(),
2276                                replay_of_event_id,
2277                                result: None,
2278                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2279                            });
2280                        }
2281                        continue;
2282                    }
2283
2284                    let final_error = error.to_string();
2285                    let dlq_entry = DlqEntry {
2286                        trigger_id: binding.id.as_str().to_string(),
2287                        binding_key: binding.binding_key(),
2288                        event: event.clone(),
2289                        attempt_count: attempt,
2290                        final_error: final_error.clone(),
2291                        attempts: attempts.clone(),
2292                    };
2293                    self.state
2294                        .dlq
2295                        .lock()
2296                        .expect("dispatcher dlq poisoned")
2297                        .push(dlq_entry.clone());
2298                    if let Some(metrics) = self.metrics.as_ref() {
2299                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2300                        metrics.record_trigger_accepted_to_dlq(
2301                            binding.id.as_str(),
2302                            &binding_key,
2303                            event.provider.as_str(),
2304                            tenant_id(&event),
2305                            "retry_exhausted",
2306                            duration_between_ms(
2307                                current_unix_ms(),
2308                                accepted_at_ms(parent_headers.as_ref(), &event),
2309                            ),
2310                        );
2311                    }
2312                    tracing::info!(
2313                        component = "dispatcher",
2314                        lifecycle = "dlq_moved",
2315                        trigger_id = %binding.id.as_str(),
2316                        binding_key = %binding_key,
2317                        event_id = %event.id.0,
2318                        attempt_count = attempt,
2319                        reason = "retry_exhausted",
2320                        trace_id = %event.trace_id.0
2321                    );
2322                    self.emit_action_graph(
2323                        &event,
2324                        vec![RunActionGraphNodeRecord {
2325                            id: format!("dlq:{binding_key}:{}", event.id.0),
2326                            label: binding.id.as_str().to_string(),
2327                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2328                            status: "queued".to_string(),
2329                            outcome: "retry_exhausted".to_string(),
2330                            trace_id: Some(event.trace_id.0.clone()),
2331                            stage_id: None,
2332                            node_id: None,
2333                            worker_id: None,
2334                            run_id: None,
2335                            run_path: None,
2336                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2337                        }],
2338                        vec![RunActionGraphEdgeRecord {
2339                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2340                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
2341                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2342                            label: Some(format!("{attempt} attempts")),
2343                        }],
2344                        serde_json::json!({
2345                            "source": "dispatcher",
2346                            "trigger_id": binding.id.as_str(),
2347                            "binding_key": binding.binding_key(),
2348                            "event_id": event.id.0,
2349                            "attempt_count": attempt,
2350                            "final_error": final_error,
2351                            "replay_of_event_id": replay_of_event_id,
2352                        }),
2353                    )
2354                    .await?;
2355                    self.append_lifecycle_event(
2356                        "DlqMoved",
2357                        &event,
2358                        binding,
2359                        serde_json::json!({
2360                            "event_id": event.id.0,
2361                            "attempt_count": attempt,
2362                            "final_error": dlq_entry.final_error,
2363                            "replay_of_event_id": replay_of_event_id,
2364                        }),
2365                        replay_of_event_id.as_ref(),
2366                    )
2367                    .await?;
2368                    self.append_topic_event(
2369                        TRIGGER_DLQ_TOPIC,
2370                        "dlq_moved",
2371                        &event,
2372                        Some(binding),
2373                        Some(attempt),
2374                        serde_json::to_value(&dlq_entry)
2375                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2376                        replay_of_event_id.as_ref(),
2377                    )
2378                    .await?;
2379                    finish_in_flight(
2380                        binding.id.as_str(),
2381                        binding.version,
2382                        TriggerDispatchOutcome::Dlq,
2383                    )
2384                    .await
2385                    .map_err(|registry_error| {
2386                        DispatchError::Registry(registry_error.to_string())
2387                    })?;
2388                    self.release_flow_control(&acquired_flow).await?;
2389                    decrement_in_flight(&self.state);
2390                    self.append_dispatch_trust_record(
2391                        binding,
2392                        &route,
2393                        &event,
2394                        replay_of_event_id.as_ref(),
2395                        autonomy_tier,
2396                        TrustOutcome::Failure,
2397                        "dlq",
2398                        attempt,
2399                        Some(error.to_string()),
2400                    )
2401                    .await?;
2402                    return Ok(DispatchOutcome {
2403                        trigger_id: binding.id.as_str().to_string(),
2404                        binding_key: binding.binding_key(),
2405                        event_id: event.id.0,
2406                        attempt_count: attempt,
2407                        status: DispatchStatus::Dlq,
2408                        handler_kind: route.kind().to_string(),
2409                        target_uri: route.target_uri(),
2410                        replay_of_event_id,
2411                        result: None,
2412                        error: Some(error.to_string()),
2413                    });
2414                }
2415            }
2416        }
2417
2418        finish_in_flight(
2419            binding.id.as_str(),
2420            binding.version,
2421            TriggerDispatchOutcome::Failed,
2422        )
2423        .await
2424        .map_err(|error| DispatchError::Registry(error.to_string()))?;
2425        self.release_flow_control(&acquired_flow).await?;
2426        decrement_in_flight(&self.state);
2427        self.append_dispatch_trust_record(
2428            binding,
2429            &route,
2430            &event,
2431            replay_of_event_id.as_ref(),
2432            autonomy_tier,
2433            TrustOutcome::Failure,
2434            "failed",
2435            max_attempts,
2436            Some("dispatch exhausted without terminal outcome".to_string()),
2437        )
2438        .await?;
2439        Ok(DispatchOutcome {
2440            trigger_id: binding.id.as_str().to_string(),
2441            binding_key: binding.binding_key(),
2442            event_id: event.id.0,
2443            attempt_count: max_attempts,
2444            status: DispatchStatus::Failed,
2445            handler_kind: route.kind().to_string(),
2446            target_uri: route.target_uri(),
2447            replay_of_event_id,
2448            result: None,
2449            error: Some("dispatch exhausted without terminal outcome".to_string()),
2450        })
2451    }
2452
2453    async fn dispatch_once(
2454        &self,
2455        binding: &TriggerBinding,
2456        route: &DispatchUri,
2457        event: &TriggerEvent,
2458        autonomy_tier: AutonomyTier,
2459        wait_lease: Option<DispatchWaitLease>,
2460        cancel_rx: &mut broadcast::Receiver<()>,
2461    ) -> Result<serde_json::Value, DispatchError> {
2462        match route {
2463            DispatchUri::Local { .. } => {
2464                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2465                    return Err(DispatchError::Local(format!(
2466                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2467                        binding.id.as_str()
2468                    )));
2469                };
2470                let value = self
2471                    .invoke_vm_callable(
2472                        closure,
2473                        &binding.binding_key(),
2474                        event,
2475                        None,
2476                        binding.id.as_str(),
2477                        &format!("{}.{}", event.provider.as_str(), event.kind),
2478                        autonomy_tier,
2479                        wait_lease,
2480                        cancel_rx,
2481                    )
2482                    .await?;
2483                Ok(vm_value_to_json(&value))
2484            }
2485            DispatchUri::A2a {
2486                target,
2487                allow_cleartext,
2488            } => {
2489                if self.state.shutting_down.load(Ordering::SeqCst) {
2490                    return Err(DispatchError::Cancelled(
2491                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2492                    ));
2493                }
2494                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2495                    target,
2496                    *allow_cleartext,
2497                    binding.id.as_str(),
2498                    &binding.binding_key(),
2499                    event,
2500                    cancel_rx,
2501                )
2502                .await
2503                .map_err(|error| match error {
2504                    crate::a2a::A2aClientError::Cancelled(message) => {
2505                        DispatchError::Cancelled(message)
2506                    }
2507                    other => DispatchError::A2a(other.to_string()),
2508                })?;
2509                match ack {
2510                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2511                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2512                }
2513            }
2514            DispatchUri::Worker { queue } => {
2515                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2516                    .enqueue(&crate::WorkerQueueJob {
2517                        queue: queue.clone(),
2518                        trigger_id: binding.id.as_str().to_string(),
2519                        binding_key: binding.binding_key(),
2520                        binding_version: binding.version,
2521                        event: event.clone(),
2522                        replay_of_event_id: current_dispatch_context()
2523                            .and_then(|context| context.replay_of_event_id),
2524                        priority: worker_queue_priority(binding, event),
2525                    })
2526                    .await
2527                    .map_err(DispatchError::from)?;
2528                Ok(serde_json::to_value(receipt)
2529                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2530            }
2531        }
2532    }
2533
2534    async fn apply_flow_control(
2535        &self,
2536        binding: &TriggerBinding,
2537        event: &TriggerEvent,
2538        replay_of_event_id: Option<&String>,
2539    ) -> Result<FlowControlOutcome, DispatchError> {
2540        let flow = &binding.flow_control;
2541        let mut managed_event = event.clone();
2542
2543        if let Some(batch) = &flow.batch {
2544            let gate = self
2545                .resolve_flow_gate(
2546                    &binding.binding_key(),
2547                    batch.key.as_ref(),
2548                    &managed_event,
2549                    replay_of_event_id,
2550                )
2551                .await?;
2552            match self
2553                .state
2554                .flow_control
2555                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2556                .await
2557                .map_err(DispatchError::from)?
2558            {
2559                BatchDecision::Dispatch(events) => {
2560                    managed_event = build_batched_event(events)?;
2561                }
2562                BatchDecision::Merged => {
2563                    return Ok(FlowControlOutcome::Skip {
2564                        reason: "batch_merged".to_string(),
2565                    })
2566                }
2567            }
2568        }
2569
2570        if let Some(debounce) = &flow.debounce {
2571            let gate = self
2572                .resolve_flow_gate(
2573                    &binding.binding_key(),
2574                    Some(&debounce.key),
2575                    &managed_event,
2576                    replay_of_event_id,
2577                )
2578                .await?;
2579            let latest = self
2580                .state
2581                .flow_control
2582                .debounce(&gate, debounce.period)
2583                .await
2584                .map_err(DispatchError::from)?;
2585            if !latest {
2586                return Ok(FlowControlOutcome::Skip {
2587                    reason: "debounced".to_string(),
2588                });
2589            }
2590        }
2591
2592        if let Some(rate_limit) = &flow.rate_limit {
2593            let gate = self
2594                .resolve_flow_gate(
2595                    &binding.binding_key(),
2596                    rate_limit.key.as_ref(),
2597                    &managed_event,
2598                    replay_of_event_id,
2599                )
2600                .await?;
2601            let allowed = self
2602                .state
2603                .flow_control
2604                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2605                .await
2606                .map_err(DispatchError::from)?;
2607            if !allowed {
2608                return Ok(FlowControlOutcome::Skip {
2609                    reason: "rate_limited".to_string(),
2610                });
2611            }
2612        }
2613
2614        if let Some(throttle) = &flow.throttle {
2615            let gate = self
2616                .resolve_flow_gate(
2617                    &binding.binding_key(),
2618                    throttle.key.as_ref(),
2619                    &managed_event,
2620                    replay_of_event_id,
2621                )
2622                .await?;
2623            self.state
2624                .flow_control
2625                .wait_for_throttle(&gate, throttle.period, throttle.max)
2626                .await
2627                .map_err(DispatchError::from)?;
2628        }
2629
2630        let mut acquired = AcquiredFlowControl::default();
2631        if let Some(singleton) = &flow.singleton {
2632            let gate = self
2633                .resolve_flow_gate(
2634                    &binding.binding_key(),
2635                    singleton.key.as_ref(),
2636                    &managed_event,
2637                    replay_of_event_id,
2638                )
2639                .await?;
2640            let acquired_singleton = self
2641                .state
2642                .flow_control
2643                .try_acquire_singleton(&gate)
2644                .await
2645                .map_err(DispatchError::from)?;
2646            if !acquired_singleton {
2647                return Ok(FlowControlOutcome::Skip {
2648                    reason: "singleton_active".to_string(),
2649                });
2650            }
2651            acquired.singleton = Some(SingletonLease { gate, held: true });
2652        }
2653
2654        if let Some(concurrency) = &flow.concurrency {
2655            let gate = self
2656                .resolve_flow_gate(
2657                    &binding.binding_key(),
2658                    concurrency.key.as_ref(),
2659                    &managed_event,
2660                    replay_of_event_id,
2661                )
2662                .await?;
2663            let priority_rank = self
2664                .resolve_priority_rank(
2665                    &binding.binding_key(),
2666                    flow.priority.as_ref(),
2667                    &managed_event,
2668                    replay_of_event_id,
2669                )
2670                .await?;
2671            let permit = self
2672                .state
2673                .flow_control
2674                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2675                .await
2676                .map_err(DispatchError::from)?;
2677            acquired.concurrency = Some(ConcurrencyLease {
2678                gate,
2679                max: concurrency.max,
2680                priority_rank,
2681                permit: Some(permit),
2682            });
2683        }
2684
2685        Ok(FlowControlOutcome::Dispatch {
2686            event: Box::new(managed_event),
2687            acquired,
2688        })
2689    }
2690
2691    async fn release_flow_control(
2692        &self,
2693        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2694    ) -> Result<(), DispatchError> {
2695        let (singleton_gate, concurrency_permit) = {
2696            let mut acquired = acquired.lock().await;
2697            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2698                if lease.held {
2699                    lease.held = false;
2700                    Some(lease.gate.clone())
2701                } else {
2702                    None
2703                }
2704            });
2705            let concurrency_permit = acquired
2706                .concurrency
2707                .as_mut()
2708                .and_then(|lease| lease.permit.take());
2709            (singleton_gate, concurrency_permit)
2710        };
2711        if let Some(gate) = singleton_gate {
2712            self.state
2713                .flow_control
2714                .release_singleton(&gate)
2715                .await
2716                .map_err(DispatchError::from)?;
2717        }
2718        if let Some(permit) = concurrency_permit {
2719            self.state
2720                .flow_control
2721                .release_concurrency(permit)
2722                .await
2723                .map_err(DispatchError::from)?;
2724        }
2725        Ok(())
2726    }
2727
2728    async fn resolve_flow_gate(
2729        &self,
2730        binding_key: &str,
2731        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2732        event: &TriggerEvent,
2733        replay_of_event_id: Option<&String>,
2734    ) -> Result<String, DispatchError> {
2735        let key = match expr {
2736            Some(expr) => {
2737                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2738                    .await?
2739            }
2740            None => "_global".to_string(),
2741        };
2742        Ok(format!("{binding_key}:{key}"))
2743    }
2744
2745    async fn resolve_priority_rank(
2746        &self,
2747        binding_key: &str,
2748        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2749        event: &TriggerEvent,
2750        replay_of_event_id: Option<&String>,
2751    ) -> Result<usize, DispatchError> {
2752        let Some(priority) = priority else {
2753            return Ok(0);
2754        };
2755        let value = self
2756            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2757            .await?;
2758        Ok(priority
2759            .order
2760            .iter()
2761            .position(|candidate| candidate == &value)
2762            .unwrap_or(priority.order.len()))
2763    }
2764
2765    async fn evaluate_flow_expression(
2766        &self,
2767        binding_key: &str,
2768        expr: &crate::triggers::TriggerExpressionSpec,
2769        event: &TriggerEvent,
2770        replay_of_event_id: Option<&String>,
2771    ) -> Result<String, DispatchError> {
2772        let value = self
2773            .invoke_vm_callable(
2774                &expr.closure,
2775                binding_key,
2776                event,
2777                replay_of_event_id,
2778                "",
2779                "flow_control",
2780                AutonomyTier::Suggest,
2781                None,
2782                &mut self.cancel_tx.subscribe(),
2783            )
2784            .await?;
2785        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2786    }
2787
2788    #[allow(clippy::too_many_arguments)]
2789    async fn invoke_vm_callable(
2790        &self,
2791        closure: &crate::value::VmClosure,
2792        binding_key: &str,
2793        event: &TriggerEvent,
2794        replay_of_event_id: Option<&String>,
2795        agent_id: &str,
2796        action: &str,
2797        autonomy_tier: AutonomyTier,
2798        wait_lease: Option<DispatchWaitLease>,
2799        cancel_rx: &mut broadcast::Receiver<()>,
2800    ) -> Result<VmValue, DispatchError> {
2801        let mut vm = self.base_vm.child_vm();
2802        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2803        if self.state.shutting_down.load(Ordering::SeqCst) {
2804            cancel_token.store(true, Ordering::SeqCst);
2805        }
2806        self.state
2807            .cancel_tokens
2808            .lock()
2809            .expect("dispatcher cancel tokens poisoned")
2810            .push(cancel_token.clone());
2811        vm.install_cancel_token(cancel_token.clone());
2812        let arg = event_to_handler_value(event)?;
2813        let args = [arg];
2814        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2815        let effective_policy = match crate::orchestration::current_execution_policy() {
2816            Some(parent) => parent
2817                .intersect(&tier_policy)
2818                .map_err(|error| DispatchError::Local(error.to_string()))?,
2819            None => tier_policy,
2820        };
2821        crate::orchestration::push_execution_policy(effective_policy);
2822        let _policy_guard = DispatchExecutionPolicyGuard;
2823        let future = vm.call_closure_pub(closure, &args);
2824        pin_mut!(future);
2825        let (binding_id, binding_version) = split_binding_key(binding_key);
2826        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2827            slot.borrow_mut().replace(DispatchContext {
2828                trigger_event: event.clone(),
2829                replay_of_event_id: replay_of_event_id.cloned(),
2830                binding_id,
2831                binding_version,
2832                agent_id: agent_id.to_string(),
2833                action: action.to_string(),
2834                autonomy_tier,
2835            })
2836        });
2837        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2838            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2839        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2840        crate::stdlib::hitl::reset_hitl_state();
2841        let mut poll = tokio::time::interval(Duration::from_millis(100));
2842        let result = loop {
2843            tokio::select! {
2844                result = &mut future => break result,
2845                _ = recv_cancel(cancel_rx) => {
2846                    cancel_token.store(true, Ordering::SeqCst);
2847                }
2848                _ = poll.tick() => {
2849                    if dispatch_cancel_requested(
2850                        &self.event_log,
2851                        binding_key,
2852                        &event.id.0,
2853                        replay_of_event_id,
2854                    )
2855                    .await? {
2856                        cancel_token.store(true, Ordering::SeqCst);
2857                    }
2858                }
2859            }
2860        };
2861        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2862            *slot.borrow_mut() = prior_context;
2863        });
2864        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2865            *slot.borrow_mut() = prior_wait_lease;
2866        });
2867        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2868        {
2869            let mut tokens = self
2870                .state
2871                .cancel_tokens
2872                .lock()
2873                .expect("dispatcher cancel tokens poisoned");
2874            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2875        }
2876
2877        if cancel_token.load(Ordering::SeqCst) {
2878            if dispatch_cancel_requested(
2879                &self.event_log,
2880                binding_key,
2881                &event.id.0,
2882                replay_of_event_id,
2883            )
2884            .await?
2885            {
2886                Err(DispatchError::Cancelled(
2887                    "trigger cancel request cancelled local handler".to_string(),
2888                ))
2889            } else {
2890                Err(DispatchError::Cancelled(
2891                    "dispatcher shutdown cancelled local handler".to_string(),
2892                ))
2893            }
2894        } else {
2895            result.map_err(dispatch_error_from_vm_error)
2896        }
2897    }
2898
2899    async fn evaluate_predicate(
2900        &self,
2901        binding: &TriggerBinding,
2902        predicate: &super::registry::TriggerPredicateSpec,
2903        event: &TriggerEvent,
2904        replay_of_event_id: Option<&String>,
2905        autonomy_tier: AutonomyTier,
2906    ) -> Result<PredicateEvaluationRecord, DispatchError> {
2907        let event_id = event.id.0.clone();
2908        let trigger_id = binding.id.as_str().to_string();
2909        let now_ms = now_unix_ms();
2910        reset_binding_budget_windows(binding);
2911
2912        let breaker_open_until = {
2913            let state = binding
2914                .predicate_state
2915                .lock()
2916                .expect("trigger predicate state poisoned");
2917            if state
2918                .breaker_open_until_ms
2919                .is_some_and(|until_ms| until_ms > now_ms)
2920            {
2921                state.breaker_open_until_ms
2922            } else {
2923                None
2924            }
2925        };
2926
2927        if breaker_open_until.is_some() {
2928            let mut metadata = BTreeMap::new();
2929            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2930            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2931            metadata.insert(
2932                "breaker_open_until_ms".to_string(),
2933                serde_json::json!(breaker_open_until),
2934            );
2935            crate::events::log_warn_meta(
2936                "trigger.predicate.circuit_breaker",
2937                "trigger predicate circuit breaker is open; short-circuiting to false",
2938                metadata,
2939            );
2940            let record = PredicateEvaluationRecord {
2941                result: false,
2942                reason: Some("circuit_open".to_string()),
2943                ..Default::default()
2944            };
2945            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2946                .await?;
2947            return Ok(record);
2948        }
2949
2950        let expected_cost = expected_predicate_cost_usd_micros(binding);
2951        if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2952            .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2953        {
2954            self.append_budget_exhausted_event(
2955                binding,
2956                event,
2957                reason,
2958                expected_cost,
2959                None,
2960                replay_of_event_id,
2961            )
2962            .await?;
2963            let record = PredicateEvaluationRecord {
2964                result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2965                reason: Some(reason.to_string()),
2966                exhaustion_strategy: Some(binding.on_budget_exhausted),
2967                ..Default::default()
2968            };
2969            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2970                .await?;
2971            return Ok(record);
2972        }
2973
2974        let replay_cache = self
2975            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2976            .await?;
2977        let guard = start_predicate_evaluation(
2978            binding.when_budget.clone().unwrap_or_default(),
2979            replay_cache,
2980        );
2981        let started = std::time::Instant::now();
2982        let eval = self
2983            .invoke_vm_callable_with_timeout(
2984                &predicate.closure,
2985                &binding.binding_key(),
2986                event,
2987                replay_of_event_id,
2988                binding.id.as_str(),
2989                &format!("{}.{}", event.provider.as_str(), event.kind),
2990                autonomy_tier,
2991                &mut self.cancel_tx.subscribe(),
2992                binding
2993                    .when_budget
2994                    .as_ref()
2995                    .and_then(|budget| budget.timeout()),
2996            )
2997            .await;
2998        let capture = guard.finish();
2999        let latency_ms = started.elapsed().as_millis() as u64;
3000        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
3001            self.append_predicate_cache_record(binding, event, &capture.entries)
3002                .await?;
3003        }
3004
3005        let mut record = PredicateEvaluationRecord {
3006            result: false,
3007            cost_usd: capture.total_cost_usd,
3008            tokens: capture.total_tokens,
3009            latency_ms,
3010            cached: capture.cached,
3011            reason: None,
3012            exhaustion_strategy: None,
3013        };
3014
3015        let mut count_failure = false;
3016        let mut opened_breaker = false;
3017
3018        match eval {
3019            Ok(value) => match predicate_value_as_bool(value) {
3020                Ok(result) => {
3021                    record.result = result;
3022                }
3023                Err(reason) => {
3024                    count_failure = true;
3025                    record.reason = Some(reason);
3026                }
3027            },
3028            Err(error) => {
3029                count_failure = true;
3030                record.reason = Some(error.to_string());
3031            }
3032        }
3033
3034        let cost_usd_micros = usd_to_micros(record.cost_usd);
3035        if cost_usd_micros > 0 {
3036            binding
3037                .metrics
3038                .cost_total_usd_micros
3039                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3040            binding
3041                .metrics
3042                .cost_today_usd_micros
3043                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3044            binding
3045                .metrics
3046                .cost_hour_usd_micros
3047                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3048            note_orchestrator_budget_cost(cost_usd_micros);
3049            record_predicate_cost_sample(binding, cost_usd_micros);
3050        }
3051
3052        let timed_out = matches!(
3053            record.reason.as_deref(),
3054            Some("predicate evaluation timed out")
3055        );
3056        if capture.budget_exceeded || timed_out {
3057            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3058                record.result = false;
3059            } else if timed_out {
3060                record.result = true;
3061            }
3062            record.reason = Some("budget_exceeded".to_string());
3063            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3064            self.append_budget_exhausted_event(
3065                binding,
3066                event,
3067                "budget_exceeded",
3068                cost_usd_micros,
3069                Some(record.tokens),
3070                replay_of_event_id,
3071            )
3072            .await?;
3073            self.append_lifecycle_event(
3074                "predicate.invocation_budget_exceeded",
3075                event,
3076                binding,
3077                serde_json::json!({
3078                    "trigger_id": binding.id.as_str(),
3079                    "event_id": event.id.0,
3080                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
3081                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
3082                    "cost_usd": record.cost_usd,
3083                    "tokens": record.tokens,
3084                    "replay_of_event_id": replay_of_event_id,
3085                }),
3086                replay_of_event_id,
3087            )
3088            .await?;
3089        }
3090
3091        if let Some(reason) =
3092            binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
3093        {
3094            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3095                record.result = false;
3096            }
3097            record.reason = Some(reason.to_string());
3098            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3099            self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
3100                .await?;
3101        }
3102
3103        {
3104            let mut state = binding
3105                .predicate_state
3106                .lock()
3107                .expect("trigger predicate state poisoned");
3108            if count_failure {
3109                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
3110                if state.consecutive_failures >= 3 {
3111                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
3112                    opened_breaker = true;
3113                }
3114            } else {
3115                state.consecutive_failures = 0;
3116                state.breaker_open_until_ms = None;
3117            }
3118        }
3119
3120        if opened_breaker {
3121            let mut metadata = BTreeMap::new();
3122            metadata.insert(
3123                "trigger_id".to_string(),
3124                serde_json::json!(binding.id.as_str()),
3125            );
3126            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3127            metadata.insert("failure_count".to_string(), serde_json::json!(3));
3128            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
3129            crate::events::log_warn_meta(
3130                "trigger.predicate.circuit_breaker",
3131                "trigger predicate circuit breaker opened for 5 minutes",
3132                metadata,
3133            );
3134        }
3135
3136        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
3137            .await?;
3138        Ok(record)
3139    }
3140
3141    #[allow(clippy::too_many_arguments)]
3142    #[allow(clippy::too_many_arguments)]
3143    async fn invoke_vm_callable_with_timeout(
3144        &self,
3145        closure: &crate::value::VmClosure,
3146        binding_key: &str,
3147        event: &TriggerEvent,
3148        replay_of_event_id: Option<&String>,
3149        agent_id: &str,
3150        action: &str,
3151        autonomy_tier: AutonomyTier,
3152        cancel_rx: &mut broadcast::Receiver<()>,
3153        timeout: Option<Duration>,
3154    ) -> Result<VmValue, DispatchError> {
3155        let future = self.invoke_vm_callable(
3156            closure,
3157            binding_key,
3158            event,
3159            replay_of_event_id,
3160            agent_id,
3161            action,
3162            autonomy_tier,
3163            None,
3164            cancel_rx,
3165        );
3166        pin_mut!(future);
3167        if let Some(timeout) = timeout {
3168            match tokio::time::timeout(timeout, future).await {
3169                Ok(result) => result,
3170                Err(_) => Err(DispatchError::Local(
3171                    "predicate evaluation timed out".to_string(),
3172                )),
3173            }
3174        } else {
3175            future.await
3176        }
3177    }
3178
3179    async fn append_predicate_evaluated_event(
3180        &self,
3181        binding: &TriggerBinding,
3182        event: &TriggerEvent,
3183        record: &PredicateEvaluationRecord,
3184        replay_of_event_id: Option<&String>,
3185    ) -> Result<(), DispatchError> {
3186        if let Some(metrics) = self.metrics.as_ref() {
3187            metrics.record_trigger_predicate_evaluation(
3188                binding.id.as_str(),
3189                record.result,
3190                record.cost_usd,
3191            );
3192            metrics.set_trigger_budget_cost_today(
3193                binding.id.as_str(),
3194                current_predicate_daily_cost(binding),
3195            );
3196            if matches!(
3197                record.reason.as_deref(),
3198                Some(
3199                    "budget_exceeded"
3200                        | "daily_budget_exceeded"
3201                        | "hourly_budget_exceeded"
3202                        | "orchestrator_daily_budget_exceeded"
3203                        | "orchestrator_hourly_budget_exceeded"
3204                )
3205            ) {
3206                metrics.record_trigger_budget_exhausted(
3207                    binding.id.as_str(),
3208                    record
3209                        .exhaustion_strategy
3210                        .map(TriggerBudgetExhaustionStrategy::as_str)
3211                        .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
3212                );
3213            }
3214        }
3215        self.append_lifecycle_event(
3216            "predicate.evaluated",
3217            event,
3218            binding,
3219            serde_json::json!({
3220                "trigger_id": binding.id.as_str(),
3221                "event_id": event.id.0,
3222                "result": record.result,
3223                "cost_usd": record.cost_usd,
3224                "tokens": record.tokens,
3225                "latency_ms": record.latency_ms,
3226                "cached": record.cached,
3227                "reason": record.reason,
3228                "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
3229                "replay_of_event_id": replay_of_event_id,
3230            }),
3231            replay_of_event_id,
3232        )
3233        .await
3234    }
3235
3236    async fn append_predicate_cache_record(
3237        &self,
3238        binding: &TriggerBinding,
3239        event: &TriggerEvent,
3240        entries: &[PredicateCacheEntry],
3241    ) -> Result<(), DispatchError> {
3242        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3243            .expect("static trigger inbox legacy topic name is valid");
3244        let payload = serde_json::to_value(PredicateCacheRecord {
3245            trigger_id: binding.id.as_str().to_string(),
3246            event_id: event.id.0.clone(),
3247            entries: entries.to_vec(),
3248        })
3249        .map_err(|error| DispatchError::Serde(error.to_string()))?;
3250        self.event_log
3251            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
3252            .await
3253            .map_err(DispatchError::from)
3254            .map(|_| ())
3255    }
3256
3257    async fn read_predicate_cache_record(
3258        &self,
3259        event_id: &str,
3260    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
3261        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3262            .expect("static trigger inbox legacy topic name is valid");
3263        let records = self
3264            .event_log
3265            .read_range(&topic, None, usize::MAX)
3266            .await
3267            .map_err(DispatchError::from)?;
3268        Ok(records
3269            .into_iter()
3270            .filter(|(_, event)| event.kind == "predicate_llm_cache")
3271            .filter_map(|(_, event)| {
3272                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
3273            })
3274            .filter(|record| record.event_id == event_id)
3275            .flat_map(|record| record.entries)
3276            .collect())
3277    }
3278
3279    #[allow(clippy::too_many_arguments)]
3280    async fn append_dispatch_trust_record(
3281        &self,
3282        binding: &TriggerBinding,
3283        route: &DispatchUri,
3284        event: &TriggerEvent,
3285        replay_of_event_id: Option<&String>,
3286        autonomy_tier: AutonomyTier,
3287        outcome: TrustOutcome,
3288        terminal_status: &str,
3289        attempt_count: u32,
3290        error: Option<String>,
3291    ) -> Result<(), DispatchError> {
3292        let mut record = TrustRecord::new(
3293            binding.id.as_str().to_string(),
3294            format!("{}.{}", event.provider.as_str(), event.kind),
3295            None,
3296            outcome,
3297            event.trace_id.0.clone(),
3298            autonomy_tier,
3299        );
3300        record.metadata.insert(
3301            "binding_key".to_string(),
3302            serde_json::json!(binding.binding_key()),
3303        );
3304        record.metadata.insert(
3305            "binding_version".to_string(),
3306            serde_json::json!(binding.version),
3307        );
3308        record.metadata.insert(
3309            "provider".to_string(),
3310            serde_json::json!(event.provider.as_str()),
3311        );
3312        record
3313            .metadata
3314            .insert("event_kind".to_string(), serde_json::json!(event.kind));
3315        record
3316            .metadata
3317            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3318        record.metadata.insert(
3319            "target_uri".to_string(),
3320            serde_json::json!(route.target_uri()),
3321        );
3322        record.metadata.insert(
3323            "terminal_status".to_string(),
3324            serde_json::json!(terminal_status),
3325        );
3326        record.metadata.insert(
3327            "attempt_count".to_string(),
3328            serde_json::json!(attempt_count),
3329        );
3330        if let Some(replay_of_event_id) = replay_of_event_id {
3331            record.metadata.insert(
3332                "replay_of_event_id".to_string(),
3333                serde_json::json!(replay_of_event_id),
3334            );
3335        }
3336        if let Some(error) = error {
3337            record
3338                .metadata
3339                .insert("error".to_string(), serde_json::json!(error));
3340        }
3341        append_trust_record(&self.event_log, &record)
3342            .await
3343            .map(|_| ())
3344            .map_err(DispatchError::from)
3345    }
3346
3347    async fn append_attempt_record(
3348        &self,
3349        event: &TriggerEvent,
3350        binding: &TriggerBinding,
3351        attempt: &DispatchAttemptRecord,
3352        replay_of_event_id: Option<&String>,
3353    ) -> Result<(), DispatchError> {
3354        self.append_topic_event(
3355            TRIGGER_ATTEMPTS_TOPIC,
3356            "attempt_recorded",
3357            event,
3358            Some(binding),
3359            Some(attempt.attempt),
3360            serde_json::to_value(attempt)
3361                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3362            replay_of_event_id,
3363        )
3364        .await
3365    }
3366
3367    async fn append_lifecycle_event(
3368        &self,
3369        kind: &str,
3370        event: &TriggerEvent,
3371        binding: &TriggerBinding,
3372        payload: serde_json::Value,
3373        replay_of_event_id: Option<&String>,
3374    ) -> Result<(), DispatchError> {
3375        self.append_topic_event(
3376            TRIGGERS_LIFECYCLE_TOPIC,
3377            kind,
3378            event,
3379            Some(binding),
3380            None,
3381            payload,
3382            replay_of_event_id,
3383        )
3384        .await
3385    }
3386
3387    async fn append_budget_exhausted_event(
3388        &self,
3389        binding: &TriggerBinding,
3390        event: &TriggerEvent,
3391        reason: &str,
3392        expected_cost_usd_micros: u64,
3393        tokens: Option<u64>,
3394        replay_of_event_id: Option<&String>,
3395    ) -> Result<(), DispatchError> {
3396        let payload = serde_json::json!({
3397            "trigger_id": binding.id.as_str(),
3398            "event_id": event.id.0,
3399            "reason": reason,
3400            "strategy": binding.on_budget_exhausted.as_str(),
3401            "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
3402            "cost_usd": micros_to_usd(expected_cost_usd_micros),
3403            "tokens": tokens,
3404            "daily_limit_usd": binding.daily_cost_usd,
3405            "hourly_limit_usd": binding.hourly_cost_usd,
3406            "cost_today_usd": current_predicate_daily_cost(binding),
3407            "cost_hour_usd": current_predicate_hourly_cost(binding),
3408            "replay_of_event_id": replay_of_event_id,
3409        });
3410        self.append_lifecycle_event(
3411            "predicate.budget_exceeded",
3412            event,
3413            binding,
3414            payload.clone(),
3415            replay_of_event_id,
3416        )
3417        .await?;
3418        let legacy_kind = match reason {
3419            "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
3420            "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
3421            "orchestrator_daily_budget_exceeded" => {
3422                Some("predicate.orchestrator_daily_budget_exceeded")
3423            }
3424            "orchestrator_hourly_budget_exceeded" => {
3425                Some("predicate.orchestrator_hourly_budget_exceeded")
3426            }
3427            _ => None,
3428        };
3429        if let Some(kind) = legacy_kind {
3430            self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
3431                .await?;
3432        }
3433        Ok(())
3434    }
3435
3436    async fn append_autonomy_budget_approval_request(
3437        &self,
3438        binding: &TriggerBinding,
3439        route: &DispatchUri,
3440        event: &TriggerEvent,
3441        replay_of_event_id: Option<&String>,
3442        reason: &str,
3443    ) -> Result<String, DispatchError> {
3444        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
3445        let detail = serde_json::json!({
3446            "trigger_id": binding.id.as_str(),
3447            "binding_key": binding.binding_key(),
3448            "event_id": event.id.0,
3449            "reason": reason,
3450            "from_tier": AutonomyTier::ActAuto.as_str(),
3451            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3452            "handler_kind": route.kind(),
3453            "target_uri": route.target_uri(),
3454            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3455            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3456            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3457            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3458            "replay_of_event_id": replay_of_event_id,
3459        });
3460        let request_id = crate::stdlib::hitl::append_approval_request_on(
3461            &self.event_log,
3462            binding.id.as_str().to_string(),
3463            event.trace_id.0.clone(),
3464            format!(
3465                "approve autonomous dispatch for trigger '{}' after {}",
3466                binding.id.as_str(),
3467                reason
3468            ),
3469            detail.clone(),
3470            reviewers.clone(),
3471        )
3472        .await
3473        .map_err(dispatch_error_from_vm_error)?;
3474        self.append_lifecycle_event(
3475            "autonomy.budget_exceeded",
3476            event,
3477            binding,
3478            serde_json::json!({
3479                "trigger_id": binding.id.as_str(),
3480                "event_id": event.id.0,
3481                "reason": reason,
3482                "request_id": request_id,
3483                "reviewers": reviewers,
3484                "from_tier": AutonomyTier::ActAuto.as_str(),
3485                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3486                "replay_of_event_id": replay_of_event_id,
3487            }),
3488            replay_of_event_id,
3489        )
3490        .await?;
3491        Ok(request_id)
3492    }
3493
3494    #[allow(clippy::too_many_arguments)]
3495    async fn emit_autonomy_budget_approval_action_graph(
3496        &self,
3497        binding: &TriggerBinding,
3498        route: &DispatchUri,
3499        event: &TriggerEvent,
3500        source_node_id: &str,
3501        replay_of_event_id: Option<&String>,
3502        reason: &str,
3503        request_id: &str,
3504    ) -> Result<(), DispatchError> {
3505        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3506        self.emit_action_graph(
3507            event,
3508            vec![RunActionGraphNodeRecord {
3509                id: approval_node_id.clone(),
3510                label: format!("approval required: {reason}"),
3511                kind: "approval".to_string(),
3512                status: "waiting".to_string(),
3513                outcome: "request_approval".to_string(),
3514                trace_id: Some(event.trace_id.0.clone()),
3515                stage_id: None,
3516                node_id: None,
3517                worker_id: None,
3518                run_id: None,
3519                run_path: None,
3520                metadata: BTreeMap::from([
3521                    (
3522                        "trigger_id".to_string(),
3523                        serde_json::json!(binding.id.as_str()),
3524                    ),
3525                    (
3526                        "binding_key".to_string(),
3527                        serde_json::json!(binding.binding_key()),
3528                    ),
3529                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3530                    ("reason".to_string(), serde_json::json!(reason)),
3531                    ("request_id".to_string(), serde_json::json!(request_id)),
3532                    (
3533                        "reviewers".to_string(),
3534                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3535                    ),
3536                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3537                    (
3538                        "target_uri".to_string(),
3539                        serde_json::json!(route.target_uri()),
3540                    ),
3541                    (
3542                        "from_tier".to_string(),
3543                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3544                    ),
3545                    (
3546                        "requested_tier".to_string(),
3547                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3548                    ),
3549                    (
3550                        "replay_of_event_id".to_string(),
3551                        serde_json::json!(replay_of_event_id),
3552                    ),
3553                ]),
3554            }],
3555            vec![RunActionGraphEdgeRecord {
3556                from_id: source_node_id.to_string(),
3557                to_id: approval_node_id,
3558                kind: "approval_gate".to_string(),
3559                label: Some("autonomy budget".to_string()),
3560            }],
3561            serde_json::json!({
3562                "source": "dispatcher",
3563                "trigger_id": binding.id.as_str(),
3564                "binding_key": binding.binding_key(),
3565                "event_id": event.id.0,
3566                "reason": reason,
3567                "request_id": request_id,
3568                "replay_of_event_id": replay_of_event_id,
3569            }),
3570        )
3571        .await
3572    }
3573
3574    #[allow(clippy::too_many_arguments)]
3575    async fn append_tier_transition_trust_record(
3576        &self,
3577        binding: &TriggerBinding,
3578        event: &TriggerEvent,
3579        replay_of_event_id: Option<&String>,
3580        from_tier: AutonomyTier,
3581        to_tier: AutonomyTier,
3582        reason: &str,
3583        request_id: &str,
3584    ) -> Result<(), DispatchError> {
3585        let mut record = TrustRecord::new(
3586            binding.id.as_str().to_string(),
3587            "autonomy.tier_transition",
3588            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3589            TrustOutcome::Denied,
3590            event.trace_id.0.clone(),
3591            to_tier,
3592        );
3593        record.metadata.insert(
3594            "binding_key".to_string(),
3595            serde_json::json!(binding.binding_key()),
3596        );
3597        record
3598            .metadata
3599            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3600        record.metadata.insert(
3601            "from_tier".to_string(),
3602            serde_json::json!(from_tier.as_str()),
3603        );
3604        record
3605            .metadata
3606            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3607        record
3608            .metadata
3609            .insert("reason".to_string(), serde_json::json!(reason));
3610        record
3611            .metadata
3612            .insert("request_id".to_string(), serde_json::json!(request_id));
3613        if let Some(replay_of_event_id) = replay_of_event_id {
3614            record.metadata.insert(
3615                "replay_of_event_id".to_string(),
3616                serde_json::json!(replay_of_event_id),
3617            );
3618        }
3619        append_trust_record(&self.event_log, &record)
3620            .await
3621            .map(|_| ())
3622            .map_err(DispatchError::from)
3623    }
3624
3625    async fn append_budget_deferred_event(
3626        &self,
3627        binding: &TriggerBinding,
3628        route: &DispatchUri,
3629        event: &TriggerEvent,
3630        replay_of_event_id: Option<&String>,
3631        reason: &str,
3632    ) -> Result<(), DispatchError> {
3633        self.append_topic_event(
3634            TRIGGER_ATTEMPTS_TOPIC,
3635            "budget_deferred",
3636            event,
3637            Some(binding),
3638            None,
3639            serde_json::json!({
3640                "event_id": event.id.0,
3641                "trigger_id": binding.id.as_str(),
3642                "binding_key": binding.binding_key(),
3643                "handler_kind": route.kind(),
3644                "target_uri": route.target_uri(),
3645                "reason": reason,
3646                "retry_at": next_budget_reset_rfc3339(binding),
3647                "replay_of_event_id": replay_of_event_id,
3648            }),
3649            replay_of_event_id,
3650        )
3651        .await?;
3652        self.append_skipped_outbox_event(
3653            binding,
3654            route,
3655            event,
3656            replay_of_event_id,
3657            DispatchSkipStage::Predicate,
3658            serde_json::json!({
3659                "deferred": true,
3660                "reason": reason,
3661                "retry_at": next_budget_reset_rfc3339(binding),
3662            }),
3663        )
3664        .await
3665    }
3666
3667    async fn move_budget_exhausted_to_dlq(
3668        &self,
3669        binding: &TriggerBinding,
3670        route: &DispatchUri,
3671        event: &TriggerEvent,
3672        replay_of_event_id: Option<&String>,
3673        final_error: &str,
3674    ) -> Result<(), DispatchError> {
3675        let dlq_entry = DlqEntry {
3676            trigger_id: binding.id.as_str().to_string(),
3677            binding_key: binding.binding_key(),
3678            event: event.clone(),
3679            attempt_count: 0,
3680            final_error: final_error.to_string(),
3681            attempts: Vec::new(),
3682        };
3683        self.state
3684            .dlq
3685            .lock()
3686            .expect("dispatcher dlq poisoned")
3687            .push(dlq_entry.clone());
3688        if let Some(metrics) = self.metrics.as_ref() {
3689            metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3690            metrics.record_trigger_accepted_to_dlq(
3691                binding.id.as_str(),
3692                &binding.binding_key(),
3693                event.provider.as_str(),
3694                tenant_id(event),
3695                "budget_exhausted",
3696                duration_between_ms(current_unix_ms(), accepted_at_ms(None, event)),
3697            );
3698        }
3699        tracing::info!(
3700            component = "dispatcher",
3701            lifecycle = "dlq_moved",
3702            trigger_id = %binding.id.as_str(),
3703            binding_key = %binding.binding_key(),
3704            event_id = %event.id.0,
3705            reason = "budget_exhausted",
3706            trace_id = %event.trace_id.0
3707        );
3708        self.emit_action_graph(
3709            event,
3710            vec![RunActionGraphNodeRecord {
3711                id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3712                label: binding.id.as_str().to_string(),
3713                kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3714                status: "queued".to_string(),
3715                outcome: "budget_exhausted".to_string(),
3716                trace_id: Some(event.trace_id.0.clone()),
3717                stage_id: None,
3718                node_id: None,
3719                worker_id: None,
3720                run_id: None,
3721                run_path: None,
3722                metadata: dlq_node_metadata(binding, event, 0, final_error),
3723            }],
3724            vec![RunActionGraphEdgeRecord {
3725                from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3726                to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3727                kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3728                label: Some("budget exhausted".to_string()),
3729            }],
3730            serde_json::json!({
3731                "source": "dispatcher",
3732                "trigger_id": binding.id.as_str(),
3733                "binding_key": binding.binding_key(),
3734                "event_id": event.id.0,
3735                "handler_kind": route.kind(),
3736                "target_uri": route.target_uri(),
3737                "final_error": final_error,
3738                "replay_of_event_id": replay_of_event_id,
3739            }),
3740        )
3741        .await?;
3742        self.append_lifecycle_event(
3743            "DlqMoved",
3744            event,
3745            binding,
3746            serde_json::json!({
3747                "event_id": event.id.0,
3748                "attempt_count": 0,
3749                "final_error": final_error,
3750                "reason": "budget_exhausted",
3751                "replay_of_event_id": replay_of_event_id,
3752            }),
3753            replay_of_event_id,
3754        )
3755        .await?;
3756        self.append_topic_event(
3757            TRIGGER_DLQ_TOPIC,
3758            "dlq_moved",
3759            event,
3760            Some(binding),
3761            None,
3762            serde_json::to_value(&dlq_entry)
3763                .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3764            replay_of_event_id,
3765        )
3766        .await
3767    }
3768
3769    async fn move_circuit_open_to_dlq(
3770        &self,
3771        binding: &TriggerBinding,
3772        route: &DispatchUri,
3773        event: &TriggerEvent,
3774        replay_of_event_id: Option<&String>,
3775        final_error: &str,
3776        destination: &str,
3777    ) -> Result<(), DispatchError> {
3778        let dlq_entry = DlqEntry {
3779            trigger_id: binding.id.as_str().to_string(),
3780            binding_key: binding.binding_key(),
3781            event: event.clone(),
3782            attempt_count: 0,
3783            final_error: final_error.to_string(),
3784            attempts: Vec::new(),
3785        };
3786        self.state
3787            .dlq
3788            .lock()
3789            .expect("dispatcher dlq poisoned")
3790            .push(dlq_entry.clone());
3791        if let Some(metrics) = self.metrics.as_ref() {
3792            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
3793            metrics.record_trigger_accepted_to_dlq(
3794                binding.id.as_str(),
3795                &binding.binding_key(),
3796                event.provider.as_str(),
3797                tenant_id(event),
3798                "circuit_open",
3799                Duration::ZERO,
3800            );
3801        }
3802        tracing::info!(
3803            component = "dispatcher",
3804            lifecycle = "dlq_moved",
3805            trigger_id = %binding.id.as_str(),
3806            binding_key = %binding.binding_key(),
3807            event_id = %event.id.0,
3808            reason = "circuit_open",
3809            destination,
3810            trace_id = %event.trace_id.0
3811        );
3812        self.emit_action_graph(
3813            event,
3814            vec![RunActionGraphNodeRecord {
3815                id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3816                label: binding.id.as_str().to_string(),
3817                kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3818                status: "queued".to_string(),
3819                outcome: "circuit_open".to_string(),
3820                trace_id: Some(event.trace_id.0.clone()),
3821                stage_id: None,
3822                node_id: None,
3823                worker_id: None,
3824                run_id: None,
3825                run_path: None,
3826                metadata: dlq_node_metadata(binding, event, 0, final_error),
3827            }],
3828            vec![RunActionGraphEdgeRecord {
3829                from_id: format!("trigger:{}", event.id.0),
3830                to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3831                kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3832                label: Some("circuit open".to_string()),
3833            }],
3834            serde_json::json!({
3835                "source": "dispatcher",
3836                "trigger_id": binding.id.as_str(),
3837                "binding_key": binding.binding_key(),
3838                "event_id": event.id.0,
3839                "handler_kind": route.kind(),
3840                "target_uri": route.target_uri(),
3841                "destination": destination,
3842                "final_error": final_error,
3843                "replay_of_event_id": replay_of_event_id,
3844            }),
3845        )
3846        .await?;
3847        self.append_lifecycle_event(
3848            "DlqMoved",
3849            event,
3850            binding,
3851            serde_json::json!({
3852                "event_id": event.id.0,
3853                "attempt_count": 0,
3854                "final_error": final_error,
3855                "reason": "circuit_open",
3856                "destination": destination,
3857                "replay_of_event_id": replay_of_event_id,
3858            }),
3859            replay_of_event_id,
3860        )
3861        .await?;
3862        self.append_topic_event(
3863            TRIGGER_DLQ_TOPIC,
3864            "dlq_moved",
3865            event,
3866            Some(binding),
3867            None,
3868            serde_json::to_value(&dlq_entry)
3869                .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3870            replay_of_event_id,
3871        )
3872        .await
3873    }
3874
3875    async fn append_skipped_outbox_event(
3876        &self,
3877        binding: &TriggerBinding,
3878        route: &DispatchUri,
3879        event: &TriggerEvent,
3880        replay_of_event_id: Option<&String>,
3881        stage: DispatchSkipStage,
3882        detail: serde_json::Value,
3883    ) -> Result<(), DispatchError> {
3884        self.append_topic_event(
3885            TRIGGER_OUTBOX_TOPIC,
3886            "dispatch_skipped",
3887            event,
3888            Some(binding),
3889            None,
3890            serde_json::json!({
3891                "event_id": event.id.0,
3892                "trigger_id": binding.id.as_str(),
3893                "binding_key": binding.binding_key(),
3894                "handler_kind": route.kind(),
3895                "target_uri": route.target_uri(),
3896                "skip_stage": stage.as_str(),
3897                "detail": detail,
3898                "replay_of_event_id": replay_of_event_id,
3899            }),
3900            replay_of_event_id,
3901        )
3902        .await
3903    }
3904
3905    async fn append_topic_event(
3906        &self,
3907        topic_name: &str,
3908        kind: &str,
3909        event: &TriggerEvent,
3910        binding: Option<&TriggerBinding>,
3911        attempt: Option<u32>,
3912        payload: serde_json::Value,
3913        replay_of_event_id: Option<&String>,
3914    ) -> Result<(), DispatchError> {
3915        let topic = Topic::new(topic_name)
3916            .expect("static trigger dispatcher topic names should always be valid");
3917        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3918        self.event_log
3919            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3920            .await
3921            .map_err(DispatchError::from)
3922            .map(|_| ())
3923    }
3924
3925    async fn emit_action_graph(
3926        &self,
3927        event: &TriggerEvent,
3928        nodes: Vec<RunActionGraphNodeRecord>,
3929        edges: Vec<RunActionGraphEdgeRecord>,
3930        extra: serde_json::Value,
3931    ) -> Result<(), DispatchError> {
3932        let mut headers = BTreeMap::new();
3933        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3934        headers.insert("event_id".to_string(), event.id.0.clone());
3935        let observability = RunObservabilityRecord {
3936            schema_version: 1,
3937            action_graph_nodes: nodes,
3938            action_graph_edges: edges,
3939            ..Default::default()
3940        };
3941        append_action_graph_update(
3942            headers,
3943            serde_json::json!({
3944                "source": "dispatcher",
3945                "trace_id": event.trace_id.0,
3946                "event_id": event.id.0,
3947                "observability": observability,
3948                "context": extra,
3949            }),
3950        )
3951        .await
3952        .map_err(DispatchError::from)
3953    }
3954}
3955
3956async fn dispatch_cancel_requested(
3957    event_log: &Arc<AnyEventLog>,
3958    binding_key: &str,
3959    event_id: &str,
3960    replay_of_event_id: Option<&String>,
3961) -> Result<bool, DispatchError> {
3962    if replay_of_event_id.is_some() {
3963        return Ok(false);
3964    }
3965    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3966        .expect("static trigger cancel topic should always be valid");
3967    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3968    let requested = events
3969        .into_iter()
3970        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3971        .filter_map(|(_, event)| {
3972            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3973        })
3974        .collect::<BTreeSet<_>>();
3975    Ok(requested
3976        .iter()
3977        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3978}
3979
3980async fn sleep_or_cancel_or_request(
3981    event_log: &Arc<AnyEventLog>,
3982    delay: Duration,
3983    binding_key: &str,
3984    event_id: &str,
3985    replay_of_event_id: Option<&String>,
3986    cancel_rx: &mut broadcast::Receiver<()>,
3987) -> Result<(), DispatchError> {
3988    let sleep = tokio::time::sleep(delay);
3989    pin_mut!(sleep);
3990    let mut poll = tokio::time::interval(Duration::from_millis(100));
3991    loop {
3992        tokio::select! {
3993            _ = &mut sleep => return Ok(()),
3994            _ = recv_cancel(cancel_rx) => {
3995                return Err(DispatchError::Cancelled(
3996                    "dispatcher shutdown cancelled retry wait".to_string(),
3997                ));
3998            }
3999            _ = poll.tick() => {
4000                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
4001                    return Err(DispatchError::Cancelled(
4002                        "trigger cancel request cancelled retry wait".to_string(),
4003                    ));
4004                }
4005            }
4006        }
4007    }
4008}
4009
4010fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
4011    let mut iter = events.into_iter();
4012    let Some(mut root) = iter.next() else {
4013        return Err(DispatchError::Registry(
4014            "batch dispatch produced an empty event list".to_string(),
4015        ));
4016    };
4017    let mut batch = Vec::new();
4018    batch.push(
4019        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
4020    );
4021    for event in iter {
4022        batch.push(
4023            serde_json::to_value(&event)
4024                .map_err(|error| DispatchError::Serde(error.to_string()))?,
4025        );
4026    }
4027    root.batch = Some(batch);
4028    Ok(root)
4029}
4030
4031fn json_value_to_gate(value: &serde_json::Value) -> String {
4032    match value {
4033        serde_json::Value::Null => "null".to_string(),
4034        serde_json::Value::String(text) => text.clone(),
4035        serde_json::Value::Bool(value) => value.to_string(),
4036        serde_json::Value::Number(value) => value.to_string(),
4037        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
4038    }
4039}
4040
4041fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
4042    let json =
4043        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4044    let value = json_to_vm_value(&json);
4045    match (&event.raw_body, value) {
4046        (Some(raw_body), VmValue::Dict(dict)) => {
4047            let mut map = (*dict).clone();
4048            map.insert(
4049                "raw_body".to_string(),
4050                VmValue::Bytes(Rc::new(raw_body.clone())),
4051            );
4052            Ok(VmValue::Dict(Rc::new(map)))
4053        }
4054        (_, other) => Ok(other),
4055    }
4056}
4057
4058fn decrement_in_flight(state: &DispatcherRuntimeState) {
4059    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
4060    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
4061        state.idle_notify.notify_waiters();
4062    }
4063}
4064
4065fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
4066    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
4067    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
4068        state.idle_notify.notify_waiters();
4069    }
4070}
4071
4072#[cfg(test)]
4073fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
4074    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4075        *slot.borrow_mut() = Some(tx);
4076    });
4077}
4078
4079#[cfg(not(test))]
4080fn notify_test_inbox_dequeued() {}
4081
4082#[cfg(test)]
4083fn notify_test_inbox_dequeued() {
4084    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4085        if let Some(tx) = slot.borrow_mut().take() {
4086            let _ = tx.send(());
4087        }
4088    });
4089}
4090
4091pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
4092    event_log: &L,
4093    event: &TriggerEvent,
4094) -> Result<u64, DispatchError> {
4095    let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
4096        .expect("static trigger.inbox.envelopes topic is valid");
4097    let headers = event_headers(event, None, None, None);
4098    let payload =
4099        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4100    event_log
4101        .append(
4102            &topic,
4103            LogEvent::new("event_ingested", payload).with_headers(headers),
4104        )
4105        .await
4106        .map_err(DispatchError::from)
4107}
4108
4109pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
4110    ACTIVE_DISPATCHER_STATE.with(|slot| {
4111        slot.borrow()
4112            .as_ref()
4113            .map(|state| DispatcherStatsSnapshot {
4114                in_flight: state.in_flight.load(Ordering::Relaxed),
4115                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
4116                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
4117            })
4118            .unwrap_or_default()
4119    })
4120}
4121
4122pub fn clear_dispatcher_state() {
4123    ACTIVE_DISPATCHER_STATE.with(|slot| {
4124        *slot.borrow_mut() = None;
4125    });
4126    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
4127        *slot.borrow_mut() = None;
4128    });
4129}
4130
4131fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
4132    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
4133        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
4134    }
4135    if is_cancelled_vm_error(&error) {
4136        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
4137    }
4138    if let VmError::Thrown(VmValue::String(message)) = &error {
4139        return DispatchError::Local(message.to_string());
4140    }
4141    match error_to_category(&error) {
4142        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
4143        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
4144        ErrorCategory::Cancelled => {
4145            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
4146        }
4147        _ => DispatchError::Local(error.to_string()),
4148    }
4149}
4150
4151fn dispatch_error_label(error: &DispatchError) -> &'static str {
4152    match error {
4153        DispatchError::Denied(_) => "denied",
4154        DispatchError::Timeout(_) => "timeout",
4155        DispatchError::Waiting(_) => "waiting",
4156        DispatchError::Cancelled(_) => "cancelled",
4157        _ => "failed",
4158    }
4159}
4160
4161fn destination_circuit_key(route: &DispatchUri) -> String {
4162    format!("{}:{}", route.kind(), route.target_uri())
4163}
4164
4165fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
4166    match route {
4167        DispatchUri::Worker { .. } => "enqueued",
4168        DispatchUri::A2a { .. }
4169            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
4170        {
4171            "pending"
4172        }
4173        DispatchUri::A2a { .. } => "completed",
4174        DispatchUri::Local { .. } => "success",
4175    }
4176}
4177
4178fn dispatch_node_id(
4179    route: &DispatchUri,
4180    binding_key: &str,
4181    event_id: &str,
4182    attempt: u32,
4183) -> String {
4184    let prefix = match route {
4185        DispatchUri::A2a { .. } => "a2a",
4186        _ => "dispatch",
4187    };
4188    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
4189}
4190
4191fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
4192    match route {
4193        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
4194        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
4195        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
4196    }
4197}
4198
4199fn dispatch_node_label(route: &DispatchUri) -> String {
4200    match route {
4201        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
4202        _ => route.target_uri(),
4203    }
4204}
4205
4206fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
4207    match route {
4208        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
4209        _ => None,
4210    }
4211}
4212
4213fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
4214    match route {
4215        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
4216        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
4217        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
4218    }
4219}
4220
4221fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
4222    match status {
4223        crate::triggers::SignatureStatus::Verified => "verified",
4224        crate::triggers::SignatureStatus::Unsigned => "unsigned",
4225        crate::triggers::SignatureStatus::Failed { .. } => "failed",
4226    }
4227}
4228
4229fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
4230    let mut metadata = BTreeMap::new();
4231    metadata.insert(
4232        "provider".to_string(),
4233        serde_json::json!(event.provider.as_str()),
4234    );
4235    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
4236    metadata.insert(
4237        "dedupe_key".to_string(),
4238        serde_json::json!(event.dedupe_key),
4239    );
4240    metadata.insert(
4241        "signature_status".to_string(),
4242        serde_json::json!(signature_status_label(&event.signature_status)),
4243    );
4244    metadata
4245}
4246
4247fn predicate_node_metadata(
4248    binding: &TriggerBinding,
4249    predicate: &super::registry::TriggerPredicateSpec,
4250    event: &TriggerEvent,
4251    evaluation: &PredicateEvaluationRecord,
4252) -> BTreeMap<String, serde_json::Value> {
4253    let mut metadata = BTreeMap::new();
4254    metadata.insert(
4255        "trigger_id".to_string(),
4256        serde_json::json!(binding.id.as_str()),
4257    );
4258    metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
4259    metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
4260    metadata.insert(
4261        "cost_usd".to_string(),
4262        serde_json::json!(evaluation.cost_usd),
4263    );
4264    metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
4265    metadata.insert(
4266        "latency_ms".to_string(),
4267        serde_json::json!(evaluation.latency_ms),
4268    );
4269    metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
4270    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4271    if let Some(reason) = evaluation.reason.as_ref() {
4272        metadata.insert("reason".to_string(), serde_json::json!(reason));
4273    }
4274    metadata
4275}
4276
4277fn dispatch_node_metadata(
4278    route: &DispatchUri,
4279    binding: &TriggerBinding,
4280    event: &TriggerEvent,
4281    attempt: u32,
4282) -> BTreeMap<String, serde_json::Value> {
4283    let mut metadata = BTreeMap::new();
4284    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
4285    metadata.insert(
4286        "target_uri".to_string(),
4287        serde_json::json!(route.target_uri()),
4288    );
4289    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4290    metadata.insert(
4291        "trigger_id".to_string(),
4292        serde_json::json!(binding.id.as_str()),
4293    );
4294    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4295    if let Some(target_agent) = dispatch_target_agent(route) {
4296        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
4297    }
4298    if let DispatchUri::Worker { queue } = route {
4299        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
4300    }
4301    metadata
4302}
4303
4304fn dispatch_success_metadata(
4305    route: &DispatchUri,
4306    binding: &TriggerBinding,
4307    event: &TriggerEvent,
4308    attempt: u32,
4309    result: &serde_json::Value,
4310) -> BTreeMap<String, serde_json::Value> {
4311    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4312    match route {
4313        DispatchUri::A2a { .. } => {
4314            if let Some(task_id) = result
4315                .get("task_id")
4316                .or_else(|| result.get("id"))
4317                .and_then(|value| value.as_str())
4318            {
4319                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
4320            }
4321            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
4322                metadata.insert("state".to_string(), serde_json::json!(state));
4323            }
4324        }
4325        DispatchUri::Worker { .. } => {
4326            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
4327            {
4328                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
4329            }
4330            if let Some(response_topic) = result
4331                .get("response_topic")
4332                .and_then(|value| value.as_str())
4333            {
4334                metadata.insert(
4335                    "response_topic".to_string(),
4336                    serde_json::json!(response_topic),
4337                );
4338            }
4339        }
4340        DispatchUri::Local { .. } => {}
4341    }
4342    metadata
4343}
4344
4345fn dispatch_error_metadata(
4346    route: &DispatchUri,
4347    binding: &TriggerBinding,
4348    event: &TriggerEvent,
4349    attempt: u32,
4350    error: &DispatchError,
4351) -> BTreeMap<String, serde_json::Value> {
4352    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4353    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4354    metadata
4355}
4356
4357fn retry_node_metadata(
4358    binding: &TriggerBinding,
4359    event: &TriggerEvent,
4360    attempt: u32,
4361    delay: Duration,
4362    error: &DispatchError,
4363) -> BTreeMap<String, serde_json::Value> {
4364    let mut metadata = BTreeMap::new();
4365    metadata.insert(
4366        "trigger_id".to_string(),
4367        serde_json::json!(binding.id.as_str()),
4368    );
4369    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4370    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4371    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
4372    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4373    metadata
4374}
4375
4376fn dlq_node_metadata(
4377    binding: &TriggerBinding,
4378    event: &TriggerEvent,
4379    attempt_count: u32,
4380    final_error: &str,
4381) -> BTreeMap<String, serde_json::Value> {
4382    let mut metadata = BTreeMap::new();
4383    metadata.insert(
4384        "trigger_id".to_string(),
4385        serde_json::json!(binding.id.as_str()),
4386    );
4387    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4388    metadata.insert(
4389        "attempt_count".to_string(),
4390        serde_json::json!(attempt_count),
4391    );
4392    metadata.insert("final_error".to_string(), serde_json::json!(final_error));
4393    metadata
4394}
4395
4396fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
4397    match value {
4398        VmValue::Bool(result) => Ok(result),
4399        VmValue::EnumVariant {
4400            enum_name,
4401            variant,
4402            fields,
4403        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
4404            Some(VmValue::Bool(result)) => Ok(*result),
4405            Some(other) => Err(format!(
4406                "predicate Result.Ok payload must be bool, got {}",
4407                other.type_name()
4408            )),
4409            None => Err("predicate Result.Ok payload is missing".to_string()),
4410        },
4411        VmValue::EnumVariant {
4412            enum_name,
4413            variant,
4414            fields,
4415        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
4416            .first()
4417            .map(VmValue::display)
4418            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
4419        other => Err(format!(
4420            "predicate must return bool or Result<bool, _>, got {}",
4421            other.type_name()
4422        )),
4423    }
4424}
4425
4426fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
4427    micros_to_usd(
4428        binding
4429            .metrics
4430            .cost_today_usd_micros
4431            .load(Ordering::Relaxed),
4432    )
4433}
4434
4435fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
4436    micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
4437}
4438
4439fn split_binding_key(binding_key: &str) -> (String, u32) {
4440    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
4441        return (binding_key.to_string(), 0);
4442    };
4443    let version = suffix.parse::<u32>().unwrap_or(0);
4444    (binding_id.to_string(), version)
4445}
4446
4447fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
4448    match binding_version {
4449        Some(version) => format!("{trigger_id}@v{version}"),
4450        None => trigger_id.to_string(),
4451    }
4452}
4453
4454fn tenant_id(event: &TriggerEvent) -> Option<&str> {
4455    event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
4456}
4457
4458fn current_unix_ms() -> i64 {
4459    unix_ms(time::OffsetDateTime::now_utc())
4460}
4461
4462fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
4463    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
4464}
4465
4466fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4467    lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
4468        .unwrap_or_else(|| unix_ms(event.received_at))
4469}
4470
4471fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4472    lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
4473        .unwrap_or_else(|| accepted_at_ms(headers, event))
4474}
4475
4476fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
4477    headers
4478        .and_then(|headers| headers.get(name))
4479        .and_then(|value| value.parse::<i64>().ok())
4480}
4481
4482fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
4483    Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
4484}
4485
4486fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
4487    match result {
4488        Ok(_) => "succeeded",
4489        Err(DispatchError::Waiting(_)) => "waiting",
4490        Err(DispatchError::Cancelled(_)) => "cancelled",
4491        Err(DispatchError::Denied(_)) => "denied",
4492        Err(DispatchError::Timeout(_)) => "timeout",
4493        Err(_) => "failed",
4494    }
4495}
4496
4497fn is_cancelled_vm_error(error: &VmError) -> bool {
4498    matches!(
4499        error,
4500        VmError::Thrown(VmValue::String(message))
4501            if message.starts_with("kind:cancelled:")
4502    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
4503}
4504
4505fn event_headers(
4506    event: &TriggerEvent,
4507    binding: Option<&TriggerBinding>,
4508    attempt: Option<u32>,
4509    replay_of_event_id: Option<&String>,
4510) -> BTreeMap<String, String> {
4511    let mut headers = BTreeMap::new();
4512    headers.insert("event_id".to_string(), event.id.0.clone());
4513    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
4514    headers.insert("provider".to_string(), event.provider.as_str().to_string());
4515    headers.insert("kind".to_string(), event.kind.clone());
4516    if let Some(replay_of_event_id) = replay_of_event_id {
4517        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
4518    }
4519    if let Some(binding) = binding {
4520        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
4521        headers.insert("binding_key".to_string(), binding.binding_key());
4522        headers.insert(
4523            "handler_kind".to_string(),
4524            DispatchUri::from(&binding.handler).kind().to_string(),
4525        );
4526    }
4527    if let Some(attempt) = attempt {
4528        headers.insert("attempt".to_string(), attempt.to_string());
4529    }
4530    headers
4531}
4532
4533fn worker_queue_priority(
4534    binding: &super::registry::TriggerBinding,
4535    event: &TriggerEvent,
4536) -> crate::WorkerQueuePriority {
4537    match event
4538        .headers
4539        .get("priority")
4540        .map(|value| value.trim().to_ascii_lowercase())
4541        .as_deref()
4542    {
4543        Some("high") => crate::WorkerQueuePriority::High,
4544        Some("low") => crate::WorkerQueuePriority::Low,
4545        _ => binding.dispatch_priority,
4546    }
4547}
4548
4549const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
4550
4551fn maybe_fail_before_outbox() {
4552    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
4553        std::process::exit(86);
4554    }
4555}
4556
4557fn now_rfc3339() -> String {
4558    time::OffsetDateTime::now_utc()
4559        .format(&time::format_description::well_known::Rfc3339)
4560        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4561}
4562
4563fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
4564    let now = time::OffsetDateTime::now_utc();
4565    let reset = if binding.hourly_cost_usd.is_some() {
4566        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
4567        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
4568    } else {
4569        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
4570        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
4571    };
4572    reset
4573        .format(&time::format_description::well_known::Rfc3339)
4574        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4575}
4576
4577fn now_unix_ms() -> i64 {
4578    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
4579}
4580
4581fn cancelled_dispatch_outcome(
4582    binding: &TriggerBinding,
4583    route: &DispatchUri,
4584    event: &TriggerEvent,
4585    replay_of_event_id: Option<String>,
4586    attempt_count: u32,
4587    error: String,
4588) -> DispatchOutcome {
4589    DispatchOutcome {
4590        trigger_id: binding.id.as_str().to_string(),
4591        binding_key: binding.binding_key(),
4592        event_id: event.id.0.clone(),
4593        attempt_count,
4594        status: DispatchStatus::Cancelled,
4595        handler_kind: route.kind().to_string(),
4596        target_uri: route.target_uri(),
4597        replay_of_event_id,
4598        result: None,
4599        error: Some(error),
4600    }
4601}
4602
4603async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
4604    let _ = cancel_rx.recv().await;
4605}
4606
4607#[cfg(test)]
4608mod tests;