Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
16use crate::llm::vm_value_to_json;
17use crate::orchestration::{
18    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
19    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
20    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
21    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
22    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
23    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
24    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
25};
26use crate::stdlib::json_to_vm_value;
27use crate::trust_graph::{
28    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
29};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::{
35    binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
36    expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
37    note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
38    reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
39    TriggerHandlerSpec,
40};
41use super::{
42    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
43    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
44    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
45    TRIGGER_OUTBOX_TOPIC,
46};
47use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
48
49mod flow_control;
50pub mod retry;
51pub mod uri;
52
53pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
54
55pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
56pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
57pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
58const DESTINATION_CIRCUIT_FAILURE_THRESHOLD: u32 = 5;
59const DESTINATION_CIRCUIT_BACKOFF: Duration = Duration::from_secs(60);
60
61thread_local! {
62    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65    #[cfg(test)]
66    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70    static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75    pub trigger_event: TriggerEvent,
76    pub replay_of_event_id: Option<String>,
77    pub binding_id: String,
78    pub binding_version: u32,
79    pub agent_id: String,
80    pub action: String,
81    pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87    fn drop(&mut self) {
88        crate::orchestration::pop_execution_policy();
89    }
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93struct PredicateCacheRecord {
94    trigger_id: String,
95    event_id: String,
96    entries: Vec<PredicateCacheEntry>,
97}
98
99#[derive(Clone, Debug, Default)]
100struct PredicateEvaluationRecord {
101    result: bool,
102    cost_usd: f64,
103    tokens: u64,
104    latency_ms: u64,
105    cached: bool,
106    reason: Option<String>,
107    exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
108}
109
110const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
111
112pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
113    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
114}
115
116pub(crate) fn current_dispatch_is_replay() -> bool {
117    ACTIVE_DISPATCH_IS_REPLAY
118        .try_with(|is_replay| *is_replay)
119        .unwrap_or(false)
120}
121
122pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
123    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
124}
125
126#[derive(Clone)]
127pub struct Dispatcher {
128    base_vm: Rc<Vm>,
129    event_log: Arc<AnyEventLog>,
130    cancel_tx: broadcast::Sender<()>,
131    state: Arc<DispatcherRuntimeState>,
132    metrics: Option<Arc<crate::MetricsRegistry>>,
133}
134
135#[derive(Debug)]
136struct DispatcherRuntimeState {
137    in_flight: AtomicU64,
138    retry_queue_depth: AtomicU64,
139    dlq: Mutex<Vec<DlqEntry>>,
140    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
141    shutting_down: std::sync::atomic::AtomicBool,
142    idle_notify: Notify,
143    flow_control: FlowControlManager,
144    destination_circuits: DestinationCircuitRegistry,
145}
146
147impl DispatcherRuntimeState {
148    fn new(event_log: Arc<AnyEventLog>) -> Self {
149        Self {
150            in_flight: AtomicU64::new(0),
151            retry_queue_depth: AtomicU64::new(0),
152            dlq: Mutex::new(Vec::new()),
153            cancel_tokens: Mutex::new(Vec::new()),
154            shutting_down: std::sync::atomic::AtomicBool::new(false),
155            idle_notify: Notify::new(),
156            flow_control: FlowControlManager::new(event_log),
157            destination_circuits: DestinationCircuitRegistry::default(),
158        }
159    }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq)]
163enum DestinationCircuitProbe {
164    Allow { half_open: bool },
165    Block { retry_after: Duration },
166}
167
168#[derive(Debug)]
169struct DestinationCircuitRegistry {
170    threshold: u32,
171    backoff: Duration,
172    states: Mutex<BTreeMap<String, DestinationCircuitState>>,
173}
174
175#[derive(Clone, Debug)]
176struct DestinationCircuitState {
177    failures: u32,
178    opened_at: Option<Instant>,
179}
180
181impl Default for DestinationCircuitRegistry {
182    fn default() -> Self {
183        Self {
184            threshold: DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
185            backoff: DESTINATION_CIRCUIT_BACKOFF,
186            states: Mutex::new(BTreeMap::new()),
187        }
188    }
189}
190
191impl DestinationCircuitRegistry {
192    fn check(&self, destination: &str) -> DestinationCircuitProbe {
193        let mut states = self
194            .states
195            .lock()
196            .expect("destination circuit registry poisoned");
197        let Some(state) = states.get_mut(destination) else {
198            return DestinationCircuitProbe::Allow { half_open: false };
199        };
200        let Some(opened_at) = state.opened_at else {
201            return DestinationCircuitProbe::Allow { half_open: false };
202        };
203        let elapsed = opened_at.elapsed();
204        if elapsed >= self.backoff {
205            DestinationCircuitProbe::Allow { half_open: true }
206        } else {
207            DestinationCircuitProbe::Block {
208                retry_after: self.backoff.saturating_sub(elapsed),
209            }
210        }
211    }
212
213    fn record_success(&self, destination: &str) {
214        let mut states = self
215            .states
216            .lock()
217            .expect("destination circuit registry poisoned");
218        states.remove(destination);
219    }
220
221    fn record_failure(&self, destination: &str) -> bool {
222        let mut states = self
223            .states
224            .lock()
225            .expect("destination circuit registry poisoned");
226        let state = states
227            .entry(destination.to_string())
228            .or_insert(DestinationCircuitState {
229                failures: 0,
230                opened_at: None,
231            });
232        if state.opened_at.is_some() {
233            state.failures = self.threshold;
234            state.opened_at = Some(Instant::now());
235            return true;
236        }
237        state.failures = state.failures.saturating_add(1);
238        if state.failures >= self.threshold {
239            state.opened_at = Some(Instant::now());
240            true
241        } else {
242            false
243        }
244    }
245}
246
247#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
248#[serde(default)]
249pub struct DispatcherStatsSnapshot {
250    pub in_flight: u64,
251    pub retry_queue_depth: u64,
252    pub dlq_depth: u64,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256#[serde(rename_all = "snake_case")]
257pub enum DispatchStatus {
258    Succeeded,
259    Failed,
260    Dlq,
261    Skipped,
262    Waiting,
263    Cancelled,
264}
265
266impl DispatchStatus {
267    pub fn as_str(&self) -> &'static str {
268        match self {
269            Self::Succeeded => "succeeded",
270            Self::Failed => "failed",
271            Self::Dlq => "dlq",
272            Self::Skipped => "skipped",
273            Self::Waiting => "waiting",
274            Self::Cancelled => "cancelled",
275        }
276    }
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280#[serde(default)]
281pub struct DispatchOutcome {
282    pub trigger_id: String,
283    pub binding_key: String,
284    pub event_id: String,
285    pub attempt_count: u32,
286    pub status: DispatchStatus,
287    pub handler_kind: String,
288    pub target_uri: String,
289    pub replay_of_event_id: Option<String>,
290    pub result: Option<serde_json::Value>,
291    pub error: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct InboxEnvelope {
296    pub trigger_id: Option<String>,
297    pub binding_version: Option<u32>,
298    pub event: TriggerEvent,
299}
300
301#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
302#[serde(default)]
303pub struct DispatcherDrainReport {
304    pub drained: bool,
305    pub in_flight: u64,
306    pub retry_queue_depth: u64,
307    pub dlq_depth: u64,
308}
309
310impl Default for DispatchOutcome {
311    fn default() -> Self {
312        Self {
313            trigger_id: String::new(),
314            binding_key: String::new(),
315            event_id: String::new(),
316            attempt_count: 0,
317            status: DispatchStatus::Failed,
318            handler_kind: String::new(),
319            target_uri: String::new(),
320            replay_of_event_id: None,
321            result: None,
322            error: None,
323        }
324    }
325}
326
327#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
328#[serde(default)]
329pub struct DispatchAttemptRecord {
330    pub trigger_id: String,
331    pub binding_key: String,
332    pub event_id: String,
333    pub attempt: u32,
334    pub handler_kind: String,
335    pub started_at: String,
336    pub completed_at: String,
337    pub outcome: String,
338    pub error_msg: Option<String>,
339}
340
341#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
342pub struct DispatchCancelRequest {
343    pub binding_key: String,
344    pub event_id: String,
345    #[serde(with = "time::serde::rfc3339")]
346    pub requested_at: time::OffsetDateTime,
347    #[serde(default)]
348    pub requested_by: Option<String>,
349    #[serde(default)]
350    pub audit_id: Option<String>,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct DlqEntry {
355    pub trigger_id: String,
356    pub binding_key: String,
357    pub event: TriggerEvent,
358    pub attempt_count: u32,
359    pub final_error: String,
360    #[serde(default = "default_dlq_error_class")]
361    pub error_class: String,
362    pub attempts: Vec<DispatchAttemptRecord>,
363}
364
365fn default_dlq_error_class() -> String {
366    "unknown".to_string()
367}
368
369#[derive(Clone, Debug)]
370struct SingletonLease {
371    gate: String,
372    held: bool,
373}
374
375#[derive(Clone, Debug)]
376struct ConcurrencyLease {
377    gate: String,
378    max: u32,
379    priority_rank: usize,
380    permit: Option<ConcurrencyPermit>,
381}
382
383#[derive(Default, Debug)]
384struct AcquiredFlowControl {
385    singleton: Option<SingletonLease>,
386    concurrency: Option<ConcurrencyLease>,
387}
388
389#[derive(Clone)]
390pub(crate) struct DispatchWaitLease {
391    state: Arc<DispatcherRuntimeState>,
392    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
393    suspended: Arc<AtomicBool>,
394}
395
396impl DispatchWaitLease {
397    fn new(
398        state: Arc<DispatcherRuntimeState>,
399        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
400    ) -> Self {
401        Self {
402            state,
403            acquired,
404            suspended: Arc::new(AtomicBool::new(false)),
405        }
406    }
407
408    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
409        if self.suspended.swap(true, Ordering::SeqCst) {
410            return Ok(());
411        }
412        let (singleton_gate, concurrency_permit) = {
413            let mut acquired = self.acquired.lock().await;
414            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
415                if lease.held {
416                    lease.held = false;
417                    Some(lease.gate.clone())
418                } else {
419                    None
420                }
421            });
422            let concurrency_permit = acquired
423                .concurrency
424                .as_mut()
425                .and_then(|lease| lease.permit.take());
426            (singleton_gate, concurrency_permit)
427        };
428
429        if let Some(gate) = singleton_gate {
430            self.state
431                .flow_control
432                .release_singleton(&gate)
433                .await
434                .map_err(DispatchError::from)?;
435        }
436        if let Some(permit) = concurrency_permit {
437            self.state
438                .flow_control
439                .release_concurrency(permit)
440                .await
441                .map_err(DispatchError::from)?;
442        }
443        Ok(())
444    }
445
446    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
447        if !self.suspended.swap(false, Ordering::SeqCst) {
448            return Ok(());
449        }
450
451        let singleton_gate = {
452            let acquired = self.acquired.lock().await;
453            acquired.singleton.as_ref().and_then(|lease| {
454                if lease.held {
455                    None
456                } else {
457                    Some(lease.gate.clone())
458                }
459            })
460        };
461        if let Some(gate) = singleton_gate {
462            self.state
463                .flow_control
464                .acquire_singleton(&gate)
465                .await
466                .map_err(DispatchError::from)?;
467            let mut acquired = self.acquired.lock().await;
468            if let Some(lease) = acquired.singleton.as_mut() {
469                lease.held = true;
470            }
471        }
472
473        let concurrency_spec = {
474            let acquired = self.acquired.lock().await;
475            acquired.concurrency.as_ref().and_then(|lease| {
476                if lease.permit.is_some() {
477                    None
478                } else {
479                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
480                }
481            })
482        };
483        if let Some((gate, max, priority_rank)) = concurrency_spec {
484            let permit = self
485                .state
486                .flow_control
487                .acquire_concurrency(&gate, max, priority_rank)
488                .await
489                .map_err(DispatchError::from)?;
490            let mut acquired = self.acquired.lock().await;
491            if let Some(lease) = acquired.concurrency.as_mut() {
492                lease.permit = Some(permit);
493            }
494        }
495        Ok(())
496    }
497}
498
499enum FlowControlOutcome {
500    Dispatch {
501        event: Box<TriggerEvent>,
502        acquired: AcquiredFlowControl,
503    },
504    Skip {
505        reason: String,
506    },
507}
508
509#[derive(Clone, Debug)]
510enum DispatchSkipStage {
511    Predicate,
512    FlowControl,
513}
514
515#[derive(Debug)]
516pub enum DispatchError {
517    EventLog(String),
518    Registry(String),
519    Serde(String),
520    Local(String),
521    A2a(String),
522    Denied(String),
523    Timeout(String),
524    Waiting(String),
525    Cancelled(String),
526    NotImplemented(String),
527}
528
529impl std::fmt::Display for DispatchError {
530    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531        match self {
532            Self::EventLog(message)
533            | Self::Registry(message)
534            | Self::Serde(message)
535            | Self::Local(message)
536            | Self::A2a(message)
537            | Self::Denied(message)
538            | Self::Timeout(message)
539            | Self::Waiting(message)
540            | Self::Cancelled(message)
541            | Self::NotImplemented(message) => f.write_str(message),
542        }
543    }
544}
545
546impl std::error::Error for DispatchError {}
547
548impl DispatchError {
549    fn retryable(&self) -> bool {
550        !matches!(
551            self,
552            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
553        )
554    }
555}
556
557impl DispatchSkipStage {
558    fn as_str(&self) -> &'static str {
559        match self {
560            Self::Predicate => "predicate",
561            Self::FlowControl => "flow_control",
562        }
563    }
564}
565
566impl From<LogError> for DispatchError {
567    fn from(value: LogError) -> Self {
568        Self::EventLog(value.to_string())
569    }
570}
571
572pub async fn append_dispatch_cancel_request(
573    event_log: &Arc<AnyEventLog>,
574    request: &DispatchCancelRequest,
575) -> Result<u64, DispatchError> {
576    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
577        .expect("static trigger cancel topic should always be valid");
578    event_log
579        .append(
580            &topic,
581            LogEvent::new(
582                "dispatch_cancel_requested",
583                serde_json::to_value(request)
584                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
585            ),
586        )
587        .await
588        .map_err(DispatchError::from)
589}
590
591impl Dispatcher {
592    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
593        self.event_log.clone()
594    }
595
596    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
597        let event_log = active_event_log().ok_or_else(|| {
598            DispatchError::EventLog("dispatcher requires an active event log".to_string())
599        })?;
600        Ok(Self::with_event_log(base_vm, event_log))
601    }
602
603    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
604        Self::with_event_log_and_metrics(base_vm, event_log, None)
605    }
606
607    pub fn with_event_log_and_metrics(
608        base_vm: Vm,
609        event_log: Arc<AnyEventLog>,
610        metrics: Option<Arc<crate::MetricsRegistry>>,
611    ) -> Self {
612        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
613        ACTIVE_DISPATCHER_STATE.with(|slot| {
614            *slot.borrow_mut() = Some(state.clone());
615        });
616        let (cancel_tx, _) = broadcast::channel(32);
617        Self {
618            base_vm: Rc::new(base_vm),
619            event_log,
620            cancel_tx,
621            state,
622            metrics,
623        }
624    }
625
626    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
627        DispatcherStatsSnapshot {
628            in_flight: self.state.in_flight.load(Ordering::Relaxed),
629            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
630            dlq_depth: self
631                .state
632                .dlq
633                .lock()
634                .expect("dispatcher dlq poisoned")
635                .len() as u64,
636        }
637    }
638
639    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
640        self.state
641            .dlq
642            .lock()
643            .expect("dispatcher dlq poisoned")
644            .clone()
645    }
646
647    pub fn shutdown(&self) {
648        self.state.shutting_down.store(true, Ordering::SeqCst);
649        for token in self
650            .state
651            .cancel_tokens
652            .lock()
653            .expect("dispatcher cancel tokens poisoned")
654            .iter()
655        {
656            token.store(true, Ordering::SeqCst);
657        }
658        let _ = self.cancel_tx.send(());
659    }
660
661    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
662        self.enqueue_targeted(None, None, event).await
663    }
664
665    pub async fn enqueue_targeted(
666        &self,
667        trigger_id: Option<String>,
668        binding_version: Option<u32>,
669        event: TriggerEvent,
670    ) -> Result<u64, DispatchError> {
671        self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
672            .await
673    }
674
675    pub async fn enqueue_targeted_with_headers(
676        &self,
677        trigger_id: Option<String>,
678        binding_version: Option<u32>,
679        event: TriggerEvent,
680        parent_headers: Option<&BTreeMap<String, String>>,
681    ) -> Result<u64, DispatchError> {
682        let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
683        let trigger_id_for_metrics = trigger_id.clone();
684        let mut headers = parent_headers.cloned().unwrap_or_default();
685        headers.extend(event_headers(&event, None, None, None));
686        if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
687            headers.insert("trigger_id".to_string(), trigger_id.clone());
688            headers.insert(
689                "binding_key".to_string(),
690                binding_key_from_parts(trigger_id, binding_version),
691            );
692        }
693        headers
694            .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
695            .or_insert_with(|| unix_ms(event.received_at).to_string());
696        let payload = serde_json::to_value(InboxEnvelope {
697            trigger_id,
698            binding_version,
699            event: event.clone(),
700        })
701        .map_err(|error| DispatchError::Serde(error.to_string()))?;
702        let mut log_event = LogEvent::new("event_ingested", payload);
703        let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
704        let queue_appended_at_ms = headers
705            .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
706            .and_then(|value| value.parse::<i64>().ok())
707            .unwrap_or(log_event.occurred_at_ms);
708        headers
709            .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
710            .or_insert_with(|| log_event.occurred_at_ms.to_string());
711        if let (Some(metrics), Some(trigger_id)) =
712            (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
713        {
714            let binding_key = binding_key_from_parts(trigger_id, binding_version);
715            let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
716            if !had_queue_appended_at {
717                metrics.record_trigger_accepted_to_queue_append(
718                    trigger_id,
719                    &binding_key,
720                    event.provider.as_str(),
721                    tenant_id(&event),
722                    "queued",
723                    duration_between_ms(queue_appended_at_ms, accepted_at_ms),
724                );
725            }
726            metrics.note_trigger_pending_event(
727                event.id.0.as_str(),
728                trigger_id,
729                &binding_key,
730                event.provider.as_str(),
731                tenant_id(&event),
732                accepted_at_ms,
733                queue_appended_at_ms,
734            );
735        }
736        log_event.headers = headers;
737        self.event_log
738            .append(&topic, log_event)
739            .await
740            .map_err(DispatchError::from)
741    }
742
743    pub async fn run(&self) -> Result<(), DispatchError> {
744        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
745            .expect("static trigger inbox envelopes topic is valid");
746        let start_from = self.event_log.latest(&topic).await?;
747        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
748        pin_mut!(stream);
749        let mut cancel_rx = self.cancel_tx.subscribe();
750
751        loop {
752            tokio::select! {
753                received = stream.next() => {
754                    let Some(received) = received else {
755                        break;
756                    };
757                    let (_, event) = received.map_err(DispatchError::from)?;
758                    if event.kind != "event_ingested" {
759                        continue;
760                    }
761                    let parent_headers = event.headers.clone();
762                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
763                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
764                    notify_test_inbox_dequeued();
765                    let _ = self
766                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
767                        .await;
768                }
769                _ = recv_cancel(&mut cancel_rx) => break,
770            }
771        }
772
773        Ok(())
774    }
775
776    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
777        let deadline = tokio::time::Instant::now() + timeout;
778        loop {
779            let snapshot = self.snapshot();
780            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
781                return Ok(DispatcherDrainReport {
782                    drained: true,
783                    in_flight: snapshot.in_flight,
784                    retry_queue_depth: snapshot.retry_queue_depth,
785                    dlq_depth: snapshot.dlq_depth,
786                });
787            }
788
789            let notified = self.state.idle_notify.notified();
790            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
791            if remaining.is_zero() {
792                return Ok(DispatcherDrainReport {
793                    drained: false,
794                    in_flight: snapshot.in_flight,
795                    retry_queue_depth: snapshot.retry_queue_depth,
796                    dlq_depth: snapshot.dlq_depth,
797                });
798            }
799            if tokio::time::timeout(remaining, notified).await.is_err() {
800                let snapshot = self.snapshot();
801                return Ok(DispatcherDrainReport {
802                    drained: false,
803                    in_flight: snapshot.in_flight,
804                    retry_queue_depth: snapshot.retry_queue_depth,
805                    dlq_depth: snapshot.dlq_depth,
806                });
807            }
808        }
809    }
810
811    pub async fn dispatch_inbox_envelope(
812        &self,
813        envelope: InboxEnvelope,
814    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
815        self.dispatch_inbox_envelope_with_headers(envelope, None)
816            .await
817    }
818
819    pub async fn dispatch_inbox_envelope_with_parent_headers(
820        &self,
821        envelope: InboxEnvelope,
822        parent_headers: &BTreeMap<String, String>,
823    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
824        self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
825            .await
826    }
827
828    async fn dispatch_inbox_envelope_with_headers(
829        &self,
830        envelope: InboxEnvelope,
831        parent_headers: Option<&BTreeMap<String, String>>,
832    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
833        if let Some(trigger_id) = envelope.trigger_id {
834            let binding = super::registry::resolve_live_trigger_binding(
835                &trigger_id,
836                envelope.binding_version,
837            )
838            .map_err(|error| DispatchError::Registry(error.to_string()))?;
839            return Ok(vec![
840                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
841                    .await?,
842            ]);
843        }
844
845        let cron_target = match &envelope.event.provider_payload {
846            crate::triggers::ProviderPayload::Known(
847                crate::triggers::event::KnownProviderPayload::Cron(payload),
848            ) => payload.cron_id.clone(),
849            _ => None,
850        };
851        if let Some(trigger_id) = cron_target {
852            let binding = super::registry::resolve_live_trigger_binding(
853                &trigger_id,
854                envelope.binding_version,
855            )
856            .map_err(|error| DispatchError::Registry(error.to_string()))?;
857            return Ok(vec![
858                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
859                    .await?,
860            ]);
861        }
862
863        self.dispatch_event_with_headers(envelope.event, parent_headers)
864            .await
865    }
866
867    pub async fn dispatch_event(
868        &self,
869        event: TriggerEvent,
870    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
871        self.dispatch_event_with_headers(event, None).await
872    }
873
874    async fn dispatch_event_with_headers(
875        &self,
876        event: TriggerEvent,
877        parent_headers: Option<&BTreeMap<String, String>>,
878    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
879        let bindings = matching_bindings(&event);
880        let mut outcomes = Vec::new();
881        for binding in bindings {
882            outcomes.push(
883                self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
884                    .await?,
885            );
886        }
887        Ok(outcomes)
888    }
889
890    pub async fn dispatch(
891        &self,
892        binding: &TriggerBinding,
893        event: TriggerEvent,
894    ) -> Result<DispatchOutcome, DispatchError> {
895        self.dispatch_with_replay(binding, event, None, None, None)
896            .await
897    }
898
899    pub async fn dispatch_replay(
900        &self,
901        binding: &TriggerBinding,
902        event: TriggerEvent,
903        replay_of_event_id: String,
904    ) -> Result<DispatchOutcome, DispatchError> {
905        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
906            .await
907    }
908
909    pub async fn dispatch_with_parent_span_id(
910        &self,
911        binding: &TriggerBinding,
912        event: TriggerEvent,
913        parent_span_id: Option<String>,
914    ) -> Result<DispatchOutcome, DispatchError> {
915        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
916            .await
917    }
918
919    async fn dispatch_with_replay(
920        &self,
921        binding: &TriggerBinding,
922        event: TriggerEvent,
923        replay_of_event_id: Option<String>,
924        parent_span_id: Option<String>,
925        parent_headers: Option<&BTreeMap<String, String>>,
926    ) -> Result<DispatchOutcome, DispatchError> {
927        let parent_headers_for_metrics = parent_headers.cloned();
928        let admitted_at_ms = current_unix_ms();
929        if let Some(metrics) = self.metrics.as_ref() {
930            let binding_key = binding.binding_key();
931            let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
932            metrics.record_trigger_queue_age_at_dispatch_admission(
933                binding.id.as_str(),
934                &binding_key,
935                event.provider.as_str(),
936                tenant_id(&event),
937                "admitted",
938                duration_between_ms(admitted_at_ms, queue_appended_at_ms),
939            );
940            metrics.clear_trigger_pending_event(
941                event.id.0.as_str(),
942                binding.id.as_str(),
943                &binding_key,
944                event.provider.as_str(),
945                tenant_id(&event),
946                admitted_at_ms,
947            );
948        }
949        let span = tracing::info_span!(
950            "dispatch",
951            trigger_id = %binding.id.as_str(),
952            binding_version = binding.version,
953            trace_id = %event.trace_id.0
954        );
955        #[cfg(feature = "otel")]
956        let span_for_otel = span.clone();
957        let _ = if let Some(headers) = parent_headers {
958            crate::observability::otel::set_span_parent_from_headers(
959                &span,
960                headers,
961                &event.trace_id,
962                parent_span_id.as_deref(),
963            )
964        } else {
965            crate::observability::otel::set_span_parent(
966                &span,
967                &event.trace_id,
968                parent_span_id.as_deref(),
969            )
970        };
971        #[cfg(feature = "otel")]
972        let started_at = Instant::now();
973        let metrics = self.metrics.clone();
974        let outcome = ACTIVE_DISPATCH_IS_REPLAY
975            .scope(
976                replay_of_event_id.is_some(),
977                self.dispatch_with_replay_inner(
978                    binding,
979                    event,
980                    replay_of_event_id,
981                    parent_headers_for_metrics,
982                )
983                .instrument(span),
984            )
985            .await;
986        if let Some(metrics) = metrics.as_ref() {
987            match &outcome {
988                Ok(dispatch_outcome) => match dispatch_outcome.status {
989                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
990                        metrics.record_dispatch_succeeded();
991                    }
992                    DispatchStatus::Waiting => {}
993                    _ => metrics.record_dispatch_failed(),
994                },
995                Err(_) => metrics.record_dispatch_failed(),
996            }
997            let outcome_label = match &outcome {
998                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
999                Err(DispatchError::Cancelled(_)) => "cancelled",
1000                Err(_) => "failed",
1001            };
1002            metrics.record_trigger_dispatched(
1003                binding.id.as_str(),
1004                binding.handler.kind(),
1005                outcome_label,
1006            );
1007            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
1008        }
1009        #[cfg(feature = "otel")]
1010        {
1011            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1012
1013            let duration_ms = started_at.elapsed().as_millis() as i64;
1014            let status = match &outcome {
1015                Ok(dispatch_outcome) => match dispatch_outcome.status {
1016                    DispatchStatus::Succeeded => "succeeded",
1017                    DispatchStatus::Skipped => "skipped",
1018                    DispatchStatus::Waiting => "waiting",
1019                    DispatchStatus::Cancelled => "cancelled",
1020                    DispatchStatus::Failed => "failed",
1021                    DispatchStatus::Dlq => "dlq",
1022                },
1023                Err(DispatchError::Cancelled(_)) => "cancelled",
1024                Err(_) => "failed",
1025            };
1026            span_for_otel.set_attribute("result.status", status);
1027            span_for_otel.set_attribute("result.duration_ms", duration_ms);
1028        }
1029        outcome
1030    }
1031
1032    async fn dispatch_with_replay_inner(
1033        &self,
1034        binding: &TriggerBinding,
1035        event: TriggerEvent,
1036        replay_of_event_id: Option<String>,
1037        parent_headers: Option<BTreeMap<String, String>>,
1038    ) -> Result<DispatchOutcome, DispatchError> {
1039        let autonomy_tier = crate::resolve_agent_autonomy_tier(
1040            &self.event_log,
1041            binding.id.as_str(),
1042            binding.autonomy_tier,
1043        )
1044        .await
1045        .unwrap_or(binding.autonomy_tier);
1046        let binding_key = binding.binding_key();
1047        let route = DispatchUri::from(&binding.handler);
1048        let trigger_id = binding.id.as_str().to_string();
1049        let event_id = event.id.0.clone();
1050        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
1051        let begin = if replay_of_event_id.is_some() {
1052            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
1053        } else {
1054            begin_in_flight(binding.id.as_str(), binding.version)
1055        };
1056        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
1057
1058        let mut attempts = Vec::new();
1059        let mut source_node_id = format!("trigger:{}", event.id.0);
1060        let mut initial_nodes = Vec::new();
1061        let mut initial_edges = Vec::new();
1062        if let Some(original_event_id) = replay_of_event_id.as_ref() {
1063            let original_node_id = format!("trigger:{original_event_id}");
1064            initial_nodes.push(RunActionGraphNodeRecord {
1065                id: original_node_id.clone(),
1066                label: format!(
1067                    "{}:{} (original {})",
1068                    event.provider.as_str(),
1069                    event.kind,
1070                    original_event_id
1071                ),
1072                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1073                status: "historical".to_string(),
1074                outcome: "replayed_from".to_string(),
1075                trace_id: Some(event.trace_id.0.clone()),
1076                stage_id: None,
1077                node_id: None,
1078                worker_id: None,
1079                run_id: None,
1080                run_path: None,
1081                metadata: trigger_node_metadata(&event),
1082            });
1083            initial_edges.push(RunActionGraphEdgeRecord {
1084                from_id: original_node_id,
1085                to_id: source_node_id.clone(),
1086                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1087                label: Some("replay chain".to_string()),
1088            });
1089        }
1090        initial_nodes.push(RunActionGraphNodeRecord {
1091            id: source_node_id.clone(),
1092            label: format!("{}:{}", event.provider.as_str(), event.kind),
1093            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1094            status: "received".to_string(),
1095            outcome: "received".to_string(),
1096            trace_id: Some(event.trace_id.0.clone()),
1097            stage_id: None,
1098            node_id: None,
1099            worker_id: None,
1100            run_id: None,
1101            run_path: None,
1102            metadata: trigger_node_metadata(&event),
1103        });
1104        self.emit_action_graph(
1105            &event,
1106            initial_nodes,
1107            initial_edges,
1108            serde_json::json!({
1109                "source": "dispatcher",
1110                "trigger_id": trigger_id,
1111                "binding_key": binding_key,
1112                "event_id": event_id,
1113                "replay_of_event_id": replay_of_event_id,
1114            }),
1115        )
1116        .await?;
1117
1118        if dispatch_cancel_requested(
1119            &self.event_log,
1120            &binding_key,
1121            &event.id.0,
1122            replay_of_event_id.as_ref(),
1123        )
1124        .await?
1125        {
1126            finish_in_flight(
1127                binding.id.as_str(),
1128                binding.version,
1129                TriggerDispatchOutcome::Failed,
1130            )
1131            .await
1132            .map_err(|error| DispatchError::Registry(error.to_string()))?;
1133            decrement_in_flight(&self.state);
1134            return Ok(cancelled_dispatch_outcome(
1135                binding,
1136                &route,
1137                &event,
1138                replay_of_event_id,
1139                0,
1140                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1141            ));
1142        }
1143
1144        if let Some(predicate) = binding.when.as_ref() {
1145            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1146            let evaluation = self
1147                .evaluate_predicate(
1148                    binding,
1149                    predicate,
1150                    &event,
1151                    replay_of_event_id.as_ref(),
1152                    autonomy_tier,
1153                )
1154                .await?;
1155            let passed = evaluation.result;
1156            self.emit_action_graph(
1157                &event,
1158                vec![RunActionGraphNodeRecord {
1159                    id: predicate_node_id.clone(),
1160                    label: predicate.raw.clone(),
1161                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1162                    status: "completed".to_string(),
1163                    outcome: passed.to_string(),
1164                    trace_id: Some(event.trace_id.0.clone()),
1165                    stage_id: None,
1166                    node_id: None,
1167                    worker_id: None,
1168                    run_id: None,
1169                    run_path: None,
1170                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1171                }],
1172                vec![RunActionGraphEdgeRecord {
1173                    from_id: source_node_id.clone(),
1174                    to_id: predicate_node_id.clone(),
1175                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1176                    label: None,
1177                }],
1178                serde_json::json!({
1179                    "source": "dispatcher",
1180                    "trigger_id": binding.id.as_str(),
1181                    "binding_key": binding.binding_key(),
1182                    "event_id": event.id.0,
1183                    "predicate": predicate.raw,
1184                    "reason": evaluation.reason,
1185                    "cached": evaluation.cached,
1186                    "cost_usd": evaluation.cost_usd,
1187                    "tokens": evaluation.tokens,
1188                    "latency_ms": evaluation.latency_ms,
1189                    "replay_of_event_id": replay_of_event_id,
1190                }),
1191            )
1192            .await?;
1193
1194            if !passed {
1195                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1196                    let final_error = format!(
1197                        "trigger budget exhausted: {}",
1198                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1199                    );
1200                    self.move_budget_exhausted_to_dlq(
1201                        binding,
1202                        &route,
1203                        &event,
1204                        replay_of_event_id.as_ref(),
1205                        &final_error,
1206                    )
1207                    .await?;
1208                    finish_in_flight(
1209                        binding.id.as_str(),
1210                        binding.version,
1211                        TriggerDispatchOutcome::Dlq,
1212                    )
1213                    .await
1214                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1215                    decrement_in_flight(&self.state);
1216                    self.append_dispatch_trust_record(
1217                        binding,
1218                        &route,
1219                        &event,
1220                        replay_of_event_id.as_ref(),
1221                        autonomy_tier,
1222                        TrustOutcome::Failure,
1223                        "dlq",
1224                        0,
1225                        Some(final_error.clone()),
1226                    )
1227                    .await?;
1228                    return Ok(DispatchOutcome {
1229                        trigger_id: binding.id.as_str().to_string(),
1230                        binding_key: binding.binding_key(),
1231                        event_id: event.id.0,
1232                        attempt_count: 0,
1233                        status: DispatchStatus::Dlq,
1234                        handler_kind: route.kind().to_string(),
1235                        target_uri: route.target_uri(),
1236                        replay_of_event_id,
1237                        result: None,
1238                        error: Some(final_error),
1239                    });
1240                }
1241
1242                if evaluation.exhaustion_strategy
1243                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1244                {
1245                    self.append_budget_deferred_event(
1246                        binding,
1247                        &route,
1248                        &event,
1249                        replay_of_event_id.as_ref(),
1250                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1251                    )
1252                    .await?;
1253                    finish_in_flight(
1254                        binding.id.as_str(),
1255                        binding.version,
1256                        TriggerDispatchOutcome::Dispatched,
1257                    )
1258                    .await
1259                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1260                    decrement_in_flight(&self.state);
1261                    self.append_dispatch_trust_record(
1262                        binding,
1263                        &route,
1264                        &event,
1265                        replay_of_event_id.as_ref(),
1266                        autonomy_tier,
1267                        TrustOutcome::Denied,
1268                        "waiting",
1269                        0,
1270                        evaluation.reason.clone(),
1271                    )
1272                    .await?;
1273                    return Ok(DispatchOutcome {
1274                        trigger_id: binding.id.as_str().to_string(),
1275                        binding_key: binding.binding_key(),
1276                        event_id: event.id.0,
1277                        attempt_count: 0,
1278                        status: DispatchStatus::Waiting,
1279                        handler_kind: route.kind().to_string(),
1280                        target_uri: route.target_uri(),
1281                        replay_of_event_id,
1282                        result: Some(serde_json::json!({
1283                            "deferred": true,
1284                            "predicate": predicate.raw,
1285                            "reason": evaluation.reason,
1286                        })),
1287                        error: None,
1288                    });
1289                }
1290
1291                self.append_skipped_outbox_event(
1292                    binding,
1293                    &route,
1294                    &event,
1295                    replay_of_event_id.as_ref(),
1296                    DispatchSkipStage::Predicate,
1297                    serde_json::json!({
1298                        "predicate": predicate.raw,
1299                        "reason": evaluation.reason,
1300                    }),
1301                )
1302                .await?;
1303                finish_in_flight(
1304                    binding.id.as_str(),
1305                    binding.version,
1306                    TriggerDispatchOutcome::Dispatched,
1307                )
1308                .await
1309                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1310                decrement_in_flight(&self.state);
1311                self.append_dispatch_trust_record(
1312                    binding,
1313                    &route,
1314                    &event,
1315                    replay_of_event_id.as_ref(),
1316                    autonomy_tier,
1317                    TrustOutcome::Denied,
1318                    "skipped",
1319                    0,
1320                    None,
1321                )
1322                .await?;
1323                return Ok(DispatchOutcome {
1324                    trigger_id: binding.id.as_str().to_string(),
1325                    binding_key: binding.binding_key(),
1326                    event_id: event.id.0,
1327                    attempt_count: 0,
1328                    status: DispatchStatus::Skipped,
1329                    handler_kind: route.kind().to_string(),
1330                    target_uri: route.target_uri(),
1331                    replay_of_event_id,
1332                    result: Some(serde_json::json!({
1333                        "skipped": true,
1334                        "predicate": predicate.raw,
1335                        "reason": evaluation.reason,
1336                    })),
1337                    error: None,
1338                });
1339            }
1340
1341            source_node_id = predicate_node_id;
1342        }
1343
1344        if autonomy_tier == AutonomyTier::ActAuto {
1345            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1346                let request_id = self
1347                    .append_autonomy_budget_approval_request(
1348                        binding,
1349                        &route,
1350                        &event,
1351                        replay_of_event_id.as_ref(),
1352                        reason,
1353                    )
1354                    .await?;
1355                self.emit_autonomy_budget_approval_action_graph(
1356                    binding,
1357                    &route,
1358                    &event,
1359                    &source_node_id,
1360                    replay_of_event_id.as_ref(),
1361                    reason,
1362                    &request_id,
1363                )
1364                .await?;
1365                finish_in_flight(
1366                    binding.id.as_str(),
1367                    binding.version,
1368                    TriggerDispatchOutcome::Dispatched,
1369                )
1370                .await
1371                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1372                decrement_in_flight(&self.state);
1373                self.append_tier_transition_trust_record(
1374                    binding,
1375                    &event,
1376                    replay_of_event_id.as_ref(),
1377                    autonomy_tier,
1378                    AutonomyTier::ActWithApproval,
1379                    reason,
1380                    &request_id,
1381                )
1382                .await?;
1383                self.append_dispatch_trust_record(
1384                    binding,
1385                    &route,
1386                    &event,
1387                    replay_of_event_id.as_ref(),
1388                    autonomy_tier,
1389                    TrustOutcome::Denied,
1390                    "waiting",
1391                    0,
1392                    Some(reason.to_string()),
1393                )
1394                .await?;
1395                return Ok(DispatchOutcome {
1396                    trigger_id: binding.id.as_str().to_string(),
1397                    binding_key: binding.binding_key(),
1398                    event_id: event.id.0,
1399                    attempt_count: 0,
1400                    status: DispatchStatus::Waiting,
1401                    handler_kind: route.kind().to_string(),
1402                    target_uri: route.target_uri(),
1403                    replay_of_event_id,
1404                    result: Some(serde_json::json!({
1405                        "approval_required": true,
1406                        "request_id": request_id,
1407                        "reason": reason,
1408                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1409                    })),
1410                    error: None,
1411                });
1412            }
1413            note_autonomous_decision(binding);
1414        }
1415
1416        let (event, acquired_flow) = match self
1417            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1418            .await?
1419        {
1420            FlowControlOutcome::Dispatch { event, acquired } => {
1421                (*event, Arc::new(AsyncMutex::new(acquired)))
1422            }
1423            FlowControlOutcome::Skip { reason } => {
1424                self.append_skipped_outbox_event(
1425                    binding,
1426                    &route,
1427                    &event,
1428                    replay_of_event_id.as_ref(),
1429                    DispatchSkipStage::FlowControl,
1430                    serde_json::json!({
1431                        "flow_control": reason,
1432                    }),
1433                )
1434                .await?;
1435                finish_in_flight(
1436                    binding.id.as_str(),
1437                    binding.version,
1438                    TriggerDispatchOutcome::Dispatched,
1439                )
1440                .await
1441                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1442                decrement_in_flight(&self.state);
1443                return Ok(DispatchOutcome {
1444                    trigger_id: binding.id.as_str().to_string(),
1445                    binding_key: binding.binding_key(),
1446                    event_id: event.id.0,
1447                    attempt_count: 0,
1448                    status: DispatchStatus::Skipped,
1449                    handler_kind: route.kind().to_string(),
1450                    target_uri: route.target_uri(),
1451                    replay_of_event_id,
1452                    result: Some(serde_json::json!({
1453                        "skipped": true,
1454                        "flow_control": reason,
1455                    })),
1456                    error: None,
1457                });
1458            }
1459        };
1460
1461        let destination_key = destination_circuit_key(&route);
1462        let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1463            DestinationCircuitProbe::Allow { half_open } => {
1464                if half_open {
1465                    if let Some(metrics) = self.metrics.as_ref() {
1466                        metrics.record_backpressure_event("circuit", "half_open_probe");
1467                    }
1468                }
1469                half_open
1470            }
1471            DestinationCircuitProbe::Block { retry_after } => {
1472                if let Some(metrics) = self.metrics.as_ref() {
1473                    metrics.record_backpressure_event("circuit", "fail_fast");
1474                }
1475                let final_error = format!(
1476                    "destination circuit open for {}; retry after {}s",
1477                    destination_key,
1478                    retry_after.as_secs().max(1)
1479                );
1480                self.move_circuit_open_to_dlq(
1481                    binding,
1482                    &route,
1483                    &event,
1484                    replay_of_event_id.as_ref(),
1485                    &final_error,
1486                    &destination_key,
1487                )
1488                .await?;
1489                finish_in_flight(
1490                    binding.id.as_str(),
1491                    binding.version,
1492                    TriggerDispatchOutcome::Dlq,
1493                )
1494                .await
1495                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1496                self.release_flow_control(&acquired_flow).await?;
1497                decrement_in_flight(&self.state);
1498                self.append_dispatch_trust_record(
1499                    binding,
1500                    &route,
1501                    &event,
1502                    replay_of_event_id.as_ref(),
1503                    autonomy_tier,
1504                    TrustOutcome::Failure,
1505                    "dlq",
1506                    0,
1507                    Some(final_error.clone()),
1508                )
1509                .await?;
1510                return Ok(DispatchOutcome {
1511                    trigger_id: binding.id.as_str().to_string(),
1512                    binding_key: binding.binding_key(),
1513                    event_id: event.id.0,
1514                    attempt_count: 0,
1515                    status: DispatchStatus::Dlq,
1516                    handler_kind: route.kind().to_string(),
1517                    target_uri: route.target_uri(),
1518                    replay_of_event_id,
1519                    result: None,
1520                    error: Some(final_error),
1521                });
1522            }
1523        };
1524
1525        let mut previous_retry_node = None;
1526        let max_attempts = binding.retry.max_attempts();
1527        for attempt in 1..=max_attempts {
1528            if dispatch_cancel_requested(
1529                &self.event_log,
1530                &binding_key,
1531                &event.id.0,
1532                replay_of_event_id.as_ref(),
1533            )
1534            .await?
1535            {
1536                finish_in_flight(
1537                    binding.id.as_str(),
1538                    binding.version,
1539                    TriggerDispatchOutcome::Failed,
1540                )
1541                .await
1542                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1543                decrement_in_flight(&self.state);
1544                return Ok(cancelled_dispatch_outcome(
1545                    binding,
1546                    &route,
1547                    &event,
1548                    replay_of_event_id,
1549                    attempt.saturating_sub(1),
1550                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1551                ));
1552            }
1553            maybe_fail_before_outbox();
1554            let attempt_started_instant = Instant::now();
1555            let attempt_started_at_ms = current_unix_ms();
1556            let queue_age_at_start = duration_between_ms(
1557                attempt_started_at_ms,
1558                queue_appended_at_ms(parent_headers.as_ref(), &event),
1559            );
1560            if attempt == 1 {
1561                if let Some(metrics) = self.metrics.as_ref() {
1562                    metrics.record_trigger_queue_age_at_dispatch_start(
1563                        binding.id.as_str(),
1564                        &binding_key,
1565                        event.provider.as_str(),
1566                        tenant_id(&event),
1567                        "started",
1568                        queue_age_at_start,
1569                    );
1570                }
1571            }
1572            tracing::info!(
1573                component = "dispatcher",
1574                lifecycle = "dispatch_started",
1575                trigger_id = %binding.id.as_str(),
1576                binding_key = %binding_key,
1577                event_id = %event.id.0,
1578                attempt,
1579                queue_age_ms = queue_age_at_start.as_millis(),
1580                trace_id = %event.trace_id.0
1581            );
1582            let started_at = now_rfc3339();
1583            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1584            self.append_lifecycle_event(
1585                "DispatchStarted",
1586                &event,
1587                binding,
1588                serde_json::json!({
1589                    "event_id": event.id.0,
1590                    "attempt": attempt,
1591                    "handler_kind": route.kind(),
1592                    "target_uri": route.target_uri(),
1593                    "replay_of_event_id": replay_of_event_id,
1594                }),
1595                replay_of_event_id.as_ref(),
1596            )
1597            .await?;
1598            self.append_topic_event(
1599                TRIGGER_OUTBOX_TOPIC,
1600                "dispatch_started",
1601                &event,
1602                Some(binding),
1603                Some(attempt),
1604                serde_json::json!({
1605                    "event_id": event.id.0,
1606                    "attempt": attempt,
1607                    "trigger_id": binding.id.as_str(),
1608                    "binding_key": binding.binding_key(),
1609                    "handler_kind": route.kind(),
1610                    "target_uri": route.target_uri(),
1611                    "replay_of_event_id": replay_of_event_id,
1612                }),
1613                replay_of_event_id.as_ref(),
1614            )
1615            .await?;
1616
1617            let mut dispatch_edges = Vec::new();
1618            if attempt == 1 {
1619                dispatch_edges.push(RunActionGraphEdgeRecord {
1620                    from_id: source_node_id.clone(),
1621                    to_id: attempt_node_id.clone(),
1622                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1623                    label: binding.when.as_ref().map(|_| "true".to_string()),
1624                });
1625            } else if let Some(retry_node_id) = previous_retry_node.take() {
1626                dispatch_edges.push(RunActionGraphEdgeRecord {
1627                    from_id: retry_node_id,
1628                    to_id: attempt_node_id.clone(),
1629                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1630                    label: Some(format!("attempt {attempt}")),
1631                });
1632            }
1633
1634            self.emit_action_graph(
1635                &event,
1636                vec![RunActionGraphNodeRecord {
1637                    id: attempt_node_id.clone(),
1638                    label: dispatch_node_label(&route),
1639                    kind: dispatch_node_kind(&route).to_string(),
1640                    status: "running".to_string(),
1641                    outcome: format!("attempt_{attempt}"),
1642                    trace_id: Some(event.trace_id.0.clone()),
1643                    stage_id: None,
1644                    node_id: None,
1645                    worker_id: None,
1646                    run_id: None,
1647                    run_path: None,
1648                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1649                }],
1650                dispatch_edges,
1651                serde_json::json!({
1652                    "source": "dispatcher",
1653                    "trigger_id": binding.id.as_str(),
1654                    "binding_key": binding.binding_key(),
1655                    "event_id": event.id.0,
1656                    "attempt": attempt,
1657                    "handler_kind": route.kind(),
1658                    "target_uri": route.target_uri(),
1659                    "target_agent": dispatch_target_agent(&route),
1660                    "replay_of_event_id": replay_of_event_id,
1661                }),
1662            )
1663            .await?;
1664
1665            let result = self
1666                .dispatch_once(
1667                    binding,
1668                    &route,
1669                    &event,
1670                    autonomy_tier,
1671                    Some(DispatchWaitLease::new(
1672                        self.state.clone(),
1673                        acquired_flow.clone(),
1674                    )),
1675                    &mut self.cancel_tx.subscribe(),
1676                )
1677                .await;
1678            let attempt_runtime = attempt_started_instant.elapsed();
1679            let attempt_status = dispatch_result_status(&result);
1680            if let Some(metrics) = self.metrics.as_ref() {
1681                metrics.record_trigger_dispatch_runtime(
1682                    binding.id.as_str(),
1683                    &binding_key,
1684                    event.provider.as_str(),
1685                    tenant_id(&event),
1686                    attempt_status,
1687                    attempt_runtime,
1688                );
1689            }
1690            tracing::info!(
1691                component = "dispatcher",
1692                lifecycle = "handler_completed",
1693                trigger_id = %binding.id.as_str(),
1694                binding_key = %binding_key,
1695                event_id = %event.id.0,
1696                attempt,
1697                status = attempt_status,
1698                runtime_ms = attempt_runtime.as_millis(),
1699                trace_id = %event.trace_id.0
1700            );
1701            let completed_at = now_rfc3339();
1702
1703            match result {
1704                Ok(result) => {
1705                    let attempt_record = DispatchAttemptRecord {
1706                        trigger_id: binding.id.as_str().to_string(),
1707                        binding_key: binding.binding_key(),
1708                        event_id: event.id.0.clone(),
1709                        attempt,
1710                        handler_kind: route.kind().to_string(),
1711                        started_at,
1712                        completed_at,
1713                        outcome: "success".to_string(),
1714                        error_msg: None,
1715                    };
1716                    attempts.push(attempt_record.clone());
1717                    self.append_attempt_record(
1718                        &event,
1719                        binding,
1720                        &attempt_record,
1721                        replay_of_event_id.as_ref(),
1722                    )
1723                    .await?;
1724                    self.append_lifecycle_event(
1725                        "DispatchSucceeded",
1726                        &event,
1727                        binding,
1728                        serde_json::json!({
1729                            "event_id": event.id.0,
1730                            "attempt": attempt,
1731                            "handler_kind": route.kind(),
1732                            "target_uri": route.target_uri(),
1733                            "result": result,
1734                            "replay_of_event_id": replay_of_event_id,
1735                        }),
1736                        replay_of_event_id.as_ref(),
1737                    )
1738                    .await?;
1739                    self.append_topic_event(
1740                        TRIGGER_OUTBOX_TOPIC,
1741                        "dispatch_succeeded",
1742                        &event,
1743                        Some(binding),
1744                        Some(attempt),
1745                        serde_json::json!({
1746                            "event_id": event.id.0,
1747                            "attempt": attempt,
1748                            "trigger_id": binding.id.as_str(),
1749                            "binding_key": binding.binding_key(),
1750                            "handler_kind": route.kind(),
1751                            "target_uri": route.target_uri(),
1752                            "result": result,
1753                            "replay_of_event_id": replay_of_event_id,
1754                        }),
1755                        replay_of_event_id.as_ref(),
1756                    )
1757                    .await?;
1758                    self.emit_action_graph(
1759                        &event,
1760                        vec![RunActionGraphNodeRecord {
1761                            id: attempt_node_id.clone(),
1762                            label: dispatch_node_label(&route),
1763                            kind: dispatch_node_kind(&route).to_string(),
1764                            status: "completed".to_string(),
1765                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1766                            trace_id: Some(event.trace_id.0.clone()),
1767                            stage_id: None,
1768                            node_id: None,
1769                            worker_id: None,
1770                            run_id: None,
1771                            run_path: None,
1772                            metadata: dispatch_success_metadata(
1773                                &route, binding, &event, attempt, &result,
1774                            ),
1775                        }],
1776                        Vec::new(),
1777                        serde_json::json!({
1778                            "source": "dispatcher",
1779                            "trigger_id": binding.id.as_str(),
1780                            "binding_key": binding.binding_key(),
1781                            "event_id": event.id.0,
1782                            "attempt": attempt,
1783                            "handler_kind": route.kind(),
1784                            "target_uri": route.target_uri(),
1785                            "result": result,
1786                            "replay_of_event_id": replay_of_event_id,
1787                        }),
1788                    )
1789                    .await?;
1790                    self.state
1791                        .destination_circuits
1792                        .record_success(&destination_key);
1793                    if half_open_probe {
1794                        if let Some(metrics) = self.metrics.as_ref() {
1795                            metrics.record_backpressure_event("circuit", "closed");
1796                        }
1797                    }
1798                    finish_in_flight(
1799                        binding.id.as_str(),
1800                        binding.version,
1801                        TriggerDispatchOutcome::Dispatched,
1802                    )
1803                    .await
1804                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1805                    self.release_flow_control(&acquired_flow).await?;
1806                    decrement_in_flight(&self.state);
1807                    self.append_dispatch_trust_record(
1808                        binding,
1809                        &route,
1810                        &event,
1811                        replay_of_event_id.as_ref(),
1812                        autonomy_tier,
1813                        TrustOutcome::Success,
1814                        "succeeded",
1815                        attempt,
1816                        None,
1817                    )
1818                    .await?;
1819                    return Ok(DispatchOutcome {
1820                        trigger_id: binding.id.as_str().to_string(),
1821                        binding_key: binding.binding_key(),
1822                        event_id: event.id.0,
1823                        attempt_count: attempt,
1824                        status: DispatchStatus::Succeeded,
1825                        handler_kind: route.kind().to_string(),
1826                        target_uri: route.target_uri(),
1827                        replay_of_event_id,
1828                        result: Some(result),
1829                        error: None,
1830                    });
1831                }
1832                Err(error) => {
1833                    let attempt_record = DispatchAttemptRecord {
1834                        trigger_id: binding.id.as_str().to_string(),
1835                        binding_key: binding.binding_key(),
1836                        event_id: event.id.0.clone(),
1837                        attempt,
1838                        handler_kind: route.kind().to_string(),
1839                        started_at,
1840                        completed_at,
1841                        outcome: dispatch_error_label(&error).to_string(),
1842                        error_msg: Some(error.to_string()),
1843                    };
1844                    attempts.push(attempt_record.clone());
1845                    self.append_attempt_record(
1846                        &event,
1847                        binding,
1848                        &attempt_record,
1849                        replay_of_event_id.as_ref(),
1850                    )
1851                    .await?;
1852                    if let DispatchError::Waiting(message) = &error {
1853                        self.append_lifecycle_event(
1854                            "DispatchWaiting",
1855                            &event,
1856                            binding,
1857                            serde_json::json!({
1858                                "event_id": event.id.0,
1859                                "attempt": attempt,
1860                                "handler_kind": route.kind(),
1861                                "target_uri": route.target_uri(),
1862                                "message": message,
1863                                "replay_of_event_id": replay_of_event_id,
1864                            }),
1865                            replay_of_event_id.as_ref(),
1866                        )
1867                        .await?;
1868                        self.append_topic_event(
1869                            TRIGGER_OUTBOX_TOPIC,
1870                            "dispatch_waiting",
1871                            &event,
1872                            Some(binding),
1873                            Some(attempt),
1874                            serde_json::json!({
1875                                "event_id": event.id.0,
1876                                "attempt": attempt,
1877                                "trigger_id": binding.id.as_str(),
1878                                "binding_key": binding.binding_key(),
1879                                "handler_kind": route.kind(),
1880                                "target_uri": route.target_uri(),
1881                                "message": message,
1882                                "replay_of_event_id": replay_of_event_id,
1883                            }),
1884                            replay_of_event_id.as_ref(),
1885                        )
1886                        .await?;
1887                        finish_in_flight(
1888                            binding.id.as_str(),
1889                            binding.version,
1890                            TriggerDispatchOutcome::Dispatched,
1891                        )
1892                        .await
1893                        .map_err(|registry_error| {
1894                            DispatchError::Registry(registry_error.to_string())
1895                        })?;
1896                        self.release_flow_control(&acquired_flow).await?;
1897                        decrement_in_flight(&self.state);
1898                        return Ok(DispatchOutcome {
1899                            trigger_id: binding.id.as_str().to_string(),
1900                            binding_key: binding.binding_key(),
1901                            event_id: event.id.0,
1902                            attempt_count: attempt,
1903                            status: DispatchStatus::Waiting,
1904                            handler_kind: route.kind().to_string(),
1905                            target_uri: route.target_uri(),
1906                            replay_of_event_id,
1907                            result: Some(serde_json::json!({
1908                                "waiting": true,
1909                                "message": message,
1910                            })),
1911                            error: None,
1912                        });
1913                    }
1914
1915                    self.append_lifecycle_event(
1916                        "DispatchFailed",
1917                        &event,
1918                        binding,
1919                        serde_json::json!({
1920                            "event_id": event.id.0,
1921                            "attempt": attempt,
1922                            "handler_kind": route.kind(),
1923                            "target_uri": route.target_uri(),
1924                            "error": error.to_string(),
1925                            "replay_of_event_id": replay_of_event_id,
1926                        }),
1927                        replay_of_event_id.as_ref(),
1928                    )
1929                    .await?;
1930                    self.append_topic_event(
1931                        TRIGGER_OUTBOX_TOPIC,
1932                        "dispatch_failed",
1933                        &event,
1934                        Some(binding),
1935                        Some(attempt),
1936                        serde_json::json!({
1937                            "event_id": event.id.0,
1938                            "attempt": attempt,
1939                            "trigger_id": binding.id.as_str(),
1940                            "binding_key": binding.binding_key(),
1941                            "handler_kind": route.kind(),
1942                            "target_uri": route.target_uri(),
1943                            "error": error.to_string(),
1944                            "replay_of_event_id": replay_of_event_id,
1945                        }),
1946                        replay_of_event_id.as_ref(),
1947                    )
1948                    .await?;
1949                    self.emit_action_graph(
1950                        &event,
1951                        vec![RunActionGraphNodeRecord {
1952                            id: attempt_node_id.clone(),
1953                            label: dispatch_node_label(&route),
1954                            kind: dispatch_node_kind(&route).to_string(),
1955                            status: if matches!(error, DispatchError::Cancelled(_)) {
1956                                "cancelled".to_string()
1957                            } else {
1958                                "failed".to_string()
1959                            },
1960                            outcome: dispatch_error_label(&error).to_string(),
1961                            trace_id: Some(event.trace_id.0.clone()),
1962                            stage_id: None,
1963                            node_id: None,
1964                            worker_id: None,
1965                            run_id: None,
1966                            run_path: None,
1967                            metadata: dispatch_error_metadata(
1968                                &route, binding, &event, attempt, &error,
1969                            ),
1970                        }],
1971                        Vec::new(),
1972                        serde_json::json!({
1973                            "source": "dispatcher",
1974                            "trigger_id": binding.id.as_str(),
1975                            "binding_key": binding.binding_key(),
1976                            "event_id": event.id.0,
1977                            "attempt": attempt,
1978                            "handler_kind": route.kind(),
1979                            "target_uri": route.target_uri(),
1980                            "error": error.to_string(),
1981                            "replay_of_event_id": replay_of_event_id,
1982                        }),
1983                    )
1984                    .await?;
1985
1986                    let circuit_opened = if error.retryable() {
1987                        self.state
1988                            .destination_circuits
1989                            .record_failure(&destination_key)
1990                    } else {
1991                        false
1992                    };
1993                    if circuit_opened {
1994                        if let Some(metrics) = self.metrics.as_ref() {
1995                            metrics.record_backpressure_event("circuit", "opened");
1996                            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1997                            metrics.record_trigger_accepted_to_dlq(
1998                                binding.id.as_str(),
1999                                &binding_key,
2000                                event.provider.as_str(),
2001                                tenant_id(&event),
2002                                "circuit_open",
2003                                duration_between_ms(
2004                                    current_unix_ms(),
2005                                    accepted_at_ms(parent_headers.as_ref(), &event),
2006                                ),
2007                            );
2008                        }
2009                        let final_error = format!(
2010                            "destination circuit opened for {} after {} consecutive failures: {}",
2011                            destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
2012                        );
2013                        let dlq_entry = DlqEntry {
2014                            trigger_id: binding.id.as_str().to_string(),
2015                            binding_key: binding.binding_key(),
2016                            event: event.clone(),
2017                            attempt_count: attempt,
2018                            final_error: final_error.clone(),
2019                            error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2020                                .to_string(),
2021                            attempts: attempts.clone(),
2022                        };
2023                        self.state
2024                            .dlq
2025                            .lock()
2026                            .expect("dispatcher dlq poisoned")
2027                            .push(dlq_entry.clone());
2028                        self.append_lifecycle_event(
2029                            "DlqMoved",
2030                            &event,
2031                            binding,
2032                            serde_json::json!({
2033                                "event_id": event.id.0,
2034                                "attempt_count": attempt,
2035                                "final_error": dlq_entry.final_error,
2036                                "reason": "circuit_open",
2037                                "destination": destination_key,
2038                                "replay_of_event_id": replay_of_event_id,
2039                            }),
2040                            replay_of_event_id.as_ref(),
2041                        )
2042                        .await?;
2043                        self.append_topic_event(
2044                            TRIGGER_DLQ_TOPIC,
2045                            "dlq_moved",
2046                            &event,
2047                            Some(binding),
2048                            Some(attempt),
2049                            serde_json::to_value(&dlq_entry).map_err(|serde_error| {
2050                                DispatchError::Serde(serde_error.to_string())
2051                            })?,
2052                            replay_of_event_id.as_ref(),
2053                        )
2054                        .await?;
2055                        finish_in_flight(
2056                            binding.id.as_str(),
2057                            binding.version,
2058                            TriggerDispatchOutcome::Dlq,
2059                        )
2060                        .await
2061                        .map_err(|registry_error| {
2062                            DispatchError::Registry(registry_error.to_string())
2063                        })?;
2064                        self.release_flow_control(&acquired_flow).await?;
2065                        decrement_in_flight(&self.state);
2066                        self.append_dispatch_trust_record(
2067                            binding,
2068                            &route,
2069                            &event,
2070                            replay_of_event_id.as_ref(),
2071                            autonomy_tier,
2072                            TrustOutcome::Failure,
2073                            "dlq",
2074                            attempt,
2075                            Some(final_error.clone()),
2076                        )
2077                        .await?;
2078                        return Ok(DispatchOutcome {
2079                            trigger_id: binding.id.as_str().to_string(),
2080                            binding_key: binding.binding_key(),
2081                            event_id: event.id.0,
2082                            attempt_count: attempt,
2083                            status: DispatchStatus::Dlq,
2084                            handler_kind: route.kind().to_string(),
2085                            target_uri: route.target_uri(),
2086                            replay_of_event_id,
2087                            result: None,
2088                            error: Some(final_error),
2089                        });
2090                    }
2091
2092                    if !error.retryable() {
2093                        finish_in_flight(
2094                            binding.id.as_str(),
2095                            binding.version,
2096                            TriggerDispatchOutcome::Failed,
2097                        )
2098                        .await
2099                        .map_err(|registry_error| {
2100                            DispatchError::Registry(registry_error.to_string())
2101                        })?;
2102                        self.release_flow_control(&acquired_flow).await?;
2103                        decrement_in_flight(&self.state);
2104                        let trust_outcome = match error {
2105                            DispatchError::Denied(_) => TrustOutcome::Denied,
2106                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
2107                            _ => TrustOutcome::Failure,
2108                        };
2109                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2110                            "cancelled"
2111                        } else {
2112                            "failed"
2113                        };
2114                        self.append_dispatch_trust_record(
2115                            binding,
2116                            &route,
2117                            &event,
2118                            replay_of_event_id.as_ref(),
2119                            autonomy_tier,
2120                            trust_outcome,
2121                            terminal_status,
2122                            attempt,
2123                            Some(error.to_string()),
2124                        )
2125                        .await?;
2126                        return Ok(DispatchOutcome {
2127                            trigger_id: binding.id.as_str().to_string(),
2128                            binding_key: binding.binding_key(),
2129                            event_id: event.id.0,
2130                            attempt_count: attempt,
2131                            status: if matches!(error, DispatchError::Cancelled(_)) {
2132                                DispatchStatus::Cancelled
2133                            } else {
2134                                DispatchStatus::Failed
2135                            },
2136                            handler_kind: route.kind().to_string(),
2137                            target_uri: route.target_uri(),
2138                            replay_of_event_id,
2139                            result: None,
2140                            error: Some(error.to_string()),
2141                        });
2142                    }
2143
2144                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2145                        if let Some(metrics) = self.metrics.as_ref() {
2146                            metrics.record_retry_scheduled();
2147                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2148                            metrics.record_trigger_retry_delay(
2149                                binding.id.as_str(),
2150                                &binding_key,
2151                                event.provider.as_str(),
2152                                tenant_id(&event),
2153                                "scheduled",
2154                                delay,
2155                            );
2156                        }
2157                        tracing::info!(
2158                            component = "dispatcher",
2159                            lifecycle = "retry_scheduled",
2160                            trigger_id = %binding.id.as_str(),
2161                            binding_key = %binding_key,
2162                            event_id = %event.id.0,
2163                            attempt = attempt + 1,
2164                            delay_ms = delay.as_millis(),
2165                            trace_id = %event.trace_id.0
2166                        );
2167                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2168                        previous_retry_node = Some(retry_node_id.clone());
2169                        self.emit_action_graph(
2170                            &event,
2171                            vec![RunActionGraphNodeRecord {
2172                                id: retry_node_id.clone(),
2173                                label: format!("retry in {}ms", delay.as_millis()),
2174                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2175                                status: "scheduled".to_string(),
2176                                outcome: format!("attempt_{}", attempt + 1),
2177                                trace_id: Some(event.trace_id.0.clone()),
2178                                stage_id: None,
2179                                node_id: None,
2180                                worker_id: None,
2181                                run_id: None,
2182                                run_path: None,
2183                                metadata: retry_node_metadata(
2184                                    binding,
2185                                    &event,
2186                                    attempt + 1,
2187                                    delay,
2188                                    &error,
2189                                ),
2190                            }],
2191                            vec![RunActionGraphEdgeRecord {
2192                                from_id: attempt_node_id,
2193                                to_id: retry_node_id.clone(),
2194                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2195                                label: Some(format!("attempt {}", attempt + 1)),
2196                            }],
2197                            serde_json::json!({
2198                                "source": "dispatcher",
2199                                "trigger_id": binding.id.as_str(),
2200                                "binding_key": binding.binding_key(),
2201                                "event_id": event.id.0,
2202                                "attempt": attempt + 1,
2203                                "delay_ms": delay.as_millis(),
2204                                "replay_of_event_id": replay_of_event_id,
2205                            }),
2206                        )
2207                        .await?;
2208                        self.append_lifecycle_event(
2209                            "RetryScheduled",
2210                            &event,
2211                            binding,
2212                            serde_json::json!({
2213                                "event_id": event.id.0,
2214                                "attempt": attempt + 1,
2215                                "delay_ms": delay.as_millis(),
2216                                "error": error.to_string(),
2217                                "replay_of_event_id": replay_of_event_id,
2218                            }),
2219                            replay_of_event_id.as_ref(),
2220                        )
2221                        .await?;
2222                        self.append_topic_event(
2223                            TRIGGER_ATTEMPTS_TOPIC,
2224                            "retry_scheduled",
2225                            &event,
2226                            Some(binding),
2227                            Some(attempt + 1),
2228                            serde_json::json!({
2229                                "event_id": event.id.0,
2230                                "attempt": attempt + 1,
2231                                "trigger_id": binding.id.as_str(),
2232                                "binding_key": binding.binding_key(),
2233                                "delay_ms": delay.as_millis(),
2234                                "error": error.to_string(),
2235                                "replay_of_event_id": replay_of_event_id,
2236                            }),
2237                            replay_of_event_id.as_ref(),
2238                        )
2239                        .await?;
2240                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2241                        let sleep_result = sleep_or_cancel_or_request(
2242                            &self.event_log,
2243                            delay,
2244                            &binding_key,
2245                            &event.id.0,
2246                            replay_of_event_id.as_ref(),
2247                            &mut self.cancel_tx.subscribe(),
2248                        )
2249                        .await;
2250                        decrement_retry_queue_depth(&self.state);
2251                        if sleep_result.is_err() {
2252                            finish_in_flight(
2253                                binding.id.as_str(),
2254                                binding.version,
2255                                TriggerDispatchOutcome::Failed,
2256                            )
2257                            .await
2258                            .map_err(|registry_error| {
2259                                DispatchError::Registry(registry_error.to_string())
2260                            })?;
2261                            self.release_flow_control(&acquired_flow).await?;
2262                            decrement_in_flight(&self.state);
2263                            self.append_dispatch_trust_record(
2264                                binding,
2265                                &route,
2266                                &event,
2267                                replay_of_event_id.as_ref(),
2268                                autonomy_tier,
2269                                TrustOutcome::Failure,
2270                                "cancelled",
2271                                attempt,
2272                                Some("dispatcher shutdown cancelled retry wait".to_string()),
2273                            )
2274                            .await?;
2275                            return Ok(DispatchOutcome {
2276                                trigger_id: binding.id.as_str().to_string(),
2277                                binding_key: binding.binding_key(),
2278                                event_id: event.id.0,
2279                                attempt_count: attempt,
2280                                status: DispatchStatus::Cancelled,
2281                                handler_kind: route.kind().to_string(),
2282                                target_uri: route.target_uri(),
2283                                replay_of_event_id,
2284                                result: None,
2285                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2286                            });
2287                        }
2288                        continue;
2289                    }
2290
2291                    let final_error = error.to_string();
2292                    let dlq_entry = DlqEntry {
2293                        trigger_id: binding.id.as_str().to_string(),
2294                        binding_key: binding.binding_key(),
2295                        event: event.clone(),
2296                        attempt_count: attempt,
2297                        final_error: final_error.clone(),
2298                        error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2299                            .to_string(),
2300                        attempts: attempts.clone(),
2301                    };
2302                    self.state
2303                        .dlq
2304                        .lock()
2305                        .expect("dispatcher dlq poisoned")
2306                        .push(dlq_entry.clone());
2307                    if let Some(metrics) = self.metrics.as_ref() {
2308                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2309                        metrics.record_trigger_accepted_to_dlq(
2310                            binding.id.as_str(),
2311                            &binding_key,
2312                            event.provider.as_str(),
2313                            tenant_id(&event),
2314                            "retry_exhausted",
2315                            duration_between_ms(
2316                                current_unix_ms(),
2317                                accepted_at_ms(parent_headers.as_ref(), &event),
2318                            ),
2319                        );
2320                    }
2321                    tracing::info!(
2322                        component = "dispatcher",
2323                        lifecycle = "dlq_moved",
2324                        trigger_id = %binding.id.as_str(),
2325                        binding_key = %binding_key,
2326                        event_id = %event.id.0,
2327                        attempt_count = attempt,
2328                        reason = "retry_exhausted",
2329                        trace_id = %event.trace_id.0
2330                    );
2331                    self.emit_action_graph(
2332                        &event,
2333                        vec![RunActionGraphNodeRecord {
2334                            id: format!("dlq:{binding_key}:{}", event.id.0),
2335                            label: binding.id.as_str().to_string(),
2336                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2337                            status: "queued".to_string(),
2338                            outcome: "retry_exhausted".to_string(),
2339                            trace_id: Some(event.trace_id.0.clone()),
2340                            stage_id: None,
2341                            node_id: None,
2342                            worker_id: None,
2343                            run_id: None,
2344                            run_path: None,
2345                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2346                        }],
2347                        vec![RunActionGraphEdgeRecord {
2348                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2349                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
2350                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2351                            label: Some(format!("{attempt} attempts")),
2352                        }],
2353                        serde_json::json!({
2354                            "source": "dispatcher",
2355                            "trigger_id": binding.id.as_str(),
2356                            "binding_key": binding.binding_key(),
2357                            "event_id": event.id.0,
2358                            "attempt_count": attempt,
2359                            "final_error": final_error,
2360                            "replay_of_event_id": replay_of_event_id,
2361                        }),
2362                    )
2363                    .await?;
2364                    self.append_lifecycle_event(
2365                        "DlqMoved",
2366                        &event,
2367                        binding,
2368                        serde_json::json!({
2369                            "event_id": event.id.0,
2370                            "attempt_count": attempt,
2371                            "final_error": dlq_entry.final_error,
2372                            "replay_of_event_id": replay_of_event_id,
2373                        }),
2374                        replay_of_event_id.as_ref(),
2375                    )
2376                    .await?;
2377                    self.append_topic_event(
2378                        TRIGGER_DLQ_TOPIC,
2379                        "dlq_moved",
2380                        &event,
2381                        Some(binding),
2382                        Some(attempt),
2383                        serde_json::to_value(&dlq_entry)
2384                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2385                        replay_of_event_id.as_ref(),
2386                    )
2387                    .await?;
2388                    finish_in_flight(
2389                        binding.id.as_str(),
2390                        binding.version,
2391                        TriggerDispatchOutcome::Dlq,
2392                    )
2393                    .await
2394                    .map_err(|registry_error| {
2395                        DispatchError::Registry(registry_error.to_string())
2396                    })?;
2397                    self.release_flow_control(&acquired_flow).await?;
2398                    decrement_in_flight(&self.state);
2399                    self.append_dispatch_trust_record(
2400                        binding,
2401                        &route,
2402                        &event,
2403                        replay_of_event_id.as_ref(),
2404                        autonomy_tier,
2405                        TrustOutcome::Failure,
2406                        "dlq",
2407                        attempt,
2408                        Some(error.to_string()),
2409                    )
2410                    .await?;
2411                    return Ok(DispatchOutcome {
2412                        trigger_id: binding.id.as_str().to_string(),
2413                        binding_key: binding.binding_key(),
2414                        event_id: event.id.0,
2415                        attempt_count: attempt,
2416                        status: DispatchStatus::Dlq,
2417                        handler_kind: route.kind().to_string(),
2418                        target_uri: route.target_uri(),
2419                        replay_of_event_id,
2420                        result: None,
2421                        error: Some(error.to_string()),
2422                    });
2423                }
2424            }
2425        }
2426
2427        finish_in_flight(
2428            binding.id.as_str(),
2429            binding.version,
2430            TriggerDispatchOutcome::Failed,
2431        )
2432        .await
2433        .map_err(|error| DispatchError::Registry(error.to_string()))?;
2434        self.release_flow_control(&acquired_flow).await?;
2435        decrement_in_flight(&self.state);
2436        self.append_dispatch_trust_record(
2437            binding,
2438            &route,
2439            &event,
2440            replay_of_event_id.as_ref(),
2441            autonomy_tier,
2442            TrustOutcome::Failure,
2443            "failed",
2444            max_attempts,
2445            Some("dispatch exhausted without terminal outcome".to_string()),
2446        )
2447        .await?;
2448        Ok(DispatchOutcome {
2449            trigger_id: binding.id.as_str().to_string(),
2450            binding_key: binding.binding_key(),
2451            event_id: event.id.0,
2452            attempt_count: max_attempts,
2453            status: DispatchStatus::Failed,
2454            handler_kind: route.kind().to_string(),
2455            target_uri: route.target_uri(),
2456            replay_of_event_id,
2457            result: None,
2458            error: Some("dispatch exhausted without terminal outcome".to_string()),
2459        })
2460    }
2461
2462    async fn dispatch_once(
2463        &self,
2464        binding: &TriggerBinding,
2465        route: &DispatchUri,
2466        event: &TriggerEvent,
2467        autonomy_tier: AutonomyTier,
2468        wait_lease: Option<DispatchWaitLease>,
2469        cancel_rx: &mut broadcast::Receiver<()>,
2470    ) -> Result<serde_json::Value, DispatchError> {
2471        match route {
2472            DispatchUri::Local { .. } => {
2473                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2474                    return Err(DispatchError::Local(format!(
2475                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2476                        binding.id.as_str()
2477                    )));
2478                };
2479                let value = self
2480                    .invoke_vm_callable(
2481                        closure,
2482                        &binding.binding_key(),
2483                        event,
2484                        None,
2485                        binding.id.as_str(),
2486                        &format!("{}.{}", event.provider.as_str(), event.kind),
2487                        autonomy_tier,
2488                        wait_lease,
2489                        cancel_rx,
2490                    )
2491                    .await?;
2492                Ok(vm_value_to_json(&value))
2493            }
2494            DispatchUri::A2a {
2495                target,
2496                allow_cleartext,
2497            } => {
2498                if self.state.shutting_down.load(Ordering::SeqCst) {
2499                    return Err(DispatchError::Cancelled(
2500                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2501                    ));
2502                }
2503                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2504                    target,
2505                    *allow_cleartext,
2506                    binding.id.as_str(),
2507                    &binding.binding_key(),
2508                    event,
2509                    cancel_rx,
2510                )
2511                .await
2512                .map_err(|error| match error {
2513                    crate::a2a::A2aClientError::Cancelled(message) => {
2514                        DispatchError::Cancelled(message)
2515                    }
2516                    other => DispatchError::A2a(other.to_string()),
2517                })?;
2518                match ack {
2519                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2520                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2521                }
2522            }
2523            DispatchUri::Worker { queue } => {
2524                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2525                    .enqueue(&crate::WorkerQueueJob {
2526                        queue: queue.clone(),
2527                        trigger_id: binding.id.as_str().to_string(),
2528                        binding_key: binding.binding_key(),
2529                        binding_version: binding.version,
2530                        event: event.clone(),
2531                        replay_of_event_id: current_dispatch_context()
2532                            .and_then(|context| context.replay_of_event_id),
2533                        priority: worker_queue_priority(binding, event),
2534                    })
2535                    .await
2536                    .map_err(DispatchError::from)?;
2537                Ok(serde_json::to_value(receipt)
2538                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2539            }
2540        }
2541    }
2542
2543    async fn apply_flow_control(
2544        &self,
2545        binding: &TriggerBinding,
2546        event: &TriggerEvent,
2547        replay_of_event_id: Option<&String>,
2548    ) -> Result<FlowControlOutcome, DispatchError> {
2549        let flow = &binding.flow_control;
2550        let mut managed_event = event.clone();
2551
2552        if let Some(batch) = &flow.batch {
2553            let gate = self
2554                .resolve_flow_gate(
2555                    &binding.binding_key(),
2556                    batch.key.as_ref(),
2557                    &managed_event,
2558                    replay_of_event_id,
2559                )
2560                .await?;
2561            match self
2562                .state
2563                .flow_control
2564                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2565                .await
2566                .map_err(DispatchError::from)?
2567            {
2568                BatchDecision::Dispatch(events) => {
2569                    managed_event = build_batched_event(events)?;
2570                }
2571                BatchDecision::Merged => {
2572                    return Ok(FlowControlOutcome::Skip {
2573                        reason: "batch_merged".to_string(),
2574                    })
2575                }
2576            }
2577        }
2578
2579        if let Some(debounce) = &flow.debounce {
2580            let gate = self
2581                .resolve_flow_gate(
2582                    &binding.binding_key(),
2583                    Some(&debounce.key),
2584                    &managed_event,
2585                    replay_of_event_id,
2586                )
2587                .await?;
2588            let latest = self
2589                .state
2590                .flow_control
2591                .debounce(&gate, debounce.period)
2592                .await
2593                .map_err(DispatchError::from)?;
2594            if !latest {
2595                return Ok(FlowControlOutcome::Skip {
2596                    reason: "debounced".to_string(),
2597                });
2598            }
2599        }
2600
2601        if let Some(rate_limit) = &flow.rate_limit {
2602            let gate = self
2603                .resolve_flow_gate(
2604                    &binding.binding_key(),
2605                    rate_limit.key.as_ref(),
2606                    &managed_event,
2607                    replay_of_event_id,
2608                )
2609                .await?;
2610            let allowed = self
2611                .state
2612                .flow_control
2613                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2614                .await
2615                .map_err(DispatchError::from)?;
2616            if !allowed {
2617                return Ok(FlowControlOutcome::Skip {
2618                    reason: "rate_limited".to_string(),
2619                });
2620            }
2621        }
2622
2623        if let Some(throttle) = &flow.throttle {
2624            let gate = self
2625                .resolve_flow_gate(
2626                    &binding.binding_key(),
2627                    throttle.key.as_ref(),
2628                    &managed_event,
2629                    replay_of_event_id,
2630                )
2631                .await?;
2632            self.state
2633                .flow_control
2634                .wait_for_throttle(&gate, throttle.period, throttle.max)
2635                .await
2636                .map_err(DispatchError::from)?;
2637        }
2638
2639        let mut acquired = AcquiredFlowControl::default();
2640        if let Some(singleton) = &flow.singleton {
2641            let gate = self
2642                .resolve_flow_gate(
2643                    &binding.binding_key(),
2644                    singleton.key.as_ref(),
2645                    &managed_event,
2646                    replay_of_event_id,
2647                )
2648                .await?;
2649            let acquired_singleton = self
2650                .state
2651                .flow_control
2652                .try_acquire_singleton(&gate)
2653                .await
2654                .map_err(DispatchError::from)?;
2655            if !acquired_singleton {
2656                return Ok(FlowControlOutcome::Skip {
2657                    reason: "singleton_active".to_string(),
2658                });
2659            }
2660            acquired.singleton = Some(SingletonLease { gate, held: true });
2661        }
2662
2663        if let Some(concurrency) = &flow.concurrency {
2664            let gate = self
2665                .resolve_flow_gate(
2666                    &binding.binding_key(),
2667                    concurrency.key.as_ref(),
2668                    &managed_event,
2669                    replay_of_event_id,
2670                )
2671                .await?;
2672            let priority_rank = self
2673                .resolve_priority_rank(
2674                    &binding.binding_key(),
2675                    flow.priority.as_ref(),
2676                    &managed_event,
2677                    replay_of_event_id,
2678                )
2679                .await?;
2680            let permit = self
2681                .state
2682                .flow_control
2683                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2684                .await
2685                .map_err(DispatchError::from)?;
2686            acquired.concurrency = Some(ConcurrencyLease {
2687                gate,
2688                max: concurrency.max,
2689                priority_rank,
2690                permit: Some(permit),
2691            });
2692        }
2693
2694        Ok(FlowControlOutcome::Dispatch {
2695            event: Box::new(managed_event),
2696            acquired,
2697        })
2698    }
2699
2700    async fn release_flow_control(
2701        &self,
2702        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2703    ) -> Result<(), DispatchError> {
2704        let (singleton_gate, concurrency_permit) = {
2705            let mut acquired = acquired.lock().await;
2706            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2707                if lease.held {
2708                    lease.held = false;
2709                    Some(lease.gate.clone())
2710                } else {
2711                    None
2712                }
2713            });
2714            let concurrency_permit = acquired
2715                .concurrency
2716                .as_mut()
2717                .and_then(|lease| lease.permit.take());
2718            (singleton_gate, concurrency_permit)
2719        };
2720        if let Some(gate) = singleton_gate {
2721            self.state
2722                .flow_control
2723                .release_singleton(&gate)
2724                .await
2725                .map_err(DispatchError::from)?;
2726        }
2727        if let Some(permit) = concurrency_permit {
2728            self.state
2729                .flow_control
2730                .release_concurrency(permit)
2731                .await
2732                .map_err(DispatchError::from)?;
2733        }
2734        Ok(())
2735    }
2736
2737    async fn resolve_flow_gate(
2738        &self,
2739        binding_key: &str,
2740        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2741        event: &TriggerEvent,
2742        replay_of_event_id: Option<&String>,
2743    ) -> Result<String, DispatchError> {
2744        let key = match expr {
2745            Some(expr) => {
2746                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2747                    .await?
2748            }
2749            None => "_global".to_string(),
2750        };
2751        Ok(format!("{binding_key}:{key}"))
2752    }
2753
2754    async fn resolve_priority_rank(
2755        &self,
2756        binding_key: &str,
2757        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2758        event: &TriggerEvent,
2759        replay_of_event_id: Option<&String>,
2760    ) -> Result<usize, DispatchError> {
2761        let Some(priority) = priority else {
2762            return Ok(0);
2763        };
2764        let value = self
2765            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2766            .await?;
2767        Ok(priority
2768            .order
2769            .iter()
2770            .position(|candidate| candidate == &value)
2771            .unwrap_or(priority.order.len()))
2772    }
2773
2774    async fn evaluate_flow_expression(
2775        &self,
2776        binding_key: &str,
2777        expr: &crate::triggers::TriggerExpressionSpec,
2778        event: &TriggerEvent,
2779        replay_of_event_id: Option<&String>,
2780    ) -> Result<String, DispatchError> {
2781        let value = self
2782            .invoke_vm_callable(
2783                &expr.closure,
2784                binding_key,
2785                event,
2786                replay_of_event_id,
2787                "",
2788                "flow_control",
2789                AutonomyTier::Suggest,
2790                None,
2791                &mut self.cancel_tx.subscribe(),
2792            )
2793            .await?;
2794        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2795    }
2796
2797    #[allow(clippy::too_many_arguments)]
2798    async fn invoke_vm_callable(
2799        &self,
2800        closure: &crate::value::VmClosure,
2801        binding_key: &str,
2802        event: &TriggerEvent,
2803        replay_of_event_id: Option<&String>,
2804        agent_id: &str,
2805        action: &str,
2806        autonomy_tier: AutonomyTier,
2807        wait_lease: Option<DispatchWaitLease>,
2808        cancel_rx: &mut broadcast::Receiver<()>,
2809    ) -> Result<VmValue, DispatchError> {
2810        let mut vm = self.base_vm.child_vm();
2811        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2812        if self.state.shutting_down.load(Ordering::SeqCst) {
2813            cancel_token.store(true, Ordering::SeqCst);
2814        }
2815        self.state
2816            .cancel_tokens
2817            .lock()
2818            .expect("dispatcher cancel tokens poisoned")
2819            .push(cancel_token.clone());
2820        vm.install_cancel_token(cancel_token.clone());
2821        let arg = event_to_handler_value(event)?;
2822        let args = [arg];
2823        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2824        let effective_policy = match crate::orchestration::current_execution_policy() {
2825            Some(parent) => parent
2826                .intersect(&tier_policy)
2827                .map_err(|error| DispatchError::Local(error.to_string()))?,
2828            None => tier_policy,
2829        };
2830        crate::orchestration::push_execution_policy(effective_policy);
2831        let _policy_guard = DispatchExecutionPolicyGuard;
2832        let future = vm.call_closure_pub(closure, &args);
2833        pin_mut!(future);
2834        let (binding_id, binding_version) = split_binding_key(binding_key);
2835        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2836            slot.borrow_mut().replace(DispatchContext {
2837                trigger_event: event.clone(),
2838                replay_of_event_id: replay_of_event_id.cloned(),
2839                binding_id,
2840                binding_version,
2841                agent_id: agent_id.to_string(),
2842                action: action.to_string(),
2843                autonomy_tier,
2844            })
2845        });
2846        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2847            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2848        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2849        crate::stdlib::hitl::reset_hitl_state();
2850        let mut poll = tokio::time::interval(Duration::from_millis(100));
2851        let result = loop {
2852            tokio::select! {
2853                result = &mut future => break result,
2854                _ = recv_cancel(cancel_rx) => {
2855                    cancel_token.store(true, Ordering::SeqCst);
2856                }
2857                _ = poll.tick() => {
2858                    if dispatch_cancel_requested(
2859                        &self.event_log,
2860                        binding_key,
2861                        &event.id.0,
2862                        replay_of_event_id,
2863                    )
2864                    .await? {
2865                        cancel_token.store(true, Ordering::SeqCst);
2866                    }
2867                }
2868            }
2869        };
2870        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2871            *slot.borrow_mut() = prior_context;
2872        });
2873        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2874            *slot.borrow_mut() = prior_wait_lease;
2875        });
2876        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2877        {
2878            let mut tokens = self
2879                .state
2880                .cancel_tokens
2881                .lock()
2882                .expect("dispatcher cancel tokens poisoned");
2883            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2884        }
2885
2886        if cancel_token.load(Ordering::SeqCst) {
2887            if dispatch_cancel_requested(
2888                &self.event_log,
2889                binding_key,
2890                &event.id.0,
2891                replay_of_event_id,
2892            )
2893            .await?
2894            {
2895                Err(DispatchError::Cancelled(
2896                    "trigger cancel request cancelled local handler".to_string(),
2897                ))
2898            } else {
2899                Err(DispatchError::Cancelled(
2900                    "dispatcher shutdown cancelled local handler".to_string(),
2901                ))
2902            }
2903        } else {
2904            result.map_err(dispatch_error_from_vm_error)
2905        }
2906    }
2907
2908    async fn evaluate_predicate(
2909        &self,
2910        binding: &TriggerBinding,
2911        predicate: &super::registry::TriggerPredicateSpec,
2912        event: &TriggerEvent,
2913        replay_of_event_id: Option<&String>,
2914        autonomy_tier: AutonomyTier,
2915    ) -> Result<PredicateEvaluationRecord, DispatchError> {
2916        let event_id = event.id.0.clone();
2917        let trigger_id = binding.id.as_str().to_string();
2918        let now_ms = now_unix_ms();
2919        reset_binding_budget_windows(binding);
2920
2921        let breaker_open_until = {
2922            let state = binding
2923                .predicate_state
2924                .lock()
2925                .expect("trigger predicate state poisoned");
2926            if state
2927                .breaker_open_until_ms
2928                .is_some_and(|until_ms| until_ms > now_ms)
2929            {
2930                state.breaker_open_until_ms
2931            } else {
2932                None
2933            }
2934        };
2935
2936        if breaker_open_until.is_some() {
2937            let mut metadata = BTreeMap::new();
2938            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2939            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2940            metadata.insert(
2941                "breaker_open_until_ms".to_string(),
2942                serde_json::json!(breaker_open_until),
2943            );
2944            crate::events::log_warn_meta(
2945                "trigger.predicate.circuit_breaker",
2946                "trigger predicate circuit breaker is open; short-circuiting to false",
2947                metadata,
2948            );
2949            let record = PredicateEvaluationRecord {
2950                result: false,
2951                reason: Some("circuit_open".to_string()),
2952                ..Default::default()
2953            };
2954            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2955                .await?;
2956            return Ok(record);
2957        }
2958
2959        let expected_cost = expected_predicate_cost_usd_micros(binding);
2960        if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2961            .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2962        {
2963            self.append_budget_exhausted_event(
2964                binding,
2965                event,
2966                reason,
2967                expected_cost,
2968                None,
2969                replay_of_event_id,
2970            )
2971            .await?;
2972            let record = PredicateEvaluationRecord {
2973                result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2974                reason: Some(reason.to_string()),
2975                exhaustion_strategy: Some(binding.on_budget_exhausted),
2976                ..Default::default()
2977            };
2978            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2979                .await?;
2980            return Ok(record);
2981        }
2982
2983        let replay_cache = self
2984            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2985            .await?;
2986        let guard = start_predicate_evaluation(
2987            binding.when_budget.clone().unwrap_or_default(),
2988            replay_cache,
2989        );
2990        let started = std::time::Instant::now();
2991        let eval = self
2992            .invoke_vm_callable_with_timeout(
2993                &predicate.closure,
2994                &binding.binding_key(),
2995                event,
2996                replay_of_event_id,
2997                binding.id.as_str(),
2998                &format!("{}.{}", event.provider.as_str(), event.kind),
2999                autonomy_tier,
3000                &mut self.cancel_tx.subscribe(),
3001                binding
3002                    .when_budget
3003                    .as_ref()
3004                    .and_then(|budget| budget.timeout()),
3005            )
3006            .await;
3007        let capture = guard.finish();
3008        let latency_ms = started.elapsed().as_millis() as u64;
3009        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
3010            self.append_predicate_cache_record(binding, event, &capture.entries)
3011                .await?;
3012        }
3013
3014        let mut record = PredicateEvaluationRecord {
3015            result: false,
3016            cost_usd: capture.total_cost_usd,
3017            tokens: capture.total_tokens,
3018            latency_ms,
3019            cached: capture.cached,
3020            reason: None,
3021            exhaustion_strategy: None,
3022        };
3023
3024        let mut count_failure = false;
3025        let mut opened_breaker = false;
3026
3027        match eval {
3028            Ok(value) => match predicate_value_as_bool(value) {
3029                Ok(result) => {
3030                    record.result = result;
3031                }
3032                Err(reason) => {
3033                    count_failure = true;
3034                    record.reason = Some(reason);
3035                }
3036            },
3037            Err(error) => {
3038                count_failure = true;
3039                record.reason = Some(error.to_string());
3040            }
3041        }
3042
3043        let cost_usd_micros = usd_to_micros(record.cost_usd);
3044        if cost_usd_micros > 0 {
3045            binding
3046                .metrics
3047                .cost_total_usd_micros
3048                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3049            binding
3050                .metrics
3051                .cost_today_usd_micros
3052                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3053            binding
3054                .metrics
3055                .cost_hour_usd_micros
3056                .fetch_add(cost_usd_micros, Ordering::Relaxed);
3057            note_orchestrator_budget_cost(cost_usd_micros);
3058            record_predicate_cost_sample(binding, cost_usd_micros);
3059        }
3060
3061        let timed_out = matches!(
3062            record.reason.as_deref(),
3063            Some("predicate evaluation timed out")
3064        );
3065        if capture.budget_exceeded || timed_out {
3066            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3067                record.result = false;
3068            } else if timed_out {
3069                record.result = true;
3070            }
3071            record.reason = Some("budget_exceeded".to_string());
3072            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3073            self.append_budget_exhausted_event(
3074                binding,
3075                event,
3076                "budget_exceeded",
3077                cost_usd_micros,
3078                Some(record.tokens),
3079                replay_of_event_id,
3080            )
3081            .await?;
3082            self.append_lifecycle_event(
3083                "predicate.invocation_budget_exceeded",
3084                event,
3085                binding,
3086                serde_json::json!({
3087                    "trigger_id": binding.id.as_str(),
3088                    "event_id": event.id.0,
3089                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
3090                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
3091                    "cost_usd": record.cost_usd,
3092                    "tokens": record.tokens,
3093                    "replay_of_event_id": replay_of_event_id,
3094                }),
3095                replay_of_event_id,
3096            )
3097            .await?;
3098        }
3099
3100        if let Some(reason) =
3101            binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
3102        {
3103            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3104                record.result = false;
3105            }
3106            record.reason = Some(reason.to_string());
3107            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3108            self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
3109                .await?;
3110        }
3111
3112        {
3113            let mut state = binding
3114                .predicate_state
3115                .lock()
3116                .expect("trigger predicate state poisoned");
3117            if count_failure {
3118                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
3119                if state.consecutive_failures >= 3 {
3120                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
3121                    opened_breaker = true;
3122                }
3123            } else {
3124                state.consecutive_failures = 0;
3125                state.breaker_open_until_ms = None;
3126            }
3127        }
3128
3129        if opened_breaker {
3130            let mut metadata = BTreeMap::new();
3131            metadata.insert(
3132                "trigger_id".to_string(),
3133                serde_json::json!(binding.id.as_str()),
3134            );
3135            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3136            metadata.insert("failure_count".to_string(), serde_json::json!(3));
3137            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
3138            crate::events::log_warn_meta(
3139                "trigger.predicate.circuit_breaker",
3140                "trigger predicate circuit breaker opened for 5 minutes",
3141                metadata,
3142            );
3143        }
3144
3145        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
3146            .await?;
3147        Ok(record)
3148    }
3149
3150    #[allow(clippy::too_many_arguments)]
3151    #[allow(clippy::too_many_arguments)]
3152    async fn invoke_vm_callable_with_timeout(
3153        &self,
3154        closure: &crate::value::VmClosure,
3155        binding_key: &str,
3156        event: &TriggerEvent,
3157        replay_of_event_id: Option<&String>,
3158        agent_id: &str,
3159        action: &str,
3160        autonomy_tier: AutonomyTier,
3161        cancel_rx: &mut broadcast::Receiver<()>,
3162        timeout: Option<Duration>,
3163    ) -> Result<VmValue, DispatchError> {
3164        let future = self.invoke_vm_callable(
3165            closure,
3166            binding_key,
3167            event,
3168            replay_of_event_id,
3169            agent_id,
3170            action,
3171            autonomy_tier,
3172            None,
3173            cancel_rx,
3174        );
3175        pin_mut!(future);
3176        if let Some(timeout) = timeout {
3177            match tokio::time::timeout(timeout, future).await {
3178                Ok(result) => result,
3179                Err(_) => Err(DispatchError::Local(
3180                    "predicate evaluation timed out".to_string(),
3181                )),
3182            }
3183        } else {
3184            future.await
3185        }
3186    }
3187
3188    async fn append_predicate_evaluated_event(
3189        &self,
3190        binding: &TriggerBinding,
3191        event: &TriggerEvent,
3192        record: &PredicateEvaluationRecord,
3193        replay_of_event_id: Option<&String>,
3194    ) -> Result<(), DispatchError> {
3195        if let Some(metrics) = self.metrics.as_ref() {
3196            metrics.record_trigger_predicate_evaluation(
3197                binding.id.as_str(),
3198                record.result,
3199                record.cost_usd,
3200            );
3201            metrics.set_trigger_budget_cost_today(
3202                binding.id.as_str(),
3203                current_predicate_daily_cost(binding),
3204            );
3205            if matches!(
3206                record.reason.as_deref(),
3207                Some(
3208                    "budget_exceeded"
3209                        | "daily_budget_exceeded"
3210                        | "hourly_budget_exceeded"
3211                        | "orchestrator_daily_budget_exceeded"
3212                        | "orchestrator_hourly_budget_exceeded"
3213                )
3214            ) {
3215                metrics.record_trigger_budget_exhausted(
3216                    binding.id.as_str(),
3217                    record
3218                        .exhaustion_strategy
3219                        .map(TriggerBudgetExhaustionStrategy::as_str)
3220                        .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
3221                );
3222            }
3223        }
3224        self.append_lifecycle_event(
3225            "predicate.evaluated",
3226            event,
3227            binding,
3228            serde_json::json!({
3229                "trigger_id": binding.id.as_str(),
3230                "event_id": event.id.0,
3231                "result": record.result,
3232                "cost_usd": record.cost_usd,
3233                "tokens": record.tokens,
3234                "latency_ms": record.latency_ms,
3235                "cached": record.cached,
3236                "reason": record.reason,
3237                "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
3238                "replay_of_event_id": replay_of_event_id,
3239            }),
3240            replay_of_event_id,
3241        )
3242        .await
3243    }
3244
3245    async fn append_predicate_cache_record(
3246        &self,
3247        binding: &TriggerBinding,
3248        event: &TriggerEvent,
3249        entries: &[PredicateCacheEntry],
3250    ) -> Result<(), DispatchError> {
3251        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3252            .expect("static trigger inbox legacy topic name is valid");
3253        let payload = serde_json::to_value(PredicateCacheRecord {
3254            trigger_id: binding.id.as_str().to_string(),
3255            event_id: event.id.0.clone(),
3256            entries: entries.to_vec(),
3257        })
3258        .map_err(|error| DispatchError::Serde(error.to_string()))?;
3259        self.event_log
3260            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
3261            .await
3262            .map_err(DispatchError::from)
3263            .map(|_| ())
3264    }
3265
3266    async fn read_predicate_cache_record(
3267        &self,
3268        event_id: &str,
3269    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
3270        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3271            .expect("static trigger inbox legacy topic name is valid");
3272        let records = self
3273            .event_log
3274            .read_range(&topic, None, usize::MAX)
3275            .await
3276            .map_err(DispatchError::from)?;
3277        Ok(records
3278            .into_iter()
3279            .filter(|(_, event)| event.kind == "predicate_llm_cache")
3280            .filter_map(|(_, event)| {
3281                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
3282            })
3283            .filter(|record| record.event_id == event_id)
3284            .flat_map(|record| record.entries)
3285            .collect())
3286    }
3287
3288    #[allow(clippy::too_many_arguments)]
3289    async fn append_dispatch_trust_record(
3290        &self,
3291        binding: &TriggerBinding,
3292        route: &DispatchUri,
3293        event: &TriggerEvent,
3294        replay_of_event_id: Option<&String>,
3295        autonomy_tier: AutonomyTier,
3296        outcome: TrustOutcome,
3297        terminal_status: &str,
3298        attempt_count: u32,
3299        error: Option<String>,
3300    ) -> Result<(), DispatchError> {
3301        let mut record = TrustRecord::new(
3302            binding.id.as_str().to_string(),
3303            format!("{}.{}", event.provider.as_str(), event.kind),
3304            None,
3305            outcome,
3306            event.trace_id.0.clone(),
3307            autonomy_tier,
3308        );
3309        record.metadata.insert(
3310            "binding_key".to_string(),
3311            serde_json::json!(binding.binding_key()),
3312        );
3313        record.metadata.insert(
3314            "binding_version".to_string(),
3315            serde_json::json!(binding.version),
3316        );
3317        record.metadata.insert(
3318            "provider".to_string(),
3319            serde_json::json!(event.provider.as_str()),
3320        );
3321        record
3322            .metadata
3323            .insert("event_kind".to_string(), serde_json::json!(event.kind));
3324        record
3325            .metadata
3326            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3327        record.metadata.insert(
3328            "target_uri".to_string(),
3329            serde_json::json!(route.target_uri()),
3330        );
3331        record.metadata.insert(
3332            "terminal_status".to_string(),
3333            serde_json::json!(terminal_status),
3334        );
3335        record.metadata.insert(
3336            "attempt_count".to_string(),
3337            serde_json::json!(attempt_count),
3338        );
3339        if let Some(replay_of_event_id) = replay_of_event_id {
3340            record.metadata.insert(
3341                "replay_of_event_id".to_string(),
3342                serde_json::json!(replay_of_event_id),
3343            );
3344        }
3345        if let Some(error) = error {
3346            record
3347                .metadata
3348                .insert("error".to_string(), serde_json::json!(error));
3349        }
3350        append_trust_record(&self.event_log, &record)
3351            .await
3352            .map(|_| ())
3353            .map_err(DispatchError::from)
3354    }
3355
3356    async fn append_attempt_record(
3357        &self,
3358        event: &TriggerEvent,
3359        binding: &TriggerBinding,
3360        attempt: &DispatchAttemptRecord,
3361        replay_of_event_id: Option<&String>,
3362    ) -> Result<(), DispatchError> {
3363        self.append_topic_event(
3364            TRIGGER_ATTEMPTS_TOPIC,
3365            "attempt_recorded",
3366            event,
3367            Some(binding),
3368            Some(attempt.attempt),
3369            serde_json::to_value(attempt)
3370                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3371            replay_of_event_id,
3372        )
3373        .await
3374    }
3375
3376    async fn append_lifecycle_event(
3377        &self,
3378        kind: &str,
3379        event: &TriggerEvent,
3380        binding: &TriggerBinding,
3381        payload: serde_json::Value,
3382        replay_of_event_id: Option<&String>,
3383    ) -> Result<(), DispatchError> {
3384        self.append_topic_event(
3385            TRIGGERS_LIFECYCLE_TOPIC,
3386            kind,
3387            event,
3388            Some(binding),
3389            None,
3390            payload,
3391            replay_of_event_id,
3392        )
3393        .await
3394    }
3395
3396    async fn append_budget_exhausted_event(
3397        &self,
3398        binding: &TriggerBinding,
3399        event: &TriggerEvent,
3400        reason: &str,
3401        expected_cost_usd_micros: u64,
3402        tokens: Option<u64>,
3403        replay_of_event_id: Option<&String>,
3404    ) -> Result<(), DispatchError> {
3405        let payload = serde_json::json!({
3406            "trigger_id": binding.id.as_str(),
3407            "event_id": event.id.0,
3408            "reason": reason,
3409            "strategy": binding.on_budget_exhausted.as_str(),
3410            "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
3411            "cost_usd": micros_to_usd(expected_cost_usd_micros),
3412            "tokens": tokens,
3413            "daily_limit_usd": binding.daily_cost_usd,
3414            "hourly_limit_usd": binding.hourly_cost_usd,
3415            "cost_today_usd": current_predicate_daily_cost(binding),
3416            "cost_hour_usd": current_predicate_hourly_cost(binding),
3417            "replay_of_event_id": replay_of_event_id,
3418        });
3419        self.append_lifecycle_event(
3420            "predicate.budget_exceeded",
3421            event,
3422            binding,
3423            payload.clone(),
3424            replay_of_event_id,
3425        )
3426        .await?;
3427        let legacy_kind = match reason {
3428            "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
3429            "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
3430            "orchestrator_daily_budget_exceeded" => {
3431                Some("predicate.orchestrator_daily_budget_exceeded")
3432            }
3433            "orchestrator_hourly_budget_exceeded" => {
3434                Some("predicate.orchestrator_hourly_budget_exceeded")
3435            }
3436            _ => None,
3437        };
3438        if let Some(kind) = legacy_kind {
3439            self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
3440                .await?;
3441        }
3442        Ok(())
3443    }
3444
3445    async fn append_autonomy_budget_approval_request(
3446        &self,
3447        binding: &TriggerBinding,
3448        route: &DispatchUri,
3449        event: &TriggerEvent,
3450        replay_of_event_id: Option<&String>,
3451        reason: &str,
3452    ) -> Result<String, DispatchError> {
3453        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
3454        let detail = serde_json::json!({
3455            "trigger_id": binding.id.as_str(),
3456            "binding_key": binding.binding_key(),
3457            "event_id": event.id.0,
3458            "reason": reason,
3459            "from_tier": AutonomyTier::ActAuto.as_str(),
3460            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3461            "handler_kind": route.kind(),
3462            "target_uri": route.target_uri(),
3463            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3464            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3465            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3466            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3467            "replay_of_event_id": replay_of_event_id,
3468        });
3469        let request_id = crate::stdlib::hitl::append_approval_request_on(
3470            &self.event_log,
3471            binding.id.as_str().to_string(),
3472            event.trace_id.0.clone(),
3473            format!(
3474                "approve autonomous dispatch for trigger '{}' after {}",
3475                binding.id.as_str(),
3476                reason
3477            ),
3478            detail.clone(),
3479            reviewers.clone(),
3480        )
3481        .await
3482        .map_err(dispatch_error_from_vm_error)?;
3483        self.append_lifecycle_event(
3484            "autonomy.budget_exceeded",
3485            event,
3486            binding,
3487            serde_json::json!({
3488                "trigger_id": binding.id.as_str(),
3489                "event_id": event.id.0,
3490                "reason": reason,
3491                "request_id": request_id,
3492                "reviewers": reviewers,
3493                "from_tier": AutonomyTier::ActAuto.as_str(),
3494                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3495                "replay_of_event_id": replay_of_event_id,
3496            }),
3497            replay_of_event_id,
3498        )
3499        .await?;
3500        Ok(request_id)
3501    }
3502
3503    #[allow(clippy::too_many_arguments)]
3504    async fn emit_autonomy_budget_approval_action_graph(
3505        &self,
3506        binding: &TriggerBinding,
3507        route: &DispatchUri,
3508        event: &TriggerEvent,
3509        source_node_id: &str,
3510        replay_of_event_id: Option<&String>,
3511        reason: &str,
3512        request_id: &str,
3513    ) -> Result<(), DispatchError> {
3514        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3515        self.emit_action_graph(
3516            event,
3517            vec![RunActionGraphNodeRecord {
3518                id: approval_node_id.clone(),
3519                label: format!("approval required: {reason}"),
3520                kind: "approval".to_string(),
3521                status: "waiting".to_string(),
3522                outcome: "request_approval".to_string(),
3523                trace_id: Some(event.trace_id.0.clone()),
3524                stage_id: None,
3525                node_id: None,
3526                worker_id: None,
3527                run_id: None,
3528                run_path: None,
3529                metadata: BTreeMap::from([
3530                    (
3531                        "trigger_id".to_string(),
3532                        serde_json::json!(binding.id.as_str()),
3533                    ),
3534                    (
3535                        "binding_key".to_string(),
3536                        serde_json::json!(binding.binding_key()),
3537                    ),
3538                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3539                    ("reason".to_string(), serde_json::json!(reason)),
3540                    ("request_id".to_string(), serde_json::json!(request_id)),
3541                    (
3542                        "reviewers".to_string(),
3543                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3544                    ),
3545                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3546                    (
3547                        "target_uri".to_string(),
3548                        serde_json::json!(route.target_uri()),
3549                    ),
3550                    (
3551                        "from_tier".to_string(),
3552                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3553                    ),
3554                    (
3555                        "requested_tier".to_string(),
3556                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3557                    ),
3558                    (
3559                        "replay_of_event_id".to_string(),
3560                        serde_json::json!(replay_of_event_id),
3561                    ),
3562                ]),
3563            }],
3564            vec![RunActionGraphEdgeRecord {
3565                from_id: source_node_id.to_string(),
3566                to_id: approval_node_id,
3567                kind: "approval_gate".to_string(),
3568                label: Some("autonomy budget".to_string()),
3569            }],
3570            serde_json::json!({
3571                "source": "dispatcher",
3572                "trigger_id": binding.id.as_str(),
3573                "binding_key": binding.binding_key(),
3574                "event_id": event.id.0,
3575                "reason": reason,
3576                "request_id": request_id,
3577                "replay_of_event_id": replay_of_event_id,
3578            }),
3579        )
3580        .await
3581    }
3582
3583    #[allow(clippy::too_many_arguments)]
3584    async fn append_tier_transition_trust_record(
3585        &self,
3586        binding: &TriggerBinding,
3587        event: &TriggerEvent,
3588        replay_of_event_id: Option<&String>,
3589        from_tier: AutonomyTier,
3590        to_tier: AutonomyTier,
3591        reason: &str,
3592        request_id: &str,
3593    ) -> Result<(), DispatchError> {
3594        let mut record = TrustRecord::new(
3595            binding.id.as_str().to_string(),
3596            "autonomy.tier_transition",
3597            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3598            TrustOutcome::Denied,
3599            event.trace_id.0.clone(),
3600            to_tier,
3601        );
3602        record.metadata.insert(
3603            "binding_key".to_string(),
3604            serde_json::json!(binding.binding_key()),
3605        );
3606        record
3607            .metadata
3608            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3609        record.metadata.insert(
3610            "from_tier".to_string(),
3611            serde_json::json!(from_tier.as_str()),
3612        );
3613        record
3614            .metadata
3615            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3616        record
3617            .metadata
3618            .insert("reason".to_string(), serde_json::json!(reason));
3619        record
3620            .metadata
3621            .insert("request_id".to_string(), serde_json::json!(request_id));
3622        if let Some(replay_of_event_id) = replay_of_event_id {
3623            record.metadata.insert(
3624                "replay_of_event_id".to_string(),
3625                serde_json::json!(replay_of_event_id),
3626            );
3627        }
3628        append_trust_record(&self.event_log, &record)
3629            .await
3630            .map(|_| ())
3631            .map_err(DispatchError::from)
3632    }
3633
3634    async fn append_budget_deferred_event(
3635        &self,
3636        binding: &TriggerBinding,
3637        route: &DispatchUri,
3638        event: &TriggerEvent,
3639        replay_of_event_id: Option<&String>,
3640        reason: &str,
3641    ) -> Result<(), DispatchError> {
3642        self.append_topic_event(
3643            TRIGGER_ATTEMPTS_TOPIC,
3644            "budget_deferred",
3645            event,
3646            Some(binding),
3647            None,
3648            serde_json::json!({
3649                "event_id": event.id.0,
3650                "trigger_id": binding.id.as_str(),
3651                "binding_key": binding.binding_key(),
3652                "handler_kind": route.kind(),
3653                "target_uri": route.target_uri(),
3654                "reason": reason,
3655                "retry_at": next_budget_reset_rfc3339(binding),
3656                "replay_of_event_id": replay_of_event_id,
3657            }),
3658            replay_of_event_id,
3659        )
3660        .await?;
3661        self.append_skipped_outbox_event(
3662            binding,
3663            route,
3664            event,
3665            replay_of_event_id,
3666            DispatchSkipStage::Predicate,
3667            serde_json::json!({
3668                "deferred": true,
3669                "reason": reason,
3670                "retry_at": next_budget_reset_rfc3339(binding),
3671            }),
3672        )
3673        .await
3674    }
3675
3676    async fn move_budget_exhausted_to_dlq(
3677        &self,
3678        binding: &TriggerBinding,
3679        route: &DispatchUri,
3680        event: &TriggerEvent,
3681        replay_of_event_id: Option<&String>,
3682        final_error: &str,
3683    ) -> Result<(), DispatchError> {
3684        let dlq_entry = DlqEntry {
3685            trigger_id: binding.id.as_str().to_string(),
3686            binding_key: binding.binding_key(),
3687            event: event.clone(),
3688            attempt_count: 0,
3689            final_error: final_error.to_string(),
3690            error_class: crate::triggers::classify_trigger_dlq_error(final_error).to_string(),
3691            attempts: Vec::new(),
3692        };
3693        self.state
3694            .dlq
3695            .lock()
3696            .expect("dispatcher dlq poisoned")
3697            .push(dlq_entry.clone());
3698        if let Some(metrics) = self.metrics.as_ref() {
3699            metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3700            metrics.record_trigger_accepted_to_dlq(
3701                binding.id.as_str(),
3702                &binding.binding_key(),
3703                event.provider.as_str(),
3704                tenant_id(event),
3705                "budget_exhausted",
3706                duration_between_ms(current_unix_ms(), accepted_at_ms(None, event)),
3707            );
3708        }
3709        tracing::info!(
3710            component = "dispatcher",
3711            lifecycle = "dlq_moved",
3712            trigger_id = %binding.id.as_str(),
3713            binding_key = %binding.binding_key(),
3714            event_id = %event.id.0,
3715            reason = "budget_exhausted",
3716            trace_id = %event.trace_id.0
3717        );
3718        self.emit_action_graph(
3719            event,
3720            vec![RunActionGraphNodeRecord {
3721                id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3722                label: binding.id.as_str().to_string(),
3723                kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3724                status: "queued".to_string(),
3725                outcome: "budget_exhausted".to_string(),
3726                trace_id: Some(event.trace_id.0.clone()),
3727                stage_id: None,
3728                node_id: None,
3729                worker_id: None,
3730                run_id: None,
3731                run_path: None,
3732                metadata: dlq_node_metadata(binding, event, 0, final_error),
3733            }],
3734            vec![RunActionGraphEdgeRecord {
3735                from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3736                to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3737                kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3738                label: Some("budget exhausted".to_string()),
3739            }],
3740            serde_json::json!({
3741                "source": "dispatcher",
3742                "trigger_id": binding.id.as_str(),
3743                "binding_key": binding.binding_key(),
3744                "event_id": event.id.0,
3745                "handler_kind": route.kind(),
3746                "target_uri": route.target_uri(),
3747                "final_error": final_error,
3748                "replay_of_event_id": replay_of_event_id,
3749            }),
3750        )
3751        .await?;
3752        self.append_lifecycle_event(
3753            "DlqMoved",
3754            event,
3755            binding,
3756            serde_json::json!({
3757                "event_id": event.id.0,
3758                "attempt_count": 0,
3759                "final_error": final_error,
3760                "reason": "budget_exhausted",
3761                "replay_of_event_id": replay_of_event_id,
3762            }),
3763            replay_of_event_id,
3764        )
3765        .await?;
3766        self.append_topic_event(
3767            TRIGGER_DLQ_TOPIC,
3768            "dlq_moved",
3769            event,
3770            Some(binding),
3771            None,
3772            serde_json::to_value(&dlq_entry)
3773                .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3774            replay_of_event_id,
3775        )
3776        .await
3777    }
3778
3779    async fn move_circuit_open_to_dlq(
3780        &self,
3781        binding: &TriggerBinding,
3782        route: &DispatchUri,
3783        event: &TriggerEvent,
3784        replay_of_event_id: Option<&String>,
3785        final_error: &str,
3786        destination: &str,
3787    ) -> Result<(), DispatchError> {
3788        let dlq_entry = DlqEntry {
3789            trigger_id: binding.id.as_str().to_string(),
3790            binding_key: binding.binding_key(),
3791            event: event.clone(),
3792            attempt_count: 0,
3793            final_error: final_error.to_string(),
3794            error_class: crate::triggers::classify_trigger_dlq_error(final_error).to_string(),
3795            attempts: Vec::new(),
3796        };
3797        self.state
3798            .dlq
3799            .lock()
3800            .expect("dispatcher dlq poisoned")
3801            .push(dlq_entry.clone());
3802        if let Some(metrics) = self.metrics.as_ref() {
3803            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
3804            metrics.record_trigger_accepted_to_dlq(
3805                binding.id.as_str(),
3806                &binding.binding_key(),
3807                event.provider.as_str(),
3808                tenant_id(event),
3809                "circuit_open",
3810                Duration::ZERO,
3811            );
3812        }
3813        tracing::info!(
3814            component = "dispatcher",
3815            lifecycle = "dlq_moved",
3816            trigger_id = %binding.id.as_str(),
3817            binding_key = %binding.binding_key(),
3818            event_id = %event.id.0,
3819            reason = "circuit_open",
3820            destination,
3821            trace_id = %event.trace_id.0
3822        );
3823        self.emit_action_graph(
3824            event,
3825            vec![RunActionGraphNodeRecord {
3826                id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3827                label: binding.id.as_str().to_string(),
3828                kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3829                status: "queued".to_string(),
3830                outcome: "circuit_open".to_string(),
3831                trace_id: Some(event.trace_id.0.clone()),
3832                stage_id: None,
3833                node_id: None,
3834                worker_id: None,
3835                run_id: None,
3836                run_path: None,
3837                metadata: dlq_node_metadata(binding, event, 0, final_error),
3838            }],
3839            vec![RunActionGraphEdgeRecord {
3840                from_id: format!("trigger:{}", event.id.0),
3841                to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3842                kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3843                label: Some("circuit open".to_string()),
3844            }],
3845            serde_json::json!({
3846                "source": "dispatcher",
3847                "trigger_id": binding.id.as_str(),
3848                "binding_key": binding.binding_key(),
3849                "event_id": event.id.0,
3850                "handler_kind": route.kind(),
3851                "target_uri": route.target_uri(),
3852                "destination": destination,
3853                "final_error": final_error,
3854                "replay_of_event_id": replay_of_event_id,
3855            }),
3856        )
3857        .await?;
3858        self.append_lifecycle_event(
3859            "DlqMoved",
3860            event,
3861            binding,
3862            serde_json::json!({
3863                "event_id": event.id.0,
3864                "attempt_count": 0,
3865                "final_error": final_error,
3866                "reason": "circuit_open",
3867                "destination": destination,
3868                "replay_of_event_id": replay_of_event_id,
3869            }),
3870            replay_of_event_id,
3871        )
3872        .await?;
3873        self.append_topic_event(
3874            TRIGGER_DLQ_TOPIC,
3875            "dlq_moved",
3876            event,
3877            Some(binding),
3878            None,
3879            serde_json::to_value(&dlq_entry)
3880                .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3881            replay_of_event_id,
3882        )
3883        .await
3884    }
3885
3886    async fn append_skipped_outbox_event(
3887        &self,
3888        binding: &TriggerBinding,
3889        route: &DispatchUri,
3890        event: &TriggerEvent,
3891        replay_of_event_id: Option<&String>,
3892        stage: DispatchSkipStage,
3893        detail: serde_json::Value,
3894    ) -> Result<(), DispatchError> {
3895        self.append_topic_event(
3896            TRIGGER_OUTBOX_TOPIC,
3897            "dispatch_skipped",
3898            event,
3899            Some(binding),
3900            None,
3901            serde_json::json!({
3902                "event_id": event.id.0,
3903                "trigger_id": binding.id.as_str(),
3904                "binding_key": binding.binding_key(),
3905                "handler_kind": route.kind(),
3906                "target_uri": route.target_uri(),
3907                "skip_stage": stage.as_str(),
3908                "detail": detail,
3909                "replay_of_event_id": replay_of_event_id,
3910            }),
3911            replay_of_event_id,
3912        )
3913        .await
3914    }
3915
3916    async fn append_topic_event(
3917        &self,
3918        topic_name: &str,
3919        kind: &str,
3920        event: &TriggerEvent,
3921        binding: Option<&TriggerBinding>,
3922        attempt: Option<u32>,
3923        payload: serde_json::Value,
3924        replay_of_event_id: Option<&String>,
3925    ) -> Result<(), DispatchError> {
3926        let topic = topic_for_event(event, topic_name)?;
3927        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3928        self.event_log
3929            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3930            .await
3931            .map_err(DispatchError::from)
3932            .map(|_| ())
3933    }
3934
3935    async fn emit_action_graph(
3936        &self,
3937        event: &TriggerEvent,
3938        nodes: Vec<RunActionGraphNodeRecord>,
3939        edges: Vec<RunActionGraphEdgeRecord>,
3940        extra: serde_json::Value,
3941    ) -> Result<(), DispatchError> {
3942        let mut headers = BTreeMap::new();
3943        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3944        headers.insert("event_id".to_string(), event.id.0.clone());
3945        let observability = RunObservabilityRecord {
3946            schema_version: 1,
3947            action_graph_nodes: nodes,
3948            action_graph_edges: edges,
3949            ..Default::default()
3950        };
3951        append_action_graph_update(
3952            headers,
3953            serde_json::json!({
3954                "source": "dispatcher",
3955                "trace_id": event.trace_id.0,
3956                "event_id": event.id.0,
3957                "observability": observability,
3958                "context": extra,
3959            }),
3960        )
3961        .await
3962        .map_err(DispatchError::from)
3963    }
3964}
3965
3966async fn dispatch_cancel_requested(
3967    event_log: &Arc<AnyEventLog>,
3968    binding_key: &str,
3969    event_id: &str,
3970    replay_of_event_id: Option<&String>,
3971) -> Result<bool, DispatchError> {
3972    if replay_of_event_id.is_some() {
3973        return Ok(false);
3974    }
3975    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3976        .expect("static trigger cancel topic should always be valid");
3977    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3978    let requested = events
3979        .into_iter()
3980        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3981        .filter_map(|(_, event)| {
3982            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3983        })
3984        .collect::<BTreeSet<_>>();
3985    Ok(requested
3986        .iter()
3987        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3988}
3989
3990async fn sleep_or_cancel_or_request(
3991    event_log: &Arc<AnyEventLog>,
3992    delay: Duration,
3993    binding_key: &str,
3994    event_id: &str,
3995    replay_of_event_id: Option<&String>,
3996    cancel_rx: &mut broadcast::Receiver<()>,
3997) -> Result<(), DispatchError> {
3998    let sleep = tokio::time::sleep(delay);
3999    pin_mut!(sleep);
4000    let mut poll = tokio::time::interval(Duration::from_millis(100));
4001    loop {
4002        tokio::select! {
4003            _ = &mut sleep => return Ok(()),
4004            _ = recv_cancel(cancel_rx) => {
4005                return Err(DispatchError::Cancelled(
4006                    "dispatcher shutdown cancelled retry wait".to_string(),
4007                ));
4008            }
4009            _ = poll.tick() => {
4010                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
4011                    return Err(DispatchError::Cancelled(
4012                        "trigger cancel request cancelled retry wait".to_string(),
4013                    ));
4014                }
4015            }
4016        }
4017    }
4018}
4019
4020fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
4021    let mut iter = events.into_iter();
4022    let Some(mut root) = iter.next() else {
4023        return Err(DispatchError::Registry(
4024            "batch dispatch produced an empty event list".to_string(),
4025        ));
4026    };
4027    let mut batch = Vec::new();
4028    batch.push(
4029        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
4030    );
4031    for event in iter {
4032        batch.push(
4033            serde_json::to_value(&event)
4034                .map_err(|error| DispatchError::Serde(error.to_string()))?,
4035        );
4036    }
4037    root.batch = Some(batch);
4038    Ok(root)
4039}
4040
4041fn json_value_to_gate(value: &serde_json::Value) -> String {
4042    match value {
4043        serde_json::Value::Null => "null".to_string(),
4044        serde_json::Value::String(text) => text.clone(),
4045        serde_json::Value::Bool(value) => value.to_string(),
4046        serde_json::Value::Number(value) => value.to_string(),
4047        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
4048    }
4049}
4050
4051fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
4052    let json =
4053        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4054    let value = json_to_vm_value(&json);
4055    match (&event.raw_body, value) {
4056        (Some(raw_body), VmValue::Dict(dict)) => {
4057            let mut map = (*dict).clone();
4058            map.insert(
4059                "raw_body".to_string(),
4060                VmValue::Bytes(Rc::new(raw_body.clone())),
4061            );
4062            Ok(VmValue::Dict(Rc::new(map)))
4063        }
4064        (_, other) => Ok(other),
4065    }
4066}
4067
4068fn decrement_in_flight(state: &DispatcherRuntimeState) {
4069    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
4070    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
4071        state.idle_notify.notify_waiters();
4072    }
4073}
4074
4075fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
4076    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
4077    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
4078        state.idle_notify.notify_waiters();
4079    }
4080}
4081
4082#[cfg(test)]
4083fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
4084    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4085        *slot.borrow_mut() = Some(tx);
4086    });
4087}
4088
4089#[cfg(not(test))]
4090fn notify_test_inbox_dequeued() {}
4091
4092#[cfg(test)]
4093fn notify_test_inbox_dequeued() {
4094    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4095        if let Some(tx) = slot.borrow_mut().take() {
4096            let _ = tx.send(());
4097        }
4098    });
4099}
4100
4101pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
4102    event_log: &L,
4103    event: &TriggerEvent,
4104) -> Result<u64, DispatchError> {
4105    let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
4106    let headers = event_headers(event, None, None, None);
4107    let payload =
4108        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4109    event_log
4110        .append(
4111            &topic,
4112            LogEvent::new("event_ingested", payload).with_headers(headers),
4113        )
4114        .await
4115        .map_err(DispatchError::from)
4116}
4117
4118pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
4119    ACTIVE_DISPATCHER_STATE.with(|slot| {
4120        slot.borrow()
4121            .as_ref()
4122            .map(|state| DispatcherStatsSnapshot {
4123                in_flight: state.in_flight.load(Ordering::Relaxed),
4124                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
4125                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
4126            })
4127            .unwrap_or_default()
4128    })
4129}
4130
4131pub fn clear_dispatcher_state() {
4132    ACTIVE_DISPATCHER_STATE.with(|slot| {
4133        *slot.borrow_mut() = None;
4134    });
4135    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
4136        *slot.borrow_mut() = None;
4137    });
4138}
4139
4140fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
4141    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
4142        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
4143    }
4144    if is_cancelled_vm_error(&error) {
4145        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
4146    }
4147    if let VmError::Thrown(VmValue::String(message)) = &error {
4148        return DispatchError::Local(message.to_string());
4149    }
4150    match error_to_category(&error) {
4151        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
4152        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
4153        ErrorCategory::Cancelled => {
4154            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
4155        }
4156        _ => DispatchError::Local(error.to_string()),
4157    }
4158}
4159
4160fn dispatch_error_label(error: &DispatchError) -> &'static str {
4161    match error {
4162        DispatchError::Denied(_) => "denied",
4163        DispatchError::Timeout(_) => "timeout",
4164        DispatchError::Waiting(_) => "waiting",
4165        DispatchError::Cancelled(_) => "cancelled",
4166        _ => "failed",
4167    }
4168}
4169
4170fn destination_circuit_key(route: &DispatchUri) -> String {
4171    format!("{}:{}", route.kind(), route.target_uri())
4172}
4173
4174fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
4175    match route {
4176        DispatchUri::Worker { .. } => "enqueued",
4177        DispatchUri::A2a { .. }
4178            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
4179        {
4180            "pending"
4181        }
4182        DispatchUri::A2a { .. } => "completed",
4183        DispatchUri::Local { .. } => "success",
4184    }
4185}
4186
4187fn dispatch_node_id(
4188    route: &DispatchUri,
4189    binding_key: &str,
4190    event_id: &str,
4191    attempt: u32,
4192) -> String {
4193    let prefix = match route {
4194        DispatchUri::A2a { .. } => "a2a",
4195        _ => "dispatch",
4196    };
4197    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
4198}
4199
4200fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
4201    match route {
4202        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
4203        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
4204        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
4205    }
4206}
4207
4208fn dispatch_node_label(route: &DispatchUri) -> String {
4209    match route {
4210        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
4211        _ => route.target_uri(),
4212    }
4213}
4214
4215fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
4216    match route {
4217        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
4218        _ => None,
4219    }
4220}
4221
4222fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
4223    match route {
4224        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
4225        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
4226        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
4227    }
4228}
4229
4230fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
4231    match status {
4232        crate::triggers::SignatureStatus::Verified => "verified",
4233        crate::triggers::SignatureStatus::Unsigned => "unsigned",
4234        crate::triggers::SignatureStatus::Failed { .. } => "failed",
4235    }
4236}
4237
4238fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
4239    let mut metadata = BTreeMap::new();
4240    metadata.insert(
4241        "provider".to_string(),
4242        serde_json::json!(event.provider.as_str()),
4243    );
4244    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
4245    metadata.insert(
4246        "dedupe_key".to_string(),
4247        serde_json::json!(event.dedupe_key),
4248    );
4249    metadata.insert(
4250        "signature_status".to_string(),
4251        serde_json::json!(signature_status_label(&event.signature_status)),
4252    );
4253    metadata
4254}
4255
4256fn predicate_node_metadata(
4257    binding: &TriggerBinding,
4258    predicate: &super::registry::TriggerPredicateSpec,
4259    event: &TriggerEvent,
4260    evaluation: &PredicateEvaluationRecord,
4261) -> BTreeMap<String, serde_json::Value> {
4262    let mut metadata = BTreeMap::new();
4263    metadata.insert(
4264        "trigger_id".to_string(),
4265        serde_json::json!(binding.id.as_str()),
4266    );
4267    metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
4268    metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
4269    metadata.insert(
4270        "cost_usd".to_string(),
4271        serde_json::json!(evaluation.cost_usd),
4272    );
4273    metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
4274    metadata.insert(
4275        "latency_ms".to_string(),
4276        serde_json::json!(evaluation.latency_ms),
4277    );
4278    metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
4279    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4280    if let Some(reason) = evaluation.reason.as_ref() {
4281        metadata.insert("reason".to_string(), serde_json::json!(reason));
4282    }
4283    metadata
4284}
4285
4286fn dispatch_node_metadata(
4287    route: &DispatchUri,
4288    binding: &TriggerBinding,
4289    event: &TriggerEvent,
4290    attempt: u32,
4291) -> BTreeMap<String, serde_json::Value> {
4292    let mut metadata = BTreeMap::new();
4293    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
4294    metadata.insert(
4295        "target_uri".to_string(),
4296        serde_json::json!(route.target_uri()),
4297    );
4298    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4299    metadata.insert(
4300        "trigger_id".to_string(),
4301        serde_json::json!(binding.id.as_str()),
4302    );
4303    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4304    if let Some(target_agent) = dispatch_target_agent(route) {
4305        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
4306    }
4307    if let DispatchUri::Worker { queue } = route {
4308        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
4309    }
4310    metadata
4311}
4312
4313fn dispatch_success_metadata(
4314    route: &DispatchUri,
4315    binding: &TriggerBinding,
4316    event: &TriggerEvent,
4317    attempt: u32,
4318    result: &serde_json::Value,
4319) -> BTreeMap<String, serde_json::Value> {
4320    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4321    match route {
4322        DispatchUri::A2a { .. } => {
4323            if let Some(task_id) = result
4324                .get("task_id")
4325                .or_else(|| result.get("id"))
4326                .and_then(|value| value.as_str())
4327            {
4328                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
4329            }
4330            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
4331                metadata.insert("state".to_string(), serde_json::json!(state));
4332            }
4333        }
4334        DispatchUri::Worker { .. } => {
4335            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
4336            {
4337                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
4338            }
4339            if let Some(response_topic) = result
4340                .get("response_topic")
4341                .and_then(|value| value.as_str())
4342            {
4343                metadata.insert(
4344                    "response_topic".to_string(),
4345                    serde_json::json!(response_topic),
4346                );
4347            }
4348        }
4349        DispatchUri::Local { .. } => {}
4350    }
4351    metadata
4352}
4353
4354fn dispatch_error_metadata(
4355    route: &DispatchUri,
4356    binding: &TriggerBinding,
4357    event: &TriggerEvent,
4358    attempt: u32,
4359    error: &DispatchError,
4360) -> BTreeMap<String, serde_json::Value> {
4361    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4362    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4363    metadata
4364}
4365
4366fn retry_node_metadata(
4367    binding: &TriggerBinding,
4368    event: &TriggerEvent,
4369    attempt: u32,
4370    delay: Duration,
4371    error: &DispatchError,
4372) -> BTreeMap<String, serde_json::Value> {
4373    let mut metadata = BTreeMap::new();
4374    metadata.insert(
4375        "trigger_id".to_string(),
4376        serde_json::json!(binding.id.as_str()),
4377    );
4378    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4379    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4380    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
4381    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4382    metadata
4383}
4384
4385fn dlq_node_metadata(
4386    binding: &TriggerBinding,
4387    event: &TriggerEvent,
4388    attempt_count: u32,
4389    final_error: &str,
4390) -> BTreeMap<String, serde_json::Value> {
4391    let mut metadata = BTreeMap::new();
4392    metadata.insert(
4393        "trigger_id".to_string(),
4394        serde_json::json!(binding.id.as_str()),
4395    );
4396    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4397    metadata.insert(
4398        "attempt_count".to_string(),
4399        serde_json::json!(attempt_count),
4400    );
4401    metadata.insert("final_error".to_string(), serde_json::json!(final_error));
4402    metadata.insert(
4403        "error_class".to_string(),
4404        serde_json::json!(crate::triggers::classify_trigger_dlq_error(final_error)),
4405    );
4406    metadata
4407}
4408
4409fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
4410    match value {
4411        VmValue::Bool(result) => Ok(result),
4412        VmValue::EnumVariant {
4413            enum_name,
4414            variant,
4415            fields,
4416        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
4417            Some(VmValue::Bool(result)) => Ok(*result),
4418            Some(other) => Err(format!(
4419                "predicate Result.Ok payload must be bool, got {}",
4420                other.type_name()
4421            )),
4422            None => Err("predicate Result.Ok payload is missing".to_string()),
4423        },
4424        VmValue::EnumVariant {
4425            enum_name,
4426            variant,
4427            fields,
4428        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
4429            .first()
4430            .map(VmValue::display)
4431            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
4432        other => Err(format!(
4433            "predicate must return bool or Result<bool, _>, got {}",
4434            other.type_name()
4435        )),
4436    }
4437}
4438
4439fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
4440    micros_to_usd(
4441        binding
4442            .metrics
4443            .cost_today_usd_micros
4444            .load(Ordering::Relaxed),
4445    )
4446}
4447
4448fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
4449    micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
4450}
4451
4452fn split_binding_key(binding_key: &str) -> (String, u32) {
4453    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
4454        return (binding_key.to_string(), 0);
4455    };
4456    let version = suffix.parse::<u32>().unwrap_or(0);
4457    (binding_id.to_string(), version)
4458}
4459
4460fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
4461    match binding_version {
4462        Some(version) => format!("{trigger_id}@v{version}"),
4463        None => trigger_id.to_string(),
4464    }
4465}
4466
4467fn tenant_id(event: &TriggerEvent) -> Option<&str> {
4468    event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
4469}
4470
4471fn current_unix_ms() -> i64 {
4472    unix_ms(time::OffsetDateTime::now_utc())
4473}
4474
4475fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
4476    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
4477}
4478
4479fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4480    lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
4481        .unwrap_or_else(|| unix_ms(event.received_at))
4482}
4483
4484fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4485    lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
4486        .unwrap_or_else(|| accepted_at_ms(headers, event))
4487}
4488
4489fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
4490    headers
4491        .and_then(|headers| headers.get(name))
4492        .and_then(|value| value.parse::<i64>().ok())
4493}
4494
4495fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
4496    Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
4497}
4498
4499fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
4500    match result {
4501        Ok(_) => "succeeded",
4502        Err(DispatchError::Waiting(_)) => "waiting",
4503        Err(DispatchError::Cancelled(_)) => "cancelled",
4504        Err(DispatchError::Denied(_)) => "denied",
4505        Err(DispatchError::Timeout(_)) => "timeout",
4506        Err(_) => "failed",
4507    }
4508}
4509
4510fn is_cancelled_vm_error(error: &VmError) -> bool {
4511    matches!(
4512        error,
4513        VmError::Thrown(VmValue::String(message))
4514            if message.starts_with("kind:cancelled:")
4515    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
4516}
4517
4518fn event_headers(
4519    event: &TriggerEvent,
4520    binding: Option<&TriggerBinding>,
4521    attempt: Option<u32>,
4522    replay_of_event_id: Option<&String>,
4523) -> BTreeMap<String, String> {
4524    let mut headers = BTreeMap::new();
4525    headers.insert("event_id".to_string(), event.id.0.clone());
4526    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
4527    headers.insert("provider".to_string(), event.provider.as_str().to_string());
4528    headers.insert("kind".to_string(), event.kind.clone());
4529    if let Some(replay_of_event_id) = replay_of_event_id {
4530        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
4531    }
4532    if let Some(tenant_id) = event.tenant_id.as_ref() {
4533        headers.insert("tenant_id".to_string(), tenant_id.0.clone());
4534    }
4535    if let Some(binding) = binding {
4536        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
4537        headers.insert("binding_key".to_string(), binding.binding_key());
4538        headers.insert(
4539            "handler_kind".to_string(),
4540            DispatchUri::from(&binding.handler).kind().to_string(),
4541        );
4542    }
4543    if let Some(attempt) = attempt {
4544        headers.insert("attempt".to_string(), attempt.to_string());
4545    }
4546    headers
4547}
4548
4549fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
4550    let topic = Topic::new(topic_name)
4551        .expect("static trigger dispatcher topic names should always be valid");
4552    match event.tenant_id.as_ref() {
4553        Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
4554        None => Ok(topic),
4555    }
4556}
4557
4558fn worker_queue_priority(
4559    binding: &super::registry::TriggerBinding,
4560    event: &TriggerEvent,
4561) -> crate::WorkerQueuePriority {
4562    match event
4563        .headers
4564        .get("priority")
4565        .map(|value| value.trim().to_ascii_lowercase())
4566        .as_deref()
4567    {
4568        Some("high") => crate::WorkerQueuePriority::High,
4569        Some("low") => crate::WorkerQueuePriority::Low,
4570        _ => binding.dispatch_priority,
4571    }
4572}
4573
4574const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
4575
4576fn maybe_fail_before_outbox() {
4577    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
4578        std::process::exit(86);
4579    }
4580}
4581
4582fn now_rfc3339() -> String {
4583    time::OffsetDateTime::now_utc()
4584        .format(&time::format_description::well_known::Rfc3339)
4585        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4586}
4587
4588fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
4589    let now = time::OffsetDateTime::now_utc();
4590    let reset = if binding.hourly_cost_usd.is_some() {
4591        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
4592        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
4593    } else {
4594        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
4595        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
4596    };
4597    reset
4598        .format(&time::format_description::well_known::Rfc3339)
4599        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4600}
4601
4602fn now_unix_ms() -> i64 {
4603    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
4604}
4605
4606fn cancelled_dispatch_outcome(
4607    binding: &TriggerBinding,
4608    route: &DispatchUri,
4609    event: &TriggerEvent,
4610    replay_of_event_id: Option<String>,
4611    attempt_count: u32,
4612    error: String,
4613) -> DispatchOutcome {
4614    DispatchOutcome {
4615        trigger_id: binding.id.as_str().to_string(),
4616        binding_key: binding.binding_key(),
4617        event_id: event.id.0.clone(),
4618        attempt_count,
4619        status: DispatchStatus::Cancelled,
4620        handler_kind: route.kind().to_string(),
4621        target_uri: route.target_uri(),
4622        replay_of_event_id,
4623        result: None,
4624        error: Some(error),
4625    }
4626}
4627
4628async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
4629    let _ = cancel_rx.recv().await;
4630}
4631
4632#[cfg(test)]
4633mod tests;