Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::vm_value_to_json;
16use crate::orchestration::{
17    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
18    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
19    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
20    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
21    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
22    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
23    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
24};
25use crate::stdlib::json_to_vm_value;
26use crate::trust_graph::{
27    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
28};
29use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
30use crate::vm::Vm;
31
32use self::uri::DispatchUri;
33use super::registry::{
34    binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
35    TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
36};
37use super::{
38    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
39    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
40    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
41};
42use circuits::{
43    destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
44    DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
45};
46use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
47use predicate_eval::predicate_node_metadata;
48
49mod circuits;
50mod flow_control;
51mod predicate_eval;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
58pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
59pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
60
61thread_local! {
62    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65    #[cfg(test)]
66    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70    static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75    pub trigger_event: TriggerEvent,
76    pub replay_of_event_id: Option<String>,
77    pub binding_id: String,
78    pub binding_version: u32,
79    pub agent_id: String,
80    pub action: String,
81    pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87    fn drop(&mut self) {
88        crate::orchestration::pop_execution_policy();
89    }
90}
91
92const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
93
94pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
95    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
96}
97
98pub(crate) fn current_dispatch_is_replay() -> bool {
99    ACTIVE_DISPATCH_IS_REPLAY
100        .try_with(|is_replay| *is_replay)
101        .unwrap_or(false)
102}
103
104pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
105    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
106}
107
108#[derive(Clone)]
109pub struct Dispatcher {
110    base_vm: Rc<Vm>,
111    event_log: Arc<AnyEventLog>,
112    cancel_tx: broadcast::Sender<()>,
113    state: Arc<DispatcherRuntimeState>,
114    metrics: Option<Arc<crate::MetricsRegistry>>,
115}
116
117#[derive(Debug)]
118struct DispatcherRuntimeState {
119    in_flight: AtomicU64,
120    retry_queue_depth: AtomicU64,
121    dlq: Mutex<Vec<DlqEntry>>,
122    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
123    shutting_down: std::sync::atomic::AtomicBool,
124    idle_notify: Notify,
125    flow_control: FlowControlManager,
126    destination_circuits: DestinationCircuitRegistry,
127}
128
129impl DispatcherRuntimeState {
130    fn new(event_log: Arc<AnyEventLog>) -> Self {
131        Self {
132            in_flight: AtomicU64::new(0),
133            retry_queue_depth: AtomicU64::new(0),
134            dlq: Mutex::new(Vec::new()),
135            cancel_tokens: Mutex::new(Vec::new()),
136            shutting_down: std::sync::atomic::AtomicBool::new(false),
137            idle_notify: Notify::new(),
138            flow_control: FlowControlManager::new(event_log),
139            destination_circuits: DestinationCircuitRegistry::default(),
140        }
141    }
142}
143
144#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(default)]
146pub struct DispatcherStatsSnapshot {
147    pub in_flight: u64,
148    pub retry_queue_depth: u64,
149    pub dlq_depth: u64,
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum DispatchStatus {
155    Succeeded,
156    Failed,
157    Dlq,
158    Skipped,
159    Waiting,
160    Cancelled,
161}
162
163impl DispatchStatus {
164    pub fn as_str(&self) -> &'static str {
165        match self {
166            Self::Succeeded => "succeeded",
167            Self::Failed => "failed",
168            Self::Dlq => "dlq",
169            Self::Skipped => "skipped",
170            Self::Waiting => "waiting",
171            Self::Cancelled => "cancelled",
172        }
173    }
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177#[serde(default)]
178pub struct DispatchOutcome {
179    pub trigger_id: String,
180    pub binding_key: String,
181    pub event_id: String,
182    pub attempt_count: u32,
183    pub status: DispatchStatus,
184    pub handler_kind: String,
185    pub target_uri: String,
186    pub replay_of_event_id: Option<String>,
187    pub result: Option<serde_json::Value>,
188    pub error: Option<String>,
189}
190
191#[derive(Clone, Debug, Serialize, Deserialize)]
192pub struct InboxEnvelope {
193    pub trigger_id: Option<String>,
194    pub binding_version: Option<u32>,
195    pub event: TriggerEvent,
196}
197
198#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
199#[serde(default)]
200pub struct DispatcherDrainReport {
201    pub drained: bool,
202    pub in_flight: u64,
203    pub retry_queue_depth: u64,
204    pub dlq_depth: u64,
205}
206
207impl Default for DispatchOutcome {
208    fn default() -> Self {
209        Self {
210            trigger_id: String::new(),
211            binding_key: String::new(),
212            event_id: String::new(),
213            attempt_count: 0,
214            status: DispatchStatus::Failed,
215            handler_kind: String::new(),
216            target_uri: String::new(),
217            replay_of_event_id: None,
218            result: None,
219            error: None,
220        }
221    }
222}
223
224#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
225#[serde(default)]
226pub struct DispatchAttemptRecord {
227    pub trigger_id: String,
228    pub binding_key: String,
229    pub event_id: String,
230    pub attempt: u32,
231    pub handler_kind: String,
232    pub started_at: String,
233    pub completed_at: String,
234    pub outcome: String,
235    pub error_msg: Option<String>,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
239pub struct DispatchCancelRequest {
240    pub binding_key: String,
241    pub event_id: String,
242    #[serde(with = "time::serde::rfc3339")]
243    pub requested_at: time::OffsetDateTime,
244    #[serde(default)]
245    pub requested_by: Option<String>,
246    #[serde(default)]
247    pub audit_id: Option<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct DlqEntry {
252    pub trigger_id: String,
253    pub binding_key: String,
254    pub event: TriggerEvent,
255    pub attempt_count: u32,
256    pub final_error: String,
257    #[serde(default = "default_dlq_error_class")]
258    pub error_class: String,
259    pub attempts: Vec<DispatchAttemptRecord>,
260}
261
262fn default_dlq_error_class() -> String {
263    "unknown".to_string()
264}
265
266#[derive(Clone, Debug)]
267struct SingletonLease {
268    gate: String,
269    held: bool,
270}
271
272#[derive(Clone, Debug)]
273struct ConcurrencyLease {
274    gate: String,
275    max: u32,
276    priority_rank: usize,
277    permit: Option<ConcurrencyPermit>,
278}
279
280#[derive(Default, Debug)]
281struct AcquiredFlowControl {
282    singleton: Option<SingletonLease>,
283    concurrency: Option<ConcurrencyLease>,
284}
285
286#[derive(Clone)]
287pub(crate) struct DispatchWaitLease {
288    state: Arc<DispatcherRuntimeState>,
289    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
290    suspended: Arc<AtomicBool>,
291}
292
293impl DispatchWaitLease {
294    fn new(
295        state: Arc<DispatcherRuntimeState>,
296        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
297    ) -> Self {
298        Self {
299            state,
300            acquired,
301            suspended: Arc::new(AtomicBool::new(false)),
302        }
303    }
304
305    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
306        if self.suspended.swap(true, Ordering::SeqCst) {
307            return Ok(());
308        }
309        let (singleton_gate, concurrency_permit) = {
310            let mut acquired = self.acquired.lock().await;
311            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
312                if lease.held {
313                    lease.held = false;
314                    Some(lease.gate.clone())
315                } else {
316                    None
317                }
318            });
319            let concurrency_permit = acquired
320                .concurrency
321                .as_mut()
322                .and_then(|lease| lease.permit.take());
323            (singleton_gate, concurrency_permit)
324        };
325
326        if let Some(gate) = singleton_gate {
327            self.state
328                .flow_control
329                .release_singleton(&gate)
330                .await
331                .map_err(DispatchError::from)?;
332        }
333        if let Some(permit) = concurrency_permit {
334            self.state
335                .flow_control
336                .release_concurrency(permit)
337                .await
338                .map_err(DispatchError::from)?;
339        }
340        Ok(())
341    }
342
343    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
344        if !self.suspended.swap(false, Ordering::SeqCst) {
345            return Ok(());
346        }
347
348        let singleton_gate = {
349            let acquired = self.acquired.lock().await;
350            acquired.singleton.as_ref().and_then(|lease| {
351                if lease.held {
352                    None
353                } else {
354                    Some(lease.gate.clone())
355                }
356            })
357        };
358        if let Some(gate) = singleton_gate {
359            self.state
360                .flow_control
361                .acquire_singleton(&gate)
362                .await
363                .map_err(DispatchError::from)?;
364            let mut acquired = self.acquired.lock().await;
365            if let Some(lease) = acquired.singleton.as_mut() {
366                lease.held = true;
367            }
368        }
369
370        let concurrency_spec = {
371            let acquired = self.acquired.lock().await;
372            acquired.concurrency.as_ref().and_then(|lease| {
373                if lease.permit.is_some() {
374                    None
375                } else {
376                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
377                }
378            })
379        };
380        if let Some((gate, max, priority_rank)) = concurrency_spec {
381            let permit = self
382                .state
383                .flow_control
384                .acquire_concurrency(&gate, max, priority_rank)
385                .await
386                .map_err(DispatchError::from)?;
387            let mut acquired = self.acquired.lock().await;
388            if let Some(lease) = acquired.concurrency.as_mut() {
389                lease.permit = Some(permit);
390            }
391        }
392        Ok(())
393    }
394}
395
396enum FlowControlOutcome {
397    Dispatch {
398        event: Box<TriggerEvent>,
399        acquired: AcquiredFlowControl,
400    },
401    Skip {
402        reason: String,
403    },
404}
405
406#[derive(Clone, Debug)]
407enum DispatchSkipStage {
408    Predicate,
409    FlowControl,
410}
411
412#[derive(Debug)]
413pub enum DispatchError {
414    EventLog(String),
415    Registry(String),
416    Serde(String),
417    Local(String),
418    A2a(String),
419    Denied(String),
420    Timeout(String),
421    Waiting(String),
422    Cancelled(String),
423    NotImplemented(String),
424}
425
426impl std::fmt::Display for DispatchError {
427    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428        match self {
429            Self::EventLog(message)
430            | Self::Registry(message)
431            | Self::Serde(message)
432            | Self::Local(message)
433            | Self::A2a(message)
434            | Self::Denied(message)
435            | Self::Timeout(message)
436            | Self::Waiting(message)
437            | Self::Cancelled(message)
438            | Self::NotImplemented(message) => f.write_str(message),
439        }
440    }
441}
442
443impl std::error::Error for DispatchError {}
444
445impl DispatchError {
446    fn retryable(&self) -> bool {
447        !matches!(
448            self,
449            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
450        )
451    }
452}
453
454impl DispatchSkipStage {
455    fn as_str(&self) -> &'static str {
456        match self {
457            Self::Predicate => "predicate",
458            Self::FlowControl => "flow_control",
459        }
460    }
461}
462
463impl From<LogError> for DispatchError {
464    fn from(value: LogError) -> Self {
465        Self::EventLog(value.to_string())
466    }
467}
468
469pub async fn append_dispatch_cancel_request(
470    event_log: &Arc<AnyEventLog>,
471    request: &DispatchCancelRequest,
472) -> Result<u64, DispatchError> {
473    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
474        .expect("static trigger cancel topic should always be valid");
475    event_log
476        .append(
477            &topic,
478            LogEvent::new(
479                "dispatch_cancel_requested",
480                serde_json::to_value(request)
481                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
482            ),
483        )
484        .await
485        .map_err(DispatchError::from)
486}
487
488impl Dispatcher {
489    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
490        self.event_log.clone()
491    }
492
493    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
494        let event_log = active_event_log().ok_or_else(|| {
495            DispatchError::EventLog("dispatcher requires an active event log".to_string())
496        })?;
497        Ok(Self::with_event_log(base_vm, event_log))
498    }
499
500    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
501        Self::with_event_log_and_metrics(base_vm, event_log, None)
502    }
503
504    pub fn with_event_log_and_metrics(
505        base_vm: Vm,
506        event_log: Arc<AnyEventLog>,
507        metrics: Option<Arc<crate::MetricsRegistry>>,
508    ) -> Self {
509        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
510        ACTIVE_DISPATCHER_STATE.with(|slot| {
511            *slot.borrow_mut() = Some(state.clone());
512        });
513        let (cancel_tx, _) = broadcast::channel(32);
514        Self {
515            base_vm: Rc::new(base_vm),
516            event_log,
517            cancel_tx,
518            state,
519            metrics,
520        }
521    }
522
523    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
524        DispatcherStatsSnapshot {
525            in_flight: self.state.in_flight.load(Ordering::Relaxed),
526            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
527            dlq_depth: self
528                .state
529                .dlq
530                .lock()
531                .expect("dispatcher dlq poisoned")
532                .len() as u64,
533        }
534    }
535
536    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
537        self.state
538            .dlq
539            .lock()
540            .expect("dispatcher dlq poisoned")
541            .clone()
542    }
543
544    pub fn shutdown(&self) {
545        self.state.shutting_down.store(true, Ordering::SeqCst);
546        for token in self
547            .state
548            .cancel_tokens
549            .lock()
550            .expect("dispatcher cancel tokens poisoned")
551            .iter()
552        {
553            token.store(true, Ordering::SeqCst);
554        }
555        let _ = self.cancel_tx.send(());
556    }
557
558    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
559        self.enqueue_targeted(None, None, event).await
560    }
561
562    pub async fn enqueue_targeted(
563        &self,
564        trigger_id: Option<String>,
565        binding_version: Option<u32>,
566        event: TriggerEvent,
567    ) -> Result<u64, DispatchError> {
568        self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
569            .await
570    }
571
572    pub async fn enqueue_targeted_with_headers(
573        &self,
574        trigger_id: Option<String>,
575        binding_version: Option<u32>,
576        event: TriggerEvent,
577        parent_headers: Option<&BTreeMap<String, String>>,
578    ) -> Result<u64, DispatchError> {
579        let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
580        let trigger_id_for_metrics = trigger_id.clone();
581        let mut headers = parent_headers.cloned().unwrap_or_default();
582        headers.extend(event_headers(&event, None, None, None));
583        if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
584            headers.insert("trigger_id".to_string(), trigger_id.clone());
585            headers.insert(
586                "binding_key".to_string(),
587                binding_key_from_parts(trigger_id, binding_version),
588            );
589        }
590        headers
591            .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
592            .or_insert_with(|| unix_ms(event.received_at).to_string());
593        let payload = serde_json::to_value(InboxEnvelope {
594            trigger_id,
595            binding_version,
596            event: event.clone(),
597        })
598        .map_err(|error| DispatchError::Serde(error.to_string()))?;
599        let mut log_event = LogEvent::new("event_ingested", payload);
600        let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
601        let queue_appended_at_ms = headers
602            .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
603            .and_then(|value| value.parse::<i64>().ok())
604            .unwrap_or(log_event.occurred_at_ms);
605        headers
606            .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
607            .or_insert_with(|| log_event.occurred_at_ms.to_string());
608        if let (Some(metrics), Some(trigger_id)) =
609            (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
610        {
611            let binding_key = binding_key_from_parts(trigger_id, binding_version);
612            let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
613            if !had_queue_appended_at {
614                metrics.record_trigger_accepted_to_queue_append(
615                    trigger_id,
616                    &binding_key,
617                    event.provider.as_str(),
618                    tenant_id(&event),
619                    "queued",
620                    duration_between_ms(queue_appended_at_ms, accepted_at_ms),
621                );
622            }
623            metrics.note_trigger_pending_event(
624                event.id.0.as_str(),
625                trigger_id,
626                &binding_key,
627                event.provider.as_str(),
628                tenant_id(&event),
629                accepted_at_ms,
630                queue_appended_at_ms,
631            );
632        }
633        log_event.headers = headers;
634        self.event_log
635            .append(&topic, log_event)
636            .await
637            .map_err(DispatchError::from)
638    }
639
640    pub async fn run(&self) -> Result<(), DispatchError> {
641        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
642            .expect("static trigger inbox envelopes topic is valid");
643        let start_from = self.event_log.latest(&topic).await?;
644        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
645        pin_mut!(stream);
646        let mut cancel_rx = self.cancel_tx.subscribe();
647
648        loop {
649            tokio::select! {
650                received = stream.next() => {
651                    let Some(received) = received else {
652                        break;
653                    };
654                    let (_, event) = received.map_err(DispatchError::from)?;
655                    if event.kind != "event_ingested" {
656                        continue;
657                    }
658                    let parent_headers = event.headers.clone();
659                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
660                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
661                    notify_test_inbox_dequeued();
662                    let _ = self
663                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
664                        .await;
665                }
666                _ = recv_cancel(&mut cancel_rx) => break,
667            }
668        }
669
670        Ok(())
671    }
672
673    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
674        let deadline = tokio::time::Instant::now() + timeout;
675        loop {
676            let snapshot = self.snapshot();
677            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
678                return Ok(DispatcherDrainReport {
679                    drained: true,
680                    in_flight: snapshot.in_flight,
681                    retry_queue_depth: snapshot.retry_queue_depth,
682                    dlq_depth: snapshot.dlq_depth,
683                });
684            }
685
686            let notified = self.state.idle_notify.notified();
687            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
688            if remaining.is_zero() {
689                return Ok(DispatcherDrainReport {
690                    drained: false,
691                    in_flight: snapshot.in_flight,
692                    retry_queue_depth: snapshot.retry_queue_depth,
693                    dlq_depth: snapshot.dlq_depth,
694                });
695            }
696            if tokio::time::timeout(remaining, notified).await.is_err() {
697                let snapshot = self.snapshot();
698                return Ok(DispatcherDrainReport {
699                    drained: false,
700                    in_flight: snapshot.in_flight,
701                    retry_queue_depth: snapshot.retry_queue_depth,
702                    dlq_depth: snapshot.dlq_depth,
703                });
704            }
705        }
706    }
707
708    pub async fn dispatch_inbox_envelope(
709        &self,
710        envelope: InboxEnvelope,
711    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
712        self.dispatch_inbox_envelope_with_headers(envelope, None)
713            .await
714    }
715
716    pub async fn dispatch_inbox_envelope_with_parent_headers(
717        &self,
718        envelope: InboxEnvelope,
719        parent_headers: &BTreeMap<String, String>,
720    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
721        self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
722            .await
723    }
724
725    async fn dispatch_inbox_envelope_with_headers(
726        &self,
727        envelope: InboxEnvelope,
728        parent_headers: Option<&BTreeMap<String, String>>,
729    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
730        if let Some(trigger_id) = envelope.trigger_id {
731            let binding = super::registry::resolve_live_trigger_binding(
732                &trigger_id,
733                envelope.binding_version,
734            )
735            .map_err(|error| DispatchError::Registry(error.to_string()))?;
736            return Ok(vec![
737                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
738                    .await?,
739            ]);
740        }
741
742        let cron_target = match &envelope.event.provider_payload {
743            crate::triggers::ProviderPayload::Known(
744                crate::triggers::event::KnownProviderPayload::Cron(payload),
745            ) => payload.cron_id.clone(),
746            _ => None,
747        };
748        if let Some(trigger_id) = cron_target {
749            let binding = super::registry::resolve_live_trigger_binding(
750                &trigger_id,
751                envelope.binding_version,
752            )
753            .map_err(|error| DispatchError::Registry(error.to_string()))?;
754            return Ok(vec![
755                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
756                    .await?,
757            ]);
758        }
759
760        self.dispatch_event_with_headers(envelope.event, parent_headers)
761            .await
762    }
763
764    pub async fn dispatch_event(
765        &self,
766        event: TriggerEvent,
767    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
768        self.dispatch_event_with_headers(event, None).await
769    }
770
771    async fn dispatch_event_with_headers(
772        &self,
773        event: TriggerEvent,
774        parent_headers: Option<&BTreeMap<String, String>>,
775    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
776        let bindings = matching_bindings(&event);
777        let mut outcomes = Vec::new();
778        for binding in bindings {
779            outcomes.push(
780                self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
781                    .await?,
782            );
783        }
784        Ok(outcomes)
785    }
786
787    pub async fn dispatch(
788        &self,
789        binding: &TriggerBinding,
790        event: TriggerEvent,
791    ) -> Result<DispatchOutcome, DispatchError> {
792        self.dispatch_with_replay(binding, event, None, None, None)
793            .await
794    }
795
796    pub async fn dispatch_replay(
797        &self,
798        binding: &TriggerBinding,
799        event: TriggerEvent,
800        replay_of_event_id: String,
801    ) -> Result<DispatchOutcome, DispatchError> {
802        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
803            .await
804    }
805
806    pub async fn dispatch_with_parent_span_id(
807        &self,
808        binding: &TriggerBinding,
809        event: TriggerEvent,
810        parent_span_id: Option<String>,
811    ) -> Result<DispatchOutcome, DispatchError> {
812        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
813            .await
814    }
815
816    async fn dispatch_with_replay(
817        &self,
818        binding: &TriggerBinding,
819        event: TriggerEvent,
820        replay_of_event_id: Option<String>,
821        parent_span_id: Option<String>,
822        parent_headers: Option<&BTreeMap<String, String>>,
823    ) -> Result<DispatchOutcome, DispatchError> {
824        let parent_headers_for_metrics = parent_headers.cloned();
825        let admitted_at_ms = current_unix_ms();
826        if let Some(metrics) = self.metrics.as_ref() {
827            let binding_key = binding.binding_key();
828            let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
829            metrics.record_trigger_queue_age_at_dispatch_admission(
830                binding.id.as_str(),
831                &binding_key,
832                event.provider.as_str(),
833                tenant_id(&event),
834                "admitted",
835                duration_between_ms(admitted_at_ms, queue_appended_at_ms),
836            );
837            metrics.clear_trigger_pending_event(
838                event.id.0.as_str(),
839                binding.id.as_str(),
840                &binding_key,
841                event.provider.as_str(),
842                tenant_id(&event),
843                admitted_at_ms,
844            );
845        }
846        let span = tracing::info_span!(
847            "dispatch",
848            trigger_id = %binding.id.as_str(),
849            binding_version = binding.version,
850            trace_id = %event.trace_id.0
851        );
852        #[cfg(feature = "otel")]
853        let span_for_otel = span.clone();
854        let _ = if let Some(headers) = parent_headers {
855            crate::observability::otel::set_span_parent_from_headers(
856                &span,
857                headers,
858                &event.trace_id,
859                parent_span_id.as_deref(),
860            )
861        } else {
862            crate::observability::otel::set_span_parent(
863                &span,
864                &event.trace_id,
865                parent_span_id.as_deref(),
866            )
867        };
868        #[cfg(feature = "otel")]
869        let started_at = Instant::now();
870        let metrics = self.metrics.clone();
871        let outcome = ACTIVE_DISPATCH_IS_REPLAY
872            .scope(
873                replay_of_event_id.is_some(),
874                self.dispatch_with_replay_inner(
875                    binding,
876                    event,
877                    replay_of_event_id,
878                    parent_headers_for_metrics,
879                )
880                .instrument(span),
881            )
882            .await;
883        if let Some(metrics) = metrics.as_ref() {
884            match &outcome {
885                Ok(dispatch_outcome) => match dispatch_outcome.status {
886                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
887                        metrics.record_dispatch_succeeded();
888                    }
889                    DispatchStatus::Waiting => {}
890                    _ => metrics.record_dispatch_failed(),
891                },
892                Err(_) => metrics.record_dispatch_failed(),
893            }
894            let outcome_label = match &outcome {
895                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
896                Err(DispatchError::Cancelled(_)) => "cancelled",
897                Err(_) => "failed",
898            };
899            metrics.record_trigger_dispatched(
900                binding.id.as_str(),
901                binding.handler.kind(),
902                outcome_label,
903            );
904            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
905        }
906        #[cfg(feature = "otel")]
907        {
908            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
909
910            let duration_ms = started_at.elapsed().as_millis() as i64;
911            let status = match &outcome {
912                Ok(dispatch_outcome) => match dispatch_outcome.status {
913                    DispatchStatus::Succeeded => "succeeded",
914                    DispatchStatus::Skipped => "skipped",
915                    DispatchStatus::Waiting => "waiting",
916                    DispatchStatus::Cancelled => "cancelled",
917                    DispatchStatus::Failed => "failed",
918                    DispatchStatus::Dlq => "dlq",
919                },
920                Err(DispatchError::Cancelled(_)) => "cancelled",
921                Err(_) => "failed",
922            };
923            span_for_otel.set_attribute("result.status", status);
924            span_for_otel.set_attribute("result.duration_ms", duration_ms);
925        }
926        outcome
927    }
928
929    async fn dispatch_with_replay_inner(
930        &self,
931        binding: &TriggerBinding,
932        event: TriggerEvent,
933        replay_of_event_id: Option<String>,
934        parent_headers: Option<BTreeMap<String, String>>,
935    ) -> Result<DispatchOutcome, DispatchError> {
936        let autonomy_tier = crate::resolve_agent_autonomy_tier(
937            &self.event_log,
938            binding.id.as_str(),
939            binding.autonomy_tier,
940        )
941        .await
942        .unwrap_or(binding.autonomy_tier);
943        let binding_key = binding.binding_key();
944        let route = DispatchUri::from(&binding.handler);
945        let trigger_id = binding.id.as_str().to_string();
946        let event_id = event.id.0.clone();
947        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
948        let begin = if replay_of_event_id.is_some() {
949            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
950        } else {
951            begin_in_flight(binding.id.as_str(), binding.version)
952        };
953        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
954
955        let mut attempts = Vec::new();
956        let mut source_node_id = format!("trigger:{}", event.id.0);
957        let mut initial_nodes = Vec::new();
958        let mut initial_edges = Vec::new();
959        if let Some(original_event_id) = replay_of_event_id.as_ref() {
960            let original_node_id = format!("trigger:{original_event_id}");
961            initial_nodes.push(RunActionGraphNodeRecord {
962                id: original_node_id.clone(),
963                label: format!(
964                    "{}:{} (original {})",
965                    event.provider.as_str(),
966                    event.kind,
967                    original_event_id
968                ),
969                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
970                status: "historical".to_string(),
971                outcome: "replayed_from".to_string(),
972                trace_id: Some(event.trace_id.0.clone()),
973                stage_id: None,
974                node_id: None,
975                worker_id: None,
976                run_id: None,
977                run_path: None,
978                metadata: trigger_node_metadata(&event),
979            });
980            initial_edges.push(RunActionGraphEdgeRecord {
981                from_id: original_node_id,
982                to_id: source_node_id.clone(),
983                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
984                label: Some("replay chain".to_string()),
985            });
986        }
987        initial_nodes.push(RunActionGraphNodeRecord {
988            id: source_node_id.clone(),
989            label: format!("{}:{}", event.provider.as_str(), event.kind),
990            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
991            status: "received".to_string(),
992            outcome: "received".to_string(),
993            trace_id: Some(event.trace_id.0.clone()),
994            stage_id: None,
995            node_id: None,
996            worker_id: None,
997            run_id: None,
998            run_path: None,
999            metadata: trigger_node_metadata(&event),
1000        });
1001        self.emit_action_graph(
1002            &event,
1003            initial_nodes,
1004            initial_edges,
1005            serde_json::json!({
1006                "source": "dispatcher",
1007                "trigger_id": trigger_id,
1008                "binding_key": binding_key,
1009                "event_id": event_id,
1010                "replay_of_event_id": replay_of_event_id,
1011            }),
1012        )
1013        .await?;
1014
1015        if dispatch_cancel_requested(
1016            &self.event_log,
1017            &binding_key,
1018            &event.id.0,
1019            replay_of_event_id.as_ref(),
1020        )
1021        .await?
1022        {
1023            finish_in_flight(
1024                binding.id.as_str(),
1025                binding.version,
1026                TriggerDispatchOutcome::Failed,
1027            )
1028            .await
1029            .map_err(|error| DispatchError::Registry(error.to_string()))?;
1030            decrement_in_flight(&self.state);
1031            return Ok(cancelled_dispatch_outcome(
1032                binding,
1033                &route,
1034                &event,
1035                replay_of_event_id,
1036                0,
1037                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1038            ));
1039        }
1040
1041        if let Some(predicate) = binding.when.as_ref() {
1042            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1043            let evaluation = self
1044                .evaluate_predicate(
1045                    binding,
1046                    predicate,
1047                    &event,
1048                    replay_of_event_id.as_ref(),
1049                    autonomy_tier,
1050                )
1051                .await?;
1052            let passed = evaluation.result;
1053            self.emit_action_graph(
1054                &event,
1055                vec![RunActionGraphNodeRecord {
1056                    id: predicate_node_id.clone(),
1057                    label: predicate.raw.clone(),
1058                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1059                    status: "completed".to_string(),
1060                    outcome: passed.to_string(),
1061                    trace_id: Some(event.trace_id.0.clone()),
1062                    stage_id: None,
1063                    node_id: None,
1064                    worker_id: None,
1065                    run_id: None,
1066                    run_path: None,
1067                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1068                }],
1069                vec![RunActionGraphEdgeRecord {
1070                    from_id: source_node_id.clone(),
1071                    to_id: predicate_node_id.clone(),
1072                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1073                    label: None,
1074                }],
1075                serde_json::json!({
1076                    "source": "dispatcher",
1077                    "trigger_id": binding.id.as_str(),
1078                    "binding_key": binding.binding_key(),
1079                    "event_id": event.id.0,
1080                    "predicate": predicate.raw,
1081                    "reason": evaluation.reason,
1082                    "cached": evaluation.cached,
1083                    "cost_usd": evaluation.cost_usd,
1084                    "tokens": evaluation.tokens,
1085                    "latency_ms": evaluation.latency_ms,
1086                    "replay_of_event_id": replay_of_event_id,
1087                }),
1088            )
1089            .await?;
1090
1091            if !passed {
1092                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1093                    let final_error = format!(
1094                        "trigger budget exhausted: {}",
1095                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1096                    );
1097                    self.move_budget_exhausted_to_dlq(
1098                        binding,
1099                        &route,
1100                        &event,
1101                        replay_of_event_id.as_ref(),
1102                        &final_error,
1103                    )
1104                    .await?;
1105                    finish_in_flight(
1106                        binding.id.as_str(),
1107                        binding.version,
1108                        TriggerDispatchOutcome::Dlq,
1109                    )
1110                    .await
1111                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1112                    decrement_in_flight(&self.state);
1113                    self.append_dispatch_trust_record(
1114                        binding,
1115                        &route,
1116                        &event,
1117                        replay_of_event_id.as_ref(),
1118                        autonomy_tier,
1119                        TrustOutcome::Failure,
1120                        "dlq",
1121                        0,
1122                        Some(final_error.clone()),
1123                    )
1124                    .await?;
1125                    return Ok(DispatchOutcome {
1126                        trigger_id: binding.id.as_str().to_string(),
1127                        binding_key: binding.binding_key(),
1128                        event_id: event.id.0,
1129                        attempt_count: 0,
1130                        status: DispatchStatus::Dlq,
1131                        handler_kind: route.kind().to_string(),
1132                        target_uri: route.target_uri(),
1133                        replay_of_event_id,
1134                        result: None,
1135                        error: Some(final_error),
1136                    });
1137                }
1138
1139                if evaluation.exhaustion_strategy
1140                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1141                {
1142                    self.append_budget_deferred_event(
1143                        binding,
1144                        &route,
1145                        &event,
1146                        replay_of_event_id.as_ref(),
1147                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1148                    )
1149                    .await?;
1150                    finish_in_flight(
1151                        binding.id.as_str(),
1152                        binding.version,
1153                        TriggerDispatchOutcome::Dispatched,
1154                    )
1155                    .await
1156                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1157                    decrement_in_flight(&self.state);
1158                    self.append_dispatch_trust_record(
1159                        binding,
1160                        &route,
1161                        &event,
1162                        replay_of_event_id.as_ref(),
1163                        autonomy_tier,
1164                        TrustOutcome::Denied,
1165                        "waiting",
1166                        0,
1167                        evaluation.reason.clone(),
1168                    )
1169                    .await?;
1170                    return Ok(DispatchOutcome {
1171                        trigger_id: binding.id.as_str().to_string(),
1172                        binding_key: binding.binding_key(),
1173                        event_id: event.id.0,
1174                        attempt_count: 0,
1175                        status: DispatchStatus::Waiting,
1176                        handler_kind: route.kind().to_string(),
1177                        target_uri: route.target_uri(),
1178                        replay_of_event_id,
1179                        result: Some(serde_json::json!({
1180                            "deferred": true,
1181                            "predicate": predicate.raw,
1182                            "reason": evaluation.reason,
1183                        })),
1184                        error: None,
1185                    });
1186                }
1187
1188                self.append_skipped_outbox_event(
1189                    binding,
1190                    &route,
1191                    &event,
1192                    replay_of_event_id.as_ref(),
1193                    DispatchSkipStage::Predicate,
1194                    serde_json::json!({
1195                        "predicate": predicate.raw,
1196                        "reason": evaluation.reason,
1197                    }),
1198                )
1199                .await?;
1200                finish_in_flight(
1201                    binding.id.as_str(),
1202                    binding.version,
1203                    TriggerDispatchOutcome::Dispatched,
1204                )
1205                .await
1206                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1207                decrement_in_flight(&self.state);
1208                self.append_dispatch_trust_record(
1209                    binding,
1210                    &route,
1211                    &event,
1212                    replay_of_event_id.as_ref(),
1213                    autonomy_tier,
1214                    TrustOutcome::Denied,
1215                    "skipped",
1216                    0,
1217                    None,
1218                )
1219                .await?;
1220                return Ok(DispatchOutcome {
1221                    trigger_id: binding.id.as_str().to_string(),
1222                    binding_key: binding.binding_key(),
1223                    event_id: event.id.0,
1224                    attempt_count: 0,
1225                    status: DispatchStatus::Skipped,
1226                    handler_kind: route.kind().to_string(),
1227                    target_uri: route.target_uri(),
1228                    replay_of_event_id,
1229                    result: Some(serde_json::json!({
1230                        "skipped": true,
1231                        "predicate": predicate.raw,
1232                        "reason": evaluation.reason,
1233                    })),
1234                    error: None,
1235                });
1236            }
1237
1238            source_node_id = predicate_node_id;
1239        }
1240
1241        if autonomy_tier == AutonomyTier::ActAuto {
1242            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1243                let request_id = self
1244                    .append_autonomy_budget_approval_request(
1245                        binding,
1246                        &route,
1247                        &event,
1248                        replay_of_event_id.as_ref(),
1249                        reason,
1250                    )
1251                    .await?;
1252                self.emit_autonomy_budget_approval_action_graph(
1253                    binding,
1254                    &route,
1255                    &event,
1256                    &source_node_id,
1257                    replay_of_event_id.as_ref(),
1258                    reason,
1259                    &request_id,
1260                )
1261                .await?;
1262                finish_in_flight(
1263                    binding.id.as_str(),
1264                    binding.version,
1265                    TriggerDispatchOutcome::Dispatched,
1266                )
1267                .await
1268                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1269                decrement_in_flight(&self.state);
1270                self.append_tier_transition_trust_record(
1271                    binding,
1272                    &event,
1273                    replay_of_event_id.as_ref(),
1274                    autonomy_tier,
1275                    AutonomyTier::ActWithApproval,
1276                    reason,
1277                    &request_id,
1278                )
1279                .await?;
1280                self.append_dispatch_trust_record(
1281                    binding,
1282                    &route,
1283                    &event,
1284                    replay_of_event_id.as_ref(),
1285                    autonomy_tier,
1286                    TrustOutcome::Denied,
1287                    "waiting",
1288                    0,
1289                    Some(reason.to_string()),
1290                )
1291                .await?;
1292                return Ok(DispatchOutcome {
1293                    trigger_id: binding.id.as_str().to_string(),
1294                    binding_key: binding.binding_key(),
1295                    event_id: event.id.0,
1296                    attempt_count: 0,
1297                    status: DispatchStatus::Waiting,
1298                    handler_kind: route.kind().to_string(),
1299                    target_uri: route.target_uri(),
1300                    replay_of_event_id,
1301                    result: Some(serde_json::json!({
1302                        "approval_required": true,
1303                        "request_id": request_id,
1304                        "reason": reason,
1305                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1306                    })),
1307                    error: None,
1308                });
1309            }
1310            note_autonomous_decision(binding);
1311        }
1312
1313        let (event, acquired_flow) = match self
1314            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1315            .await?
1316        {
1317            FlowControlOutcome::Dispatch { event, acquired } => {
1318                (*event, Arc::new(AsyncMutex::new(acquired)))
1319            }
1320            FlowControlOutcome::Skip { reason } => {
1321                self.append_skipped_outbox_event(
1322                    binding,
1323                    &route,
1324                    &event,
1325                    replay_of_event_id.as_ref(),
1326                    DispatchSkipStage::FlowControl,
1327                    serde_json::json!({
1328                        "flow_control": reason,
1329                    }),
1330                )
1331                .await?;
1332                finish_in_flight(
1333                    binding.id.as_str(),
1334                    binding.version,
1335                    TriggerDispatchOutcome::Dispatched,
1336                )
1337                .await
1338                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1339                decrement_in_flight(&self.state);
1340                return Ok(DispatchOutcome {
1341                    trigger_id: binding.id.as_str().to_string(),
1342                    binding_key: binding.binding_key(),
1343                    event_id: event.id.0,
1344                    attempt_count: 0,
1345                    status: DispatchStatus::Skipped,
1346                    handler_kind: route.kind().to_string(),
1347                    target_uri: route.target_uri(),
1348                    replay_of_event_id,
1349                    result: Some(serde_json::json!({
1350                        "skipped": true,
1351                        "flow_control": reason,
1352                    })),
1353                    error: None,
1354                });
1355            }
1356        };
1357
1358        let destination_key = destination_circuit_key(&route);
1359        let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1360            DestinationCircuitProbe::Allow { half_open } => {
1361                if half_open {
1362                    if let Some(metrics) = self.metrics.as_ref() {
1363                        metrics.record_backpressure_event("circuit", "half_open_probe");
1364                    }
1365                }
1366                half_open
1367            }
1368            DestinationCircuitProbe::Block { retry_after } => {
1369                if let Some(metrics) = self.metrics.as_ref() {
1370                    metrics.record_backpressure_event("circuit", "fail_fast");
1371                }
1372                let final_error = format!(
1373                    "destination circuit open for {}; retry after {}s",
1374                    destination_key,
1375                    retry_after.as_secs().max(1)
1376                );
1377                self.move_circuit_open_to_dlq(
1378                    binding,
1379                    &route,
1380                    &event,
1381                    replay_of_event_id.as_ref(),
1382                    &final_error,
1383                    &destination_key,
1384                )
1385                .await?;
1386                finish_in_flight(
1387                    binding.id.as_str(),
1388                    binding.version,
1389                    TriggerDispatchOutcome::Dlq,
1390                )
1391                .await
1392                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1393                self.release_flow_control(&acquired_flow).await?;
1394                decrement_in_flight(&self.state);
1395                self.append_dispatch_trust_record(
1396                    binding,
1397                    &route,
1398                    &event,
1399                    replay_of_event_id.as_ref(),
1400                    autonomy_tier,
1401                    TrustOutcome::Failure,
1402                    "dlq",
1403                    0,
1404                    Some(final_error.clone()),
1405                )
1406                .await?;
1407                return Ok(DispatchOutcome {
1408                    trigger_id: binding.id.as_str().to_string(),
1409                    binding_key: binding.binding_key(),
1410                    event_id: event.id.0,
1411                    attempt_count: 0,
1412                    status: DispatchStatus::Dlq,
1413                    handler_kind: route.kind().to_string(),
1414                    target_uri: route.target_uri(),
1415                    replay_of_event_id,
1416                    result: None,
1417                    error: Some(final_error),
1418                });
1419            }
1420        };
1421
1422        let mut previous_retry_node = None;
1423        let max_attempts = binding.retry.max_attempts();
1424        for attempt in 1..=max_attempts {
1425            if dispatch_cancel_requested(
1426                &self.event_log,
1427                &binding_key,
1428                &event.id.0,
1429                replay_of_event_id.as_ref(),
1430            )
1431            .await?
1432            {
1433                finish_in_flight(
1434                    binding.id.as_str(),
1435                    binding.version,
1436                    TriggerDispatchOutcome::Failed,
1437                )
1438                .await
1439                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1440                decrement_in_flight(&self.state);
1441                return Ok(cancelled_dispatch_outcome(
1442                    binding,
1443                    &route,
1444                    &event,
1445                    replay_of_event_id,
1446                    attempt.saturating_sub(1),
1447                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1448                ));
1449            }
1450            maybe_fail_before_outbox();
1451            let attempt_started_instant = Instant::now();
1452            let attempt_started_at_ms = current_unix_ms();
1453            let queue_age_at_start = duration_between_ms(
1454                attempt_started_at_ms,
1455                queue_appended_at_ms(parent_headers.as_ref(), &event),
1456            );
1457            if attempt == 1 {
1458                if let Some(metrics) = self.metrics.as_ref() {
1459                    metrics.record_trigger_queue_age_at_dispatch_start(
1460                        binding.id.as_str(),
1461                        &binding_key,
1462                        event.provider.as_str(),
1463                        tenant_id(&event),
1464                        "started",
1465                        queue_age_at_start,
1466                    );
1467                }
1468            }
1469            tracing::info!(
1470                component = "dispatcher",
1471                lifecycle = "dispatch_started",
1472                trigger_id = %binding.id.as_str(),
1473                binding_key = %binding_key,
1474                event_id = %event.id.0,
1475                attempt,
1476                queue_age_ms = queue_age_at_start.as_millis(),
1477                trace_id = %event.trace_id.0
1478            );
1479            let started_at = now_rfc3339();
1480            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1481            self.append_lifecycle_event(
1482                "DispatchStarted",
1483                &event,
1484                binding,
1485                serde_json::json!({
1486                    "event_id": event.id.0,
1487                    "attempt": attempt,
1488                    "handler_kind": route.kind(),
1489                    "target_uri": route.target_uri(),
1490                    "replay_of_event_id": replay_of_event_id,
1491                }),
1492                replay_of_event_id.as_ref(),
1493            )
1494            .await?;
1495            self.append_topic_event(
1496                TRIGGER_OUTBOX_TOPIC,
1497                "dispatch_started",
1498                &event,
1499                Some(binding),
1500                Some(attempt),
1501                serde_json::json!({
1502                    "event_id": event.id.0,
1503                    "attempt": attempt,
1504                    "trigger_id": binding.id.as_str(),
1505                    "binding_key": binding.binding_key(),
1506                    "handler_kind": route.kind(),
1507                    "target_uri": route.target_uri(),
1508                    "replay_of_event_id": replay_of_event_id,
1509                }),
1510                replay_of_event_id.as_ref(),
1511            )
1512            .await?;
1513
1514            let mut dispatch_edges = Vec::new();
1515            if attempt == 1 {
1516                dispatch_edges.push(RunActionGraphEdgeRecord {
1517                    from_id: source_node_id.clone(),
1518                    to_id: attempt_node_id.clone(),
1519                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1520                    label: binding.when.as_ref().map(|_| "true".to_string()),
1521                });
1522            } else if let Some(retry_node_id) = previous_retry_node.take() {
1523                dispatch_edges.push(RunActionGraphEdgeRecord {
1524                    from_id: retry_node_id,
1525                    to_id: attempt_node_id.clone(),
1526                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1527                    label: Some(format!("attempt {attempt}")),
1528                });
1529            }
1530
1531            self.emit_action_graph(
1532                &event,
1533                vec![RunActionGraphNodeRecord {
1534                    id: attempt_node_id.clone(),
1535                    label: dispatch_node_label(&route),
1536                    kind: dispatch_node_kind(&route).to_string(),
1537                    status: "running".to_string(),
1538                    outcome: format!("attempt_{attempt}"),
1539                    trace_id: Some(event.trace_id.0.clone()),
1540                    stage_id: None,
1541                    node_id: None,
1542                    worker_id: None,
1543                    run_id: None,
1544                    run_path: None,
1545                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1546                }],
1547                dispatch_edges,
1548                serde_json::json!({
1549                    "source": "dispatcher",
1550                    "trigger_id": binding.id.as_str(),
1551                    "binding_key": binding.binding_key(),
1552                    "event_id": event.id.0,
1553                    "attempt": attempt,
1554                    "handler_kind": route.kind(),
1555                    "target_uri": route.target_uri(),
1556                    "target_agent": dispatch_target_agent(&route),
1557                    "replay_of_event_id": replay_of_event_id,
1558                }),
1559            )
1560            .await?;
1561
1562            let result = self
1563                .dispatch_once(
1564                    binding,
1565                    &route,
1566                    &event,
1567                    autonomy_tier,
1568                    Some(DispatchWaitLease::new(
1569                        self.state.clone(),
1570                        acquired_flow.clone(),
1571                    )),
1572                    &mut self.cancel_tx.subscribe(),
1573                )
1574                .await;
1575            let attempt_runtime = attempt_started_instant.elapsed();
1576            let attempt_status = dispatch_result_status(&result);
1577            if let Some(metrics) = self.metrics.as_ref() {
1578                metrics.record_trigger_dispatch_runtime(
1579                    binding.id.as_str(),
1580                    &binding_key,
1581                    event.provider.as_str(),
1582                    tenant_id(&event),
1583                    attempt_status,
1584                    attempt_runtime,
1585                );
1586            }
1587            tracing::info!(
1588                component = "dispatcher",
1589                lifecycle = "handler_completed",
1590                trigger_id = %binding.id.as_str(),
1591                binding_key = %binding_key,
1592                event_id = %event.id.0,
1593                attempt,
1594                status = attempt_status,
1595                runtime_ms = attempt_runtime.as_millis(),
1596                trace_id = %event.trace_id.0
1597            );
1598            let completed_at = now_rfc3339();
1599
1600            match result {
1601                Ok(result) => {
1602                    let attempt_record = DispatchAttemptRecord {
1603                        trigger_id: binding.id.as_str().to_string(),
1604                        binding_key: binding.binding_key(),
1605                        event_id: event.id.0.clone(),
1606                        attempt,
1607                        handler_kind: route.kind().to_string(),
1608                        started_at,
1609                        completed_at,
1610                        outcome: "success".to_string(),
1611                        error_msg: None,
1612                    };
1613                    attempts.push(attempt_record.clone());
1614                    self.append_attempt_record(
1615                        &event,
1616                        binding,
1617                        &attempt_record,
1618                        replay_of_event_id.as_ref(),
1619                    )
1620                    .await?;
1621                    self.append_lifecycle_event(
1622                        "DispatchSucceeded",
1623                        &event,
1624                        binding,
1625                        serde_json::json!({
1626                            "event_id": event.id.0,
1627                            "attempt": attempt,
1628                            "handler_kind": route.kind(),
1629                            "target_uri": route.target_uri(),
1630                            "result": result,
1631                            "replay_of_event_id": replay_of_event_id,
1632                        }),
1633                        replay_of_event_id.as_ref(),
1634                    )
1635                    .await?;
1636                    self.append_topic_event(
1637                        TRIGGER_OUTBOX_TOPIC,
1638                        "dispatch_succeeded",
1639                        &event,
1640                        Some(binding),
1641                        Some(attempt),
1642                        serde_json::json!({
1643                            "event_id": event.id.0,
1644                            "attempt": attempt,
1645                            "trigger_id": binding.id.as_str(),
1646                            "binding_key": binding.binding_key(),
1647                            "handler_kind": route.kind(),
1648                            "target_uri": route.target_uri(),
1649                            "result": result,
1650                            "replay_of_event_id": replay_of_event_id,
1651                        }),
1652                        replay_of_event_id.as_ref(),
1653                    )
1654                    .await?;
1655                    self.emit_action_graph(
1656                        &event,
1657                        vec![RunActionGraphNodeRecord {
1658                            id: attempt_node_id.clone(),
1659                            label: dispatch_node_label(&route),
1660                            kind: dispatch_node_kind(&route).to_string(),
1661                            status: "completed".to_string(),
1662                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1663                            trace_id: Some(event.trace_id.0.clone()),
1664                            stage_id: None,
1665                            node_id: None,
1666                            worker_id: None,
1667                            run_id: None,
1668                            run_path: None,
1669                            metadata: dispatch_success_metadata(
1670                                &route, binding, &event, attempt, &result,
1671                            ),
1672                        }],
1673                        Vec::new(),
1674                        serde_json::json!({
1675                            "source": "dispatcher",
1676                            "trigger_id": binding.id.as_str(),
1677                            "binding_key": binding.binding_key(),
1678                            "event_id": event.id.0,
1679                            "attempt": attempt,
1680                            "handler_kind": route.kind(),
1681                            "target_uri": route.target_uri(),
1682                            "result": result,
1683                            "replay_of_event_id": replay_of_event_id,
1684                        }),
1685                    )
1686                    .await?;
1687                    self.state
1688                        .destination_circuits
1689                        .record_success(&destination_key);
1690                    if half_open_probe {
1691                        if let Some(metrics) = self.metrics.as_ref() {
1692                            metrics.record_backpressure_event("circuit", "closed");
1693                        }
1694                    }
1695                    finish_in_flight(
1696                        binding.id.as_str(),
1697                        binding.version,
1698                        TriggerDispatchOutcome::Dispatched,
1699                    )
1700                    .await
1701                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1702                    self.release_flow_control(&acquired_flow).await?;
1703                    decrement_in_flight(&self.state);
1704                    self.append_dispatch_trust_record(
1705                        binding,
1706                        &route,
1707                        &event,
1708                        replay_of_event_id.as_ref(),
1709                        autonomy_tier,
1710                        TrustOutcome::Success,
1711                        "succeeded",
1712                        attempt,
1713                        None,
1714                    )
1715                    .await?;
1716                    return Ok(DispatchOutcome {
1717                        trigger_id: binding.id.as_str().to_string(),
1718                        binding_key: binding.binding_key(),
1719                        event_id: event.id.0,
1720                        attempt_count: attempt,
1721                        status: DispatchStatus::Succeeded,
1722                        handler_kind: route.kind().to_string(),
1723                        target_uri: route.target_uri(),
1724                        replay_of_event_id,
1725                        result: Some(result),
1726                        error: None,
1727                    });
1728                }
1729                Err(error) => {
1730                    let attempt_record = DispatchAttemptRecord {
1731                        trigger_id: binding.id.as_str().to_string(),
1732                        binding_key: binding.binding_key(),
1733                        event_id: event.id.0.clone(),
1734                        attempt,
1735                        handler_kind: route.kind().to_string(),
1736                        started_at,
1737                        completed_at,
1738                        outcome: dispatch_error_label(&error).to_string(),
1739                        error_msg: Some(error.to_string()),
1740                    };
1741                    attempts.push(attempt_record.clone());
1742                    self.append_attempt_record(
1743                        &event,
1744                        binding,
1745                        &attempt_record,
1746                        replay_of_event_id.as_ref(),
1747                    )
1748                    .await?;
1749                    if let DispatchError::Waiting(message) = &error {
1750                        self.append_lifecycle_event(
1751                            "DispatchWaiting",
1752                            &event,
1753                            binding,
1754                            serde_json::json!({
1755                                "event_id": event.id.0,
1756                                "attempt": attempt,
1757                                "handler_kind": route.kind(),
1758                                "target_uri": route.target_uri(),
1759                                "message": message,
1760                                "replay_of_event_id": replay_of_event_id,
1761                            }),
1762                            replay_of_event_id.as_ref(),
1763                        )
1764                        .await?;
1765                        self.append_topic_event(
1766                            TRIGGER_OUTBOX_TOPIC,
1767                            "dispatch_waiting",
1768                            &event,
1769                            Some(binding),
1770                            Some(attempt),
1771                            serde_json::json!({
1772                                "event_id": event.id.0,
1773                                "attempt": attempt,
1774                                "trigger_id": binding.id.as_str(),
1775                                "binding_key": binding.binding_key(),
1776                                "handler_kind": route.kind(),
1777                                "target_uri": route.target_uri(),
1778                                "message": message,
1779                                "replay_of_event_id": replay_of_event_id,
1780                            }),
1781                            replay_of_event_id.as_ref(),
1782                        )
1783                        .await?;
1784                        finish_in_flight(
1785                            binding.id.as_str(),
1786                            binding.version,
1787                            TriggerDispatchOutcome::Dispatched,
1788                        )
1789                        .await
1790                        .map_err(|registry_error| {
1791                            DispatchError::Registry(registry_error.to_string())
1792                        })?;
1793                        self.release_flow_control(&acquired_flow).await?;
1794                        decrement_in_flight(&self.state);
1795                        return Ok(DispatchOutcome {
1796                            trigger_id: binding.id.as_str().to_string(),
1797                            binding_key: binding.binding_key(),
1798                            event_id: event.id.0,
1799                            attempt_count: attempt,
1800                            status: DispatchStatus::Waiting,
1801                            handler_kind: route.kind().to_string(),
1802                            target_uri: route.target_uri(),
1803                            replay_of_event_id,
1804                            result: Some(serde_json::json!({
1805                                "waiting": true,
1806                                "message": message,
1807                            })),
1808                            error: None,
1809                        });
1810                    }
1811
1812                    self.append_lifecycle_event(
1813                        "DispatchFailed",
1814                        &event,
1815                        binding,
1816                        serde_json::json!({
1817                            "event_id": event.id.0,
1818                            "attempt": attempt,
1819                            "handler_kind": route.kind(),
1820                            "target_uri": route.target_uri(),
1821                            "error": error.to_string(),
1822                            "replay_of_event_id": replay_of_event_id,
1823                        }),
1824                        replay_of_event_id.as_ref(),
1825                    )
1826                    .await?;
1827                    self.append_topic_event(
1828                        TRIGGER_OUTBOX_TOPIC,
1829                        "dispatch_failed",
1830                        &event,
1831                        Some(binding),
1832                        Some(attempt),
1833                        serde_json::json!({
1834                            "event_id": event.id.0,
1835                            "attempt": attempt,
1836                            "trigger_id": binding.id.as_str(),
1837                            "binding_key": binding.binding_key(),
1838                            "handler_kind": route.kind(),
1839                            "target_uri": route.target_uri(),
1840                            "error": error.to_string(),
1841                            "replay_of_event_id": replay_of_event_id,
1842                        }),
1843                        replay_of_event_id.as_ref(),
1844                    )
1845                    .await?;
1846                    self.emit_action_graph(
1847                        &event,
1848                        vec![RunActionGraphNodeRecord {
1849                            id: attempt_node_id.clone(),
1850                            label: dispatch_node_label(&route),
1851                            kind: dispatch_node_kind(&route).to_string(),
1852                            status: if matches!(error, DispatchError::Cancelled(_)) {
1853                                "cancelled".to_string()
1854                            } else {
1855                                "failed".to_string()
1856                            },
1857                            outcome: dispatch_error_label(&error).to_string(),
1858                            trace_id: Some(event.trace_id.0.clone()),
1859                            stage_id: None,
1860                            node_id: None,
1861                            worker_id: None,
1862                            run_id: None,
1863                            run_path: None,
1864                            metadata: dispatch_error_metadata(
1865                                &route, binding, &event, attempt, &error,
1866                            ),
1867                        }],
1868                        Vec::new(),
1869                        serde_json::json!({
1870                            "source": "dispatcher",
1871                            "trigger_id": binding.id.as_str(),
1872                            "binding_key": binding.binding_key(),
1873                            "event_id": event.id.0,
1874                            "attempt": attempt,
1875                            "handler_kind": route.kind(),
1876                            "target_uri": route.target_uri(),
1877                            "error": error.to_string(),
1878                            "replay_of_event_id": replay_of_event_id,
1879                        }),
1880                    )
1881                    .await?;
1882
1883                    let circuit_opened = if error.retryable() {
1884                        self.state
1885                            .destination_circuits
1886                            .record_failure(&destination_key)
1887                    } else {
1888                        false
1889                    };
1890                    if circuit_opened {
1891                        if let Some(metrics) = self.metrics.as_ref() {
1892                            metrics.record_backpressure_event("circuit", "opened");
1893                            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1894                            metrics.record_trigger_accepted_to_dlq(
1895                                binding.id.as_str(),
1896                                &binding_key,
1897                                event.provider.as_str(),
1898                                tenant_id(&event),
1899                                "circuit_open",
1900                                duration_between_ms(
1901                                    current_unix_ms(),
1902                                    accepted_at_ms(parent_headers.as_ref(), &event),
1903                                ),
1904                            );
1905                        }
1906                        let final_error = format!(
1907                            "destination circuit opened for {} after {} consecutive failures: {}",
1908                            destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
1909                        );
1910                        let dlq_entry = DlqEntry {
1911                            trigger_id: binding.id.as_str().to_string(),
1912                            binding_key: binding.binding_key(),
1913                            event: event.clone(),
1914                            attempt_count: attempt,
1915                            final_error: final_error.clone(),
1916                            error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
1917                                .to_string(),
1918                            attempts: attempts.clone(),
1919                        };
1920                        self.state
1921                            .dlq
1922                            .lock()
1923                            .expect("dispatcher dlq poisoned")
1924                            .push(dlq_entry.clone());
1925                        self.append_lifecycle_event(
1926                            "DlqMoved",
1927                            &event,
1928                            binding,
1929                            serde_json::json!({
1930                                "event_id": event.id.0,
1931                                "attempt_count": attempt,
1932                                "final_error": dlq_entry.final_error,
1933                                "reason": "circuit_open",
1934                                "destination": destination_key,
1935                                "replay_of_event_id": replay_of_event_id,
1936                            }),
1937                            replay_of_event_id.as_ref(),
1938                        )
1939                        .await?;
1940                        self.append_topic_event(
1941                            TRIGGER_DLQ_TOPIC,
1942                            "dlq_moved",
1943                            &event,
1944                            Some(binding),
1945                            Some(attempt),
1946                            serde_json::to_value(&dlq_entry).map_err(|serde_error| {
1947                                DispatchError::Serde(serde_error.to_string())
1948                            })?,
1949                            replay_of_event_id.as_ref(),
1950                        )
1951                        .await?;
1952                        finish_in_flight(
1953                            binding.id.as_str(),
1954                            binding.version,
1955                            TriggerDispatchOutcome::Dlq,
1956                        )
1957                        .await
1958                        .map_err(|registry_error| {
1959                            DispatchError::Registry(registry_error.to_string())
1960                        })?;
1961                        self.release_flow_control(&acquired_flow).await?;
1962                        decrement_in_flight(&self.state);
1963                        self.append_dispatch_trust_record(
1964                            binding,
1965                            &route,
1966                            &event,
1967                            replay_of_event_id.as_ref(),
1968                            autonomy_tier,
1969                            TrustOutcome::Failure,
1970                            "dlq",
1971                            attempt,
1972                            Some(final_error.clone()),
1973                        )
1974                        .await?;
1975                        return Ok(DispatchOutcome {
1976                            trigger_id: binding.id.as_str().to_string(),
1977                            binding_key: binding.binding_key(),
1978                            event_id: event.id.0,
1979                            attempt_count: attempt,
1980                            status: DispatchStatus::Dlq,
1981                            handler_kind: route.kind().to_string(),
1982                            target_uri: route.target_uri(),
1983                            replay_of_event_id,
1984                            result: None,
1985                            error: Some(final_error),
1986                        });
1987                    }
1988
1989                    if !error.retryable() {
1990                        finish_in_flight(
1991                            binding.id.as_str(),
1992                            binding.version,
1993                            TriggerDispatchOutcome::Failed,
1994                        )
1995                        .await
1996                        .map_err(|registry_error| {
1997                            DispatchError::Registry(registry_error.to_string())
1998                        })?;
1999                        self.release_flow_control(&acquired_flow).await?;
2000                        decrement_in_flight(&self.state);
2001                        let trust_outcome = match error {
2002                            DispatchError::Denied(_) => TrustOutcome::Denied,
2003                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
2004                            _ => TrustOutcome::Failure,
2005                        };
2006                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2007                            "cancelled"
2008                        } else {
2009                            "failed"
2010                        };
2011                        self.append_dispatch_trust_record(
2012                            binding,
2013                            &route,
2014                            &event,
2015                            replay_of_event_id.as_ref(),
2016                            autonomy_tier,
2017                            trust_outcome,
2018                            terminal_status,
2019                            attempt,
2020                            Some(error.to_string()),
2021                        )
2022                        .await?;
2023                        return Ok(DispatchOutcome {
2024                            trigger_id: binding.id.as_str().to_string(),
2025                            binding_key: binding.binding_key(),
2026                            event_id: event.id.0,
2027                            attempt_count: attempt,
2028                            status: if matches!(error, DispatchError::Cancelled(_)) {
2029                                DispatchStatus::Cancelled
2030                            } else {
2031                                DispatchStatus::Failed
2032                            },
2033                            handler_kind: route.kind().to_string(),
2034                            target_uri: route.target_uri(),
2035                            replay_of_event_id,
2036                            result: None,
2037                            error: Some(error.to_string()),
2038                        });
2039                    }
2040
2041                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2042                        if let Some(metrics) = self.metrics.as_ref() {
2043                            metrics.record_retry_scheduled();
2044                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2045                            metrics.record_trigger_retry_delay(
2046                                binding.id.as_str(),
2047                                &binding_key,
2048                                event.provider.as_str(),
2049                                tenant_id(&event),
2050                                "scheduled",
2051                                delay,
2052                            );
2053                        }
2054                        tracing::info!(
2055                            component = "dispatcher",
2056                            lifecycle = "retry_scheduled",
2057                            trigger_id = %binding.id.as_str(),
2058                            binding_key = %binding_key,
2059                            event_id = %event.id.0,
2060                            attempt = attempt + 1,
2061                            delay_ms = delay.as_millis(),
2062                            trace_id = %event.trace_id.0
2063                        );
2064                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2065                        previous_retry_node = Some(retry_node_id.clone());
2066                        self.emit_action_graph(
2067                            &event,
2068                            vec![RunActionGraphNodeRecord {
2069                                id: retry_node_id.clone(),
2070                                label: format!("retry in {}ms", delay.as_millis()),
2071                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2072                                status: "scheduled".to_string(),
2073                                outcome: format!("attempt_{}", attempt + 1),
2074                                trace_id: Some(event.trace_id.0.clone()),
2075                                stage_id: None,
2076                                node_id: None,
2077                                worker_id: None,
2078                                run_id: None,
2079                                run_path: None,
2080                                metadata: retry_node_metadata(
2081                                    binding,
2082                                    &event,
2083                                    attempt + 1,
2084                                    delay,
2085                                    &error,
2086                                ),
2087                            }],
2088                            vec![RunActionGraphEdgeRecord {
2089                                from_id: attempt_node_id,
2090                                to_id: retry_node_id.clone(),
2091                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2092                                label: Some(format!("attempt {}", attempt + 1)),
2093                            }],
2094                            serde_json::json!({
2095                                "source": "dispatcher",
2096                                "trigger_id": binding.id.as_str(),
2097                                "binding_key": binding.binding_key(),
2098                                "event_id": event.id.0,
2099                                "attempt": attempt + 1,
2100                                "delay_ms": delay.as_millis(),
2101                                "replay_of_event_id": replay_of_event_id,
2102                            }),
2103                        )
2104                        .await?;
2105                        self.append_lifecycle_event(
2106                            "RetryScheduled",
2107                            &event,
2108                            binding,
2109                            serde_json::json!({
2110                                "event_id": event.id.0,
2111                                "attempt": attempt + 1,
2112                                "delay_ms": delay.as_millis(),
2113                                "error": error.to_string(),
2114                                "replay_of_event_id": replay_of_event_id,
2115                            }),
2116                            replay_of_event_id.as_ref(),
2117                        )
2118                        .await?;
2119                        self.append_topic_event(
2120                            TRIGGER_ATTEMPTS_TOPIC,
2121                            "retry_scheduled",
2122                            &event,
2123                            Some(binding),
2124                            Some(attempt + 1),
2125                            serde_json::json!({
2126                                "event_id": event.id.0,
2127                                "attempt": attempt + 1,
2128                                "trigger_id": binding.id.as_str(),
2129                                "binding_key": binding.binding_key(),
2130                                "delay_ms": delay.as_millis(),
2131                                "error": error.to_string(),
2132                                "replay_of_event_id": replay_of_event_id,
2133                            }),
2134                            replay_of_event_id.as_ref(),
2135                        )
2136                        .await?;
2137                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2138                        let sleep_result = sleep_or_cancel_or_request(
2139                            &self.event_log,
2140                            delay,
2141                            &binding_key,
2142                            &event.id.0,
2143                            replay_of_event_id.as_ref(),
2144                            &mut self.cancel_tx.subscribe(),
2145                        )
2146                        .await;
2147                        decrement_retry_queue_depth(&self.state);
2148                        if sleep_result.is_err() {
2149                            finish_in_flight(
2150                                binding.id.as_str(),
2151                                binding.version,
2152                                TriggerDispatchOutcome::Failed,
2153                            )
2154                            .await
2155                            .map_err(|registry_error| {
2156                                DispatchError::Registry(registry_error.to_string())
2157                            })?;
2158                            self.release_flow_control(&acquired_flow).await?;
2159                            decrement_in_flight(&self.state);
2160                            self.append_dispatch_trust_record(
2161                                binding,
2162                                &route,
2163                                &event,
2164                                replay_of_event_id.as_ref(),
2165                                autonomy_tier,
2166                                TrustOutcome::Failure,
2167                                "cancelled",
2168                                attempt,
2169                                Some("dispatcher shutdown cancelled retry wait".to_string()),
2170                            )
2171                            .await?;
2172                            return Ok(DispatchOutcome {
2173                                trigger_id: binding.id.as_str().to_string(),
2174                                binding_key: binding.binding_key(),
2175                                event_id: event.id.0,
2176                                attempt_count: attempt,
2177                                status: DispatchStatus::Cancelled,
2178                                handler_kind: route.kind().to_string(),
2179                                target_uri: route.target_uri(),
2180                                replay_of_event_id,
2181                                result: None,
2182                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2183                            });
2184                        }
2185                        continue;
2186                    }
2187
2188                    let final_error = error.to_string();
2189                    let dlq_entry = DlqEntry {
2190                        trigger_id: binding.id.as_str().to_string(),
2191                        binding_key: binding.binding_key(),
2192                        event: event.clone(),
2193                        attempt_count: attempt,
2194                        final_error: final_error.clone(),
2195                        error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2196                            .to_string(),
2197                        attempts: attempts.clone(),
2198                    };
2199                    self.state
2200                        .dlq
2201                        .lock()
2202                        .expect("dispatcher dlq poisoned")
2203                        .push(dlq_entry.clone());
2204                    if let Some(metrics) = self.metrics.as_ref() {
2205                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2206                        metrics.record_trigger_accepted_to_dlq(
2207                            binding.id.as_str(),
2208                            &binding_key,
2209                            event.provider.as_str(),
2210                            tenant_id(&event),
2211                            "retry_exhausted",
2212                            duration_between_ms(
2213                                current_unix_ms(),
2214                                accepted_at_ms(parent_headers.as_ref(), &event),
2215                            ),
2216                        );
2217                    }
2218                    tracing::info!(
2219                        component = "dispatcher",
2220                        lifecycle = "dlq_moved",
2221                        trigger_id = %binding.id.as_str(),
2222                        binding_key = %binding_key,
2223                        event_id = %event.id.0,
2224                        attempt_count = attempt,
2225                        reason = "retry_exhausted",
2226                        trace_id = %event.trace_id.0
2227                    );
2228                    self.emit_action_graph(
2229                        &event,
2230                        vec![RunActionGraphNodeRecord {
2231                            id: format!("dlq:{binding_key}:{}", event.id.0),
2232                            label: binding.id.as_str().to_string(),
2233                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2234                            status: "queued".to_string(),
2235                            outcome: "retry_exhausted".to_string(),
2236                            trace_id: Some(event.trace_id.0.clone()),
2237                            stage_id: None,
2238                            node_id: None,
2239                            worker_id: None,
2240                            run_id: None,
2241                            run_path: None,
2242                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2243                        }],
2244                        vec![RunActionGraphEdgeRecord {
2245                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2246                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
2247                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2248                            label: Some(format!("{attempt} attempts")),
2249                        }],
2250                        serde_json::json!({
2251                            "source": "dispatcher",
2252                            "trigger_id": binding.id.as_str(),
2253                            "binding_key": binding.binding_key(),
2254                            "event_id": event.id.0,
2255                            "attempt_count": attempt,
2256                            "final_error": final_error,
2257                            "replay_of_event_id": replay_of_event_id,
2258                        }),
2259                    )
2260                    .await?;
2261                    self.append_lifecycle_event(
2262                        "DlqMoved",
2263                        &event,
2264                        binding,
2265                        serde_json::json!({
2266                            "event_id": event.id.0,
2267                            "attempt_count": attempt,
2268                            "final_error": dlq_entry.final_error,
2269                            "replay_of_event_id": replay_of_event_id,
2270                        }),
2271                        replay_of_event_id.as_ref(),
2272                    )
2273                    .await?;
2274                    self.append_topic_event(
2275                        TRIGGER_DLQ_TOPIC,
2276                        "dlq_moved",
2277                        &event,
2278                        Some(binding),
2279                        Some(attempt),
2280                        serde_json::to_value(&dlq_entry)
2281                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2282                        replay_of_event_id.as_ref(),
2283                    )
2284                    .await?;
2285                    finish_in_flight(
2286                        binding.id.as_str(),
2287                        binding.version,
2288                        TriggerDispatchOutcome::Dlq,
2289                    )
2290                    .await
2291                    .map_err(|registry_error| {
2292                        DispatchError::Registry(registry_error.to_string())
2293                    })?;
2294                    self.release_flow_control(&acquired_flow).await?;
2295                    decrement_in_flight(&self.state);
2296                    self.append_dispatch_trust_record(
2297                        binding,
2298                        &route,
2299                        &event,
2300                        replay_of_event_id.as_ref(),
2301                        autonomy_tier,
2302                        TrustOutcome::Failure,
2303                        "dlq",
2304                        attempt,
2305                        Some(error.to_string()),
2306                    )
2307                    .await?;
2308                    return Ok(DispatchOutcome {
2309                        trigger_id: binding.id.as_str().to_string(),
2310                        binding_key: binding.binding_key(),
2311                        event_id: event.id.0,
2312                        attempt_count: attempt,
2313                        status: DispatchStatus::Dlq,
2314                        handler_kind: route.kind().to_string(),
2315                        target_uri: route.target_uri(),
2316                        replay_of_event_id,
2317                        result: None,
2318                        error: Some(error.to_string()),
2319                    });
2320                }
2321            }
2322        }
2323
2324        finish_in_flight(
2325            binding.id.as_str(),
2326            binding.version,
2327            TriggerDispatchOutcome::Failed,
2328        )
2329        .await
2330        .map_err(|error| DispatchError::Registry(error.to_string()))?;
2331        self.release_flow_control(&acquired_flow).await?;
2332        decrement_in_flight(&self.state);
2333        self.append_dispatch_trust_record(
2334            binding,
2335            &route,
2336            &event,
2337            replay_of_event_id.as_ref(),
2338            autonomy_tier,
2339            TrustOutcome::Failure,
2340            "failed",
2341            max_attempts,
2342            Some("dispatch exhausted without terminal outcome".to_string()),
2343        )
2344        .await?;
2345        Ok(DispatchOutcome {
2346            trigger_id: binding.id.as_str().to_string(),
2347            binding_key: binding.binding_key(),
2348            event_id: event.id.0,
2349            attempt_count: max_attempts,
2350            status: DispatchStatus::Failed,
2351            handler_kind: route.kind().to_string(),
2352            target_uri: route.target_uri(),
2353            replay_of_event_id,
2354            result: None,
2355            error: Some("dispatch exhausted without terminal outcome".to_string()),
2356        })
2357    }
2358
2359    async fn dispatch_once(
2360        &self,
2361        binding: &TriggerBinding,
2362        route: &DispatchUri,
2363        event: &TriggerEvent,
2364        autonomy_tier: AutonomyTier,
2365        wait_lease: Option<DispatchWaitLease>,
2366        cancel_rx: &mut broadcast::Receiver<()>,
2367    ) -> Result<serde_json::Value, DispatchError> {
2368        match route {
2369            DispatchUri::Local { .. } => {
2370                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2371                    return Err(DispatchError::Local(format!(
2372                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2373                        binding.id.as_str()
2374                    )));
2375                };
2376                let value = self
2377                    .invoke_vm_callable(
2378                        closure,
2379                        &binding.binding_key(),
2380                        event,
2381                        None,
2382                        binding.id.as_str(),
2383                        &format!("{}.{}", event.provider.as_str(), event.kind),
2384                        autonomy_tier,
2385                        wait_lease,
2386                        cancel_rx,
2387                    )
2388                    .await?;
2389                Ok(vm_value_to_json(&value))
2390            }
2391            DispatchUri::A2a {
2392                target,
2393                allow_cleartext,
2394            } => {
2395                if self.state.shutting_down.load(Ordering::SeqCst) {
2396                    return Err(DispatchError::Cancelled(
2397                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2398                    ));
2399                }
2400                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2401                    target,
2402                    *allow_cleartext,
2403                    binding.id.as_str(),
2404                    &binding.binding_key(),
2405                    event,
2406                    cancel_rx,
2407                )
2408                .await
2409                .map_err(|error| match error {
2410                    crate::a2a::A2aClientError::Cancelled(message) => {
2411                        DispatchError::Cancelled(message)
2412                    }
2413                    other => DispatchError::A2a(other.to_string()),
2414                })?;
2415                match ack {
2416                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2417                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2418                }
2419            }
2420            DispatchUri::Worker { queue } => {
2421                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2422                    .enqueue(&crate::WorkerQueueJob {
2423                        queue: queue.clone(),
2424                        trigger_id: binding.id.as_str().to_string(),
2425                        binding_key: binding.binding_key(),
2426                        binding_version: binding.version,
2427                        event: event.clone(),
2428                        replay_of_event_id: current_dispatch_context()
2429                            .and_then(|context| context.replay_of_event_id),
2430                        priority: worker_queue_priority(binding, event),
2431                    })
2432                    .await
2433                    .map_err(DispatchError::from)?;
2434                Ok(serde_json::to_value(receipt)
2435                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2436            }
2437        }
2438    }
2439
2440    async fn apply_flow_control(
2441        &self,
2442        binding: &TriggerBinding,
2443        event: &TriggerEvent,
2444        replay_of_event_id: Option<&String>,
2445    ) -> Result<FlowControlOutcome, DispatchError> {
2446        let flow = &binding.flow_control;
2447        let mut managed_event = event.clone();
2448
2449        if let Some(batch) = &flow.batch {
2450            let gate = self
2451                .resolve_flow_gate(
2452                    &binding.binding_key(),
2453                    batch.key.as_ref(),
2454                    &managed_event,
2455                    replay_of_event_id,
2456                )
2457                .await?;
2458            match self
2459                .state
2460                .flow_control
2461                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2462                .await
2463                .map_err(DispatchError::from)?
2464            {
2465                BatchDecision::Dispatch(events) => {
2466                    managed_event = build_batched_event(events)?;
2467                }
2468                BatchDecision::Merged => {
2469                    return Ok(FlowControlOutcome::Skip {
2470                        reason: "batch_merged".to_string(),
2471                    })
2472                }
2473            }
2474        }
2475
2476        if let Some(debounce) = &flow.debounce {
2477            let gate = self
2478                .resolve_flow_gate(
2479                    &binding.binding_key(),
2480                    Some(&debounce.key),
2481                    &managed_event,
2482                    replay_of_event_id,
2483                )
2484                .await?;
2485            let latest = self
2486                .state
2487                .flow_control
2488                .debounce(&gate, debounce.period)
2489                .await
2490                .map_err(DispatchError::from)?;
2491            if !latest {
2492                return Ok(FlowControlOutcome::Skip {
2493                    reason: "debounced".to_string(),
2494                });
2495            }
2496        }
2497
2498        if let Some(rate_limit) = &flow.rate_limit {
2499            let gate = self
2500                .resolve_flow_gate(
2501                    &binding.binding_key(),
2502                    rate_limit.key.as_ref(),
2503                    &managed_event,
2504                    replay_of_event_id,
2505                )
2506                .await?;
2507            let allowed = self
2508                .state
2509                .flow_control
2510                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2511                .await
2512                .map_err(DispatchError::from)?;
2513            if !allowed {
2514                return Ok(FlowControlOutcome::Skip {
2515                    reason: "rate_limited".to_string(),
2516                });
2517            }
2518        }
2519
2520        if let Some(throttle) = &flow.throttle {
2521            let gate = self
2522                .resolve_flow_gate(
2523                    &binding.binding_key(),
2524                    throttle.key.as_ref(),
2525                    &managed_event,
2526                    replay_of_event_id,
2527                )
2528                .await?;
2529            self.state
2530                .flow_control
2531                .wait_for_throttle(&gate, throttle.period, throttle.max)
2532                .await
2533                .map_err(DispatchError::from)?;
2534        }
2535
2536        let mut acquired = AcquiredFlowControl::default();
2537        if let Some(singleton) = &flow.singleton {
2538            let gate = self
2539                .resolve_flow_gate(
2540                    &binding.binding_key(),
2541                    singleton.key.as_ref(),
2542                    &managed_event,
2543                    replay_of_event_id,
2544                )
2545                .await?;
2546            let acquired_singleton = self
2547                .state
2548                .flow_control
2549                .try_acquire_singleton(&gate)
2550                .await
2551                .map_err(DispatchError::from)?;
2552            if !acquired_singleton {
2553                return Ok(FlowControlOutcome::Skip {
2554                    reason: "singleton_active".to_string(),
2555                });
2556            }
2557            acquired.singleton = Some(SingletonLease { gate, held: true });
2558        }
2559
2560        if let Some(concurrency) = &flow.concurrency {
2561            let gate = self
2562                .resolve_flow_gate(
2563                    &binding.binding_key(),
2564                    concurrency.key.as_ref(),
2565                    &managed_event,
2566                    replay_of_event_id,
2567                )
2568                .await?;
2569            let priority_rank = self
2570                .resolve_priority_rank(
2571                    &binding.binding_key(),
2572                    flow.priority.as_ref(),
2573                    &managed_event,
2574                    replay_of_event_id,
2575                )
2576                .await?;
2577            let permit = self
2578                .state
2579                .flow_control
2580                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2581                .await
2582                .map_err(DispatchError::from)?;
2583            acquired.concurrency = Some(ConcurrencyLease {
2584                gate,
2585                max: concurrency.max,
2586                priority_rank,
2587                permit: Some(permit),
2588            });
2589        }
2590
2591        Ok(FlowControlOutcome::Dispatch {
2592            event: Box::new(managed_event),
2593            acquired,
2594        })
2595    }
2596
2597    async fn release_flow_control(
2598        &self,
2599        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2600    ) -> Result<(), DispatchError> {
2601        let (singleton_gate, concurrency_permit) = {
2602            let mut acquired = acquired.lock().await;
2603            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2604                if lease.held {
2605                    lease.held = false;
2606                    Some(lease.gate.clone())
2607                } else {
2608                    None
2609                }
2610            });
2611            let concurrency_permit = acquired
2612                .concurrency
2613                .as_mut()
2614                .and_then(|lease| lease.permit.take());
2615            (singleton_gate, concurrency_permit)
2616        };
2617        if let Some(gate) = singleton_gate {
2618            self.state
2619                .flow_control
2620                .release_singleton(&gate)
2621                .await
2622                .map_err(DispatchError::from)?;
2623        }
2624        if let Some(permit) = concurrency_permit {
2625            self.state
2626                .flow_control
2627                .release_concurrency(permit)
2628                .await
2629                .map_err(DispatchError::from)?;
2630        }
2631        Ok(())
2632    }
2633
2634    async fn resolve_flow_gate(
2635        &self,
2636        binding_key: &str,
2637        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2638        event: &TriggerEvent,
2639        replay_of_event_id: Option<&String>,
2640    ) -> Result<String, DispatchError> {
2641        let key = match expr {
2642            Some(expr) => {
2643                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2644                    .await?
2645            }
2646            None => "_global".to_string(),
2647        };
2648        Ok(format!("{binding_key}:{key}"))
2649    }
2650
2651    async fn resolve_priority_rank(
2652        &self,
2653        binding_key: &str,
2654        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2655        event: &TriggerEvent,
2656        replay_of_event_id: Option<&String>,
2657    ) -> Result<usize, DispatchError> {
2658        let Some(priority) = priority else {
2659            return Ok(0);
2660        };
2661        let value = self
2662            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2663            .await?;
2664        Ok(priority
2665            .order
2666            .iter()
2667            .position(|candidate| candidate == &value)
2668            .unwrap_or(priority.order.len()))
2669    }
2670
2671    async fn evaluate_flow_expression(
2672        &self,
2673        binding_key: &str,
2674        expr: &crate::triggers::TriggerExpressionSpec,
2675        event: &TriggerEvent,
2676        replay_of_event_id: Option<&String>,
2677    ) -> Result<String, DispatchError> {
2678        let value = self
2679            .invoke_vm_callable(
2680                &expr.closure,
2681                binding_key,
2682                event,
2683                replay_of_event_id,
2684                "",
2685                "flow_control",
2686                AutonomyTier::Suggest,
2687                None,
2688                &mut self.cancel_tx.subscribe(),
2689            )
2690            .await?;
2691        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2692    }
2693
2694    #[allow(clippy::too_many_arguments)]
2695    async fn invoke_vm_callable(
2696        &self,
2697        closure: &crate::value::VmClosure,
2698        binding_key: &str,
2699        event: &TriggerEvent,
2700        replay_of_event_id: Option<&String>,
2701        agent_id: &str,
2702        action: &str,
2703        autonomy_tier: AutonomyTier,
2704        wait_lease: Option<DispatchWaitLease>,
2705        cancel_rx: &mut broadcast::Receiver<()>,
2706    ) -> Result<VmValue, DispatchError> {
2707        let mut vm = self.base_vm.child_vm();
2708        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2709        if self.state.shutting_down.load(Ordering::SeqCst) {
2710            cancel_token.store(true, Ordering::SeqCst);
2711        }
2712        self.state
2713            .cancel_tokens
2714            .lock()
2715            .expect("dispatcher cancel tokens poisoned")
2716            .push(cancel_token.clone());
2717        vm.install_cancel_token(cancel_token.clone());
2718        let arg = event_to_handler_value(event)?;
2719        let args = [arg];
2720        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2721        let effective_policy = match crate::orchestration::current_execution_policy() {
2722            Some(parent) => parent
2723                .intersect(&tier_policy)
2724                .map_err(|error| DispatchError::Local(error.to_string()))?,
2725            None => tier_policy,
2726        };
2727        crate::orchestration::push_execution_policy(effective_policy);
2728        let _policy_guard = DispatchExecutionPolicyGuard;
2729        let future = vm.call_closure_pub(closure, &args);
2730        pin_mut!(future);
2731        let (binding_id, binding_version) = split_binding_key(binding_key);
2732        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2733            slot.borrow_mut().replace(DispatchContext {
2734                trigger_event: event.clone(),
2735                replay_of_event_id: replay_of_event_id.cloned(),
2736                binding_id,
2737                binding_version,
2738                agent_id: agent_id.to_string(),
2739                action: action.to_string(),
2740                autonomy_tier,
2741            })
2742        });
2743        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2744            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2745        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2746        crate::stdlib::hitl::reset_hitl_state();
2747        let mut poll = tokio::time::interval(Duration::from_millis(100));
2748        let result = loop {
2749            tokio::select! {
2750                result = &mut future => break result,
2751                _ = recv_cancel(cancel_rx) => {
2752                    cancel_token.store(true, Ordering::SeqCst);
2753                }
2754                _ = poll.tick() => {
2755                    if dispatch_cancel_requested(
2756                        &self.event_log,
2757                        binding_key,
2758                        &event.id.0,
2759                        replay_of_event_id,
2760                    )
2761                    .await? {
2762                        cancel_token.store(true, Ordering::SeqCst);
2763                    }
2764                }
2765            }
2766        };
2767        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2768            *slot.borrow_mut() = prior_context;
2769        });
2770        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2771            *slot.borrow_mut() = prior_wait_lease;
2772        });
2773        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2774        {
2775            let mut tokens = self
2776                .state
2777                .cancel_tokens
2778                .lock()
2779                .expect("dispatcher cancel tokens poisoned");
2780            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2781        }
2782
2783        if cancel_token.load(Ordering::SeqCst) {
2784            if dispatch_cancel_requested(
2785                &self.event_log,
2786                binding_key,
2787                &event.id.0,
2788                replay_of_event_id,
2789            )
2790            .await?
2791            {
2792                Err(DispatchError::Cancelled(
2793                    "trigger cancel request cancelled local handler".to_string(),
2794                ))
2795            } else {
2796                Err(DispatchError::Cancelled(
2797                    "dispatcher shutdown cancelled local handler".to_string(),
2798                ))
2799            }
2800        } else {
2801            result.map_err(dispatch_error_from_vm_error)
2802        }
2803    }
2804
2805    #[allow(clippy::too_many_arguments)]
2806    async fn invoke_vm_callable_with_timeout(
2807        &self,
2808        closure: &crate::value::VmClosure,
2809        binding_key: &str,
2810        event: &TriggerEvent,
2811        replay_of_event_id: Option<&String>,
2812        agent_id: &str,
2813        action: &str,
2814        autonomy_tier: AutonomyTier,
2815        cancel_rx: &mut broadcast::Receiver<()>,
2816        timeout: Option<Duration>,
2817    ) -> Result<VmValue, DispatchError> {
2818        let future = self.invoke_vm_callable(
2819            closure,
2820            binding_key,
2821            event,
2822            replay_of_event_id,
2823            agent_id,
2824            action,
2825            autonomy_tier,
2826            None,
2827            cancel_rx,
2828        );
2829        pin_mut!(future);
2830        if let Some(timeout) = timeout {
2831            match tokio::time::timeout(timeout, future).await {
2832                Ok(result) => result,
2833                Err(_) => Err(DispatchError::Local(
2834                    "predicate evaluation timed out".to_string(),
2835                )),
2836            }
2837        } else {
2838            future.await
2839        }
2840    }
2841
2842    #[allow(clippy::too_many_arguments)]
2843    async fn append_dispatch_trust_record(
2844        &self,
2845        binding: &TriggerBinding,
2846        route: &DispatchUri,
2847        event: &TriggerEvent,
2848        replay_of_event_id: Option<&String>,
2849        autonomy_tier: AutonomyTier,
2850        outcome: TrustOutcome,
2851        terminal_status: &str,
2852        attempt_count: u32,
2853        error: Option<String>,
2854    ) -> Result<(), DispatchError> {
2855        let mut record = TrustRecord::new(
2856            binding.id.as_str().to_string(),
2857            format!("{}.{}", event.provider.as_str(), event.kind),
2858            None,
2859            outcome,
2860            event.trace_id.0.clone(),
2861            autonomy_tier,
2862        );
2863        record.metadata.insert(
2864            "binding_key".to_string(),
2865            serde_json::json!(binding.binding_key()),
2866        );
2867        record.metadata.insert(
2868            "binding_version".to_string(),
2869            serde_json::json!(binding.version),
2870        );
2871        record.metadata.insert(
2872            "provider".to_string(),
2873            serde_json::json!(event.provider.as_str()),
2874        );
2875        record
2876            .metadata
2877            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2878        record
2879            .metadata
2880            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2881        record.metadata.insert(
2882            "target_uri".to_string(),
2883            serde_json::json!(route.target_uri()),
2884        );
2885        record.metadata.insert(
2886            "terminal_status".to_string(),
2887            serde_json::json!(terminal_status),
2888        );
2889        record.metadata.insert(
2890            "attempt_count".to_string(),
2891            serde_json::json!(attempt_count),
2892        );
2893        if let Some(replay_of_event_id) = replay_of_event_id {
2894            record.metadata.insert(
2895                "replay_of_event_id".to_string(),
2896                serde_json::json!(replay_of_event_id),
2897            );
2898        }
2899        if let Some(error) = error {
2900            record
2901                .metadata
2902                .insert("error".to_string(), serde_json::json!(error));
2903        }
2904        append_trust_record(&self.event_log, &record)
2905            .await
2906            .map(|_| ())
2907            .map_err(DispatchError::from)
2908    }
2909
2910    async fn append_attempt_record(
2911        &self,
2912        event: &TriggerEvent,
2913        binding: &TriggerBinding,
2914        attempt: &DispatchAttemptRecord,
2915        replay_of_event_id: Option<&String>,
2916    ) -> Result<(), DispatchError> {
2917        self.append_topic_event(
2918            TRIGGER_ATTEMPTS_TOPIC,
2919            "attempt_recorded",
2920            event,
2921            Some(binding),
2922            Some(attempt.attempt),
2923            serde_json::to_value(attempt)
2924                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2925            replay_of_event_id,
2926        )
2927        .await
2928    }
2929
2930    async fn append_lifecycle_event(
2931        &self,
2932        kind: &str,
2933        event: &TriggerEvent,
2934        binding: &TriggerBinding,
2935        payload: serde_json::Value,
2936        replay_of_event_id: Option<&String>,
2937    ) -> Result<(), DispatchError> {
2938        self.append_topic_event(
2939            TRIGGERS_LIFECYCLE_TOPIC,
2940            kind,
2941            event,
2942            Some(binding),
2943            None,
2944            payload,
2945            replay_of_event_id,
2946        )
2947        .await
2948    }
2949
2950    async fn append_autonomy_budget_approval_request(
2951        &self,
2952        binding: &TriggerBinding,
2953        route: &DispatchUri,
2954        event: &TriggerEvent,
2955        replay_of_event_id: Option<&String>,
2956        reason: &str,
2957    ) -> Result<String, DispatchError> {
2958        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2959        let detail = serde_json::json!({
2960            "trigger_id": binding.id.as_str(),
2961            "binding_key": binding.binding_key(),
2962            "event_id": event.id.0,
2963            "reason": reason,
2964            "from_tier": AutonomyTier::ActAuto.as_str(),
2965            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
2966            "handler_kind": route.kind(),
2967            "target_uri": route.target_uri(),
2968            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
2969            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
2970            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
2971            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
2972            "replay_of_event_id": replay_of_event_id,
2973        });
2974        let request_id = crate::stdlib::hitl::append_approval_request_on(
2975            &self.event_log,
2976            binding.id.as_str().to_string(),
2977            event.trace_id.0.clone(),
2978            format!(
2979                "approve autonomous dispatch for trigger '{}' after {}",
2980                binding.id.as_str(),
2981                reason
2982            ),
2983            detail.clone(),
2984            reviewers.clone(),
2985        )
2986        .await
2987        .map_err(dispatch_error_from_vm_error)?;
2988        self.append_lifecycle_event(
2989            "autonomy.budget_exceeded",
2990            event,
2991            binding,
2992            serde_json::json!({
2993                "trigger_id": binding.id.as_str(),
2994                "event_id": event.id.0,
2995                "reason": reason,
2996                "request_id": request_id,
2997                "reviewers": reviewers,
2998                "from_tier": AutonomyTier::ActAuto.as_str(),
2999                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3000                "replay_of_event_id": replay_of_event_id,
3001            }),
3002            replay_of_event_id,
3003        )
3004        .await?;
3005        Ok(request_id)
3006    }
3007
3008    #[allow(clippy::too_many_arguments)]
3009    async fn emit_autonomy_budget_approval_action_graph(
3010        &self,
3011        binding: &TriggerBinding,
3012        route: &DispatchUri,
3013        event: &TriggerEvent,
3014        source_node_id: &str,
3015        replay_of_event_id: Option<&String>,
3016        reason: &str,
3017        request_id: &str,
3018    ) -> Result<(), DispatchError> {
3019        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3020        self.emit_action_graph(
3021            event,
3022            vec![RunActionGraphNodeRecord {
3023                id: approval_node_id.clone(),
3024                label: format!("approval required: {reason}"),
3025                kind: "approval".to_string(),
3026                status: "waiting".to_string(),
3027                outcome: "request_approval".to_string(),
3028                trace_id: Some(event.trace_id.0.clone()),
3029                stage_id: None,
3030                node_id: None,
3031                worker_id: None,
3032                run_id: None,
3033                run_path: None,
3034                metadata: BTreeMap::from([
3035                    (
3036                        "trigger_id".to_string(),
3037                        serde_json::json!(binding.id.as_str()),
3038                    ),
3039                    (
3040                        "binding_key".to_string(),
3041                        serde_json::json!(binding.binding_key()),
3042                    ),
3043                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3044                    ("reason".to_string(), serde_json::json!(reason)),
3045                    ("request_id".to_string(), serde_json::json!(request_id)),
3046                    (
3047                        "reviewers".to_string(),
3048                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3049                    ),
3050                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3051                    (
3052                        "target_uri".to_string(),
3053                        serde_json::json!(route.target_uri()),
3054                    ),
3055                    (
3056                        "from_tier".to_string(),
3057                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3058                    ),
3059                    (
3060                        "requested_tier".to_string(),
3061                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3062                    ),
3063                    (
3064                        "replay_of_event_id".to_string(),
3065                        serde_json::json!(replay_of_event_id),
3066                    ),
3067                ]),
3068            }],
3069            vec![RunActionGraphEdgeRecord {
3070                from_id: source_node_id.to_string(),
3071                to_id: approval_node_id,
3072                kind: "approval_gate".to_string(),
3073                label: Some("autonomy budget".to_string()),
3074            }],
3075            serde_json::json!({
3076                "source": "dispatcher",
3077                "trigger_id": binding.id.as_str(),
3078                "binding_key": binding.binding_key(),
3079                "event_id": event.id.0,
3080                "reason": reason,
3081                "request_id": request_id,
3082                "replay_of_event_id": replay_of_event_id,
3083            }),
3084        )
3085        .await
3086    }
3087
3088    #[allow(clippy::too_many_arguments)]
3089    async fn append_tier_transition_trust_record(
3090        &self,
3091        binding: &TriggerBinding,
3092        event: &TriggerEvent,
3093        replay_of_event_id: Option<&String>,
3094        from_tier: AutonomyTier,
3095        to_tier: AutonomyTier,
3096        reason: &str,
3097        request_id: &str,
3098    ) -> Result<(), DispatchError> {
3099        let mut record = TrustRecord::new(
3100            binding.id.as_str().to_string(),
3101            "autonomy.tier_transition",
3102            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3103            TrustOutcome::Denied,
3104            event.trace_id.0.clone(),
3105            to_tier,
3106        );
3107        record.metadata.insert(
3108            "binding_key".to_string(),
3109            serde_json::json!(binding.binding_key()),
3110        );
3111        record
3112            .metadata
3113            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3114        record.metadata.insert(
3115            "from_tier".to_string(),
3116            serde_json::json!(from_tier.as_str()),
3117        );
3118        record
3119            .metadata
3120            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3121        record
3122            .metadata
3123            .insert("reason".to_string(), serde_json::json!(reason));
3124        record
3125            .metadata
3126            .insert("request_id".to_string(), serde_json::json!(request_id));
3127        if let Some(replay_of_event_id) = replay_of_event_id {
3128            record.metadata.insert(
3129                "replay_of_event_id".to_string(),
3130                serde_json::json!(replay_of_event_id),
3131            );
3132        }
3133        append_trust_record(&self.event_log, &record)
3134            .await
3135            .map(|_| ())
3136            .map_err(DispatchError::from)
3137    }
3138
3139    async fn append_budget_deferred_event(
3140        &self,
3141        binding: &TriggerBinding,
3142        route: &DispatchUri,
3143        event: &TriggerEvent,
3144        replay_of_event_id: Option<&String>,
3145        reason: &str,
3146    ) -> Result<(), DispatchError> {
3147        self.append_topic_event(
3148            TRIGGER_ATTEMPTS_TOPIC,
3149            "budget_deferred",
3150            event,
3151            Some(binding),
3152            None,
3153            serde_json::json!({
3154                "event_id": event.id.0,
3155                "trigger_id": binding.id.as_str(),
3156                "binding_key": binding.binding_key(),
3157                "handler_kind": route.kind(),
3158                "target_uri": route.target_uri(),
3159                "reason": reason,
3160                "retry_at": next_budget_reset_rfc3339(binding),
3161                "replay_of_event_id": replay_of_event_id,
3162            }),
3163            replay_of_event_id,
3164        )
3165        .await?;
3166        self.append_skipped_outbox_event(
3167            binding,
3168            route,
3169            event,
3170            replay_of_event_id,
3171            DispatchSkipStage::Predicate,
3172            serde_json::json!({
3173                "deferred": true,
3174                "reason": reason,
3175                "retry_at": next_budget_reset_rfc3339(binding),
3176            }),
3177        )
3178        .await
3179    }
3180
3181    async fn append_skipped_outbox_event(
3182        &self,
3183        binding: &TriggerBinding,
3184        route: &DispatchUri,
3185        event: &TriggerEvent,
3186        replay_of_event_id: Option<&String>,
3187        stage: DispatchSkipStage,
3188        detail: serde_json::Value,
3189    ) -> Result<(), DispatchError> {
3190        self.append_topic_event(
3191            TRIGGER_OUTBOX_TOPIC,
3192            "dispatch_skipped",
3193            event,
3194            Some(binding),
3195            None,
3196            serde_json::json!({
3197                "event_id": event.id.0,
3198                "trigger_id": binding.id.as_str(),
3199                "binding_key": binding.binding_key(),
3200                "handler_kind": route.kind(),
3201                "target_uri": route.target_uri(),
3202                "skip_stage": stage.as_str(),
3203                "detail": detail,
3204                "replay_of_event_id": replay_of_event_id,
3205            }),
3206            replay_of_event_id,
3207        )
3208        .await
3209    }
3210
3211    async fn append_topic_event(
3212        &self,
3213        topic_name: &str,
3214        kind: &str,
3215        event: &TriggerEvent,
3216        binding: Option<&TriggerBinding>,
3217        attempt: Option<u32>,
3218        payload: serde_json::Value,
3219        replay_of_event_id: Option<&String>,
3220    ) -> Result<(), DispatchError> {
3221        let topic = topic_for_event(event, topic_name)?;
3222        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3223        self.event_log
3224            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3225            .await
3226            .map_err(DispatchError::from)
3227            .map(|_| ())
3228    }
3229
3230    async fn emit_action_graph(
3231        &self,
3232        event: &TriggerEvent,
3233        nodes: Vec<RunActionGraphNodeRecord>,
3234        edges: Vec<RunActionGraphEdgeRecord>,
3235        extra: serde_json::Value,
3236    ) -> Result<(), DispatchError> {
3237        let mut headers = BTreeMap::new();
3238        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3239        headers.insert("event_id".to_string(), event.id.0.clone());
3240        let observability = RunObservabilityRecord {
3241            schema_version: 1,
3242            action_graph_nodes: nodes,
3243            action_graph_edges: edges,
3244            ..Default::default()
3245        };
3246        append_action_graph_update(
3247            headers,
3248            serde_json::json!({
3249                "source": "dispatcher",
3250                "trace_id": event.trace_id.0,
3251                "event_id": event.id.0,
3252                "observability": observability,
3253                "context": extra,
3254            }),
3255        )
3256        .await
3257        .map_err(DispatchError::from)
3258    }
3259}
3260
3261async fn dispatch_cancel_requested(
3262    event_log: &Arc<AnyEventLog>,
3263    binding_key: &str,
3264    event_id: &str,
3265    replay_of_event_id: Option<&String>,
3266) -> Result<bool, DispatchError> {
3267    if replay_of_event_id.is_some() {
3268        return Ok(false);
3269    }
3270    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3271        .expect("static trigger cancel topic should always be valid");
3272    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3273    let requested = events
3274        .into_iter()
3275        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3276        .filter_map(|(_, event)| {
3277            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3278        })
3279        .collect::<BTreeSet<_>>();
3280    Ok(requested
3281        .iter()
3282        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3283}
3284
3285async fn sleep_or_cancel_or_request(
3286    event_log: &Arc<AnyEventLog>,
3287    delay: Duration,
3288    binding_key: &str,
3289    event_id: &str,
3290    replay_of_event_id: Option<&String>,
3291    cancel_rx: &mut broadcast::Receiver<()>,
3292) -> Result<(), DispatchError> {
3293    let sleep = tokio::time::sleep(delay);
3294    pin_mut!(sleep);
3295    let mut poll = tokio::time::interval(Duration::from_millis(100));
3296    loop {
3297        tokio::select! {
3298            _ = &mut sleep => return Ok(()),
3299            _ = recv_cancel(cancel_rx) => {
3300                return Err(DispatchError::Cancelled(
3301                    "dispatcher shutdown cancelled retry wait".to_string(),
3302                ));
3303            }
3304            _ = poll.tick() => {
3305                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3306                    return Err(DispatchError::Cancelled(
3307                        "trigger cancel request cancelled retry wait".to_string(),
3308                    ));
3309                }
3310            }
3311        }
3312    }
3313}
3314
3315fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3316    let mut iter = events.into_iter();
3317    let Some(mut root) = iter.next() else {
3318        return Err(DispatchError::Registry(
3319            "batch dispatch produced an empty event list".to_string(),
3320        ));
3321    };
3322    let mut batch = Vec::new();
3323    batch.push(
3324        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3325    );
3326    for event in iter {
3327        batch.push(
3328            serde_json::to_value(&event)
3329                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3330        );
3331    }
3332    root.batch = Some(batch);
3333    Ok(root)
3334}
3335
3336fn json_value_to_gate(value: &serde_json::Value) -> String {
3337    match value {
3338        serde_json::Value::Null => "null".to_string(),
3339        serde_json::Value::String(text) => text.clone(),
3340        serde_json::Value::Bool(value) => value.to_string(),
3341        serde_json::Value::Number(value) => value.to_string(),
3342        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3343    }
3344}
3345
3346fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3347    let json =
3348        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3349    let value = json_to_vm_value(&json);
3350    match (&event.raw_body, value) {
3351        (Some(raw_body), VmValue::Dict(dict)) => {
3352            let mut map = (*dict).clone();
3353            map.insert(
3354                "raw_body".to_string(),
3355                VmValue::Bytes(Rc::new(raw_body.clone())),
3356            );
3357            Ok(VmValue::Dict(Rc::new(map)))
3358        }
3359        (_, other) => Ok(other),
3360    }
3361}
3362
3363fn decrement_in_flight(state: &DispatcherRuntimeState) {
3364    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3365    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3366        state.idle_notify.notify_waiters();
3367    }
3368}
3369
3370fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3371    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3372    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3373        state.idle_notify.notify_waiters();
3374    }
3375}
3376
3377#[cfg(test)]
3378fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3379    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3380        *slot.borrow_mut() = Some(tx);
3381    });
3382}
3383
3384#[cfg(not(test))]
3385fn notify_test_inbox_dequeued() {}
3386
3387#[cfg(test)]
3388fn notify_test_inbox_dequeued() {
3389    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3390        if let Some(tx) = slot.borrow_mut().take() {
3391            let _ = tx.send(());
3392        }
3393    });
3394}
3395
3396pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3397    event_log: &L,
3398    event: &TriggerEvent,
3399) -> Result<u64, DispatchError> {
3400    let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
3401    let headers = event_headers(event, None, None, None);
3402    let payload =
3403        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3404    event_log
3405        .append(
3406            &topic,
3407            LogEvent::new("event_ingested", payload).with_headers(headers),
3408        )
3409        .await
3410        .map_err(DispatchError::from)
3411}
3412
3413pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3414    ACTIVE_DISPATCHER_STATE.with(|slot| {
3415        slot.borrow()
3416            .as_ref()
3417            .map(|state| DispatcherStatsSnapshot {
3418                in_flight: state.in_flight.load(Ordering::Relaxed),
3419                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3420                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3421            })
3422            .unwrap_or_default()
3423    })
3424}
3425
3426pub fn clear_dispatcher_state() {
3427    ACTIVE_DISPATCHER_STATE.with(|slot| {
3428        *slot.borrow_mut() = None;
3429    });
3430    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3431        *slot.borrow_mut() = None;
3432    });
3433}
3434
3435fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3436    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3437        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3438    }
3439    if is_cancelled_vm_error(&error) {
3440        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3441    }
3442    if let VmError::Thrown(VmValue::String(message)) = &error {
3443        return DispatchError::Local(message.to_string());
3444    }
3445    match error_to_category(&error) {
3446        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3447        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3448        ErrorCategory::Cancelled => {
3449            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3450        }
3451        _ => DispatchError::Local(error.to_string()),
3452    }
3453}
3454
3455fn dispatch_error_label(error: &DispatchError) -> &'static str {
3456    match error {
3457        DispatchError::Denied(_) => "denied",
3458        DispatchError::Timeout(_) => "timeout",
3459        DispatchError::Waiting(_) => "waiting",
3460        DispatchError::Cancelled(_) => "cancelled",
3461        _ => "failed",
3462    }
3463}
3464
3465fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3466    match route {
3467        DispatchUri::Worker { .. } => "enqueued",
3468        DispatchUri::A2a { .. }
3469            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3470        {
3471            "pending"
3472        }
3473        DispatchUri::A2a { .. } => "completed",
3474        DispatchUri::Local { .. } => "success",
3475    }
3476}
3477
3478fn dispatch_node_id(
3479    route: &DispatchUri,
3480    binding_key: &str,
3481    event_id: &str,
3482    attempt: u32,
3483) -> String {
3484    let prefix = match route {
3485        DispatchUri::A2a { .. } => "a2a",
3486        _ => "dispatch",
3487    };
3488    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3489}
3490
3491fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3492    match route {
3493        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3494        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3495        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3496    }
3497}
3498
3499fn dispatch_node_label(route: &DispatchUri) -> String {
3500    match route {
3501        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3502        _ => route.target_uri(),
3503    }
3504}
3505
3506fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3507    match route {
3508        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3509        _ => None,
3510    }
3511}
3512
3513fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3514    match route {
3515        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3516        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3517        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3518    }
3519}
3520
3521fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3522    match status {
3523        crate::triggers::SignatureStatus::Verified => "verified",
3524        crate::triggers::SignatureStatus::Unsigned => "unsigned",
3525        crate::triggers::SignatureStatus::Failed { .. } => "failed",
3526    }
3527}
3528
3529fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3530    let mut metadata = BTreeMap::new();
3531    metadata.insert(
3532        "provider".to_string(),
3533        serde_json::json!(event.provider.as_str()),
3534    );
3535    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3536    metadata.insert(
3537        "dedupe_key".to_string(),
3538        serde_json::json!(event.dedupe_key),
3539    );
3540    metadata.insert(
3541        "signature_status".to_string(),
3542        serde_json::json!(signature_status_label(&event.signature_status)),
3543    );
3544    metadata
3545}
3546
3547fn dispatch_node_metadata(
3548    route: &DispatchUri,
3549    binding: &TriggerBinding,
3550    event: &TriggerEvent,
3551    attempt: u32,
3552) -> BTreeMap<String, serde_json::Value> {
3553    let mut metadata = BTreeMap::new();
3554    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3555    metadata.insert(
3556        "target_uri".to_string(),
3557        serde_json::json!(route.target_uri()),
3558    );
3559    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3560    metadata.insert(
3561        "trigger_id".to_string(),
3562        serde_json::json!(binding.id.as_str()),
3563    );
3564    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3565    if let Some(target_agent) = dispatch_target_agent(route) {
3566        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3567    }
3568    if let DispatchUri::Worker { queue } = route {
3569        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3570    }
3571    metadata
3572}
3573
3574fn dispatch_success_metadata(
3575    route: &DispatchUri,
3576    binding: &TriggerBinding,
3577    event: &TriggerEvent,
3578    attempt: u32,
3579    result: &serde_json::Value,
3580) -> BTreeMap<String, serde_json::Value> {
3581    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3582    match route {
3583        DispatchUri::A2a { .. } => {
3584            if let Some(task_id) = result
3585                .get("task_id")
3586                .or_else(|| result.get("id"))
3587                .and_then(|value| value.as_str())
3588            {
3589                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3590            }
3591            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3592                metadata.insert("state".to_string(), serde_json::json!(state));
3593            }
3594        }
3595        DispatchUri::Worker { .. } => {
3596            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3597            {
3598                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3599            }
3600            if let Some(response_topic) = result
3601                .get("response_topic")
3602                .and_then(|value| value.as_str())
3603            {
3604                metadata.insert(
3605                    "response_topic".to_string(),
3606                    serde_json::json!(response_topic),
3607                );
3608            }
3609        }
3610        DispatchUri::Local { .. } => {}
3611    }
3612    metadata
3613}
3614
3615fn dispatch_error_metadata(
3616    route: &DispatchUri,
3617    binding: &TriggerBinding,
3618    event: &TriggerEvent,
3619    attempt: u32,
3620    error: &DispatchError,
3621) -> BTreeMap<String, serde_json::Value> {
3622    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3623    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3624    metadata
3625}
3626
3627fn retry_node_metadata(
3628    binding: &TriggerBinding,
3629    event: &TriggerEvent,
3630    attempt: u32,
3631    delay: Duration,
3632    error: &DispatchError,
3633) -> BTreeMap<String, serde_json::Value> {
3634    let mut metadata = BTreeMap::new();
3635    metadata.insert(
3636        "trigger_id".to_string(),
3637        serde_json::json!(binding.id.as_str()),
3638    );
3639    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3640    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3641    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3642    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3643    metadata
3644}
3645
3646fn split_binding_key(binding_key: &str) -> (String, u32) {
3647    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3648        return (binding_key.to_string(), 0);
3649    };
3650    let version = suffix.parse::<u32>().unwrap_or(0);
3651    (binding_id.to_string(), version)
3652}
3653
3654fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
3655    match binding_version {
3656        Some(version) => format!("{trigger_id}@v{version}"),
3657        None => trigger_id.to_string(),
3658    }
3659}
3660
3661fn tenant_id(event: &TriggerEvent) -> Option<&str> {
3662    event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
3663}
3664
3665fn current_unix_ms() -> i64 {
3666    unix_ms(time::OffsetDateTime::now_utc())
3667}
3668
3669fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
3670    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
3671}
3672
3673fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3674    lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
3675        .unwrap_or_else(|| unix_ms(event.received_at))
3676}
3677
3678fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3679    lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
3680        .unwrap_or_else(|| accepted_at_ms(headers, event))
3681}
3682
3683fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
3684    headers
3685        .and_then(|headers| headers.get(name))
3686        .and_then(|value| value.parse::<i64>().ok())
3687}
3688
3689fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
3690    Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
3691}
3692
3693fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
3694    match result {
3695        Ok(_) => "succeeded",
3696        Err(DispatchError::Waiting(_)) => "waiting",
3697        Err(DispatchError::Cancelled(_)) => "cancelled",
3698        Err(DispatchError::Denied(_)) => "denied",
3699        Err(DispatchError::Timeout(_)) => "timeout",
3700        Err(_) => "failed",
3701    }
3702}
3703
3704fn is_cancelled_vm_error(error: &VmError) -> bool {
3705    matches!(
3706        error,
3707        VmError::Thrown(VmValue::String(message))
3708            if message.starts_with("kind:cancelled:")
3709    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3710}
3711
3712fn event_headers(
3713    event: &TriggerEvent,
3714    binding: Option<&TriggerBinding>,
3715    attempt: Option<u32>,
3716    replay_of_event_id: Option<&String>,
3717) -> BTreeMap<String, String> {
3718    let mut headers = BTreeMap::new();
3719    headers.insert("event_id".to_string(), event.id.0.clone());
3720    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3721    headers.insert("provider".to_string(), event.provider.as_str().to_string());
3722    headers.insert("kind".to_string(), event.kind.clone());
3723    if let Some(replay_of_event_id) = replay_of_event_id {
3724        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3725    }
3726    if let Some(tenant_id) = event.tenant_id.as_ref() {
3727        headers.insert("tenant_id".to_string(), tenant_id.0.clone());
3728    }
3729    if let Some(binding) = binding {
3730        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3731        headers.insert("binding_key".to_string(), binding.binding_key());
3732        headers.insert(
3733            "handler_kind".to_string(),
3734            DispatchUri::from(&binding.handler).kind().to_string(),
3735        );
3736    }
3737    if let Some(attempt) = attempt {
3738        headers.insert("attempt".to_string(), attempt.to_string());
3739    }
3740    headers
3741}
3742
3743fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
3744    let topic = Topic::new(topic_name)
3745        .expect("static trigger dispatcher topic names should always be valid");
3746    match event.tenant_id.as_ref() {
3747        Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
3748        None => Ok(topic),
3749    }
3750}
3751
3752fn worker_queue_priority(
3753    binding: &super::registry::TriggerBinding,
3754    event: &TriggerEvent,
3755) -> crate::WorkerQueuePriority {
3756    match event
3757        .headers
3758        .get("priority")
3759        .map(|value| value.trim().to_ascii_lowercase())
3760        .as_deref()
3761    {
3762        Some("high") => crate::WorkerQueuePriority::High,
3763        Some("low") => crate::WorkerQueuePriority::Low,
3764        _ => binding.dispatch_priority,
3765    }
3766}
3767
3768const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3769
3770fn maybe_fail_before_outbox() {
3771    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3772        std::process::exit(86);
3773    }
3774}
3775
3776fn now_rfc3339() -> String {
3777    time::OffsetDateTime::now_utc()
3778        .format(&time::format_description::well_known::Rfc3339)
3779        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3780}
3781
3782fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3783    let now = time::OffsetDateTime::now_utc();
3784    let reset = if binding.hourly_cost_usd.is_some() {
3785        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3786        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3787    } else {
3788        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3789        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3790    };
3791    reset
3792        .format(&time::format_description::well_known::Rfc3339)
3793        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3794}
3795
3796fn now_unix_ms() -> i64 {
3797    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3798}
3799
3800fn cancelled_dispatch_outcome(
3801    binding: &TriggerBinding,
3802    route: &DispatchUri,
3803    event: &TriggerEvent,
3804    replay_of_event_id: Option<String>,
3805    attempt_count: u32,
3806    error: String,
3807) -> DispatchOutcome {
3808    DispatchOutcome {
3809        trigger_id: binding.id.as_str().to_string(),
3810        binding_key: binding.binding_key(),
3811        event_id: event.id.0.clone(),
3812        attempt_count,
3813        status: DispatchStatus::Cancelled,
3814        handler_kind: route.kind().to_string(),
3815        target_uri: route.target_uri(),
3816        replay_of_event_id,
3817        result: None,
3818        error: Some(error),
3819    }
3820}
3821
3822async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3823    let _ = cancel_rx.recv().await;
3824}
3825
3826#[cfg(test)]
3827mod tests;