Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::vm_value_to_json;
16use crate::orchestration::{
17    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
18    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
19    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
20    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
21    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
22    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
23    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
24};
25use crate::stdlib::json_to_vm_value;
26use crate::trust_graph::{
27    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
28};
29use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
30use crate::vm::Vm;
31
32use self::uri::DispatchUri;
33use super::registry::{
34    binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
35    TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
36};
37use super::{
38    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
39    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
40    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
41};
42use circuits::{
43    destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
44    DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
45};
46use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
47use predicate_eval::predicate_node_metadata;
48
49mod circuits;
50mod flow_control;
51mod predicate_eval;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
58pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
59pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
60
61thread_local! {
62    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65    #[cfg(test)]
66    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70    static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75    pub trigger_event: TriggerEvent,
76    pub replay_of_event_id: Option<String>,
77    pub binding_id: String,
78    pub binding_version: u32,
79    pub agent_id: String,
80    pub action: String,
81    pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87    fn drop(&mut self) {
88        crate::orchestration::pop_execution_policy();
89    }
90}
91
92const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
93
94pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
95    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
96}
97
98pub(crate) fn current_dispatch_is_replay() -> bool {
99    ACTIVE_DISPATCH_IS_REPLAY
100        .try_with(|is_replay| *is_replay)
101        .unwrap_or(false)
102}
103
104pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
105    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
106}
107
108#[derive(Clone)]
109pub struct Dispatcher {
110    base_vm: Rc<Vm>,
111    event_log: Arc<AnyEventLog>,
112    cancel_tx: broadcast::Sender<()>,
113    state: Arc<DispatcherRuntimeState>,
114    metrics: Option<Arc<crate::MetricsRegistry>>,
115}
116
117#[derive(Debug)]
118struct DispatcherRuntimeState {
119    in_flight: AtomicU64,
120    retry_queue_depth: AtomicU64,
121    dlq: Mutex<Vec<DlqEntry>>,
122    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
123    shutting_down: std::sync::atomic::AtomicBool,
124    idle_notify: Notify,
125    flow_control: FlowControlManager,
126    destination_circuits: DestinationCircuitRegistry,
127}
128
129impl DispatcherRuntimeState {
130    fn new(event_log: Arc<AnyEventLog>) -> Self {
131        Self {
132            in_flight: AtomicU64::new(0),
133            retry_queue_depth: AtomicU64::new(0),
134            dlq: Mutex::new(Vec::new()),
135            cancel_tokens: Mutex::new(Vec::new()),
136            shutting_down: std::sync::atomic::AtomicBool::new(false),
137            idle_notify: Notify::new(),
138            flow_control: FlowControlManager::new(event_log),
139            destination_circuits: DestinationCircuitRegistry::default(),
140        }
141    }
142}
143
144#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(default)]
146pub struct DispatcherStatsSnapshot {
147    pub in_flight: u64,
148    pub retry_queue_depth: u64,
149    pub dlq_depth: u64,
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum DispatchStatus {
155    Succeeded,
156    Failed,
157    Dlq,
158    Skipped,
159    Waiting,
160    Cancelled,
161}
162
163impl DispatchStatus {
164    pub fn as_str(&self) -> &'static str {
165        match self {
166            Self::Succeeded => "succeeded",
167            Self::Failed => "failed",
168            Self::Dlq => "dlq",
169            Self::Skipped => "skipped",
170            Self::Waiting => "waiting",
171            Self::Cancelled => "cancelled",
172        }
173    }
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177#[serde(default)]
178pub struct DispatchOutcome {
179    pub trigger_id: String,
180    pub binding_key: String,
181    pub event_id: String,
182    pub attempt_count: u32,
183    pub status: DispatchStatus,
184    pub handler_kind: String,
185    pub target_uri: String,
186    pub replay_of_event_id: Option<String>,
187    pub result: Option<serde_json::Value>,
188    pub error: Option<String>,
189}
190
191#[derive(Clone, Debug, Serialize, Deserialize)]
192pub struct InboxEnvelope {
193    pub trigger_id: Option<String>,
194    pub binding_version: Option<u32>,
195    pub event: TriggerEvent,
196}
197
198#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
199#[serde(default)]
200pub struct DispatcherDrainReport {
201    pub drained: bool,
202    pub in_flight: u64,
203    pub retry_queue_depth: u64,
204    pub dlq_depth: u64,
205}
206
207impl Default for DispatchOutcome {
208    fn default() -> Self {
209        Self {
210            trigger_id: String::new(),
211            binding_key: String::new(),
212            event_id: String::new(),
213            attempt_count: 0,
214            status: DispatchStatus::Failed,
215            handler_kind: String::new(),
216            target_uri: String::new(),
217            replay_of_event_id: None,
218            result: None,
219            error: None,
220        }
221    }
222}
223
224#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
225#[serde(default)]
226pub struct DispatchAttemptRecord {
227    pub trigger_id: String,
228    pub binding_key: String,
229    pub event_id: String,
230    pub attempt: u32,
231    pub handler_kind: String,
232    pub started_at: String,
233    pub completed_at: String,
234    pub outcome: String,
235    pub error_msg: Option<String>,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
239pub struct DispatchCancelRequest {
240    pub binding_key: String,
241    pub event_id: String,
242    #[serde(with = "time::serde::rfc3339")]
243    pub requested_at: time::OffsetDateTime,
244    #[serde(default)]
245    pub requested_by: Option<String>,
246    #[serde(default)]
247    pub audit_id: Option<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct DlqEntry {
252    pub trigger_id: String,
253    pub binding_key: String,
254    pub event: TriggerEvent,
255    pub attempt_count: u32,
256    pub final_error: String,
257    #[serde(default = "default_dlq_error_class")]
258    pub error_class: String,
259    pub attempts: Vec<DispatchAttemptRecord>,
260}
261
262fn default_dlq_error_class() -> String {
263    "unknown".to_string()
264}
265
266#[derive(Clone, Debug)]
267struct SingletonLease {
268    gate: String,
269    held: bool,
270}
271
272#[derive(Clone, Debug)]
273struct ConcurrencyLease {
274    gate: String,
275    max: u32,
276    priority_rank: usize,
277    permit: Option<ConcurrencyPermit>,
278}
279
280#[derive(Default, Debug)]
281struct AcquiredFlowControl {
282    singleton: Option<SingletonLease>,
283    concurrency: Option<ConcurrencyLease>,
284}
285
286#[derive(Clone)]
287pub(crate) struct DispatchWaitLease {
288    state: Arc<DispatcherRuntimeState>,
289    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
290    suspended: Arc<AtomicBool>,
291}
292
293impl DispatchWaitLease {
294    fn new(
295        state: Arc<DispatcherRuntimeState>,
296        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
297    ) -> Self {
298        Self {
299            state,
300            acquired,
301            suspended: Arc::new(AtomicBool::new(false)),
302        }
303    }
304
305    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
306        if self.suspended.swap(true, Ordering::SeqCst) {
307            return Ok(());
308        }
309        let (singleton_gate, concurrency_permit) = {
310            let mut acquired = self.acquired.lock().await;
311            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
312                if lease.held {
313                    lease.held = false;
314                    Some(lease.gate.clone())
315                } else {
316                    None
317                }
318            });
319            let concurrency_permit = acquired
320                .concurrency
321                .as_mut()
322                .and_then(|lease| lease.permit.take());
323            (singleton_gate, concurrency_permit)
324        };
325
326        if let Some(gate) = singleton_gate {
327            self.state
328                .flow_control
329                .release_singleton(&gate)
330                .await
331                .map_err(DispatchError::from)?;
332        }
333        if let Some(permit) = concurrency_permit {
334            self.state
335                .flow_control
336                .release_concurrency(permit)
337                .await
338                .map_err(DispatchError::from)?;
339        }
340        Ok(())
341    }
342
343    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
344        if !self.suspended.swap(false, Ordering::SeqCst) {
345            return Ok(());
346        }
347
348        let singleton_gate = {
349            let acquired = self.acquired.lock().await;
350            acquired.singleton.as_ref().and_then(|lease| {
351                if lease.held {
352                    None
353                } else {
354                    Some(lease.gate.clone())
355                }
356            })
357        };
358        if let Some(gate) = singleton_gate {
359            self.state
360                .flow_control
361                .acquire_singleton(&gate)
362                .await
363                .map_err(DispatchError::from)?;
364            let mut acquired = self.acquired.lock().await;
365            if let Some(lease) = acquired.singleton.as_mut() {
366                lease.held = true;
367            }
368        }
369
370        let concurrency_spec = {
371            let acquired = self.acquired.lock().await;
372            acquired.concurrency.as_ref().and_then(|lease| {
373                if lease.permit.is_some() {
374                    None
375                } else {
376                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
377                }
378            })
379        };
380        if let Some((gate, max, priority_rank)) = concurrency_spec {
381            let permit = self
382                .state
383                .flow_control
384                .acquire_concurrency(&gate, max, priority_rank)
385                .await
386                .map_err(DispatchError::from)?;
387            let mut acquired = self.acquired.lock().await;
388            if let Some(lease) = acquired.concurrency.as_mut() {
389                lease.permit = Some(permit);
390            }
391        }
392        Ok(())
393    }
394}
395
396enum FlowControlOutcome {
397    Dispatch {
398        event: Box<TriggerEvent>,
399        acquired: AcquiredFlowControl,
400    },
401    Skip {
402        reason: String,
403    },
404}
405
406#[derive(Clone, Debug)]
407enum DispatchSkipStage {
408    Predicate,
409    FlowControl,
410}
411
412#[derive(Debug)]
413pub enum DispatchError {
414    EventLog(String),
415    Registry(String),
416    Serde(String),
417    Local(String),
418    A2a(String),
419    Denied(String),
420    Timeout(String),
421    Waiting(String),
422    Cancelled(String),
423    NotImplemented(String),
424}
425
426impl std::fmt::Display for DispatchError {
427    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428        match self {
429            Self::EventLog(message)
430            | Self::Registry(message)
431            | Self::Serde(message)
432            | Self::Local(message)
433            | Self::A2a(message)
434            | Self::Denied(message)
435            | Self::Timeout(message)
436            | Self::Waiting(message)
437            | Self::Cancelled(message)
438            | Self::NotImplemented(message) => f.write_str(message),
439        }
440    }
441}
442
443impl std::error::Error for DispatchError {}
444
445impl DispatchError {
446    fn retryable(&self) -> bool {
447        !matches!(
448            self,
449            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
450        )
451    }
452}
453
454impl DispatchSkipStage {
455    fn as_str(&self) -> &'static str {
456        match self {
457            Self::Predicate => "predicate",
458            Self::FlowControl => "flow_control",
459        }
460    }
461}
462
463impl From<LogError> for DispatchError {
464    fn from(value: LogError) -> Self {
465        Self::EventLog(value.to_string())
466    }
467}
468
469pub async fn append_dispatch_cancel_request(
470    event_log: &Arc<AnyEventLog>,
471    request: &DispatchCancelRequest,
472) -> Result<u64, DispatchError> {
473    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
474        .expect("static trigger cancel topic should always be valid");
475    event_log
476        .append(
477            &topic,
478            LogEvent::new(
479                "dispatch_cancel_requested",
480                serde_json::to_value(request)
481                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
482            ),
483        )
484        .await
485        .map_err(DispatchError::from)
486}
487
488impl Dispatcher {
489    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
490        self.event_log.clone()
491    }
492
493    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
494        let event_log = active_event_log().ok_or_else(|| {
495            DispatchError::EventLog("dispatcher requires an active event log".to_string())
496        })?;
497        Ok(Self::with_event_log(base_vm, event_log))
498    }
499
500    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
501        Self::with_event_log_and_metrics(base_vm, event_log, None)
502    }
503
504    pub fn with_event_log_and_metrics(
505        base_vm: Vm,
506        event_log: Arc<AnyEventLog>,
507        metrics: Option<Arc<crate::MetricsRegistry>>,
508    ) -> Self {
509        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
510        ACTIVE_DISPATCHER_STATE.with(|slot| {
511            *slot.borrow_mut() = Some(state.clone());
512        });
513        let (cancel_tx, _) = broadcast::channel(32);
514        Self {
515            base_vm: Rc::new(base_vm),
516            event_log,
517            cancel_tx,
518            state,
519            metrics,
520        }
521    }
522
523    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
524        DispatcherStatsSnapshot {
525            in_flight: self.state.in_flight.load(Ordering::Relaxed),
526            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
527            dlq_depth: self
528                .state
529                .dlq
530                .lock()
531                .expect("dispatcher dlq poisoned")
532                .len() as u64,
533        }
534    }
535
536    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
537        self.state
538            .dlq
539            .lock()
540            .expect("dispatcher dlq poisoned")
541            .clone()
542    }
543
544    pub fn shutdown(&self) {
545        self.state.shutting_down.store(true, Ordering::SeqCst);
546        for token in self
547            .state
548            .cancel_tokens
549            .lock()
550            .expect("dispatcher cancel tokens poisoned")
551            .iter()
552        {
553            token.store(true, Ordering::SeqCst);
554        }
555        let _ = self.cancel_tx.send(());
556    }
557
558    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
559        self.enqueue_targeted(None, None, event).await
560    }
561
562    pub async fn enqueue_targeted(
563        &self,
564        trigger_id: Option<String>,
565        binding_version: Option<u32>,
566        event: TriggerEvent,
567    ) -> Result<u64, DispatchError> {
568        self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
569            .await
570    }
571
572    pub async fn enqueue_targeted_with_headers(
573        &self,
574        trigger_id: Option<String>,
575        binding_version: Option<u32>,
576        event: TriggerEvent,
577        parent_headers: Option<&BTreeMap<String, String>>,
578    ) -> Result<u64, DispatchError> {
579        let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
580        let trigger_id_for_metrics = trigger_id.clone();
581        let mut headers = parent_headers.cloned().unwrap_or_default();
582        headers.extend(event_headers(&event, None, None, None));
583        if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
584            headers.insert("trigger_id".to_string(), trigger_id.clone());
585            headers.insert(
586                "binding_key".to_string(),
587                binding_key_from_parts(trigger_id, binding_version),
588            );
589        }
590        headers
591            .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
592            .or_insert_with(|| unix_ms(event.received_at).to_string());
593        let payload = serde_json::to_value(InboxEnvelope {
594            trigger_id,
595            binding_version,
596            event: event.clone(),
597        })
598        .map_err(|error| DispatchError::Serde(error.to_string()))?;
599        let mut log_event = LogEvent::new("event_ingested", payload);
600        let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
601        let queue_appended_at_ms = headers
602            .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
603            .and_then(|value| value.parse::<i64>().ok())
604            .unwrap_or(log_event.occurred_at_ms);
605        headers
606            .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
607            .or_insert_with(|| log_event.occurred_at_ms.to_string());
608        if let (Some(metrics), Some(trigger_id)) =
609            (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
610        {
611            let binding_key = binding_key_from_parts(trigger_id, binding_version);
612            let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
613            if !had_queue_appended_at {
614                metrics.record_trigger_accepted_to_queue_append(
615                    trigger_id,
616                    &binding_key,
617                    event.provider.as_str(),
618                    tenant_id(&event),
619                    "queued",
620                    duration_between_ms(queue_appended_at_ms, accepted_at_ms),
621                );
622            }
623            metrics.note_trigger_pending_event(
624                event.id.0.as_str(),
625                trigger_id,
626                &binding_key,
627                event.provider.as_str(),
628                tenant_id(&event),
629                accepted_at_ms,
630                queue_appended_at_ms,
631            );
632        }
633        log_event.headers = headers;
634        self.event_log
635            .append(&topic, log_event)
636            .await
637            .map_err(DispatchError::from)
638    }
639
640    pub async fn run(&self) -> Result<(), DispatchError> {
641        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
642            .expect("static trigger inbox envelopes topic is valid");
643        let start_from = self.event_log.latest(&topic).await?;
644        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
645        pin_mut!(stream);
646        let mut cancel_rx = self.cancel_tx.subscribe();
647
648        loop {
649            tokio::select! {
650                received = stream.next() => {
651                    let Some(received) = received else {
652                        break;
653                    };
654                    let (_, event) = received.map_err(DispatchError::from)?;
655                    if event.kind != "event_ingested" {
656                        continue;
657                    }
658                    let parent_headers = event.headers.clone();
659                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
660                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
661                    notify_test_inbox_dequeued();
662                    let _ = self
663                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
664                        .await;
665                }
666                _ = recv_cancel(&mut cancel_rx) => break,
667            }
668        }
669
670        Ok(())
671    }
672
673    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
674        let deadline = tokio::time::Instant::now() + timeout;
675        loop {
676            let snapshot = self.snapshot();
677            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
678                return Ok(DispatcherDrainReport {
679                    drained: true,
680                    in_flight: snapshot.in_flight,
681                    retry_queue_depth: snapshot.retry_queue_depth,
682                    dlq_depth: snapshot.dlq_depth,
683                });
684            }
685
686            let notified = self.state.idle_notify.notified();
687            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
688            if remaining.is_zero() {
689                return Ok(DispatcherDrainReport {
690                    drained: false,
691                    in_flight: snapshot.in_flight,
692                    retry_queue_depth: snapshot.retry_queue_depth,
693                    dlq_depth: snapshot.dlq_depth,
694                });
695            }
696            if tokio::time::timeout(remaining, notified).await.is_err() {
697                let snapshot = self.snapshot();
698                return Ok(DispatcherDrainReport {
699                    drained: false,
700                    in_flight: snapshot.in_flight,
701                    retry_queue_depth: snapshot.retry_queue_depth,
702                    dlq_depth: snapshot.dlq_depth,
703                });
704            }
705        }
706    }
707
708    pub async fn dispatch_inbox_envelope(
709        &self,
710        envelope: InboxEnvelope,
711    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
712        self.dispatch_inbox_envelope_with_headers(envelope, None)
713            .await
714    }
715
716    pub async fn dispatch_inbox_envelope_with_parent_headers(
717        &self,
718        envelope: InboxEnvelope,
719        parent_headers: &BTreeMap<String, String>,
720    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
721        self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
722            .await
723    }
724
725    async fn dispatch_inbox_envelope_with_headers(
726        &self,
727        envelope: InboxEnvelope,
728        parent_headers: Option<&BTreeMap<String, String>>,
729    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
730        if let Some(trigger_id) = envelope.trigger_id {
731            let binding = super::registry::resolve_live_trigger_binding(
732                &trigger_id,
733                envelope.binding_version,
734            )
735            .map_err(|error| DispatchError::Registry(error.to_string()))?;
736            return Ok(vec![
737                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
738                    .await?,
739            ]);
740        }
741
742        let cron_target = match &envelope.event.provider_payload {
743            crate::triggers::ProviderPayload::Known(
744                crate::triggers::event::KnownProviderPayload::Cron(payload),
745            ) => payload.cron_id.clone(),
746            _ => None,
747        };
748        if let Some(trigger_id) = cron_target {
749            let binding = super::registry::resolve_live_trigger_binding(
750                &trigger_id,
751                envelope.binding_version,
752            )
753            .map_err(|error| DispatchError::Registry(error.to_string()))?;
754            return Ok(vec![
755                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
756                    .await?,
757            ]);
758        }
759
760        self.dispatch_event_with_headers(envelope.event, parent_headers)
761            .await
762    }
763
764    pub async fn dispatch_event(
765        &self,
766        event: TriggerEvent,
767    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
768        self.dispatch_event_with_headers(event, None).await
769    }
770
771    async fn dispatch_event_with_headers(
772        &self,
773        event: TriggerEvent,
774        parent_headers: Option<&BTreeMap<String, String>>,
775    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
776        let bindings = matching_bindings(&event);
777        let mut outcomes = Vec::new();
778        for binding in bindings {
779            outcomes.push(
780                self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
781                    .await?,
782            );
783        }
784        Ok(outcomes)
785    }
786
787    pub async fn dispatch(
788        &self,
789        binding: &TriggerBinding,
790        event: TriggerEvent,
791    ) -> Result<DispatchOutcome, DispatchError> {
792        self.dispatch_with_replay(binding, event, None, None, None)
793            .await
794    }
795
796    pub async fn dispatch_replay(
797        &self,
798        binding: &TriggerBinding,
799        event: TriggerEvent,
800        replay_of_event_id: String,
801    ) -> Result<DispatchOutcome, DispatchError> {
802        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
803            .await
804    }
805
806    pub async fn dispatch_with_parent_span_id(
807        &self,
808        binding: &TriggerBinding,
809        event: TriggerEvent,
810        parent_span_id: Option<String>,
811    ) -> Result<DispatchOutcome, DispatchError> {
812        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
813            .await
814    }
815
816    async fn dispatch_with_replay(
817        &self,
818        binding: &TriggerBinding,
819        event: TriggerEvent,
820        replay_of_event_id: Option<String>,
821        parent_span_id: Option<String>,
822        parent_headers: Option<&BTreeMap<String, String>>,
823    ) -> Result<DispatchOutcome, DispatchError> {
824        let parent_headers_for_metrics = parent_headers.cloned();
825        let admitted_at_ms = current_unix_ms();
826        if let Some(metrics) = self.metrics.as_ref() {
827            let binding_key = binding.binding_key();
828            let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
829            metrics.record_trigger_queue_age_at_dispatch_admission(
830                binding.id.as_str(),
831                &binding_key,
832                event.provider.as_str(),
833                tenant_id(&event),
834                "admitted",
835                duration_between_ms(admitted_at_ms, queue_appended_at_ms),
836            );
837            metrics.clear_trigger_pending_event(
838                event.id.0.as_str(),
839                binding.id.as_str(),
840                &binding_key,
841                event.provider.as_str(),
842                tenant_id(&event),
843                admitted_at_ms,
844            );
845        }
846        let span = tracing::info_span!(
847            "dispatch",
848            trigger_id = %binding.id.as_str(),
849            binding_version = binding.version,
850            trace_id = %event.trace_id.0
851        );
852        #[cfg(feature = "otel")]
853        let span_for_otel = span.clone();
854        let _ = if let Some(headers) = parent_headers {
855            crate::observability::otel::set_span_parent_from_headers(
856                &span,
857                headers,
858                &event.trace_id,
859                parent_span_id.as_deref(),
860            )
861        } else {
862            crate::observability::otel::set_span_parent(
863                &span,
864                &event.trace_id,
865                parent_span_id.as_deref(),
866            )
867        };
868        #[cfg(feature = "otel")]
869        let started_at = Instant::now();
870        let metrics = self.metrics.clone();
871        let outcome = ACTIVE_DISPATCH_IS_REPLAY
872            .scope(
873                replay_of_event_id.is_some(),
874                self.dispatch_with_replay_inner(
875                    binding,
876                    event,
877                    replay_of_event_id,
878                    parent_headers_for_metrics,
879                )
880                .instrument(span),
881            )
882            .await;
883        if let Some(metrics) = metrics.as_ref() {
884            match &outcome {
885                Ok(dispatch_outcome) => match dispatch_outcome.status {
886                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
887                        metrics.record_dispatch_succeeded();
888                    }
889                    DispatchStatus::Waiting => {}
890                    _ => metrics.record_dispatch_failed(),
891                },
892                Err(_) => metrics.record_dispatch_failed(),
893            }
894            let outcome_label = match &outcome {
895                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
896                Err(DispatchError::Cancelled(_)) => "cancelled",
897                Err(_) => "failed",
898            };
899            metrics.record_trigger_dispatched(
900                binding.id.as_str(),
901                binding.handler.kind(),
902                outcome_label,
903            );
904            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
905        }
906        #[cfg(feature = "otel")]
907        {
908            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
909
910            let duration_ms = started_at.elapsed().as_millis() as i64;
911            let status = match &outcome {
912                Ok(dispatch_outcome) => match dispatch_outcome.status {
913                    DispatchStatus::Succeeded => "succeeded",
914                    DispatchStatus::Skipped => "skipped",
915                    DispatchStatus::Waiting => "waiting",
916                    DispatchStatus::Cancelled => "cancelled",
917                    DispatchStatus::Failed => "failed",
918                    DispatchStatus::Dlq => "dlq",
919                },
920                Err(DispatchError::Cancelled(_)) => "cancelled",
921                Err(_) => "failed",
922            };
923            span_for_otel.set_attribute("result.status", status);
924            span_for_otel.set_attribute("result.duration_ms", duration_ms);
925        }
926        outcome
927    }
928
929    async fn dispatch_with_replay_inner(
930        &self,
931        binding: &TriggerBinding,
932        event: TriggerEvent,
933        replay_of_event_id: Option<String>,
934        parent_headers: Option<BTreeMap<String, String>>,
935    ) -> Result<DispatchOutcome, DispatchError> {
936        let autonomy_tier = crate::resolve_agent_autonomy_tier(
937            &self.event_log,
938            binding.id.as_str(),
939            binding.autonomy_tier,
940        )
941        .await
942        .unwrap_or(binding.autonomy_tier);
943        let binding_key = binding.binding_key();
944        let route = DispatchUri::from(&binding.handler);
945        let trigger_id = binding.id.as_str().to_string();
946        let event_id = event.id.0.clone();
947        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
948        let begin = if replay_of_event_id.is_some() {
949            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
950        } else {
951            begin_in_flight(binding.id.as_str(), binding.version)
952        };
953        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
954
955        let mut attempts = Vec::new();
956        let mut source_node_id = format!("trigger:{}", event.id.0);
957        let mut initial_nodes = Vec::new();
958        let mut initial_edges = Vec::new();
959        if let Some(original_event_id) = replay_of_event_id.as_ref() {
960            let original_node_id = format!("trigger:{original_event_id}");
961            initial_nodes.push(RunActionGraphNodeRecord {
962                id: original_node_id.clone(),
963                label: format!(
964                    "{}:{} (original {})",
965                    event.provider.as_str(),
966                    event.kind,
967                    original_event_id
968                ),
969                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
970                status: "historical".to_string(),
971                outcome: "replayed_from".to_string(),
972                trace_id: Some(event.trace_id.0.clone()),
973                stage_id: None,
974                node_id: None,
975                worker_id: None,
976                run_id: None,
977                run_path: None,
978                metadata: trigger_node_metadata(&event),
979            });
980            initial_edges.push(RunActionGraphEdgeRecord {
981                from_id: original_node_id,
982                to_id: source_node_id.clone(),
983                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
984                label: Some("replay chain".to_string()),
985            });
986        }
987        initial_nodes.push(RunActionGraphNodeRecord {
988            id: source_node_id.clone(),
989            label: format!("{}:{}", event.provider.as_str(), event.kind),
990            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
991            status: "received".to_string(),
992            outcome: "received".to_string(),
993            trace_id: Some(event.trace_id.0.clone()),
994            stage_id: None,
995            node_id: None,
996            worker_id: None,
997            run_id: None,
998            run_path: None,
999            metadata: trigger_node_metadata(&event),
1000        });
1001        self.emit_action_graph(
1002            &event,
1003            initial_nodes,
1004            initial_edges,
1005            serde_json::json!({
1006                "source": "dispatcher",
1007                "trigger_id": trigger_id,
1008                "binding_key": binding_key,
1009                "event_id": event_id,
1010                "replay_of_event_id": replay_of_event_id,
1011            }),
1012        )
1013        .await?;
1014
1015        if dispatch_cancel_requested(
1016            &self.event_log,
1017            &binding_key,
1018            &event.id.0,
1019            replay_of_event_id.as_ref(),
1020        )
1021        .await?
1022        {
1023            finish_in_flight(
1024                binding.id.as_str(),
1025                binding.version,
1026                TriggerDispatchOutcome::Failed,
1027            )
1028            .await
1029            .map_err(|error| DispatchError::Registry(error.to_string()))?;
1030            decrement_in_flight(&self.state);
1031            return Ok(cancelled_dispatch_outcome(
1032                binding,
1033                &route,
1034                &event,
1035                replay_of_event_id,
1036                0,
1037                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1038            ));
1039        }
1040
1041        if let Some(predicate) = binding.when.as_ref() {
1042            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1043            let evaluation = self
1044                .evaluate_predicate(
1045                    binding,
1046                    predicate,
1047                    &event,
1048                    replay_of_event_id.as_ref(),
1049                    autonomy_tier,
1050                )
1051                .await?;
1052            let passed = evaluation.result;
1053            self.emit_action_graph(
1054                &event,
1055                vec![RunActionGraphNodeRecord {
1056                    id: predicate_node_id.clone(),
1057                    label: predicate.raw.clone(),
1058                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1059                    status: "completed".to_string(),
1060                    outcome: passed.to_string(),
1061                    trace_id: Some(event.trace_id.0.clone()),
1062                    stage_id: None,
1063                    node_id: None,
1064                    worker_id: None,
1065                    run_id: None,
1066                    run_path: None,
1067                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1068                }],
1069                vec![RunActionGraphEdgeRecord {
1070                    from_id: source_node_id.clone(),
1071                    to_id: predicate_node_id.clone(),
1072                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1073                    label: None,
1074                }],
1075                serde_json::json!({
1076                    "source": "dispatcher",
1077                    "trigger_id": binding.id.as_str(),
1078                    "binding_key": binding.binding_key(),
1079                    "event_id": event.id.0,
1080                    "predicate": predicate.raw,
1081                    "reason": evaluation.reason,
1082                    "cached": evaluation.cached,
1083                    "cost_usd": evaluation.cost_usd,
1084                    "tokens": evaluation.tokens,
1085                    "latency_ms": evaluation.latency_ms,
1086                    "replay_of_event_id": replay_of_event_id,
1087                }),
1088            )
1089            .await?;
1090
1091            if !passed {
1092                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1093                    let final_error = format!(
1094                        "trigger budget exhausted: {}",
1095                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1096                    );
1097                    self.move_budget_exhausted_to_dlq(
1098                        binding,
1099                        &route,
1100                        &event,
1101                        replay_of_event_id.as_ref(),
1102                        &final_error,
1103                    )
1104                    .await?;
1105                    finish_in_flight(
1106                        binding.id.as_str(),
1107                        binding.version,
1108                        TriggerDispatchOutcome::Dlq,
1109                    )
1110                    .await
1111                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1112                    decrement_in_flight(&self.state);
1113                    self.append_dispatch_trust_record(
1114                        binding,
1115                        &route,
1116                        &event,
1117                        replay_of_event_id.as_ref(),
1118                        autonomy_tier,
1119                        TrustOutcome::Failure,
1120                        "dlq",
1121                        0,
1122                        Some(final_error.clone()),
1123                    )
1124                    .await?;
1125                    return Ok(DispatchOutcome {
1126                        trigger_id: binding.id.as_str().to_string(),
1127                        binding_key: binding.binding_key(),
1128                        event_id: event.id.0,
1129                        attempt_count: 0,
1130                        status: DispatchStatus::Dlq,
1131                        handler_kind: route.kind().to_string(),
1132                        target_uri: route.target_uri(),
1133                        replay_of_event_id,
1134                        result: None,
1135                        error: Some(final_error),
1136                    });
1137                }
1138
1139                if evaluation.exhaustion_strategy
1140                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1141                {
1142                    self.append_budget_deferred_event(
1143                        binding,
1144                        &route,
1145                        &event,
1146                        replay_of_event_id.as_ref(),
1147                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1148                    )
1149                    .await?;
1150                    finish_in_flight(
1151                        binding.id.as_str(),
1152                        binding.version,
1153                        TriggerDispatchOutcome::Dispatched,
1154                    )
1155                    .await
1156                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1157                    decrement_in_flight(&self.state);
1158                    self.append_dispatch_trust_record(
1159                        binding,
1160                        &route,
1161                        &event,
1162                        replay_of_event_id.as_ref(),
1163                        autonomy_tier,
1164                        TrustOutcome::Denied,
1165                        "waiting",
1166                        0,
1167                        evaluation.reason.clone(),
1168                    )
1169                    .await?;
1170                    return Ok(DispatchOutcome {
1171                        trigger_id: binding.id.as_str().to_string(),
1172                        binding_key: binding.binding_key(),
1173                        event_id: event.id.0,
1174                        attempt_count: 0,
1175                        status: DispatchStatus::Waiting,
1176                        handler_kind: route.kind().to_string(),
1177                        target_uri: route.target_uri(),
1178                        replay_of_event_id,
1179                        result: Some(serde_json::json!({
1180                            "deferred": true,
1181                            "predicate": predicate.raw,
1182                            "reason": evaluation.reason,
1183                        })),
1184                        error: None,
1185                    });
1186                }
1187
1188                self.append_skipped_outbox_event(
1189                    binding,
1190                    &route,
1191                    &event,
1192                    replay_of_event_id.as_ref(),
1193                    DispatchSkipStage::Predicate,
1194                    serde_json::json!({
1195                        "predicate": predicate.raw,
1196                        "reason": evaluation.reason,
1197                    }),
1198                )
1199                .await?;
1200                finish_in_flight(
1201                    binding.id.as_str(),
1202                    binding.version,
1203                    TriggerDispatchOutcome::Dispatched,
1204                )
1205                .await
1206                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1207                decrement_in_flight(&self.state);
1208                self.append_dispatch_trust_record(
1209                    binding,
1210                    &route,
1211                    &event,
1212                    replay_of_event_id.as_ref(),
1213                    autonomy_tier,
1214                    TrustOutcome::Denied,
1215                    "skipped",
1216                    0,
1217                    None,
1218                )
1219                .await?;
1220                return Ok(DispatchOutcome {
1221                    trigger_id: binding.id.as_str().to_string(),
1222                    binding_key: binding.binding_key(),
1223                    event_id: event.id.0,
1224                    attempt_count: 0,
1225                    status: DispatchStatus::Skipped,
1226                    handler_kind: route.kind().to_string(),
1227                    target_uri: route.target_uri(),
1228                    replay_of_event_id,
1229                    result: Some(serde_json::json!({
1230                        "skipped": true,
1231                        "predicate": predicate.raw,
1232                        "reason": evaluation.reason,
1233                    })),
1234                    error: None,
1235                });
1236            }
1237
1238            source_node_id = predicate_node_id;
1239        }
1240
1241        if autonomy_tier == AutonomyTier::ActAuto {
1242            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1243                let request_id = self
1244                    .append_autonomy_budget_approval_request(
1245                        binding,
1246                        &route,
1247                        &event,
1248                        replay_of_event_id.as_ref(),
1249                        reason,
1250                    )
1251                    .await?;
1252                self.emit_autonomy_budget_approval_action_graph(
1253                    binding,
1254                    &route,
1255                    &event,
1256                    &source_node_id,
1257                    replay_of_event_id.as_ref(),
1258                    reason,
1259                    &request_id,
1260                )
1261                .await?;
1262                finish_in_flight(
1263                    binding.id.as_str(),
1264                    binding.version,
1265                    TriggerDispatchOutcome::Dispatched,
1266                )
1267                .await
1268                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1269                decrement_in_flight(&self.state);
1270                self.append_tier_transition_trust_record(
1271                    binding,
1272                    &event,
1273                    replay_of_event_id.as_ref(),
1274                    autonomy_tier,
1275                    AutonomyTier::ActWithApproval,
1276                    reason,
1277                    &request_id,
1278                )
1279                .await?;
1280                self.append_dispatch_trust_record(
1281                    binding,
1282                    &route,
1283                    &event,
1284                    replay_of_event_id.as_ref(),
1285                    autonomy_tier,
1286                    TrustOutcome::Denied,
1287                    "waiting",
1288                    0,
1289                    Some(reason.to_string()),
1290                )
1291                .await?;
1292                return Ok(DispatchOutcome {
1293                    trigger_id: binding.id.as_str().to_string(),
1294                    binding_key: binding.binding_key(),
1295                    event_id: event.id.0,
1296                    attempt_count: 0,
1297                    status: DispatchStatus::Waiting,
1298                    handler_kind: route.kind().to_string(),
1299                    target_uri: route.target_uri(),
1300                    replay_of_event_id,
1301                    result: Some(serde_json::json!({
1302                        "approval_required": true,
1303                        "request_id": request_id,
1304                        "reason": reason,
1305                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1306                    })),
1307                    error: None,
1308                });
1309            }
1310            note_autonomous_decision(binding);
1311        }
1312
1313        let (event, acquired_flow) = match self
1314            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1315            .await?
1316        {
1317            FlowControlOutcome::Dispatch { event, acquired } => {
1318                (*event, Arc::new(AsyncMutex::new(acquired)))
1319            }
1320            FlowControlOutcome::Skip { reason } => {
1321                self.append_skipped_outbox_event(
1322                    binding,
1323                    &route,
1324                    &event,
1325                    replay_of_event_id.as_ref(),
1326                    DispatchSkipStage::FlowControl,
1327                    serde_json::json!({
1328                        "flow_control": reason,
1329                    }),
1330                )
1331                .await?;
1332                finish_in_flight(
1333                    binding.id.as_str(),
1334                    binding.version,
1335                    TriggerDispatchOutcome::Dispatched,
1336                )
1337                .await
1338                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1339                decrement_in_flight(&self.state);
1340                return Ok(DispatchOutcome {
1341                    trigger_id: binding.id.as_str().to_string(),
1342                    binding_key: binding.binding_key(),
1343                    event_id: event.id.0,
1344                    attempt_count: 0,
1345                    status: DispatchStatus::Skipped,
1346                    handler_kind: route.kind().to_string(),
1347                    target_uri: route.target_uri(),
1348                    replay_of_event_id,
1349                    result: Some(serde_json::json!({
1350                        "skipped": true,
1351                        "flow_control": reason,
1352                    })),
1353                    error: None,
1354                });
1355            }
1356        };
1357
1358        let destination_key = destination_circuit_key(&route);
1359        let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1360            DestinationCircuitProbe::Allow { half_open } => {
1361                if half_open {
1362                    if let Some(metrics) = self.metrics.as_ref() {
1363                        metrics.record_backpressure_event("circuit", "half_open_probe");
1364                    }
1365                }
1366                half_open
1367            }
1368            DestinationCircuitProbe::Block { retry_after } => {
1369                if let Some(metrics) = self.metrics.as_ref() {
1370                    metrics.record_backpressure_event("circuit", "fail_fast");
1371                }
1372                let final_error = format!(
1373                    "destination circuit open for {}; retry after {}s",
1374                    destination_key,
1375                    retry_after.as_secs().max(1)
1376                );
1377                self.move_circuit_open_to_dlq(
1378                    binding,
1379                    &route,
1380                    &event,
1381                    replay_of_event_id.as_ref(),
1382                    &final_error,
1383                    &destination_key,
1384                )
1385                .await?;
1386                finish_in_flight(
1387                    binding.id.as_str(),
1388                    binding.version,
1389                    TriggerDispatchOutcome::Dlq,
1390                )
1391                .await
1392                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1393                self.release_flow_control(&acquired_flow).await?;
1394                decrement_in_flight(&self.state);
1395                self.append_dispatch_trust_record(
1396                    binding,
1397                    &route,
1398                    &event,
1399                    replay_of_event_id.as_ref(),
1400                    autonomy_tier,
1401                    TrustOutcome::Failure,
1402                    "dlq",
1403                    0,
1404                    Some(final_error.clone()),
1405                )
1406                .await?;
1407                return Ok(DispatchOutcome {
1408                    trigger_id: binding.id.as_str().to_string(),
1409                    binding_key: binding.binding_key(),
1410                    event_id: event.id.0,
1411                    attempt_count: 0,
1412                    status: DispatchStatus::Dlq,
1413                    handler_kind: route.kind().to_string(),
1414                    target_uri: route.target_uri(),
1415                    replay_of_event_id,
1416                    result: None,
1417                    error: Some(final_error),
1418                });
1419            }
1420        };
1421
1422        let mut previous_retry_node = None;
1423        let max_attempts = binding.retry.max_attempts();
1424        for attempt in 1..=max_attempts {
1425            if dispatch_cancel_requested(
1426                &self.event_log,
1427                &binding_key,
1428                &event.id.0,
1429                replay_of_event_id.as_ref(),
1430            )
1431            .await?
1432            {
1433                finish_in_flight(
1434                    binding.id.as_str(),
1435                    binding.version,
1436                    TriggerDispatchOutcome::Failed,
1437                )
1438                .await
1439                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1440                decrement_in_flight(&self.state);
1441                return Ok(cancelled_dispatch_outcome(
1442                    binding,
1443                    &route,
1444                    &event,
1445                    replay_of_event_id,
1446                    attempt.saturating_sub(1),
1447                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1448                ));
1449            }
1450            maybe_fail_before_outbox();
1451            let attempt_started_instant = Instant::now();
1452            let attempt_started_at_ms = current_unix_ms();
1453            let queue_age_at_start = duration_between_ms(
1454                attempt_started_at_ms,
1455                queue_appended_at_ms(parent_headers.as_ref(), &event),
1456            );
1457            if attempt == 1 {
1458                if let Some(metrics) = self.metrics.as_ref() {
1459                    metrics.record_trigger_queue_age_at_dispatch_start(
1460                        binding.id.as_str(),
1461                        &binding_key,
1462                        event.provider.as_str(),
1463                        tenant_id(&event),
1464                        "started",
1465                        queue_age_at_start,
1466                    );
1467                }
1468            }
1469            tracing::info!(
1470                component = "dispatcher",
1471                lifecycle = "dispatch_started",
1472                trigger_id = %binding.id.as_str(),
1473                binding_key = %binding_key,
1474                event_id = %event.id.0,
1475                attempt,
1476                queue_age_ms = queue_age_at_start.as_millis(),
1477                trace_id = %event.trace_id.0
1478            );
1479            let started_at = now_rfc3339();
1480            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1481            self.append_lifecycle_event(
1482                "DispatchStarted",
1483                &event,
1484                binding,
1485                serde_json::json!({
1486                    "event_id": event.id.0,
1487                    "attempt": attempt,
1488                    "handler_kind": route.kind(),
1489                    "target_uri": route.target_uri(),
1490                    "replay_of_event_id": replay_of_event_id,
1491                }),
1492                replay_of_event_id.as_ref(),
1493            )
1494            .await?;
1495            self.append_topic_event(
1496                TRIGGER_OUTBOX_TOPIC,
1497                "dispatch_started",
1498                &event,
1499                Some(binding),
1500                Some(attempt),
1501                serde_json::json!({
1502                    "event_id": event.id.0,
1503                    "attempt": attempt,
1504                    "trigger_id": binding.id.as_str(),
1505                    "binding_key": binding.binding_key(),
1506                    "handler_kind": route.kind(),
1507                    "target_uri": route.target_uri(),
1508                    "replay_of_event_id": replay_of_event_id,
1509                }),
1510                replay_of_event_id.as_ref(),
1511            )
1512            .await?;
1513
1514            let mut dispatch_edges = Vec::new();
1515            if attempt == 1 {
1516                dispatch_edges.push(RunActionGraphEdgeRecord {
1517                    from_id: source_node_id.clone(),
1518                    to_id: attempt_node_id.clone(),
1519                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1520                    label: binding.when.as_ref().map(|_| "true".to_string()),
1521                });
1522            } else if let Some(retry_node_id) = previous_retry_node.take() {
1523                dispatch_edges.push(RunActionGraphEdgeRecord {
1524                    from_id: retry_node_id,
1525                    to_id: attempt_node_id.clone(),
1526                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1527                    label: Some(format!("attempt {attempt}")),
1528                });
1529            }
1530
1531            self.emit_action_graph(
1532                &event,
1533                vec![RunActionGraphNodeRecord {
1534                    id: attempt_node_id.clone(),
1535                    label: dispatch_node_label(&route),
1536                    kind: dispatch_node_kind(&route).to_string(),
1537                    status: "running".to_string(),
1538                    outcome: format!("attempt_{attempt}"),
1539                    trace_id: Some(event.trace_id.0.clone()),
1540                    stage_id: None,
1541                    node_id: None,
1542                    worker_id: None,
1543                    run_id: None,
1544                    run_path: None,
1545                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1546                }],
1547                dispatch_edges,
1548                serde_json::json!({
1549                    "source": "dispatcher",
1550                    "trigger_id": binding.id.as_str(),
1551                    "binding_key": binding.binding_key(),
1552                    "event_id": event.id.0,
1553                    "attempt": attempt,
1554                    "handler_kind": route.kind(),
1555                    "target_uri": route.target_uri(),
1556                    "target_agent": dispatch_target_agent(&route),
1557                    "replay_of_event_id": replay_of_event_id,
1558                }),
1559            )
1560            .await?;
1561
1562            let result = self
1563                .dispatch_once(
1564                    binding,
1565                    &route,
1566                    &event,
1567                    autonomy_tier,
1568                    Some(DispatchWaitLease::new(
1569                        self.state.clone(),
1570                        acquired_flow.clone(),
1571                    )),
1572                    &mut self.cancel_tx.subscribe(),
1573                )
1574                .await;
1575            let attempt_runtime = attempt_started_instant.elapsed();
1576            let attempt_status = dispatch_result_status(&result);
1577            if let Some(metrics) = self.metrics.as_ref() {
1578                metrics.record_trigger_dispatch_runtime(
1579                    binding.id.as_str(),
1580                    &binding_key,
1581                    event.provider.as_str(),
1582                    tenant_id(&event),
1583                    attempt_status,
1584                    attempt_runtime,
1585                );
1586            }
1587            tracing::info!(
1588                component = "dispatcher",
1589                lifecycle = "handler_completed",
1590                trigger_id = %binding.id.as_str(),
1591                binding_key = %binding_key,
1592                event_id = %event.id.0,
1593                attempt,
1594                status = attempt_status,
1595                runtime_ms = attempt_runtime.as_millis(),
1596                trace_id = %event.trace_id.0
1597            );
1598            let completed_at = now_rfc3339();
1599
1600            match result {
1601                Ok(result) => {
1602                    let attempt_record = DispatchAttemptRecord {
1603                        trigger_id: binding.id.as_str().to_string(),
1604                        binding_key: binding.binding_key(),
1605                        event_id: event.id.0.clone(),
1606                        attempt,
1607                        handler_kind: route.kind().to_string(),
1608                        started_at,
1609                        completed_at,
1610                        outcome: "success".to_string(),
1611                        error_msg: None,
1612                    };
1613                    attempts.push(attempt_record.clone());
1614                    self.append_attempt_record(
1615                        &event,
1616                        binding,
1617                        &attempt_record,
1618                        replay_of_event_id.as_ref(),
1619                    )
1620                    .await?;
1621                    self.append_lifecycle_event(
1622                        "DispatchSucceeded",
1623                        &event,
1624                        binding,
1625                        serde_json::json!({
1626                            "event_id": event.id.0,
1627                            "attempt": attempt,
1628                            "handler_kind": route.kind(),
1629                            "target_uri": route.target_uri(),
1630                            "result": result,
1631                            "replay_of_event_id": replay_of_event_id,
1632                        }),
1633                        replay_of_event_id.as_ref(),
1634                    )
1635                    .await?;
1636                    self.append_topic_event(
1637                        TRIGGER_OUTBOX_TOPIC,
1638                        "dispatch_succeeded",
1639                        &event,
1640                        Some(binding),
1641                        Some(attempt),
1642                        serde_json::json!({
1643                            "event_id": event.id.0,
1644                            "attempt": attempt,
1645                            "trigger_id": binding.id.as_str(),
1646                            "binding_key": binding.binding_key(),
1647                            "handler_kind": route.kind(),
1648                            "target_uri": route.target_uri(),
1649                            "result": result,
1650                            "replay_of_event_id": replay_of_event_id,
1651                        }),
1652                        replay_of_event_id.as_ref(),
1653                    )
1654                    .await?;
1655                    self.emit_action_graph(
1656                        &event,
1657                        vec![RunActionGraphNodeRecord {
1658                            id: attempt_node_id.clone(),
1659                            label: dispatch_node_label(&route),
1660                            kind: dispatch_node_kind(&route).to_string(),
1661                            status: "completed".to_string(),
1662                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1663                            trace_id: Some(event.trace_id.0.clone()),
1664                            stage_id: None,
1665                            node_id: None,
1666                            worker_id: None,
1667                            run_id: None,
1668                            run_path: None,
1669                            metadata: dispatch_success_metadata(
1670                                &route, binding, &event, attempt, &result,
1671                            ),
1672                        }],
1673                        Vec::new(),
1674                        serde_json::json!({
1675                            "source": "dispatcher",
1676                            "trigger_id": binding.id.as_str(),
1677                            "binding_key": binding.binding_key(),
1678                            "event_id": event.id.0,
1679                            "attempt": attempt,
1680                            "handler_kind": route.kind(),
1681                            "target_uri": route.target_uri(),
1682                            "result": result,
1683                            "replay_of_event_id": replay_of_event_id,
1684                        }),
1685                    )
1686                    .await?;
1687                    self.state
1688                        .destination_circuits
1689                        .record_success(&destination_key);
1690                    if half_open_probe {
1691                        if let Some(metrics) = self.metrics.as_ref() {
1692                            metrics.record_backpressure_event("circuit", "closed");
1693                        }
1694                    }
1695                    finish_in_flight(
1696                        binding.id.as_str(),
1697                        binding.version,
1698                        TriggerDispatchOutcome::Dispatched,
1699                    )
1700                    .await
1701                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1702                    self.release_flow_control(&acquired_flow).await?;
1703                    decrement_in_flight(&self.state);
1704                    self.append_dispatch_trust_record(
1705                        binding,
1706                        &route,
1707                        &event,
1708                        replay_of_event_id.as_ref(),
1709                        autonomy_tier,
1710                        TrustOutcome::Success,
1711                        "succeeded",
1712                        attempt,
1713                        None,
1714                    )
1715                    .await?;
1716                    return Ok(DispatchOutcome {
1717                        trigger_id: binding.id.as_str().to_string(),
1718                        binding_key: binding.binding_key(),
1719                        event_id: event.id.0,
1720                        attempt_count: attempt,
1721                        status: DispatchStatus::Succeeded,
1722                        handler_kind: route.kind().to_string(),
1723                        target_uri: route.target_uri(),
1724                        replay_of_event_id,
1725                        result: Some(result),
1726                        error: None,
1727                    });
1728                }
1729                Err(error) => {
1730                    let attempt_record = DispatchAttemptRecord {
1731                        trigger_id: binding.id.as_str().to_string(),
1732                        binding_key: binding.binding_key(),
1733                        event_id: event.id.0.clone(),
1734                        attempt,
1735                        handler_kind: route.kind().to_string(),
1736                        started_at,
1737                        completed_at,
1738                        outcome: dispatch_error_label(&error).to_string(),
1739                        error_msg: Some(error.to_string()),
1740                    };
1741                    attempts.push(attempt_record.clone());
1742                    self.append_attempt_record(
1743                        &event,
1744                        binding,
1745                        &attempt_record,
1746                        replay_of_event_id.as_ref(),
1747                    )
1748                    .await?;
1749                    if let DispatchError::Waiting(message) = &error {
1750                        self.append_lifecycle_event(
1751                            "DispatchWaiting",
1752                            &event,
1753                            binding,
1754                            serde_json::json!({
1755                                "event_id": event.id.0,
1756                                "attempt": attempt,
1757                                "handler_kind": route.kind(),
1758                                "target_uri": route.target_uri(),
1759                                "message": message,
1760                                "replay_of_event_id": replay_of_event_id,
1761                            }),
1762                            replay_of_event_id.as_ref(),
1763                        )
1764                        .await?;
1765                        self.append_topic_event(
1766                            TRIGGER_OUTBOX_TOPIC,
1767                            "dispatch_waiting",
1768                            &event,
1769                            Some(binding),
1770                            Some(attempt),
1771                            serde_json::json!({
1772                                "event_id": event.id.0,
1773                                "attempt": attempt,
1774                                "trigger_id": binding.id.as_str(),
1775                                "binding_key": binding.binding_key(),
1776                                "handler_kind": route.kind(),
1777                                "target_uri": route.target_uri(),
1778                                "message": message,
1779                                "replay_of_event_id": replay_of_event_id,
1780                            }),
1781                            replay_of_event_id.as_ref(),
1782                        )
1783                        .await?;
1784                        finish_in_flight(
1785                            binding.id.as_str(),
1786                            binding.version,
1787                            TriggerDispatchOutcome::Dispatched,
1788                        )
1789                        .await
1790                        .map_err(|registry_error| {
1791                            DispatchError::Registry(registry_error.to_string())
1792                        })?;
1793                        self.release_flow_control(&acquired_flow).await?;
1794                        decrement_in_flight(&self.state);
1795                        return Ok(DispatchOutcome {
1796                            trigger_id: binding.id.as_str().to_string(),
1797                            binding_key: binding.binding_key(),
1798                            event_id: event.id.0,
1799                            attempt_count: attempt,
1800                            status: DispatchStatus::Waiting,
1801                            handler_kind: route.kind().to_string(),
1802                            target_uri: route.target_uri(),
1803                            replay_of_event_id,
1804                            result: Some(serde_json::json!({
1805                                "waiting": true,
1806                                "message": message,
1807                            })),
1808                            error: None,
1809                        });
1810                    }
1811
1812                    self.append_lifecycle_event(
1813                        "DispatchFailed",
1814                        &event,
1815                        binding,
1816                        serde_json::json!({
1817                            "event_id": event.id.0,
1818                            "attempt": attempt,
1819                            "handler_kind": route.kind(),
1820                            "target_uri": route.target_uri(),
1821                            "error": error.to_string(),
1822                            "replay_of_event_id": replay_of_event_id,
1823                        }),
1824                        replay_of_event_id.as_ref(),
1825                    )
1826                    .await?;
1827                    self.append_topic_event(
1828                        TRIGGER_OUTBOX_TOPIC,
1829                        "dispatch_failed",
1830                        &event,
1831                        Some(binding),
1832                        Some(attempt),
1833                        serde_json::json!({
1834                            "event_id": event.id.0,
1835                            "attempt": attempt,
1836                            "trigger_id": binding.id.as_str(),
1837                            "binding_key": binding.binding_key(),
1838                            "handler_kind": route.kind(),
1839                            "target_uri": route.target_uri(),
1840                            "error": error.to_string(),
1841                            "replay_of_event_id": replay_of_event_id,
1842                        }),
1843                        replay_of_event_id.as_ref(),
1844                    )
1845                    .await?;
1846                    self.emit_action_graph(
1847                        &event,
1848                        vec![RunActionGraphNodeRecord {
1849                            id: attempt_node_id.clone(),
1850                            label: dispatch_node_label(&route),
1851                            kind: dispatch_node_kind(&route).to_string(),
1852                            status: if matches!(error, DispatchError::Cancelled(_)) {
1853                                "cancelled".to_string()
1854                            } else {
1855                                "failed".to_string()
1856                            },
1857                            outcome: dispatch_error_label(&error).to_string(),
1858                            trace_id: Some(event.trace_id.0.clone()),
1859                            stage_id: None,
1860                            node_id: None,
1861                            worker_id: None,
1862                            run_id: None,
1863                            run_path: None,
1864                            metadata: dispatch_error_metadata(
1865                                &route, binding, &event, attempt, &error,
1866                            ),
1867                        }],
1868                        Vec::new(),
1869                        serde_json::json!({
1870                            "source": "dispatcher",
1871                            "trigger_id": binding.id.as_str(),
1872                            "binding_key": binding.binding_key(),
1873                            "event_id": event.id.0,
1874                            "attempt": attempt,
1875                            "handler_kind": route.kind(),
1876                            "target_uri": route.target_uri(),
1877                            "error": error.to_string(),
1878                            "replay_of_event_id": replay_of_event_id,
1879                        }),
1880                    )
1881                    .await?;
1882
1883                    let circuit_opened = if error.retryable() {
1884                        self.state
1885                            .destination_circuits
1886                            .record_failure(&destination_key)
1887                    } else {
1888                        false
1889                    };
1890                    if circuit_opened {
1891                        if let Some(metrics) = self.metrics.as_ref() {
1892                            metrics.record_backpressure_event("circuit", "opened");
1893                            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1894                            metrics.record_trigger_accepted_to_dlq(
1895                                binding.id.as_str(),
1896                                &binding_key,
1897                                event.provider.as_str(),
1898                                tenant_id(&event),
1899                                "circuit_open",
1900                                duration_between_ms(
1901                                    current_unix_ms(),
1902                                    accepted_at_ms(parent_headers.as_ref(), &event),
1903                                ),
1904                            );
1905                        }
1906                        let final_error = format!(
1907                            "destination circuit opened for {} after {} consecutive failures: {}",
1908                            destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
1909                        );
1910                        let dlq_entry = DlqEntry {
1911                            trigger_id: binding.id.as_str().to_string(),
1912                            binding_key: binding.binding_key(),
1913                            event: event.clone(),
1914                            attempt_count: attempt,
1915                            final_error: final_error.clone(),
1916                            error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
1917                                .to_string(),
1918                            attempts: attempts.clone(),
1919                        };
1920                        self.state
1921                            .dlq
1922                            .lock()
1923                            .expect("dispatcher dlq poisoned")
1924                            .push(dlq_entry.clone());
1925                        self.append_lifecycle_event(
1926                            "DlqMoved",
1927                            &event,
1928                            binding,
1929                            serde_json::json!({
1930                                "event_id": event.id.0,
1931                                "attempt_count": attempt,
1932                                "final_error": dlq_entry.final_error,
1933                                "reason": "circuit_open",
1934                                "destination": destination_key,
1935                                "replay_of_event_id": replay_of_event_id,
1936                            }),
1937                            replay_of_event_id.as_ref(),
1938                        )
1939                        .await?;
1940                        self.append_topic_event(
1941                            TRIGGER_DLQ_TOPIC,
1942                            "dlq_moved",
1943                            &event,
1944                            Some(binding),
1945                            Some(attempt),
1946                            serde_json::to_value(&dlq_entry).map_err(|serde_error| {
1947                                DispatchError::Serde(serde_error.to_string())
1948                            })?,
1949                            replay_of_event_id.as_ref(),
1950                        )
1951                        .await?;
1952                        finish_in_flight(
1953                            binding.id.as_str(),
1954                            binding.version,
1955                            TriggerDispatchOutcome::Dlq,
1956                        )
1957                        .await
1958                        .map_err(|registry_error| {
1959                            DispatchError::Registry(registry_error.to_string())
1960                        })?;
1961                        self.release_flow_control(&acquired_flow).await?;
1962                        decrement_in_flight(&self.state);
1963                        self.append_dispatch_trust_record(
1964                            binding,
1965                            &route,
1966                            &event,
1967                            replay_of_event_id.as_ref(),
1968                            autonomy_tier,
1969                            TrustOutcome::Failure,
1970                            "dlq",
1971                            attempt,
1972                            Some(final_error.clone()),
1973                        )
1974                        .await?;
1975                        return Ok(DispatchOutcome {
1976                            trigger_id: binding.id.as_str().to_string(),
1977                            binding_key: binding.binding_key(),
1978                            event_id: event.id.0,
1979                            attempt_count: attempt,
1980                            status: DispatchStatus::Dlq,
1981                            handler_kind: route.kind().to_string(),
1982                            target_uri: route.target_uri(),
1983                            replay_of_event_id,
1984                            result: None,
1985                            error: Some(final_error),
1986                        });
1987                    }
1988
1989                    if !error.retryable() {
1990                        finish_in_flight(
1991                            binding.id.as_str(),
1992                            binding.version,
1993                            TriggerDispatchOutcome::Failed,
1994                        )
1995                        .await
1996                        .map_err(|registry_error| {
1997                            DispatchError::Registry(registry_error.to_string())
1998                        })?;
1999                        self.release_flow_control(&acquired_flow).await?;
2000                        decrement_in_flight(&self.state);
2001                        let trust_outcome = match error {
2002                            DispatchError::Denied(_) => TrustOutcome::Denied,
2003                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
2004                            _ => TrustOutcome::Failure,
2005                        };
2006                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2007                            "cancelled"
2008                        } else {
2009                            "failed"
2010                        };
2011                        self.append_dispatch_trust_record(
2012                            binding,
2013                            &route,
2014                            &event,
2015                            replay_of_event_id.as_ref(),
2016                            autonomy_tier,
2017                            trust_outcome,
2018                            terminal_status,
2019                            attempt,
2020                            Some(error.to_string()),
2021                        )
2022                        .await?;
2023                        return Ok(DispatchOutcome {
2024                            trigger_id: binding.id.as_str().to_string(),
2025                            binding_key: binding.binding_key(),
2026                            event_id: event.id.0,
2027                            attempt_count: attempt,
2028                            status: if matches!(error, DispatchError::Cancelled(_)) {
2029                                DispatchStatus::Cancelled
2030                            } else {
2031                                DispatchStatus::Failed
2032                            },
2033                            handler_kind: route.kind().to_string(),
2034                            target_uri: route.target_uri(),
2035                            replay_of_event_id,
2036                            result: None,
2037                            error: Some(error.to_string()),
2038                        });
2039                    }
2040
2041                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2042                        if let Some(metrics) = self.metrics.as_ref() {
2043                            metrics.record_retry_scheduled();
2044                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2045                            metrics.record_trigger_retry_delay(
2046                                binding.id.as_str(),
2047                                &binding_key,
2048                                event.provider.as_str(),
2049                                tenant_id(&event),
2050                                "scheduled",
2051                                delay,
2052                            );
2053                        }
2054                        tracing::info!(
2055                            component = "dispatcher",
2056                            lifecycle = "retry_scheduled",
2057                            trigger_id = %binding.id.as_str(),
2058                            binding_key = %binding_key,
2059                            event_id = %event.id.0,
2060                            attempt = attempt + 1,
2061                            delay_ms = delay.as_millis(),
2062                            trace_id = %event.trace_id.0
2063                        );
2064                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2065                        previous_retry_node = Some(retry_node_id.clone());
2066                        self.emit_action_graph(
2067                            &event,
2068                            vec![RunActionGraphNodeRecord {
2069                                id: retry_node_id.clone(),
2070                                label: format!("retry in {}ms", delay.as_millis()),
2071                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2072                                status: "scheduled".to_string(),
2073                                outcome: format!("attempt_{}", attempt + 1),
2074                                trace_id: Some(event.trace_id.0.clone()),
2075                                stage_id: None,
2076                                node_id: None,
2077                                worker_id: None,
2078                                run_id: None,
2079                                run_path: None,
2080                                metadata: retry_node_metadata(
2081                                    binding,
2082                                    &event,
2083                                    attempt + 1,
2084                                    delay,
2085                                    &error,
2086                                ),
2087                            }],
2088                            vec![RunActionGraphEdgeRecord {
2089                                from_id: attempt_node_id,
2090                                to_id: retry_node_id.clone(),
2091                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2092                                label: Some(format!("attempt {}", attempt + 1)),
2093                            }],
2094                            serde_json::json!({
2095                                "source": "dispatcher",
2096                                "trigger_id": binding.id.as_str(),
2097                                "binding_key": binding.binding_key(),
2098                                "event_id": event.id.0,
2099                                "attempt": attempt + 1,
2100                                "delay_ms": delay.as_millis(),
2101                                "replay_of_event_id": replay_of_event_id,
2102                            }),
2103                        )
2104                        .await?;
2105                        self.append_lifecycle_event(
2106                            "RetryScheduled",
2107                            &event,
2108                            binding,
2109                            serde_json::json!({
2110                                "event_id": event.id.0,
2111                                "attempt": attempt + 1,
2112                                "delay_ms": delay.as_millis(),
2113                                "error": error.to_string(),
2114                                "replay_of_event_id": replay_of_event_id,
2115                            }),
2116                            replay_of_event_id.as_ref(),
2117                        )
2118                        .await?;
2119                        self.append_topic_event(
2120                            TRIGGER_ATTEMPTS_TOPIC,
2121                            "retry_scheduled",
2122                            &event,
2123                            Some(binding),
2124                            Some(attempt + 1),
2125                            serde_json::json!({
2126                                "event_id": event.id.0,
2127                                "attempt": attempt + 1,
2128                                "trigger_id": binding.id.as_str(),
2129                                "binding_key": binding.binding_key(),
2130                                "delay_ms": delay.as_millis(),
2131                                "error": error.to_string(),
2132                                "replay_of_event_id": replay_of_event_id,
2133                            }),
2134                            replay_of_event_id.as_ref(),
2135                        )
2136                        .await?;
2137                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2138                        let sleep_result = sleep_or_cancel_or_request(
2139                            &self.event_log,
2140                            delay,
2141                            &binding_key,
2142                            &event.id.0,
2143                            replay_of_event_id.as_ref(),
2144                            &mut self.cancel_tx.subscribe(),
2145                        )
2146                        .await;
2147                        decrement_retry_queue_depth(&self.state);
2148                        if sleep_result.is_err() {
2149                            finish_in_flight(
2150                                binding.id.as_str(),
2151                                binding.version,
2152                                TriggerDispatchOutcome::Failed,
2153                            )
2154                            .await
2155                            .map_err(|registry_error| {
2156                                DispatchError::Registry(registry_error.to_string())
2157                            })?;
2158                            self.release_flow_control(&acquired_flow).await?;
2159                            decrement_in_flight(&self.state);
2160                            self.append_dispatch_trust_record(
2161                                binding,
2162                                &route,
2163                                &event,
2164                                replay_of_event_id.as_ref(),
2165                                autonomy_tier,
2166                                TrustOutcome::Failure,
2167                                "cancelled",
2168                                attempt,
2169                                Some("dispatcher shutdown cancelled retry wait".to_string()),
2170                            )
2171                            .await?;
2172                            return Ok(DispatchOutcome {
2173                                trigger_id: binding.id.as_str().to_string(),
2174                                binding_key: binding.binding_key(),
2175                                event_id: event.id.0,
2176                                attempt_count: attempt,
2177                                status: DispatchStatus::Cancelled,
2178                                handler_kind: route.kind().to_string(),
2179                                target_uri: route.target_uri(),
2180                                replay_of_event_id,
2181                                result: None,
2182                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2183                            });
2184                        }
2185                        continue;
2186                    }
2187
2188                    let final_error = error.to_string();
2189                    let dlq_entry = DlqEntry {
2190                        trigger_id: binding.id.as_str().to_string(),
2191                        binding_key: binding.binding_key(),
2192                        event: event.clone(),
2193                        attempt_count: attempt,
2194                        final_error: final_error.clone(),
2195                        error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2196                            .to_string(),
2197                        attempts: attempts.clone(),
2198                    };
2199                    self.state
2200                        .dlq
2201                        .lock()
2202                        .expect("dispatcher dlq poisoned")
2203                        .push(dlq_entry.clone());
2204                    if let Some(metrics) = self.metrics.as_ref() {
2205                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2206                        metrics.record_trigger_accepted_to_dlq(
2207                            binding.id.as_str(),
2208                            &binding_key,
2209                            event.provider.as_str(),
2210                            tenant_id(&event),
2211                            "retry_exhausted",
2212                            duration_between_ms(
2213                                current_unix_ms(),
2214                                accepted_at_ms(parent_headers.as_ref(), &event),
2215                            ),
2216                        );
2217                    }
2218                    tracing::info!(
2219                        component = "dispatcher",
2220                        lifecycle = "dlq_moved",
2221                        trigger_id = %binding.id.as_str(),
2222                        binding_key = %binding_key,
2223                        event_id = %event.id.0,
2224                        attempt_count = attempt,
2225                        reason = "retry_exhausted",
2226                        trace_id = %event.trace_id.0
2227                    );
2228                    self.emit_action_graph(
2229                        &event,
2230                        vec![RunActionGraphNodeRecord {
2231                            id: format!("dlq:{binding_key}:{}", event.id.0),
2232                            label: binding.id.as_str().to_string(),
2233                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2234                            status: "queued".to_string(),
2235                            outcome: "retry_exhausted".to_string(),
2236                            trace_id: Some(event.trace_id.0.clone()),
2237                            stage_id: None,
2238                            node_id: None,
2239                            worker_id: None,
2240                            run_id: None,
2241                            run_path: None,
2242                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2243                        }],
2244                        vec![RunActionGraphEdgeRecord {
2245                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2246                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
2247                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2248                            label: Some(format!("{attempt} attempts")),
2249                        }],
2250                        serde_json::json!({
2251                            "source": "dispatcher",
2252                            "trigger_id": binding.id.as_str(),
2253                            "binding_key": binding.binding_key(),
2254                            "event_id": event.id.0,
2255                            "attempt_count": attempt,
2256                            "final_error": final_error,
2257                            "replay_of_event_id": replay_of_event_id,
2258                        }),
2259                    )
2260                    .await?;
2261                    self.append_lifecycle_event(
2262                        "DlqMoved",
2263                        &event,
2264                        binding,
2265                        serde_json::json!({
2266                            "event_id": event.id.0,
2267                            "attempt_count": attempt,
2268                            "final_error": dlq_entry.final_error,
2269                            "replay_of_event_id": replay_of_event_id,
2270                        }),
2271                        replay_of_event_id.as_ref(),
2272                    )
2273                    .await?;
2274                    self.append_topic_event(
2275                        TRIGGER_DLQ_TOPIC,
2276                        "dlq_moved",
2277                        &event,
2278                        Some(binding),
2279                        Some(attempt),
2280                        serde_json::to_value(&dlq_entry)
2281                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2282                        replay_of_event_id.as_ref(),
2283                    )
2284                    .await?;
2285                    finish_in_flight(
2286                        binding.id.as_str(),
2287                        binding.version,
2288                        TriggerDispatchOutcome::Dlq,
2289                    )
2290                    .await
2291                    .map_err(|registry_error| {
2292                        DispatchError::Registry(registry_error.to_string())
2293                    })?;
2294                    self.release_flow_control(&acquired_flow).await?;
2295                    decrement_in_flight(&self.state);
2296                    self.append_dispatch_trust_record(
2297                        binding,
2298                        &route,
2299                        &event,
2300                        replay_of_event_id.as_ref(),
2301                        autonomy_tier,
2302                        TrustOutcome::Failure,
2303                        "dlq",
2304                        attempt,
2305                        Some(error.to_string()),
2306                    )
2307                    .await?;
2308                    return Ok(DispatchOutcome {
2309                        trigger_id: binding.id.as_str().to_string(),
2310                        binding_key: binding.binding_key(),
2311                        event_id: event.id.0,
2312                        attempt_count: attempt,
2313                        status: DispatchStatus::Dlq,
2314                        handler_kind: route.kind().to_string(),
2315                        target_uri: route.target_uri(),
2316                        replay_of_event_id,
2317                        result: None,
2318                        error: Some(error.to_string()),
2319                    });
2320                }
2321            }
2322        }
2323
2324        finish_in_flight(
2325            binding.id.as_str(),
2326            binding.version,
2327            TriggerDispatchOutcome::Failed,
2328        )
2329        .await
2330        .map_err(|error| DispatchError::Registry(error.to_string()))?;
2331        self.release_flow_control(&acquired_flow).await?;
2332        decrement_in_flight(&self.state);
2333        self.append_dispatch_trust_record(
2334            binding,
2335            &route,
2336            &event,
2337            replay_of_event_id.as_ref(),
2338            autonomy_tier,
2339            TrustOutcome::Failure,
2340            "failed",
2341            max_attempts,
2342            Some("dispatch exhausted without terminal outcome".to_string()),
2343        )
2344        .await?;
2345        Ok(DispatchOutcome {
2346            trigger_id: binding.id.as_str().to_string(),
2347            binding_key: binding.binding_key(),
2348            event_id: event.id.0,
2349            attempt_count: max_attempts,
2350            status: DispatchStatus::Failed,
2351            handler_kind: route.kind().to_string(),
2352            target_uri: route.target_uri(),
2353            replay_of_event_id,
2354            result: None,
2355            error: Some("dispatch exhausted without terminal outcome".to_string()),
2356        })
2357    }
2358
2359    async fn dispatch_once(
2360        &self,
2361        binding: &TriggerBinding,
2362        route: &DispatchUri,
2363        event: &TriggerEvent,
2364        autonomy_tier: AutonomyTier,
2365        wait_lease: Option<DispatchWaitLease>,
2366        cancel_rx: &mut broadcast::Receiver<()>,
2367    ) -> Result<serde_json::Value, DispatchError> {
2368        match route {
2369            DispatchUri::Local { .. } => {
2370                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2371                    return Err(DispatchError::Local(format!(
2372                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2373                        binding.id.as_str()
2374                    )));
2375                };
2376                let value = self
2377                    .invoke_vm_callable(
2378                        closure,
2379                        &binding.binding_key(),
2380                        event,
2381                        None,
2382                        binding.id.as_str(),
2383                        &format!("{}.{}", event.provider.as_str(), event.kind),
2384                        autonomy_tier,
2385                        wait_lease,
2386                        cancel_rx,
2387                    )
2388                    .await?;
2389                Ok(vm_value_to_json(&value))
2390            }
2391            DispatchUri::A2a {
2392                target,
2393                allow_cleartext,
2394            } => {
2395                if self.state.shutting_down.load(Ordering::SeqCst) {
2396                    return Err(DispatchError::Cancelled(
2397                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2398                    ));
2399                }
2400                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2401                    target,
2402                    *allow_cleartext,
2403                    binding.id.as_str(),
2404                    &binding.binding_key(),
2405                    event,
2406                    cancel_rx,
2407                )
2408                .await
2409                .map_err(|error| match error {
2410                    crate::a2a::A2aClientError::Cancelled(message) => {
2411                        DispatchError::Cancelled(message)
2412                    }
2413                    other => DispatchError::A2a(other.to_string()),
2414                })?;
2415                match ack {
2416                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2417                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2418                }
2419            }
2420            DispatchUri::Worker { queue } => {
2421                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2422                    .enqueue(&crate::WorkerQueueJob {
2423                        queue: queue.clone(),
2424                        trigger_id: binding.id.as_str().to_string(),
2425                        binding_key: binding.binding_key(),
2426                        binding_version: binding.version,
2427                        event: event.clone(),
2428                        replay_of_event_id: current_dispatch_context()
2429                            .and_then(|context| context.replay_of_event_id),
2430                        priority: worker_queue_priority(binding, event),
2431                    })
2432                    .await
2433                    .map_err(DispatchError::from)?;
2434                Ok(serde_json::to_value(receipt)
2435                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2436            }
2437            DispatchUri::Persona { .. } => {
2438                let TriggerHandlerSpec::Persona {
2439                    binding: persona_binding,
2440                } = &binding.handler
2441                else {
2442                    return Err(DispatchError::Local(format!(
2443                        "trigger '{}' resolved to a persona dispatch URI but does not carry a persona binding",
2444                        binding.id.as_str()
2445                    )));
2446                };
2447                let receipt = crate::fire_persona_trigger(
2448                    &self.event_log,
2449                    persona_binding,
2450                    event.provider.as_str(),
2451                    &event.kind,
2452                    trigger_event_persona_metadata(event),
2453                    crate::PersonaRunCost::default(),
2454                    crate::persona_now_ms(),
2455                )
2456                .await
2457                .map_err(DispatchError::Local)?;
2458                Ok(serde_json::to_value(receipt)
2459                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2460            }
2461        }
2462    }
2463
2464    async fn apply_flow_control(
2465        &self,
2466        binding: &TriggerBinding,
2467        event: &TriggerEvent,
2468        replay_of_event_id: Option<&String>,
2469    ) -> Result<FlowControlOutcome, DispatchError> {
2470        let flow = &binding.flow_control;
2471        let mut managed_event = event.clone();
2472
2473        if let Some(batch) = &flow.batch {
2474            let gate = self
2475                .resolve_flow_gate(
2476                    &binding.binding_key(),
2477                    batch.key.as_ref(),
2478                    &managed_event,
2479                    replay_of_event_id,
2480                )
2481                .await?;
2482            match self
2483                .state
2484                .flow_control
2485                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2486                .await
2487                .map_err(DispatchError::from)?
2488            {
2489                BatchDecision::Dispatch(events) => {
2490                    managed_event = build_batched_event(events)?;
2491                }
2492                BatchDecision::Merged => {
2493                    return Ok(FlowControlOutcome::Skip {
2494                        reason: "batch_merged".to_string(),
2495                    })
2496                }
2497            }
2498        }
2499
2500        if let Some(debounce) = &flow.debounce {
2501            let gate = self
2502                .resolve_flow_gate(
2503                    &binding.binding_key(),
2504                    Some(&debounce.key),
2505                    &managed_event,
2506                    replay_of_event_id,
2507                )
2508                .await?;
2509            let latest = self
2510                .state
2511                .flow_control
2512                .debounce(&gate, debounce.period)
2513                .await
2514                .map_err(DispatchError::from)?;
2515            if !latest {
2516                return Ok(FlowControlOutcome::Skip {
2517                    reason: "debounced".to_string(),
2518                });
2519            }
2520        }
2521
2522        if let Some(rate_limit) = &flow.rate_limit {
2523            let gate = self
2524                .resolve_flow_gate(
2525                    &binding.binding_key(),
2526                    rate_limit.key.as_ref(),
2527                    &managed_event,
2528                    replay_of_event_id,
2529                )
2530                .await?;
2531            let allowed = self
2532                .state
2533                .flow_control
2534                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2535                .await
2536                .map_err(DispatchError::from)?;
2537            if !allowed {
2538                return Ok(FlowControlOutcome::Skip {
2539                    reason: "rate_limited".to_string(),
2540                });
2541            }
2542        }
2543
2544        if let Some(throttle) = &flow.throttle {
2545            let gate = self
2546                .resolve_flow_gate(
2547                    &binding.binding_key(),
2548                    throttle.key.as_ref(),
2549                    &managed_event,
2550                    replay_of_event_id,
2551                )
2552                .await?;
2553            self.state
2554                .flow_control
2555                .wait_for_throttle(&gate, throttle.period, throttle.max)
2556                .await
2557                .map_err(DispatchError::from)?;
2558        }
2559
2560        let mut acquired = AcquiredFlowControl::default();
2561        if let Some(singleton) = &flow.singleton {
2562            let gate = self
2563                .resolve_flow_gate(
2564                    &binding.binding_key(),
2565                    singleton.key.as_ref(),
2566                    &managed_event,
2567                    replay_of_event_id,
2568                )
2569                .await?;
2570            let acquired_singleton = self
2571                .state
2572                .flow_control
2573                .try_acquire_singleton(&gate)
2574                .await
2575                .map_err(DispatchError::from)?;
2576            if !acquired_singleton {
2577                return Ok(FlowControlOutcome::Skip {
2578                    reason: "singleton_active".to_string(),
2579                });
2580            }
2581            acquired.singleton = Some(SingletonLease { gate, held: true });
2582        }
2583
2584        if let Some(concurrency) = &flow.concurrency {
2585            let gate = self
2586                .resolve_flow_gate(
2587                    &binding.binding_key(),
2588                    concurrency.key.as_ref(),
2589                    &managed_event,
2590                    replay_of_event_id,
2591                )
2592                .await?;
2593            let priority_rank = self
2594                .resolve_priority_rank(
2595                    &binding.binding_key(),
2596                    flow.priority.as_ref(),
2597                    &managed_event,
2598                    replay_of_event_id,
2599                )
2600                .await?;
2601            let permit = self
2602                .state
2603                .flow_control
2604                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2605                .await
2606                .map_err(DispatchError::from)?;
2607            acquired.concurrency = Some(ConcurrencyLease {
2608                gate,
2609                max: concurrency.max,
2610                priority_rank,
2611                permit: Some(permit),
2612            });
2613        }
2614
2615        Ok(FlowControlOutcome::Dispatch {
2616            event: Box::new(managed_event),
2617            acquired,
2618        })
2619    }
2620
2621    async fn release_flow_control(
2622        &self,
2623        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2624    ) -> Result<(), DispatchError> {
2625        let (singleton_gate, concurrency_permit) = {
2626            let mut acquired = acquired.lock().await;
2627            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2628                if lease.held {
2629                    lease.held = false;
2630                    Some(lease.gate.clone())
2631                } else {
2632                    None
2633                }
2634            });
2635            let concurrency_permit = acquired
2636                .concurrency
2637                .as_mut()
2638                .and_then(|lease| lease.permit.take());
2639            (singleton_gate, concurrency_permit)
2640        };
2641        if let Some(gate) = singleton_gate {
2642            self.state
2643                .flow_control
2644                .release_singleton(&gate)
2645                .await
2646                .map_err(DispatchError::from)?;
2647        }
2648        if let Some(permit) = concurrency_permit {
2649            self.state
2650                .flow_control
2651                .release_concurrency(permit)
2652                .await
2653                .map_err(DispatchError::from)?;
2654        }
2655        Ok(())
2656    }
2657
2658    async fn resolve_flow_gate(
2659        &self,
2660        binding_key: &str,
2661        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2662        event: &TriggerEvent,
2663        replay_of_event_id: Option<&String>,
2664    ) -> Result<String, DispatchError> {
2665        let key = match expr {
2666            Some(expr) => {
2667                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2668                    .await?
2669            }
2670            None => "_global".to_string(),
2671        };
2672        Ok(format!("{binding_key}:{key}"))
2673    }
2674
2675    async fn resolve_priority_rank(
2676        &self,
2677        binding_key: &str,
2678        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2679        event: &TriggerEvent,
2680        replay_of_event_id: Option<&String>,
2681    ) -> Result<usize, DispatchError> {
2682        let Some(priority) = priority else {
2683            return Ok(0);
2684        };
2685        let value = self
2686            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2687            .await?;
2688        Ok(priority
2689            .order
2690            .iter()
2691            .position(|candidate| candidate == &value)
2692            .unwrap_or(priority.order.len()))
2693    }
2694
2695    async fn evaluate_flow_expression(
2696        &self,
2697        binding_key: &str,
2698        expr: &crate::triggers::TriggerExpressionSpec,
2699        event: &TriggerEvent,
2700        replay_of_event_id: Option<&String>,
2701    ) -> Result<String, DispatchError> {
2702        let value = self
2703            .invoke_vm_callable(
2704                &expr.closure,
2705                binding_key,
2706                event,
2707                replay_of_event_id,
2708                "",
2709                "flow_control",
2710                AutonomyTier::Suggest,
2711                None,
2712                &mut self.cancel_tx.subscribe(),
2713            )
2714            .await?;
2715        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2716    }
2717
2718    #[allow(clippy::too_many_arguments)]
2719    async fn invoke_vm_callable(
2720        &self,
2721        closure: &crate::value::VmClosure,
2722        binding_key: &str,
2723        event: &TriggerEvent,
2724        replay_of_event_id: Option<&String>,
2725        agent_id: &str,
2726        action: &str,
2727        autonomy_tier: AutonomyTier,
2728        wait_lease: Option<DispatchWaitLease>,
2729        cancel_rx: &mut broadcast::Receiver<()>,
2730    ) -> Result<VmValue, DispatchError> {
2731        let mut vm = self.base_vm.child_vm();
2732        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2733        if self.state.shutting_down.load(Ordering::SeqCst) {
2734            cancel_token.store(true, Ordering::SeqCst);
2735        }
2736        self.state
2737            .cancel_tokens
2738            .lock()
2739            .expect("dispatcher cancel tokens poisoned")
2740            .push(cancel_token.clone());
2741        vm.install_cancel_token(cancel_token.clone());
2742        let arg = event_to_handler_value(event)?;
2743        let args = [arg];
2744        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2745        let effective_policy = match crate::orchestration::current_execution_policy() {
2746            Some(parent) => parent
2747                .intersect(&tier_policy)
2748                .map_err(|error| DispatchError::Local(error.to_string()))?,
2749            None => tier_policy,
2750        };
2751        crate::orchestration::push_execution_policy(effective_policy);
2752        let _policy_guard = DispatchExecutionPolicyGuard;
2753        let future = vm.call_closure_pub(closure, &args);
2754        pin_mut!(future);
2755        let (binding_id, binding_version) = split_binding_key(binding_key);
2756        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2757            slot.borrow_mut().replace(DispatchContext {
2758                trigger_event: event.clone(),
2759                replay_of_event_id: replay_of_event_id.cloned(),
2760                binding_id,
2761                binding_version,
2762                agent_id: agent_id.to_string(),
2763                action: action.to_string(),
2764                autonomy_tier,
2765            })
2766        });
2767        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2768            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2769        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2770        crate::stdlib::hitl::reset_hitl_state();
2771        let mut poll = tokio::time::interval(Duration::from_millis(100));
2772        let result = loop {
2773            tokio::select! {
2774                result = &mut future => break result,
2775                _ = recv_cancel(cancel_rx) => {
2776                    cancel_token.store(true, Ordering::SeqCst);
2777                }
2778                _ = poll.tick() => {
2779                    if dispatch_cancel_requested(
2780                        &self.event_log,
2781                        binding_key,
2782                        &event.id.0,
2783                        replay_of_event_id,
2784                    )
2785                    .await? {
2786                        cancel_token.store(true, Ordering::SeqCst);
2787                    }
2788                }
2789            }
2790        };
2791        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2792            *slot.borrow_mut() = prior_context;
2793        });
2794        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2795            *slot.borrow_mut() = prior_wait_lease;
2796        });
2797        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2798        {
2799            let mut tokens = self
2800                .state
2801                .cancel_tokens
2802                .lock()
2803                .expect("dispatcher cancel tokens poisoned");
2804            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2805        }
2806
2807        if cancel_token.load(Ordering::SeqCst) {
2808            if dispatch_cancel_requested(
2809                &self.event_log,
2810                binding_key,
2811                &event.id.0,
2812                replay_of_event_id,
2813            )
2814            .await?
2815            {
2816                Err(DispatchError::Cancelled(
2817                    "trigger cancel request cancelled local handler".to_string(),
2818                ))
2819            } else {
2820                Err(DispatchError::Cancelled(
2821                    "dispatcher shutdown cancelled local handler".to_string(),
2822                ))
2823            }
2824        } else {
2825            result.map_err(dispatch_error_from_vm_error)
2826        }
2827    }
2828
2829    #[allow(clippy::too_many_arguments)]
2830    async fn invoke_vm_callable_with_timeout(
2831        &self,
2832        closure: &crate::value::VmClosure,
2833        binding_key: &str,
2834        event: &TriggerEvent,
2835        replay_of_event_id: Option<&String>,
2836        agent_id: &str,
2837        action: &str,
2838        autonomy_tier: AutonomyTier,
2839        cancel_rx: &mut broadcast::Receiver<()>,
2840        timeout: Option<Duration>,
2841    ) -> Result<VmValue, DispatchError> {
2842        let future = self.invoke_vm_callable(
2843            closure,
2844            binding_key,
2845            event,
2846            replay_of_event_id,
2847            agent_id,
2848            action,
2849            autonomy_tier,
2850            None,
2851            cancel_rx,
2852        );
2853        pin_mut!(future);
2854        if let Some(timeout) = timeout {
2855            match tokio::time::timeout(timeout, future).await {
2856                Ok(result) => result,
2857                Err(_) => Err(DispatchError::Local(
2858                    "predicate evaluation timed out".to_string(),
2859                )),
2860            }
2861        } else {
2862            future.await
2863        }
2864    }
2865
2866    #[allow(clippy::too_many_arguments)]
2867    async fn append_dispatch_trust_record(
2868        &self,
2869        binding: &TriggerBinding,
2870        route: &DispatchUri,
2871        event: &TriggerEvent,
2872        replay_of_event_id: Option<&String>,
2873        autonomy_tier: AutonomyTier,
2874        outcome: TrustOutcome,
2875        terminal_status: &str,
2876        attempt_count: u32,
2877        error: Option<String>,
2878    ) -> Result<(), DispatchError> {
2879        let mut record = TrustRecord::new(
2880            binding.id.as_str().to_string(),
2881            format!("{}.{}", event.provider.as_str(), event.kind),
2882            None,
2883            outcome,
2884            event.trace_id.0.clone(),
2885            autonomy_tier,
2886        );
2887        record.metadata.insert(
2888            "binding_key".to_string(),
2889            serde_json::json!(binding.binding_key()),
2890        );
2891        record.metadata.insert(
2892            "binding_version".to_string(),
2893            serde_json::json!(binding.version),
2894        );
2895        record.metadata.insert(
2896            "provider".to_string(),
2897            serde_json::json!(event.provider.as_str()),
2898        );
2899        record
2900            .metadata
2901            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2902        record
2903            .metadata
2904            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2905        record.metadata.insert(
2906            "target_uri".to_string(),
2907            serde_json::json!(route.target_uri()),
2908        );
2909        record.metadata.insert(
2910            "terminal_status".to_string(),
2911            serde_json::json!(terminal_status),
2912        );
2913        record.metadata.insert(
2914            "attempt_count".to_string(),
2915            serde_json::json!(attempt_count),
2916        );
2917        if let Some(replay_of_event_id) = replay_of_event_id {
2918            record.metadata.insert(
2919                "replay_of_event_id".to_string(),
2920                serde_json::json!(replay_of_event_id),
2921            );
2922        }
2923        if let Some(error) = error {
2924            record
2925                .metadata
2926                .insert("error".to_string(), serde_json::json!(error));
2927        }
2928        append_trust_record(&self.event_log, &record)
2929            .await
2930            .map(|_| ())
2931            .map_err(DispatchError::from)
2932    }
2933
2934    async fn append_attempt_record(
2935        &self,
2936        event: &TriggerEvent,
2937        binding: &TriggerBinding,
2938        attempt: &DispatchAttemptRecord,
2939        replay_of_event_id: Option<&String>,
2940    ) -> Result<(), DispatchError> {
2941        self.append_topic_event(
2942            TRIGGER_ATTEMPTS_TOPIC,
2943            "attempt_recorded",
2944            event,
2945            Some(binding),
2946            Some(attempt.attempt),
2947            serde_json::to_value(attempt)
2948                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2949            replay_of_event_id,
2950        )
2951        .await
2952    }
2953
2954    async fn append_lifecycle_event(
2955        &self,
2956        kind: &str,
2957        event: &TriggerEvent,
2958        binding: &TriggerBinding,
2959        payload: serde_json::Value,
2960        replay_of_event_id: Option<&String>,
2961    ) -> Result<(), DispatchError> {
2962        self.append_topic_event(
2963            TRIGGERS_LIFECYCLE_TOPIC,
2964            kind,
2965            event,
2966            Some(binding),
2967            None,
2968            payload,
2969            replay_of_event_id,
2970        )
2971        .await
2972    }
2973
2974    async fn append_autonomy_budget_approval_request(
2975        &self,
2976        binding: &TriggerBinding,
2977        route: &DispatchUri,
2978        event: &TriggerEvent,
2979        replay_of_event_id: Option<&String>,
2980        reason: &str,
2981    ) -> Result<String, DispatchError> {
2982        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2983        let detail = serde_json::json!({
2984            "trigger_id": binding.id.as_str(),
2985            "binding_key": binding.binding_key(),
2986            "event_id": event.id.0,
2987            "reason": reason,
2988            "from_tier": AutonomyTier::ActAuto.as_str(),
2989            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
2990            "handler_kind": route.kind(),
2991            "target_uri": route.target_uri(),
2992            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
2993            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
2994            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
2995            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
2996            "replay_of_event_id": replay_of_event_id,
2997        });
2998        let request_id = crate::stdlib::hitl::append_approval_request_on(
2999            &self.event_log,
3000            binding.id.as_str().to_string(),
3001            event.trace_id.0.clone(),
3002            format!(
3003                "approve autonomous dispatch for trigger '{}' after {}",
3004                binding.id.as_str(),
3005                reason
3006            ),
3007            detail.clone(),
3008            reviewers.clone(),
3009        )
3010        .await
3011        .map_err(dispatch_error_from_vm_error)?;
3012        self.append_lifecycle_event(
3013            "autonomy.budget_exceeded",
3014            event,
3015            binding,
3016            serde_json::json!({
3017                "trigger_id": binding.id.as_str(),
3018                "event_id": event.id.0,
3019                "reason": reason,
3020                "request_id": request_id,
3021                "reviewers": reviewers,
3022                "from_tier": AutonomyTier::ActAuto.as_str(),
3023                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3024                "replay_of_event_id": replay_of_event_id,
3025            }),
3026            replay_of_event_id,
3027        )
3028        .await?;
3029        Ok(request_id)
3030    }
3031
3032    #[allow(clippy::too_many_arguments)]
3033    async fn emit_autonomy_budget_approval_action_graph(
3034        &self,
3035        binding: &TriggerBinding,
3036        route: &DispatchUri,
3037        event: &TriggerEvent,
3038        source_node_id: &str,
3039        replay_of_event_id: Option<&String>,
3040        reason: &str,
3041        request_id: &str,
3042    ) -> Result<(), DispatchError> {
3043        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3044        self.emit_action_graph(
3045            event,
3046            vec![RunActionGraphNodeRecord {
3047                id: approval_node_id.clone(),
3048                label: format!("approval required: {reason}"),
3049                kind: "approval".to_string(),
3050                status: "waiting".to_string(),
3051                outcome: "request_approval".to_string(),
3052                trace_id: Some(event.trace_id.0.clone()),
3053                stage_id: None,
3054                node_id: None,
3055                worker_id: None,
3056                run_id: None,
3057                run_path: None,
3058                metadata: BTreeMap::from([
3059                    (
3060                        "trigger_id".to_string(),
3061                        serde_json::json!(binding.id.as_str()),
3062                    ),
3063                    (
3064                        "binding_key".to_string(),
3065                        serde_json::json!(binding.binding_key()),
3066                    ),
3067                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3068                    ("reason".to_string(), serde_json::json!(reason)),
3069                    ("request_id".to_string(), serde_json::json!(request_id)),
3070                    (
3071                        "reviewers".to_string(),
3072                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3073                    ),
3074                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3075                    (
3076                        "target_uri".to_string(),
3077                        serde_json::json!(route.target_uri()),
3078                    ),
3079                    (
3080                        "from_tier".to_string(),
3081                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3082                    ),
3083                    (
3084                        "requested_tier".to_string(),
3085                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3086                    ),
3087                    (
3088                        "replay_of_event_id".to_string(),
3089                        serde_json::json!(replay_of_event_id),
3090                    ),
3091                ]),
3092            }],
3093            vec![RunActionGraphEdgeRecord {
3094                from_id: source_node_id.to_string(),
3095                to_id: approval_node_id,
3096                kind: "approval_gate".to_string(),
3097                label: Some("autonomy budget".to_string()),
3098            }],
3099            serde_json::json!({
3100                "source": "dispatcher",
3101                "trigger_id": binding.id.as_str(),
3102                "binding_key": binding.binding_key(),
3103                "event_id": event.id.0,
3104                "reason": reason,
3105                "request_id": request_id,
3106                "replay_of_event_id": replay_of_event_id,
3107            }),
3108        )
3109        .await
3110    }
3111
3112    #[allow(clippy::too_many_arguments)]
3113    async fn append_tier_transition_trust_record(
3114        &self,
3115        binding: &TriggerBinding,
3116        event: &TriggerEvent,
3117        replay_of_event_id: Option<&String>,
3118        from_tier: AutonomyTier,
3119        to_tier: AutonomyTier,
3120        reason: &str,
3121        request_id: &str,
3122    ) -> Result<(), DispatchError> {
3123        let mut record = TrustRecord::new(
3124            binding.id.as_str().to_string(),
3125            "autonomy.tier_transition",
3126            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3127            TrustOutcome::Denied,
3128            event.trace_id.0.clone(),
3129            to_tier,
3130        );
3131        record.metadata.insert(
3132            "binding_key".to_string(),
3133            serde_json::json!(binding.binding_key()),
3134        );
3135        record
3136            .metadata
3137            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3138        record.metadata.insert(
3139            "from_tier".to_string(),
3140            serde_json::json!(from_tier.as_str()),
3141        );
3142        record
3143            .metadata
3144            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3145        record
3146            .metadata
3147            .insert("reason".to_string(), serde_json::json!(reason));
3148        record
3149            .metadata
3150            .insert("request_id".to_string(), serde_json::json!(request_id));
3151        if let Some(replay_of_event_id) = replay_of_event_id {
3152            record.metadata.insert(
3153                "replay_of_event_id".to_string(),
3154                serde_json::json!(replay_of_event_id),
3155            );
3156        }
3157        append_trust_record(&self.event_log, &record)
3158            .await
3159            .map(|_| ())
3160            .map_err(DispatchError::from)
3161    }
3162
3163    async fn append_budget_deferred_event(
3164        &self,
3165        binding: &TriggerBinding,
3166        route: &DispatchUri,
3167        event: &TriggerEvent,
3168        replay_of_event_id: Option<&String>,
3169        reason: &str,
3170    ) -> Result<(), DispatchError> {
3171        self.append_topic_event(
3172            TRIGGER_ATTEMPTS_TOPIC,
3173            "budget_deferred",
3174            event,
3175            Some(binding),
3176            None,
3177            serde_json::json!({
3178                "event_id": event.id.0,
3179                "trigger_id": binding.id.as_str(),
3180                "binding_key": binding.binding_key(),
3181                "handler_kind": route.kind(),
3182                "target_uri": route.target_uri(),
3183                "reason": reason,
3184                "retry_at": next_budget_reset_rfc3339(binding),
3185                "replay_of_event_id": replay_of_event_id,
3186            }),
3187            replay_of_event_id,
3188        )
3189        .await?;
3190        self.append_skipped_outbox_event(
3191            binding,
3192            route,
3193            event,
3194            replay_of_event_id,
3195            DispatchSkipStage::Predicate,
3196            serde_json::json!({
3197                "deferred": true,
3198                "reason": reason,
3199                "retry_at": next_budget_reset_rfc3339(binding),
3200            }),
3201        )
3202        .await
3203    }
3204
3205    async fn append_skipped_outbox_event(
3206        &self,
3207        binding: &TriggerBinding,
3208        route: &DispatchUri,
3209        event: &TriggerEvent,
3210        replay_of_event_id: Option<&String>,
3211        stage: DispatchSkipStage,
3212        detail: serde_json::Value,
3213    ) -> Result<(), DispatchError> {
3214        self.append_topic_event(
3215            TRIGGER_OUTBOX_TOPIC,
3216            "dispatch_skipped",
3217            event,
3218            Some(binding),
3219            None,
3220            serde_json::json!({
3221                "event_id": event.id.0,
3222                "trigger_id": binding.id.as_str(),
3223                "binding_key": binding.binding_key(),
3224                "handler_kind": route.kind(),
3225                "target_uri": route.target_uri(),
3226                "skip_stage": stage.as_str(),
3227                "detail": detail,
3228                "replay_of_event_id": replay_of_event_id,
3229            }),
3230            replay_of_event_id,
3231        )
3232        .await
3233    }
3234
3235    async fn append_topic_event(
3236        &self,
3237        topic_name: &str,
3238        kind: &str,
3239        event: &TriggerEvent,
3240        binding: Option<&TriggerBinding>,
3241        attempt: Option<u32>,
3242        payload: serde_json::Value,
3243        replay_of_event_id: Option<&String>,
3244    ) -> Result<(), DispatchError> {
3245        let topic = topic_for_event(event, topic_name)?;
3246        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3247        self.event_log
3248            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3249            .await
3250            .map_err(DispatchError::from)
3251            .map(|_| ())
3252    }
3253
3254    async fn emit_action_graph(
3255        &self,
3256        event: &TriggerEvent,
3257        nodes: Vec<RunActionGraphNodeRecord>,
3258        edges: Vec<RunActionGraphEdgeRecord>,
3259        extra: serde_json::Value,
3260    ) -> Result<(), DispatchError> {
3261        let mut headers = BTreeMap::new();
3262        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3263        headers.insert("event_id".to_string(), event.id.0.clone());
3264        let observability = RunObservabilityRecord {
3265            schema_version: 1,
3266            action_graph_nodes: nodes,
3267            action_graph_edges: edges,
3268            ..Default::default()
3269        };
3270        append_action_graph_update(
3271            headers,
3272            serde_json::json!({
3273                "source": "dispatcher",
3274                "trace_id": event.trace_id.0,
3275                "event_id": event.id.0,
3276                "observability": observability,
3277                "context": extra,
3278            }),
3279        )
3280        .await
3281        .map_err(DispatchError::from)
3282    }
3283}
3284
3285async fn dispatch_cancel_requested(
3286    event_log: &Arc<AnyEventLog>,
3287    binding_key: &str,
3288    event_id: &str,
3289    replay_of_event_id: Option<&String>,
3290) -> Result<bool, DispatchError> {
3291    if replay_of_event_id.is_some() {
3292        return Ok(false);
3293    }
3294    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3295        .expect("static trigger cancel topic should always be valid");
3296    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3297    let requested = events
3298        .into_iter()
3299        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3300        .filter_map(|(_, event)| {
3301            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3302        })
3303        .collect::<BTreeSet<_>>();
3304    Ok(requested
3305        .iter()
3306        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3307}
3308
3309async fn sleep_or_cancel_or_request(
3310    event_log: &Arc<AnyEventLog>,
3311    delay: Duration,
3312    binding_key: &str,
3313    event_id: &str,
3314    replay_of_event_id: Option<&String>,
3315    cancel_rx: &mut broadcast::Receiver<()>,
3316) -> Result<(), DispatchError> {
3317    let sleep = tokio::time::sleep(delay);
3318    pin_mut!(sleep);
3319    let mut poll = tokio::time::interval(Duration::from_millis(100));
3320    loop {
3321        tokio::select! {
3322            _ = &mut sleep => return Ok(()),
3323            _ = recv_cancel(cancel_rx) => {
3324                return Err(DispatchError::Cancelled(
3325                    "dispatcher shutdown cancelled retry wait".to_string(),
3326                ));
3327            }
3328            _ = poll.tick() => {
3329                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3330                    return Err(DispatchError::Cancelled(
3331                        "trigger cancel request cancelled retry wait".to_string(),
3332                    ));
3333                }
3334            }
3335        }
3336    }
3337}
3338
3339fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3340    let mut iter = events.into_iter();
3341    let Some(mut root) = iter.next() else {
3342        return Err(DispatchError::Registry(
3343            "batch dispatch produced an empty event list".to_string(),
3344        ));
3345    };
3346    let mut batch = Vec::new();
3347    batch.push(
3348        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3349    );
3350    for event in iter {
3351        batch.push(
3352            serde_json::to_value(&event)
3353                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3354        );
3355    }
3356    root.batch = Some(batch);
3357    Ok(root)
3358}
3359
3360fn json_value_to_gate(value: &serde_json::Value) -> String {
3361    match value {
3362        serde_json::Value::Null => "null".to_string(),
3363        serde_json::Value::String(text) => text.clone(),
3364        serde_json::Value::Bool(value) => value.to_string(),
3365        serde_json::Value::Number(value) => value.to_string(),
3366        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3367    }
3368}
3369
3370fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3371    let json =
3372        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3373    let value = json_to_vm_value(&json);
3374    match (&event.raw_body, value) {
3375        (Some(raw_body), VmValue::Dict(dict)) => {
3376            let mut map = (*dict).clone();
3377            map.insert(
3378                "raw_body".to_string(),
3379                VmValue::Bytes(Rc::new(raw_body.clone())),
3380            );
3381            Ok(VmValue::Dict(Rc::new(map)))
3382        }
3383        (_, other) => Ok(other),
3384    }
3385}
3386
3387fn decrement_in_flight(state: &DispatcherRuntimeState) {
3388    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3389    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3390        state.idle_notify.notify_waiters();
3391    }
3392}
3393
3394fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3395    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3396    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3397        state.idle_notify.notify_waiters();
3398    }
3399}
3400
3401#[cfg(test)]
3402fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3403    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3404        *slot.borrow_mut() = Some(tx);
3405    });
3406}
3407
3408#[cfg(not(test))]
3409fn notify_test_inbox_dequeued() {}
3410
3411#[cfg(test)]
3412fn notify_test_inbox_dequeued() {
3413    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3414        if let Some(tx) = slot.borrow_mut().take() {
3415            let _ = tx.send(());
3416        }
3417    });
3418}
3419
3420pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3421    event_log: &L,
3422    event: &TriggerEvent,
3423) -> Result<u64, DispatchError> {
3424    let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
3425    let headers = event_headers(event, None, None, None);
3426    let payload =
3427        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3428    event_log
3429        .append(
3430            &topic,
3431            LogEvent::new("event_ingested", payload).with_headers(headers),
3432        )
3433        .await
3434        .map_err(DispatchError::from)
3435}
3436
3437pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3438    ACTIVE_DISPATCHER_STATE.with(|slot| {
3439        slot.borrow()
3440            .as_ref()
3441            .map(|state| DispatcherStatsSnapshot {
3442                in_flight: state.in_flight.load(Ordering::Relaxed),
3443                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3444                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3445            })
3446            .unwrap_or_default()
3447    })
3448}
3449
3450pub fn clear_dispatcher_state() {
3451    ACTIVE_DISPATCHER_STATE.with(|slot| {
3452        *slot.borrow_mut() = None;
3453    });
3454    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3455        *slot.borrow_mut() = None;
3456    });
3457}
3458
3459fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3460    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3461        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3462    }
3463    if is_cancelled_vm_error(&error) {
3464        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3465    }
3466    if let VmError::Thrown(VmValue::String(message)) = &error {
3467        return DispatchError::Local(message.to_string());
3468    }
3469    match error_to_category(&error) {
3470        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3471        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3472        ErrorCategory::Cancelled => {
3473            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3474        }
3475        _ => DispatchError::Local(error.to_string()),
3476    }
3477}
3478
3479fn dispatch_error_label(error: &DispatchError) -> &'static str {
3480    match error {
3481        DispatchError::Denied(_) => "denied",
3482        DispatchError::Timeout(_) => "timeout",
3483        DispatchError::Waiting(_) => "waiting",
3484        DispatchError::Cancelled(_) => "cancelled",
3485        _ => "failed",
3486    }
3487}
3488
3489fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3490    match route {
3491        DispatchUri::Worker { .. } => "enqueued",
3492        DispatchUri::Persona { .. } => "recorded",
3493        DispatchUri::A2a { .. }
3494            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3495        {
3496            "pending"
3497        }
3498        DispatchUri::A2a { .. } => "completed",
3499        DispatchUri::Local { .. } => "success",
3500    }
3501}
3502
3503fn dispatch_node_id(
3504    route: &DispatchUri,
3505    binding_key: &str,
3506    event_id: &str,
3507    attempt: u32,
3508) -> String {
3509    let prefix = match route {
3510        DispatchUri::A2a { .. } => "a2a",
3511        _ => "dispatch",
3512    };
3513    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3514}
3515
3516fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3517    match route {
3518        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3519        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3520        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3521    }
3522}
3523
3524fn dispatch_node_label(route: &DispatchUri) -> String {
3525    match route {
3526        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3527        _ => route.target_uri(),
3528    }
3529}
3530
3531fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3532    match route {
3533        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3534        _ => None,
3535    }
3536}
3537
3538fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3539    match route {
3540        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3541        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3542        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3543    }
3544}
3545
3546fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3547    match status {
3548        crate::triggers::SignatureStatus::Verified => "verified",
3549        crate::triggers::SignatureStatus::Unsigned => "unsigned",
3550        crate::triggers::SignatureStatus::Failed { .. } => "failed",
3551    }
3552}
3553
3554fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3555    let mut metadata = BTreeMap::new();
3556    metadata.insert(
3557        "provider".to_string(),
3558        serde_json::json!(event.provider.as_str()),
3559    );
3560    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3561    metadata.insert(
3562        "dedupe_key".to_string(),
3563        serde_json::json!(event.dedupe_key),
3564    );
3565    metadata.insert(
3566        "signature_status".to_string(),
3567        serde_json::json!(signature_status_label(&event.signature_status)),
3568    );
3569    metadata
3570}
3571
3572fn trigger_event_persona_metadata(event: &TriggerEvent) -> BTreeMap<String, String> {
3573    let mut metadata = BTreeMap::new();
3574    metadata.insert("trigger_event_id".to_string(), event.id.0.clone());
3575    metadata.insert("event_id".to_string(), event.id.0.clone());
3576    metadata.insert("dedupe_key".to_string(), event.dedupe_key.clone());
3577    metadata.insert("trace_id".to_string(), event.trace_id.0.clone());
3578    if let Some(tenant_id) = &event.tenant_id {
3579        metadata.insert("tenant_id".to_string(), tenant_id.0.clone());
3580    }
3581    for (key, value) in &event.headers {
3582        metadata.insert(format!("header.{key}"), value.clone());
3583    }
3584    if let Ok(payload) = serde_json::to_value(&event.provider_payload) {
3585        collect_persona_payload_metadata("", &payload, &mut metadata);
3586        if let Some(raw) = payload.get("raw") {
3587            collect_persona_payload_metadata("", raw, &mut metadata);
3588        }
3589    }
3590    metadata
3591}
3592
3593fn collect_persona_payload_metadata(
3594    prefix: &str,
3595    value: &serde_json::Value,
3596    metadata: &mut BTreeMap<String, String>,
3597) {
3598    let serde_json::Value::Object(object) = value else {
3599        return;
3600    };
3601    for (key, value) in object {
3602        let path = if prefix.is_empty() {
3603            key.clone()
3604        } else {
3605            format!("{prefix}.{key}")
3606        };
3607        match value {
3608            serde_json::Value::String(text) => {
3609                metadata.insert(path, text.clone());
3610            }
3611            serde_json::Value::Number(number) => {
3612                metadata.insert(path, number.to_string());
3613            }
3614            serde_json::Value::Bool(flag) => {
3615                metadata.insert(path, flag.to_string());
3616            }
3617            serde_json::Value::Object(_) if prefix.is_empty() => {
3618                collect_persona_payload_metadata(&path, value, metadata);
3619            }
3620            _ => {}
3621        }
3622    }
3623}
3624
3625fn dispatch_node_metadata(
3626    route: &DispatchUri,
3627    binding: &TriggerBinding,
3628    event: &TriggerEvent,
3629    attempt: u32,
3630) -> BTreeMap<String, serde_json::Value> {
3631    let mut metadata = BTreeMap::new();
3632    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3633    metadata.insert(
3634        "target_uri".to_string(),
3635        serde_json::json!(route.target_uri()),
3636    );
3637    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3638    metadata.insert(
3639        "trigger_id".to_string(),
3640        serde_json::json!(binding.id.as_str()),
3641    );
3642    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3643    if let Some(target_agent) = dispatch_target_agent(route) {
3644        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3645    }
3646    if let DispatchUri::Worker { queue } = route {
3647        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3648    }
3649    if let DispatchUri::Persona { name } = route {
3650        metadata.insert("persona".to_string(), serde_json::json!(name));
3651    }
3652    metadata
3653}
3654
3655fn dispatch_success_metadata(
3656    route: &DispatchUri,
3657    binding: &TriggerBinding,
3658    event: &TriggerEvent,
3659    attempt: u32,
3660    result: &serde_json::Value,
3661) -> BTreeMap<String, serde_json::Value> {
3662    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3663    match route {
3664        DispatchUri::A2a { .. } => {
3665            if let Some(task_id) = result
3666                .get("task_id")
3667                .or_else(|| result.get("id"))
3668                .and_then(|value| value.as_str())
3669            {
3670                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3671            }
3672            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3673                metadata.insert("state".to_string(), serde_json::json!(state));
3674            }
3675        }
3676        DispatchUri::Worker { .. } => {
3677            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3678            {
3679                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3680            }
3681            if let Some(response_topic) = result
3682                .get("response_topic")
3683                .and_then(|value| value.as_str())
3684            {
3685                metadata.insert(
3686                    "response_topic".to_string(),
3687                    serde_json::json!(response_topic),
3688                );
3689            }
3690        }
3691        DispatchUri::Persona { .. } => {
3692            if let Some(receipt_id) = result.get("receipt_id").and_then(|value| value.as_str()) {
3693                metadata.insert("receipt_id".to_string(), serde_json::json!(receipt_id));
3694            }
3695            if let Some(status) = result.get("status").and_then(|value| value.as_str()) {
3696                metadata.insert("status".to_string(), serde_json::json!(status));
3697            }
3698        }
3699        DispatchUri::Local { .. } => {}
3700    }
3701    metadata
3702}
3703
3704fn dispatch_error_metadata(
3705    route: &DispatchUri,
3706    binding: &TriggerBinding,
3707    event: &TriggerEvent,
3708    attempt: u32,
3709    error: &DispatchError,
3710) -> BTreeMap<String, serde_json::Value> {
3711    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3712    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3713    metadata
3714}
3715
3716fn retry_node_metadata(
3717    binding: &TriggerBinding,
3718    event: &TriggerEvent,
3719    attempt: u32,
3720    delay: Duration,
3721    error: &DispatchError,
3722) -> BTreeMap<String, serde_json::Value> {
3723    let mut metadata = BTreeMap::new();
3724    metadata.insert(
3725        "trigger_id".to_string(),
3726        serde_json::json!(binding.id.as_str()),
3727    );
3728    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3729    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3730    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3731    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3732    metadata
3733}
3734
3735fn split_binding_key(binding_key: &str) -> (String, u32) {
3736    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3737        return (binding_key.to_string(), 0);
3738    };
3739    let version = suffix.parse::<u32>().unwrap_or(0);
3740    (binding_id.to_string(), version)
3741}
3742
3743fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
3744    match binding_version {
3745        Some(version) => format!("{trigger_id}@v{version}"),
3746        None => trigger_id.to_string(),
3747    }
3748}
3749
3750fn tenant_id(event: &TriggerEvent) -> Option<&str> {
3751    event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
3752}
3753
3754fn current_unix_ms() -> i64 {
3755    unix_ms(time::OffsetDateTime::now_utc())
3756}
3757
3758fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
3759    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
3760}
3761
3762fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3763    lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
3764        .unwrap_or_else(|| unix_ms(event.received_at))
3765}
3766
3767fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3768    lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
3769        .unwrap_or_else(|| accepted_at_ms(headers, event))
3770}
3771
3772fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
3773    headers
3774        .and_then(|headers| headers.get(name))
3775        .and_then(|value| value.parse::<i64>().ok())
3776}
3777
3778fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
3779    Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
3780}
3781
3782fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
3783    match result {
3784        Ok(_) => "succeeded",
3785        Err(DispatchError::Waiting(_)) => "waiting",
3786        Err(DispatchError::Cancelled(_)) => "cancelled",
3787        Err(DispatchError::Denied(_)) => "denied",
3788        Err(DispatchError::Timeout(_)) => "timeout",
3789        Err(_) => "failed",
3790    }
3791}
3792
3793fn is_cancelled_vm_error(error: &VmError) -> bool {
3794    matches!(
3795        error,
3796        VmError::Thrown(VmValue::String(message))
3797            if message.starts_with("kind:cancelled:")
3798    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3799}
3800
3801fn event_headers(
3802    event: &TriggerEvent,
3803    binding: Option<&TriggerBinding>,
3804    attempt: Option<u32>,
3805    replay_of_event_id: Option<&String>,
3806) -> BTreeMap<String, String> {
3807    let mut headers = BTreeMap::new();
3808    headers.insert("event_id".to_string(), event.id.0.clone());
3809    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3810    headers.insert("provider".to_string(), event.provider.as_str().to_string());
3811    headers.insert("kind".to_string(), event.kind.clone());
3812    if let Some(replay_of_event_id) = replay_of_event_id {
3813        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3814    }
3815    if let Some(tenant_id) = event.tenant_id.as_ref() {
3816        headers.insert("tenant_id".to_string(), tenant_id.0.clone());
3817    }
3818    if let Some(binding) = binding {
3819        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3820        headers.insert("binding_key".to_string(), binding.binding_key());
3821        headers.insert(
3822            "handler_kind".to_string(),
3823            DispatchUri::from(&binding.handler).kind().to_string(),
3824        );
3825    }
3826    if let Some(attempt) = attempt {
3827        headers.insert("attempt".to_string(), attempt.to_string());
3828    }
3829    headers
3830}
3831
3832fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
3833    let topic = Topic::new(topic_name)
3834        .expect("static trigger dispatcher topic names should always be valid");
3835    match event.tenant_id.as_ref() {
3836        Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
3837        None => Ok(topic),
3838    }
3839}
3840
3841fn worker_queue_priority(
3842    binding: &super::registry::TriggerBinding,
3843    event: &TriggerEvent,
3844) -> crate::WorkerQueuePriority {
3845    match event
3846        .headers
3847        .get("priority")
3848        .map(|value| value.trim().to_ascii_lowercase())
3849        .as_deref()
3850    {
3851        Some("high") => crate::WorkerQueuePriority::High,
3852        Some("low") => crate::WorkerQueuePriority::Low,
3853        _ => binding.dispatch_priority,
3854    }
3855}
3856
3857const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3858
3859fn maybe_fail_before_outbox() {
3860    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3861        std::process::exit(86);
3862    }
3863}
3864
3865fn now_rfc3339() -> String {
3866    time::OffsetDateTime::now_utc()
3867        .format(&time::format_description::well_known::Rfc3339)
3868        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3869}
3870
3871fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3872    let now = time::OffsetDateTime::now_utc();
3873    let reset = if binding.hourly_cost_usd.is_some() {
3874        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3875        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3876    } else {
3877        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3878        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3879    };
3880    reset
3881        .format(&time::format_description::well_known::Rfc3339)
3882        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3883}
3884
3885fn now_unix_ms() -> i64 {
3886    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3887}
3888
3889fn cancelled_dispatch_outcome(
3890    binding: &TriggerBinding,
3891    route: &DispatchUri,
3892    event: &TriggerEvent,
3893    replay_of_event_id: Option<String>,
3894    attempt_count: u32,
3895    error: String,
3896) -> DispatchOutcome {
3897    DispatchOutcome {
3898        trigger_id: binding.id.as_str().to_string(),
3899        binding_key: binding.binding_key(),
3900        event_id: event.id.0.clone(),
3901        attempt_count,
3902        status: DispatchStatus::Cancelled,
3903        handler_kind: route.kind().to_string(),
3904        target_uri: route.target_uri(),
3905        replay_of_event_id,
3906        result: None,
3907        error: Some(error),
3908    }
3909}
3910
3911async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3912    let _ = cancel_rx.recv().await;
3913}
3914
3915#[cfg(test)]
3916mod tests;