Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::vm_value_to_json;
16use crate::orchestration::{
17    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
18    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
19    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
20    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
21    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
22    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
23    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
24};
25use crate::stdlib::json_to_vm_value;
26use crate::trust_graph::{
27    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
28};
29use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
30use crate::vm::Vm;
31
32use self::uri::DispatchUri;
33use super::registry::{
34    binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
35    TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
36};
37use super::{
38    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
39    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
40    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
41};
42use circuits::{
43    destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
44    DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
45};
46use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
47use predicate_eval::predicate_node_metadata;
48
49mod circuits;
50mod flow_control;
51mod predicate_eval;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
58pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
59pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
60
61thread_local! {
62    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65    #[cfg(test)]
66    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70    static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75    pub trigger_event: TriggerEvent,
76    pub replay_of_event_id: Option<String>,
77    pub binding_id: String,
78    pub binding_version: u32,
79    pub agent_id: String,
80    pub action: String,
81    pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87    fn drop(&mut self) {
88        crate::orchestration::pop_execution_policy();
89    }
90}
91
92const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
93
94pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
95    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
96}
97
98pub(crate) fn current_dispatch_is_replay() -> bool {
99    ACTIVE_DISPATCH_IS_REPLAY
100        .try_with(|is_replay| *is_replay)
101        .unwrap_or(false)
102}
103
104pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
105    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
106}
107
108#[derive(Clone)]
109pub struct Dispatcher {
110    base_vm: Rc<Vm>,
111    event_log: Arc<AnyEventLog>,
112    cancel_tx: broadcast::Sender<()>,
113    state: Arc<DispatcherRuntimeState>,
114    metrics: Option<Arc<crate::MetricsRegistry>>,
115    a2a_client: Arc<dyn crate::a2a::A2aClient>,
116}
117
118#[derive(Debug)]
119struct DispatcherRuntimeState {
120    in_flight: AtomicU64,
121    retry_queue_depth: AtomicU64,
122    dlq: Mutex<Vec<DlqEntry>>,
123    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
124    shutting_down: std::sync::atomic::AtomicBool,
125    idle_notify: Notify,
126    flow_control: FlowControlManager,
127    destination_circuits: DestinationCircuitRegistry,
128}
129
130impl DispatcherRuntimeState {
131    fn new(event_log: Arc<AnyEventLog>) -> Self {
132        Self {
133            in_flight: AtomicU64::new(0),
134            retry_queue_depth: AtomicU64::new(0),
135            dlq: Mutex::new(Vec::new()),
136            cancel_tokens: Mutex::new(Vec::new()),
137            shutting_down: std::sync::atomic::AtomicBool::new(false),
138            idle_notify: Notify::new(),
139            flow_control: FlowControlManager::new(event_log),
140            destination_circuits: DestinationCircuitRegistry::default(),
141        }
142    }
143}
144
145#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(default)]
147pub struct DispatcherStatsSnapshot {
148    pub in_flight: u64,
149    pub retry_queue_depth: u64,
150    pub dlq_depth: u64,
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub enum DispatchStatus {
156    Succeeded,
157    Failed,
158    Dlq,
159    Skipped,
160    Waiting,
161    Cancelled,
162}
163
164impl DispatchStatus {
165    pub fn as_str(&self) -> &'static str {
166        match self {
167            Self::Succeeded => "succeeded",
168            Self::Failed => "failed",
169            Self::Dlq => "dlq",
170            Self::Skipped => "skipped",
171            Self::Waiting => "waiting",
172            Self::Cancelled => "cancelled",
173        }
174    }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178#[serde(default)]
179pub struct DispatchOutcome {
180    pub trigger_id: String,
181    pub binding_key: String,
182    pub event_id: String,
183    pub attempt_count: u32,
184    pub status: DispatchStatus,
185    pub handler_kind: String,
186    pub target_uri: String,
187    pub replay_of_event_id: Option<String>,
188    pub result: Option<serde_json::Value>,
189    pub error: Option<String>,
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct InboxEnvelope {
194    pub trigger_id: Option<String>,
195    pub binding_version: Option<u32>,
196    pub event: TriggerEvent,
197}
198
199#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(default)]
201pub struct DispatcherDrainReport {
202    pub drained: bool,
203    pub in_flight: u64,
204    pub retry_queue_depth: u64,
205    pub dlq_depth: u64,
206}
207
208impl Default for DispatchOutcome {
209    fn default() -> Self {
210        Self {
211            trigger_id: String::new(),
212            binding_key: String::new(),
213            event_id: String::new(),
214            attempt_count: 0,
215            status: DispatchStatus::Failed,
216            handler_kind: String::new(),
217            target_uri: String::new(),
218            replay_of_event_id: None,
219            result: None,
220            error: None,
221        }
222    }
223}
224
225#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
226#[serde(default)]
227pub struct DispatchAttemptRecord {
228    pub trigger_id: String,
229    pub binding_key: String,
230    pub event_id: String,
231    pub attempt: u32,
232    pub handler_kind: String,
233    pub started_at: String,
234    pub completed_at: String,
235    pub outcome: String,
236    pub error_msg: Option<String>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
240pub struct DispatchCancelRequest {
241    pub binding_key: String,
242    pub event_id: String,
243    #[serde(with = "time::serde::rfc3339")]
244    pub requested_at: time::OffsetDateTime,
245    #[serde(default)]
246    pub requested_by: Option<String>,
247    #[serde(default)]
248    pub audit_id: Option<String>,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct DlqEntry {
253    pub trigger_id: String,
254    pub binding_key: String,
255    pub event: TriggerEvent,
256    pub attempt_count: u32,
257    pub final_error: String,
258    #[serde(default = "default_dlq_error_class")]
259    pub error_class: String,
260    pub attempts: Vec<DispatchAttemptRecord>,
261}
262
263fn default_dlq_error_class() -> String {
264    "unknown".to_string()
265}
266
267#[derive(Clone, Debug)]
268struct SingletonLease {
269    gate: String,
270    held: bool,
271}
272
273#[derive(Clone, Debug)]
274struct ConcurrencyLease {
275    gate: String,
276    max: u32,
277    priority_rank: usize,
278    permit: Option<ConcurrencyPermit>,
279}
280
281#[derive(Default, Debug)]
282struct AcquiredFlowControl {
283    singleton: Option<SingletonLease>,
284    concurrency: Option<ConcurrencyLease>,
285}
286
287#[derive(Clone)]
288pub(crate) struct DispatchWaitLease {
289    state: Arc<DispatcherRuntimeState>,
290    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
291    suspended: Arc<AtomicBool>,
292}
293
294impl DispatchWaitLease {
295    fn new(
296        state: Arc<DispatcherRuntimeState>,
297        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
298    ) -> Self {
299        Self {
300            state,
301            acquired,
302            suspended: Arc::new(AtomicBool::new(false)),
303        }
304    }
305
306    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
307        if self.suspended.swap(true, Ordering::SeqCst) {
308            return Ok(());
309        }
310        let (singleton_gate, concurrency_permit) = {
311            let mut acquired = self.acquired.lock().await;
312            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
313                if lease.held {
314                    lease.held = false;
315                    Some(lease.gate.clone())
316                } else {
317                    None
318                }
319            });
320            let concurrency_permit = acquired
321                .concurrency
322                .as_mut()
323                .and_then(|lease| lease.permit.take());
324            (singleton_gate, concurrency_permit)
325        };
326
327        if let Some(gate) = singleton_gate {
328            self.state
329                .flow_control
330                .release_singleton(&gate)
331                .await
332                .map_err(DispatchError::from)?;
333        }
334        if let Some(permit) = concurrency_permit {
335            self.state
336                .flow_control
337                .release_concurrency(permit)
338                .await
339                .map_err(DispatchError::from)?;
340        }
341        Ok(())
342    }
343
344    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
345        if !self.suspended.swap(false, Ordering::SeqCst) {
346            return Ok(());
347        }
348
349        let singleton_gate = {
350            let acquired = self.acquired.lock().await;
351            acquired.singleton.as_ref().and_then(|lease| {
352                if lease.held {
353                    None
354                } else {
355                    Some(lease.gate.clone())
356                }
357            })
358        };
359        if let Some(gate) = singleton_gate {
360            self.state
361                .flow_control
362                .acquire_singleton(&gate)
363                .await
364                .map_err(DispatchError::from)?;
365            let mut acquired = self.acquired.lock().await;
366            if let Some(lease) = acquired.singleton.as_mut() {
367                lease.held = true;
368            }
369        }
370
371        let concurrency_spec = {
372            let acquired = self.acquired.lock().await;
373            acquired.concurrency.as_ref().and_then(|lease| {
374                if lease.permit.is_some() {
375                    None
376                } else {
377                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
378                }
379            })
380        };
381        if let Some((gate, max, priority_rank)) = concurrency_spec {
382            let permit = self
383                .state
384                .flow_control
385                .acquire_concurrency(&gate, max, priority_rank)
386                .await
387                .map_err(DispatchError::from)?;
388            let mut acquired = self.acquired.lock().await;
389            if let Some(lease) = acquired.concurrency.as_mut() {
390                lease.permit = Some(permit);
391            }
392        }
393        Ok(())
394    }
395}
396
397enum FlowControlOutcome {
398    Dispatch {
399        event: Box<TriggerEvent>,
400        acquired: AcquiredFlowControl,
401    },
402    Skip {
403        reason: String,
404    },
405}
406
407#[derive(Clone, Debug)]
408enum DispatchSkipStage {
409    Predicate,
410    FlowControl,
411}
412
413#[derive(Debug)]
414pub enum DispatchError {
415    EventLog(String),
416    Registry(String),
417    Serde(String),
418    Local(String),
419    A2a(String),
420    Denied(String),
421    Timeout(String),
422    Waiting(String),
423    Cancelled(String),
424    NotImplemented(String),
425}
426
427impl std::fmt::Display for DispatchError {
428    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429        match self {
430            Self::EventLog(message)
431            | Self::Registry(message)
432            | Self::Serde(message)
433            | Self::Local(message)
434            | Self::A2a(message)
435            | Self::Denied(message)
436            | Self::Timeout(message)
437            | Self::Waiting(message)
438            | Self::Cancelled(message)
439            | Self::NotImplemented(message) => f.write_str(message),
440        }
441    }
442}
443
444impl std::error::Error for DispatchError {}
445
446impl DispatchError {
447    fn retryable(&self) -> bool {
448        !matches!(
449            self,
450            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
451        )
452    }
453}
454
455impl DispatchSkipStage {
456    fn as_str(&self) -> &'static str {
457        match self {
458            Self::Predicate => "predicate",
459            Self::FlowControl => "flow_control",
460        }
461    }
462}
463
464impl From<LogError> for DispatchError {
465    fn from(value: LogError) -> Self {
466        Self::EventLog(value.to_string())
467    }
468}
469
470pub async fn append_dispatch_cancel_request(
471    event_log: &Arc<AnyEventLog>,
472    request: &DispatchCancelRequest,
473) -> Result<u64, DispatchError> {
474    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
475        .expect("static trigger cancel topic should always be valid");
476    event_log
477        .append(
478            &topic,
479            LogEvent::new(
480                "dispatch_cancel_requested",
481                serde_json::to_value(request)
482                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
483            ),
484        )
485        .await
486        .map_err(DispatchError::from)
487}
488
489impl Dispatcher {
490    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
491        self.event_log.clone()
492    }
493
494    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
495        let event_log = active_event_log().ok_or_else(|| {
496            DispatchError::EventLog("dispatcher requires an active event log".to_string())
497        })?;
498        Ok(Self::with_event_log(base_vm, event_log))
499    }
500
501    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
502        Self::with_event_log_and_metrics(base_vm, event_log, None)
503    }
504
505    pub fn with_event_log_and_metrics(
506        base_vm: Vm,
507        event_log: Arc<AnyEventLog>,
508        metrics: Option<Arc<crate::MetricsRegistry>>,
509    ) -> Self {
510        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
511        ACTIVE_DISPATCHER_STATE.with(|slot| {
512            *slot.borrow_mut() = Some(state.clone());
513        });
514        let (cancel_tx, _) = broadcast::channel(32);
515        Self {
516            base_vm: Rc::new(base_vm),
517            event_log,
518            cancel_tx,
519            state,
520            metrics,
521            a2a_client: Arc::new(crate::a2a::RealA2aClient),
522        }
523    }
524
525    #[cfg(test)]
526    pub fn with_a2a_client(mut self, client: Arc<dyn crate::a2a::A2aClient>) -> Self {
527        self.a2a_client = client;
528        self
529    }
530
531    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
532        DispatcherStatsSnapshot {
533            in_flight: self.state.in_flight.load(Ordering::Relaxed),
534            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
535            dlq_depth: self
536                .state
537                .dlq
538                .lock()
539                .expect("dispatcher dlq poisoned")
540                .len() as u64,
541        }
542    }
543
544    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
545        self.state
546            .dlq
547            .lock()
548            .expect("dispatcher dlq poisoned")
549            .clone()
550    }
551
552    pub fn shutdown(&self) {
553        self.state.shutting_down.store(true, Ordering::SeqCst);
554        for token in self
555            .state
556            .cancel_tokens
557            .lock()
558            .expect("dispatcher cancel tokens poisoned")
559            .iter()
560        {
561            token.store(true, Ordering::SeqCst);
562        }
563        let _ = self.cancel_tx.send(());
564    }
565
566    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
567        self.enqueue_targeted(None, None, event).await
568    }
569
570    pub async fn enqueue_targeted(
571        &self,
572        trigger_id: Option<String>,
573        binding_version: Option<u32>,
574        event: TriggerEvent,
575    ) -> Result<u64, DispatchError> {
576        self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
577            .await
578    }
579
580    pub async fn enqueue_targeted_with_headers(
581        &self,
582        trigger_id: Option<String>,
583        binding_version: Option<u32>,
584        event: TriggerEvent,
585        parent_headers: Option<&BTreeMap<String, String>>,
586    ) -> Result<u64, DispatchError> {
587        let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
588        let trigger_id_for_metrics = trigger_id.clone();
589        let mut headers = parent_headers.cloned().unwrap_or_default();
590        headers.extend(event_headers(&event, None, None, None));
591        if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
592            headers.insert("trigger_id".to_string(), trigger_id.clone());
593            headers.insert(
594                "binding_key".to_string(),
595                binding_key_from_parts(trigger_id, binding_version),
596            );
597        }
598        headers
599            .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
600            .or_insert_with(|| unix_ms(event.received_at).to_string());
601        let payload = serde_json::to_value(InboxEnvelope {
602            trigger_id,
603            binding_version,
604            event: event.clone(),
605        })
606        .map_err(|error| DispatchError::Serde(error.to_string()))?;
607        let mut log_event = LogEvent::new("event_ingested", payload);
608        let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
609        let queue_appended_at_ms = headers
610            .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
611            .and_then(|value| value.parse::<i64>().ok())
612            .unwrap_or(log_event.occurred_at_ms);
613        headers
614            .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
615            .or_insert_with(|| log_event.occurred_at_ms.to_string());
616        if let (Some(metrics), Some(trigger_id)) =
617            (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
618        {
619            let binding_key = binding_key_from_parts(trigger_id, binding_version);
620            let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
621            if !had_queue_appended_at {
622                metrics.record_trigger_accepted_to_queue_append(
623                    trigger_id,
624                    &binding_key,
625                    event.provider.as_str(),
626                    tenant_id(&event),
627                    "queued",
628                    duration_between_ms(queue_appended_at_ms, accepted_at_ms),
629                );
630            }
631            metrics.note_trigger_pending_event(
632                event.id.0.as_str(),
633                trigger_id,
634                &binding_key,
635                event.provider.as_str(),
636                tenant_id(&event),
637                accepted_at_ms,
638                queue_appended_at_ms,
639            );
640        }
641        log_event.headers = headers;
642        self.event_log
643            .append(&topic, log_event)
644            .await
645            .map_err(DispatchError::from)
646    }
647
648    pub async fn run(&self) -> Result<(), DispatchError> {
649        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
650            .expect("static trigger inbox envelopes topic is valid");
651        let start_from = self.event_log.latest(&topic).await?;
652        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
653        pin_mut!(stream);
654        let mut cancel_rx = self.cancel_tx.subscribe();
655
656        loop {
657            tokio::select! {
658                received = stream.next() => {
659                    let Some(received) = received else {
660                        break;
661                    };
662                    let (_, event) = received.map_err(DispatchError::from)?;
663                    if event.kind != "event_ingested" {
664                        continue;
665                    }
666                    let parent_headers = event.headers.clone();
667                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
668                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
669                    notify_test_inbox_dequeued();
670                    let _ = self
671                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
672                        .await;
673                }
674                _ = recv_cancel(&mut cancel_rx) => break,
675            }
676        }
677
678        Ok(())
679    }
680
681    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
682        let deadline = tokio::time::Instant::now() + timeout;
683        loop {
684            let snapshot = self.snapshot();
685            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
686                return Ok(DispatcherDrainReport {
687                    drained: true,
688                    in_flight: snapshot.in_flight,
689                    retry_queue_depth: snapshot.retry_queue_depth,
690                    dlq_depth: snapshot.dlq_depth,
691                });
692            }
693
694            let notified = self.state.idle_notify.notified();
695            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
696            if remaining.is_zero() {
697                return Ok(DispatcherDrainReport {
698                    drained: false,
699                    in_flight: snapshot.in_flight,
700                    retry_queue_depth: snapshot.retry_queue_depth,
701                    dlq_depth: snapshot.dlq_depth,
702                });
703            }
704            if tokio::time::timeout(remaining, notified).await.is_err() {
705                let snapshot = self.snapshot();
706                return Ok(DispatcherDrainReport {
707                    drained: false,
708                    in_flight: snapshot.in_flight,
709                    retry_queue_depth: snapshot.retry_queue_depth,
710                    dlq_depth: snapshot.dlq_depth,
711                });
712            }
713        }
714    }
715
716    pub async fn dispatch_inbox_envelope(
717        &self,
718        envelope: InboxEnvelope,
719    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
720        self.dispatch_inbox_envelope_with_headers(envelope, None)
721            .await
722    }
723
724    pub async fn dispatch_inbox_envelope_with_parent_headers(
725        &self,
726        envelope: InboxEnvelope,
727        parent_headers: &BTreeMap<String, String>,
728    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
729        self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
730            .await
731    }
732
733    async fn dispatch_inbox_envelope_with_headers(
734        &self,
735        envelope: InboxEnvelope,
736        parent_headers: Option<&BTreeMap<String, String>>,
737    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
738        if let Some(trigger_id) = envelope.trigger_id {
739            let binding = super::registry::resolve_live_trigger_binding(
740                &trigger_id,
741                envelope.binding_version,
742            )
743            .map_err(|error| DispatchError::Registry(error.to_string()))?;
744            return Ok(vec![
745                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
746                    .await?,
747            ]);
748        }
749
750        let cron_target = match &envelope.event.provider_payload {
751            crate::triggers::ProviderPayload::Known(
752                crate::triggers::event::KnownProviderPayload::Cron(payload),
753            ) => payload.cron_id.clone(),
754            _ => None,
755        };
756        if let Some(trigger_id) = cron_target {
757            let binding = super::registry::resolve_live_trigger_binding(
758                &trigger_id,
759                envelope.binding_version,
760            )
761            .map_err(|error| DispatchError::Registry(error.to_string()))?;
762            return Ok(vec![
763                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
764                    .await?,
765            ]);
766        }
767
768        self.dispatch_event_with_headers(envelope.event, parent_headers)
769            .await
770    }
771
772    pub async fn dispatch_event(
773        &self,
774        event: TriggerEvent,
775    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
776        self.dispatch_event_with_headers(event, None).await
777    }
778
779    async fn dispatch_event_with_headers(
780        &self,
781        event: TriggerEvent,
782        parent_headers: Option<&BTreeMap<String, String>>,
783    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
784        let bindings = matching_bindings(&event);
785        let mut outcomes = Vec::new();
786        for binding in bindings {
787            outcomes.push(
788                self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
789                    .await?,
790            );
791        }
792        Ok(outcomes)
793    }
794
795    pub async fn dispatch(
796        &self,
797        binding: &TriggerBinding,
798        event: TriggerEvent,
799    ) -> Result<DispatchOutcome, DispatchError> {
800        self.dispatch_with_replay(binding, event, None, None, None)
801            .await
802    }
803
804    pub async fn dispatch_replay(
805        &self,
806        binding: &TriggerBinding,
807        event: TriggerEvent,
808        replay_of_event_id: String,
809    ) -> Result<DispatchOutcome, DispatchError> {
810        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
811            .await
812    }
813
814    pub async fn dispatch_with_parent_span_id(
815        &self,
816        binding: &TriggerBinding,
817        event: TriggerEvent,
818        parent_span_id: Option<String>,
819    ) -> Result<DispatchOutcome, DispatchError> {
820        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
821            .await
822    }
823
824    async fn dispatch_with_replay(
825        &self,
826        binding: &TriggerBinding,
827        event: TriggerEvent,
828        replay_of_event_id: Option<String>,
829        parent_span_id: Option<String>,
830        parent_headers: Option<&BTreeMap<String, String>>,
831    ) -> Result<DispatchOutcome, DispatchError> {
832        let parent_headers_for_metrics = parent_headers.cloned();
833        let admitted_at_ms = current_unix_ms();
834        if let Some(metrics) = self.metrics.as_ref() {
835            let binding_key = binding.binding_key();
836            let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
837            metrics.record_trigger_queue_age_at_dispatch_admission(
838                binding.id.as_str(),
839                &binding_key,
840                event.provider.as_str(),
841                tenant_id(&event),
842                "admitted",
843                duration_between_ms(admitted_at_ms, queue_appended_at_ms),
844            );
845            metrics.clear_trigger_pending_event(
846                event.id.0.as_str(),
847                binding.id.as_str(),
848                &binding_key,
849                event.provider.as_str(),
850                tenant_id(&event),
851                admitted_at_ms,
852            );
853        }
854        let span = tracing::info_span!(
855            "dispatch",
856            trigger_id = %binding.id.as_str(),
857            binding_version = binding.version,
858            trace_id = %event.trace_id.0
859        );
860        #[cfg(feature = "otel")]
861        let span_for_otel = span.clone();
862        let _ = if let Some(headers) = parent_headers {
863            crate::observability::otel::set_span_parent_from_headers(
864                &span,
865                headers,
866                &event.trace_id,
867                parent_span_id.as_deref(),
868            )
869        } else {
870            crate::observability::otel::set_span_parent(
871                &span,
872                &event.trace_id,
873                parent_span_id.as_deref(),
874            )
875        };
876        #[cfg(feature = "otel")]
877        let started_at = Instant::now();
878        let metrics = self.metrics.clone();
879        let outcome = ACTIVE_DISPATCH_IS_REPLAY
880            .scope(
881                replay_of_event_id.is_some(),
882                self.dispatch_with_replay_inner(
883                    binding,
884                    event,
885                    replay_of_event_id,
886                    parent_headers_for_metrics,
887                )
888                .instrument(span),
889            )
890            .await;
891        if let Some(metrics) = metrics.as_ref() {
892            match &outcome {
893                Ok(dispatch_outcome) => match dispatch_outcome.status {
894                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
895                        metrics.record_dispatch_succeeded();
896                    }
897                    DispatchStatus::Waiting => {}
898                    _ => metrics.record_dispatch_failed(),
899                },
900                Err(_) => metrics.record_dispatch_failed(),
901            }
902            let outcome_label = match &outcome {
903                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
904                Err(DispatchError::Cancelled(_)) => "cancelled",
905                Err(_) => "failed",
906            };
907            metrics.record_trigger_dispatched(
908                binding.id.as_str(),
909                binding.handler.kind(),
910                outcome_label,
911            );
912            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
913        }
914        #[cfg(feature = "otel")]
915        {
916            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
917
918            let duration_ms = started_at.elapsed().as_millis() as i64;
919            let status = match &outcome {
920                Ok(dispatch_outcome) => match dispatch_outcome.status {
921                    DispatchStatus::Succeeded => "succeeded",
922                    DispatchStatus::Skipped => "skipped",
923                    DispatchStatus::Waiting => "waiting",
924                    DispatchStatus::Cancelled => "cancelled",
925                    DispatchStatus::Failed => "failed",
926                    DispatchStatus::Dlq => "dlq",
927                },
928                Err(DispatchError::Cancelled(_)) => "cancelled",
929                Err(_) => "failed",
930            };
931            span_for_otel.set_attribute("result.status", status);
932            span_for_otel.set_attribute("result.duration_ms", duration_ms);
933        }
934        outcome
935    }
936
937    async fn dispatch_with_replay_inner(
938        &self,
939        binding: &TriggerBinding,
940        event: TriggerEvent,
941        replay_of_event_id: Option<String>,
942        parent_headers: Option<BTreeMap<String, String>>,
943    ) -> Result<DispatchOutcome, DispatchError> {
944        let autonomy_tier = crate::resolve_agent_autonomy_tier(
945            &self.event_log,
946            binding.id.as_str(),
947            binding.autonomy_tier,
948        )
949        .await
950        .unwrap_or(binding.autonomy_tier);
951        let binding_key = binding.binding_key();
952        let route = DispatchUri::from(&binding.handler);
953        let trigger_id = binding.id.as_str().to_string();
954        let event_id = event.id.0.clone();
955        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
956        let begin = if replay_of_event_id.is_some() {
957            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
958        } else {
959            begin_in_flight(binding.id.as_str(), binding.version)
960        };
961        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
962
963        let mut attempts = Vec::new();
964        let mut source_node_id = format!("trigger:{}", event.id.0);
965        let mut initial_nodes = Vec::new();
966        let mut initial_edges = Vec::new();
967        if let Some(original_event_id) = replay_of_event_id.as_ref() {
968            let original_node_id = format!("trigger:{original_event_id}");
969            initial_nodes.push(RunActionGraphNodeRecord {
970                id: original_node_id.clone(),
971                label: format!(
972                    "{}:{} (original {})",
973                    event.provider.as_str(),
974                    event.kind,
975                    original_event_id
976                ),
977                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
978                status: "historical".to_string(),
979                outcome: "replayed_from".to_string(),
980                trace_id: Some(event.trace_id.0.clone()),
981                stage_id: None,
982                node_id: None,
983                worker_id: None,
984                run_id: None,
985                run_path: None,
986                metadata: trigger_node_metadata(&event),
987            });
988            initial_edges.push(RunActionGraphEdgeRecord {
989                from_id: original_node_id,
990                to_id: source_node_id.clone(),
991                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
992                label: Some("replay chain".to_string()),
993            });
994        }
995        initial_nodes.push(RunActionGraphNodeRecord {
996            id: source_node_id.clone(),
997            label: format!("{}:{}", event.provider.as_str(), event.kind),
998            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
999            status: "received".to_string(),
1000            outcome: "received".to_string(),
1001            trace_id: Some(event.trace_id.0.clone()),
1002            stage_id: None,
1003            node_id: None,
1004            worker_id: None,
1005            run_id: None,
1006            run_path: None,
1007            metadata: trigger_node_metadata(&event),
1008        });
1009        self.emit_action_graph(
1010            &event,
1011            initial_nodes,
1012            initial_edges,
1013            serde_json::json!({
1014                "source": "dispatcher",
1015                "trigger_id": trigger_id,
1016                "binding_key": binding_key,
1017                "event_id": event_id,
1018                "replay_of_event_id": replay_of_event_id,
1019            }),
1020        )
1021        .await?;
1022
1023        if dispatch_cancel_requested(
1024            &self.event_log,
1025            &binding_key,
1026            &event.id.0,
1027            replay_of_event_id.as_ref(),
1028        )
1029        .await?
1030        {
1031            finish_in_flight(
1032                binding.id.as_str(),
1033                binding.version,
1034                TriggerDispatchOutcome::Failed,
1035            )
1036            .await
1037            .map_err(|error| DispatchError::Registry(error.to_string()))?;
1038            decrement_in_flight(&self.state);
1039            return Ok(cancelled_dispatch_outcome(
1040                binding,
1041                &route,
1042                &event,
1043                replay_of_event_id,
1044                0,
1045                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1046            ));
1047        }
1048
1049        if let Some(predicate) = binding.when.as_ref() {
1050            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1051            let evaluation = self
1052                .evaluate_predicate(
1053                    binding,
1054                    predicate,
1055                    &event,
1056                    replay_of_event_id.as_ref(),
1057                    autonomy_tier,
1058                )
1059                .await?;
1060            let passed = evaluation.result;
1061            self.emit_action_graph(
1062                &event,
1063                vec![RunActionGraphNodeRecord {
1064                    id: predicate_node_id.clone(),
1065                    label: predicate.raw.clone(),
1066                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1067                    status: "completed".to_string(),
1068                    outcome: passed.to_string(),
1069                    trace_id: Some(event.trace_id.0.clone()),
1070                    stage_id: None,
1071                    node_id: None,
1072                    worker_id: None,
1073                    run_id: None,
1074                    run_path: None,
1075                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1076                }],
1077                vec![RunActionGraphEdgeRecord {
1078                    from_id: source_node_id.clone(),
1079                    to_id: predicate_node_id.clone(),
1080                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1081                    label: None,
1082                }],
1083                serde_json::json!({
1084                    "source": "dispatcher",
1085                    "trigger_id": binding.id.as_str(),
1086                    "binding_key": binding.binding_key(),
1087                    "event_id": event.id.0,
1088                    "predicate": predicate.raw,
1089                    "reason": evaluation.reason,
1090                    "cached": evaluation.cached,
1091                    "cost_usd": evaluation.cost_usd,
1092                    "tokens": evaluation.tokens,
1093                    "latency_ms": evaluation.latency_ms,
1094                    "replay_of_event_id": replay_of_event_id,
1095                }),
1096            )
1097            .await?;
1098
1099            if !passed {
1100                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1101                    let final_error = format!(
1102                        "trigger budget exhausted: {}",
1103                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1104                    );
1105                    self.move_budget_exhausted_to_dlq(
1106                        binding,
1107                        &route,
1108                        &event,
1109                        replay_of_event_id.as_ref(),
1110                        &final_error,
1111                    )
1112                    .await?;
1113                    finish_in_flight(
1114                        binding.id.as_str(),
1115                        binding.version,
1116                        TriggerDispatchOutcome::Dlq,
1117                    )
1118                    .await
1119                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1120                    decrement_in_flight(&self.state);
1121                    self.append_dispatch_trust_record(
1122                        binding,
1123                        &route,
1124                        &event,
1125                        replay_of_event_id.as_ref(),
1126                        autonomy_tier,
1127                        TrustOutcome::Failure,
1128                        "dlq",
1129                        0,
1130                        Some(final_error.clone()),
1131                    )
1132                    .await?;
1133                    return Ok(DispatchOutcome {
1134                        trigger_id: binding.id.as_str().to_string(),
1135                        binding_key: binding.binding_key(),
1136                        event_id: event.id.0,
1137                        attempt_count: 0,
1138                        status: DispatchStatus::Dlq,
1139                        handler_kind: route.kind().to_string(),
1140                        target_uri: route.target_uri(),
1141                        replay_of_event_id,
1142                        result: None,
1143                        error: Some(final_error),
1144                    });
1145                }
1146
1147                if evaluation.exhaustion_strategy
1148                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1149                {
1150                    self.append_budget_deferred_event(
1151                        binding,
1152                        &route,
1153                        &event,
1154                        replay_of_event_id.as_ref(),
1155                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1156                    )
1157                    .await?;
1158                    finish_in_flight(
1159                        binding.id.as_str(),
1160                        binding.version,
1161                        TriggerDispatchOutcome::Dispatched,
1162                    )
1163                    .await
1164                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1165                    decrement_in_flight(&self.state);
1166                    self.append_dispatch_trust_record(
1167                        binding,
1168                        &route,
1169                        &event,
1170                        replay_of_event_id.as_ref(),
1171                        autonomy_tier,
1172                        TrustOutcome::Denied,
1173                        "waiting",
1174                        0,
1175                        evaluation.reason.clone(),
1176                    )
1177                    .await?;
1178                    return Ok(DispatchOutcome {
1179                        trigger_id: binding.id.as_str().to_string(),
1180                        binding_key: binding.binding_key(),
1181                        event_id: event.id.0,
1182                        attempt_count: 0,
1183                        status: DispatchStatus::Waiting,
1184                        handler_kind: route.kind().to_string(),
1185                        target_uri: route.target_uri(),
1186                        replay_of_event_id,
1187                        result: Some(serde_json::json!({
1188                            "deferred": true,
1189                            "predicate": predicate.raw,
1190                            "reason": evaluation.reason,
1191                        })),
1192                        error: None,
1193                    });
1194                }
1195
1196                self.append_skipped_outbox_event(
1197                    binding,
1198                    &route,
1199                    &event,
1200                    replay_of_event_id.as_ref(),
1201                    DispatchSkipStage::Predicate,
1202                    serde_json::json!({
1203                        "predicate": predicate.raw,
1204                        "reason": evaluation.reason,
1205                    }),
1206                )
1207                .await?;
1208                finish_in_flight(
1209                    binding.id.as_str(),
1210                    binding.version,
1211                    TriggerDispatchOutcome::Dispatched,
1212                )
1213                .await
1214                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1215                decrement_in_flight(&self.state);
1216                self.append_dispatch_trust_record(
1217                    binding,
1218                    &route,
1219                    &event,
1220                    replay_of_event_id.as_ref(),
1221                    autonomy_tier,
1222                    TrustOutcome::Denied,
1223                    "skipped",
1224                    0,
1225                    None,
1226                )
1227                .await?;
1228                return Ok(DispatchOutcome {
1229                    trigger_id: binding.id.as_str().to_string(),
1230                    binding_key: binding.binding_key(),
1231                    event_id: event.id.0,
1232                    attempt_count: 0,
1233                    status: DispatchStatus::Skipped,
1234                    handler_kind: route.kind().to_string(),
1235                    target_uri: route.target_uri(),
1236                    replay_of_event_id,
1237                    result: Some(serde_json::json!({
1238                        "skipped": true,
1239                        "predicate": predicate.raw,
1240                        "reason": evaluation.reason,
1241                    })),
1242                    error: None,
1243                });
1244            }
1245
1246            source_node_id = predicate_node_id;
1247        }
1248
1249        if autonomy_tier == AutonomyTier::ActAuto {
1250            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1251                let request_id = self
1252                    .append_autonomy_budget_approval_request(
1253                        binding,
1254                        &route,
1255                        &event,
1256                        replay_of_event_id.as_ref(),
1257                        reason,
1258                    )
1259                    .await?;
1260                self.emit_autonomy_budget_approval_action_graph(
1261                    binding,
1262                    &route,
1263                    &event,
1264                    &source_node_id,
1265                    replay_of_event_id.as_ref(),
1266                    reason,
1267                    &request_id,
1268                )
1269                .await?;
1270                finish_in_flight(
1271                    binding.id.as_str(),
1272                    binding.version,
1273                    TriggerDispatchOutcome::Dispatched,
1274                )
1275                .await
1276                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1277                decrement_in_flight(&self.state);
1278                self.append_tier_transition_trust_record(
1279                    binding,
1280                    &event,
1281                    replay_of_event_id.as_ref(),
1282                    autonomy_tier,
1283                    AutonomyTier::ActWithApproval,
1284                    reason,
1285                    &request_id,
1286                )
1287                .await?;
1288                self.append_dispatch_trust_record(
1289                    binding,
1290                    &route,
1291                    &event,
1292                    replay_of_event_id.as_ref(),
1293                    autonomy_tier,
1294                    TrustOutcome::Denied,
1295                    "waiting",
1296                    0,
1297                    Some(reason.to_string()),
1298                )
1299                .await?;
1300                return Ok(DispatchOutcome {
1301                    trigger_id: binding.id.as_str().to_string(),
1302                    binding_key: binding.binding_key(),
1303                    event_id: event.id.0,
1304                    attempt_count: 0,
1305                    status: DispatchStatus::Waiting,
1306                    handler_kind: route.kind().to_string(),
1307                    target_uri: route.target_uri(),
1308                    replay_of_event_id,
1309                    result: Some(serde_json::json!({
1310                        "approval_required": true,
1311                        "request_id": request_id,
1312                        "reason": reason,
1313                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1314                    })),
1315                    error: None,
1316                });
1317            }
1318            note_autonomous_decision(binding);
1319        }
1320
1321        let (event, acquired_flow) = match self
1322            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1323            .await?
1324        {
1325            FlowControlOutcome::Dispatch { event, acquired } => {
1326                (*event, Arc::new(AsyncMutex::new(acquired)))
1327            }
1328            FlowControlOutcome::Skip { reason } => {
1329                self.append_skipped_outbox_event(
1330                    binding,
1331                    &route,
1332                    &event,
1333                    replay_of_event_id.as_ref(),
1334                    DispatchSkipStage::FlowControl,
1335                    serde_json::json!({
1336                        "flow_control": reason,
1337                    }),
1338                )
1339                .await?;
1340                finish_in_flight(
1341                    binding.id.as_str(),
1342                    binding.version,
1343                    TriggerDispatchOutcome::Dispatched,
1344                )
1345                .await
1346                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1347                decrement_in_flight(&self.state);
1348                return Ok(DispatchOutcome {
1349                    trigger_id: binding.id.as_str().to_string(),
1350                    binding_key: binding.binding_key(),
1351                    event_id: event.id.0,
1352                    attempt_count: 0,
1353                    status: DispatchStatus::Skipped,
1354                    handler_kind: route.kind().to_string(),
1355                    target_uri: route.target_uri(),
1356                    replay_of_event_id,
1357                    result: Some(serde_json::json!({
1358                        "skipped": true,
1359                        "flow_control": reason,
1360                    })),
1361                    error: None,
1362                });
1363            }
1364        };
1365
1366        let destination_key = destination_circuit_key(&route);
1367        let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1368            DestinationCircuitProbe::Allow { half_open } => {
1369                if half_open {
1370                    if let Some(metrics) = self.metrics.as_ref() {
1371                        metrics.record_backpressure_event("circuit", "half_open_probe");
1372                    }
1373                }
1374                half_open
1375            }
1376            DestinationCircuitProbe::Block { retry_after } => {
1377                if let Some(metrics) = self.metrics.as_ref() {
1378                    metrics.record_backpressure_event("circuit", "fail_fast");
1379                }
1380                let final_error = format!(
1381                    "destination circuit open for {}; retry after {}s",
1382                    destination_key,
1383                    retry_after.as_secs().max(1)
1384                );
1385                self.move_circuit_open_to_dlq(
1386                    binding,
1387                    &route,
1388                    &event,
1389                    replay_of_event_id.as_ref(),
1390                    &final_error,
1391                    &destination_key,
1392                )
1393                .await?;
1394                finish_in_flight(
1395                    binding.id.as_str(),
1396                    binding.version,
1397                    TriggerDispatchOutcome::Dlq,
1398                )
1399                .await
1400                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1401                self.release_flow_control(&acquired_flow).await?;
1402                decrement_in_flight(&self.state);
1403                self.append_dispatch_trust_record(
1404                    binding,
1405                    &route,
1406                    &event,
1407                    replay_of_event_id.as_ref(),
1408                    autonomy_tier,
1409                    TrustOutcome::Failure,
1410                    "dlq",
1411                    0,
1412                    Some(final_error.clone()),
1413                )
1414                .await?;
1415                return Ok(DispatchOutcome {
1416                    trigger_id: binding.id.as_str().to_string(),
1417                    binding_key: binding.binding_key(),
1418                    event_id: event.id.0,
1419                    attempt_count: 0,
1420                    status: DispatchStatus::Dlq,
1421                    handler_kind: route.kind().to_string(),
1422                    target_uri: route.target_uri(),
1423                    replay_of_event_id,
1424                    result: None,
1425                    error: Some(final_error),
1426                });
1427            }
1428        };
1429
1430        let mut previous_retry_node = None;
1431        let max_attempts = binding.retry.max_attempts();
1432        for attempt in 1..=max_attempts {
1433            if dispatch_cancel_requested(
1434                &self.event_log,
1435                &binding_key,
1436                &event.id.0,
1437                replay_of_event_id.as_ref(),
1438            )
1439            .await?
1440            {
1441                finish_in_flight(
1442                    binding.id.as_str(),
1443                    binding.version,
1444                    TriggerDispatchOutcome::Failed,
1445                )
1446                .await
1447                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1448                decrement_in_flight(&self.state);
1449                return Ok(cancelled_dispatch_outcome(
1450                    binding,
1451                    &route,
1452                    &event,
1453                    replay_of_event_id,
1454                    attempt.saturating_sub(1),
1455                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1456                ));
1457            }
1458            maybe_fail_before_outbox();
1459            let attempt_started_instant = Instant::now();
1460            let attempt_started_at_ms = current_unix_ms();
1461            let queue_age_at_start = duration_between_ms(
1462                attempt_started_at_ms,
1463                queue_appended_at_ms(parent_headers.as_ref(), &event),
1464            );
1465            if attempt == 1 {
1466                if let Some(metrics) = self.metrics.as_ref() {
1467                    metrics.record_trigger_queue_age_at_dispatch_start(
1468                        binding.id.as_str(),
1469                        &binding_key,
1470                        event.provider.as_str(),
1471                        tenant_id(&event),
1472                        "started",
1473                        queue_age_at_start,
1474                    );
1475                }
1476            }
1477            tracing::info!(
1478                component = "dispatcher",
1479                lifecycle = "dispatch_started",
1480                trigger_id = %binding.id.as_str(),
1481                binding_key = %binding_key,
1482                event_id = %event.id.0,
1483                attempt,
1484                queue_age_ms = queue_age_at_start.as_millis(),
1485                trace_id = %event.trace_id.0
1486            );
1487            let started_at = now_rfc3339();
1488            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1489            self.append_lifecycle_event(
1490                "DispatchStarted",
1491                &event,
1492                binding,
1493                serde_json::json!({
1494                    "event_id": event.id.0,
1495                    "attempt": attempt,
1496                    "handler_kind": route.kind(),
1497                    "target_uri": route.target_uri(),
1498                    "replay_of_event_id": replay_of_event_id,
1499                }),
1500                replay_of_event_id.as_ref(),
1501            )
1502            .await?;
1503            self.append_topic_event(
1504                TRIGGER_OUTBOX_TOPIC,
1505                "dispatch_started",
1506                &event,
1507                Some(binding),
1508                Some(attempt),
1509                serde_json::json!({
1510                    "event_id": event.id.0,
1511                    "attempt": attempt,
1512                    "trigger_id": binding.id.as_str(),
1513                    "binding_key": binding.binding_key(),
1514                    "handler_kind": route.kind(),
1515                    "target_uri": route.target_uri(),
1516                    "replay_of_event_id": replay_of_event_id,
1517                }),
1518                replay_of_event_id.as_ref(),
1519            )
1520            .await?;
1521
1522            let mut dispatch_edges = Vec::new();
1523            if attempt == 1 {
1524                dispatch_edges.push(RunActionGraphEdgeRecord {
1525                    from_id: source_node_id.clone(),
1526                    to_id: attempt_node_id.clone(),
1527                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1528                    label: binding.when.as_ref().map(|_| "true".to_string()),
1529                });
1530            } else if let Some(retry_node_id) = previous_retry_node.take() {
1531                dispatch_edges.push(RunActionGraphEdgeRecord {
1532                    from_id: retry_node_id,
1533                    to_id: attempt_node_id.clone(),
1534                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1535                    label: Some(format!("attempt {attempt}")),
1536                });
1537            }
1538
1539            self.emit_action_graph(
1540                &event,
1541                vec![RunActionGraphNodeRecord {
1542                    id: attempt_node_id.clone(),
1543                    label: dispatch_node_label(&route),
1544                    kind: dispatch_node_kind(&route).to_string(),
1545                    status: "running".to_string(),
1546                    outcome: format!("attempt_{attempt}"),
1547                    trace_id: Some(event.trace_id.0.clone()),
1548                    stage_id: None,
1549                    node_id: None,
1550                    worker_id: None,
1551                    run_id: None,
1552                    run_path: None,
1553                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1554                }],
1555                dispatch_edges,
1556                serde_json::json!({
1557                    "source": "dispatcher",
1558                    "trigger_id": binding.id.as_str(),
1559                    "binding_key": binding.binding_key(),
1560                    "event_id": event.id.0,
1561                    "attempt": attempt,
1562                    "handler_kind": route.kind(),
1563                    "target_uri": route.target_uri(),
1564                    "target_agent": dispatch_target_agent(&route),
1565                    "replay_of_event_id": replay_of_event_id,
1566                }),
1567            )
1568            .await?;
1569
1570            let result = self
1571                .dispatch_once(
1572                    binding,
1573                    &route,
1574                    &event,
1575                    autonomy_tier,
1576                    Some(DispatchWaitLease::new(
1577                        self.state.clone(),
1578                        acquired_flow.clone(),
1579                    )),
1580                    &mut self.cancel_tx.subscribe(),
1581                )
1582                .await;
1583            let attempt_runtime = attempt_started_instant.elapsed();
1584            let attempt_status = dispatch_result_status(&result);
1585            if let Some(metrics) = self.metrics.as_ref() {
1586                metrics.record_trigger_dispatch_runtime(
1587                    binding.id.as_str(),
1588                    &binding_key,
1589                    event.provider.as_str(),
1590                    tenant_id(&event),
1591                    attempt_status,
1592                    attempt_runtime,
1593                );
1594            }
1595            tracing::info!(
1596                component = "dispatcher",
1597                lifecycle = "handler_completed",
1598                trigger_id = %binding.id.as_str(),
1599                binding_key = %binding_key,
1600                event_id = %event.id.0,
1601                attempt,
1602                status = attempt_status,
1603                runtime_ms = attempt_runtime.as_millis(),
1604                trace_id = %event.trace_id.0
1605            );
1606            let completed_at = now_rfc3339();
1607
1608            match result {
1609                Ok(result) => {
1610                    let attempt_record = DispatchAttemptRecord {
1611                        trigger_id: binding.id.as_str().to_string(),
1612                        binding_key: binding.binding_key(),
1613                        event_id: event.id.0.clone(),
1614                        attempt,
1615                        handler_kind: route.kind().to_string(),
1616                        started_at,
1617                        completed_at,
1618                        outcome: "success".to_string(),
1619                        error_msg: None,
1620                    };
1621                    attempts.push(attempt_record.clone());
1622                    self.append_attempt_record(
1623                        &event,
1624                        binding,
1625                        &attempt_record,
1626                        replay_of_event_id.as_ref(),
1627                    )
1628                    .await?;
1629                    self.append_lifecycle_event(
1630                        "DispatchSucceeded",
1631                        &event,
1632                        binding,
1633                        serde_json::json!({
1634                            "event_id": event.id.0,
1635                            "attempt": attempt,
1636                            "handler_kind": route.kind(),
1637                            "target_uri": route.target_uri(),
1638                            "result": result,
1639                            "replay_of_event_id": replay_of_event_id,
1640                        }),
1641                        replay_of_event_id.as_ref(),
1642                    )
1643                    .await?;
1644                    self.append_topic_event(
1645                        TRIGGER_OUTBOX_TOPIC,
1646                        "dispatch_succeeded",
1647                        &event,
1648                        Some(binding),
1649                        Some(attempt),
1650                        serde_json::json!({
1651                            "event_id": event.id.0,
1652                            "attempt": attempt,
1653                            "trigger_id": binding.id.as_str(),
1654                            "binding_key": binding.binding_key(),
1655                            "handler_kind": route.kind(),
1656                            "target_uri": route.target_uri(),
1657                            "result": result,
1658                            "replay_of_event_id": replay_of_event_id,
1659                        }),
1660                        replay_of_event_id.as_ref(),
1661                    )
1662                    .await?;
1663                    self.emit_action_graph(
1664                        &event,
1665                        vec![RunActionGraphNodeRecord {
1666                            id: attempt_node_id.clone(),
1667                            label: dispatch_node_label(&route),
1668                            kind: dispatch_node_kind(&route).to_string(),
1669                            status: "completed".to_string(),
1670                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1671                            trace_id: Some(event.trace_id.0.clone()),
1672                            stage_id: None,
1673                            node_id: None,
1674                            worker_id: None,
1675                            run_id: None,
1676                            run_path: None,
1677                            metadata: dispatch_success_metadata(
1678                                &route, binding, &event, attempt, &result,
1679                            ),
1680                        }],
1681                        Vec::new(),
1682                        serde_json::json!({
1683                            "source": "dispatcher",
1684                            "trigger_id": binding.id.as_str(),
1685                            "binding_key": binding.binding_key(),
1686                            "event_id": event.id.0,
1687                            "attempt": attempt,
1688                            "handler_kind": route.kind(),
1689                            "target_uri": route.target_uri(),
1690                            "result": result,
1691                            "replay_of_event_id": replay_of_event_id,
1692                        }),
1693                    )
1694                    .await?;
1695                    self.state
1696                        .destination_circuits
1697                        .record_success(&destination_key);
1698                    if half_open_probe {
1699                        if let Some(metrics) = self.metrics.as_ref() {
1700                            metrics.record_backpressure_event("circuit", "closed");
1701                        }
1702                    }
1703                    finish_in_flight(
1704                        binding.id.as_str(),
1705                        binding.version,
1706                        TriggerDispatchOutcome::Dispatched,
1707                    )
1708                    .await
1709                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1710                    self.release_flow_control(&acquired_flow).await?;
1711                    decrement_in_flight(&self.state);
1712                    self.append_dispatch_trust_record(
1713                        binding,
1714                        &route,
1715                        &event,
1716                        replay_of_event_id.as_ref(),
1717                        autonomy_tier,
1718                        TrustOutcome::Success,
1719                        "succeeded",
1720                        attempt,
1721                        None,
1722                    )
1723                    .await?;
1724                    return Ok(DispatchOutcome {
1725                        trigger_id: binding.id.as_str().to_string(),
1726                        binding_key: binding.binding_key(),
1727                        event_id: event.id.0,
1728                        attempt_count: attempt,
1729                        status: DispatchStatus::Succeeded,
1730                        handler_kind: route.kind().to_string(),
1731                        target_uri: route.target_uri(),
1732                        replay_of_event_id,
1733                        result: Some(result),
1734                        error: None,
1735                    });
1736                }
1737                Err(error) => {
1738                    let attempt_record = DispatchAttemptRecord {
1739                        trigger_id: binding.id.as_str().to_string(),
1740                        binding_key: binding.binding_key(),
1741                        event_id: event.id.0.clone(),
1742                        attempt,
1743                        handler_kind: route.kind().to_string(),
1744                        started_at,
1745                        completed_at,
1746                        outcome: dispatch_error_label(&error).to_string(),
1747                        error_msg: Some(error.to_string()),
1748                    };
1749                    attempts.push(attempt_record.clone());
1750                    self.append_attempt_record(
1751                        &event,
1752                        binding,
1753                        &attempt_record,
1754                        replay_of_event_id.as_ref(),
1755                    )
1756                    .await?;
1757                    if let DispatchError::Waiting(message) = &error {
1758                        self.append_lifecycle_event(
1759                            "DispatchWaiting",
1760                            &event,
1761                            binding,
1762                            serde_json::json!({
1763                                "event_id": event.id.0,
1764                                "attempt": attempt,
1765                                "handler_kind": route.kind(),
1766                                "target_uri": route.target_uri(),
1767                                "message": message,
1768                                "replay_of_event_id": replay_of_event_id,
1769                            }),
1770                            replay_of_event_id.as_ref(),
1771                        )
1772                        .await?;
1773                        self.append_topic_event(
1774                            TRIGGER_OUTBOX_TOPIC,
1775                            "dispatch_waiting",
1776                            &event,
1777                            Some(binding),
1778                            Some(attempt),
1779                            serde_json::json!({
1780                                "event_id": event.id.0,
1781                                "attempt": attempt,
1782                                "trigger_id": binding.id.as_str(),
1783                                "binding_key": binding.binding_key(),
1784                                "handler_kind": route.kind(),
1785                                "target_uri": route.target_uri(),
1786                                "message": message,
1787                                "replay_of_event_id": replay_of_event_id,
1788                            }),
1789                            replay_of_event_id.as_ref(),
1790                        )
1791                        .await?;
1792                        finish_in_flight(
1793                            binding.id.as_str(),
1794                            binding.version,
1795                            TriggerDispatchOutcome::Dispatched,
1796                        )
1797                        .await
1798                        .map_err(|registry_error| {
1799                            DispatchError::Registry(registry_error.to_string())
1800                        })?;
1801                        self.release_flow_control(&acquired_flow).await?;
1802                        decrement_in_flight(&self.state);
1803                        return Ok(DispatchOutcome {
1804                            trigger_id: binding.id.as_str().to_string(),
1805                            binding_key: binding.binding_key(),
1806                            event_id: event.id.0,
1807                            attempt_count: attempt,
1808                            status: DispatchStatus::Waiting,
1809                            handler_kind: route.kind().to_string(),
1810                            target_uri: route.target_uri(),
1811                            replay_of_event_id,
1812                            result: Some(serde_json::json!({
1813                                "waiting": true,
1814                                "message": message,
1815                            })),
1816                            error: None,
1817                        });
1818                    }
1819
1820                    self.append_lifecycle_event(
1821                        "DispatchFailed",
1822                        &event,
1823                        binding,
1824                        serde_json::json!({
1825                            "event_id": event.id.0,
1826                            "attempt": attempt,
1827                            "handler_kind": route.kind(),
1828                            "target_uri": route.target_uri(),
1829                            "error": error.to_string(),
1830                            "replay_of_event_id": replay_of_event_id,
1831                        }),
1832                        replay_of_event_id.as_ref(),
1833                    )
1834                    .await?;
1835                    self.append_topic_event(
1836                        TRIGGER_OUTBOX_TOPIC,
1837                        "dispatch_failed",
1838                        &event,
1839                        Some(binding),
1840                        Some(attempt),
1841                        serde_json::json!({
1842                            "event_id": event.id.0,
1843                            "attempt": attempt,
1844                            "trigger_id": binding.id.as_str(),
1845                            "binding_key": binding.binding_key(),
1846                            "handler_kind": route.kind(),
1847                            "target_uri": route.target_uri(),
1848                            "error": error.to_string(),
1849                            "replay_of_event_id": replay_of_event_id,
1850                        }),
1851                        replay_of_event_id.as_ref(),
1852                    )
1853                    .await?;
1854                    self.emit_action_graph(
1855                        &event,
1856                        vec![RunActionGraphNodeRecord {
1857                            id: attempt_node_id.clone(),
1858                            label: dispatch_node_label(&route),
1859                            kind: dispatch_node_kind(&route).to_string(),
1860                            status: if matches!(error, DispatchError::Cancelled(_)) {
1861                                "cancelled".to_string()
1862                            } else {
1863                                "failed".to_string()
1864                            },
1865                            outcome: dispatch_error_label(&error).to_string(),
1866                            trace_id: Some(event.trace_id.0.clone()),
1867                            stage_id: None,
1868                            node_id: None,
1869                            worker_id: None,
1870                            run_id: None,
1871                            run_path: None,
1872                            metadata: dispatch_error_metadata(
1873                                &route, binding, &event, attempt, &error,
1874                            ),
1875                        }],
1876                        Vec::new(),
1877                        serde_json::json!({
1878                            "source": "dispatcher",
1879                            "trigger_id": binding.id.as_str(),
1880                            "binding_key": binding.binding_key(),
1881                            "event_id": event.id.0,
1882                            "attempt": attempt,
1883                            "handler_kind": route.kind(),
1884                            "target_uri": route.target_uri(),
1885                            "error": error.to_string(),
1886                            "replay_of_event_id": replay_of_event_id,
1887                        }),
1888                    )
1889                    .await?;
1890
1891                    let circuit_opened = if error.retryable() {
1892                        self.state
1893                            .destination_circuits
1894                            .record_failure(&destination_key)
1895                    } else {
1896                        false
1897                    };
1898                    if circuit_opened {
1899                        if let Some(metrics) = self.metrics.as_ref() {
1900                            metrics.record_backpressure_event("circuit", "opened");
1901                            metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1902                            metrics.record_trigger_accepted_to_dlq(
1903                                binding.id.as_str(),
1904                                &binding_key,
1905                                event.provider.as_str(),
1906                                tenant_id(&event),
1907                                "circuit_open",
1908                                duration_between_ms(
1909                                    current_unix_ms(),
1910                                    accepted_at_ms(parent_headers.as_ref(), &event),
1911                                ),
1912                            );
1913                        }
1914                        let final_error = format!(
1915                            "destination circuit opened for {} after {} consecutive failures: {}",
1916                            destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
1917                        );
1918                        let dlq_entry = DlqEntry {
1919                            trigger_id: binding.id.as_str().to_string(),
1920                            binding_key: binding.binding_key(),
1921                            event: event.clone(),
1922                            attempt_count: attempt,
1923                            final_error: final_error.clone(),
1924                            error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
1925                                .to_string(),
1926                            attempts: attempts.clone(),
1927                        };
1928                        self.state
1929                            .dlq
1930                            .lock()
1931                            .expect("dispatcher dlq poisoned")
1932                            .push(dlq_entry.clone());
1933                        self.append_lifecycle_event(
1934                            "DlqMoved",
1935                            &event,
1936                            binding,
1937                            serde_json::json!({
1938                                "event_id": event.id.0,
1939                                "attempt_count": attempt,
1940                                "final_error": dlq_entry.final_error,
1941                                "reason": "circuit_open",
1942                                "destination": destination_key,
1943                                "replay_of_event_id": replay_of_event_id,
1944                            }),
1945                            replay_of_event_id.as_ref(),
1946                        )
1947                        .await?;
1948                        self.append_topic_event(
1949                            TRIGGER_DLQ_TOPIC,
1950                            "dlq_moved",
1951                            &event,
1952                            Some(binding),
1953                            Some(attempt),
1954                            serde_json::to_value(&dlq_entry).map_err(|serde_error| {
1955                                DispatchError::Serde(serde_error.to_string())
1956                            })?,
1957                            replay_of_event_id.as_ref(),
1958                        )
1959                        .await?;
1960                        finish_in_flight(
1961                            binding.id.as_str(),
1962                            binding.version,
1963                            TriggerDispatchOutcome::Dlq,
1964                        )
1965                        .await
1966                        .map_err(|registry_error| {
1967                            DispatchError::Registry(registry_error.to_string())
1968                        })?;
1969                        self.release_flow_control(&acquired_flow).await?;
1970                        decrement_in_flight(&self.state);
1971                        self.append_dispatch_trust_record(
1972                            binding,
1973                            &route,
1974                            &event,
1975                            replay_of_event_id.as_ref(),
1976                            autonomy_tier,
1977                            TrustOutcome::Failure,
1978                            "dlq",
1979                            attempt,
1980                            Some(final_error.clone()),
1981                        )
1982                        .await?;
1983                        return Ok(DispatchOutcome {
1984                            trigger_id: binding.id.as_str().to_string(),
1985                            binding_key: binding.binding_key(),
1986                            event_id: event.id.0,
1987                            attempt_count: attempt,
1988                            status: DispatchStatus::Dlq,
1989                            handler_kind: route.kind().to_string(),
1990                            target_uri: route.target_uri(),
1991                            replay_of_event_id,
1992                            result: None,
1993                            error: Some(final_error),
1994                        });
1995                    }
1996
1997                    if !error.retryable() {
1998                        finish_in_flight(
1999                            binding.id.as_str(),
2000                            binding.version,
2001                            TriggerDispatchOutcome::Failed,
2002                        )
2003                        .await
2004                        .map_err(|registry_error| {
2005                            DispatchError::Registry(registry_error.to_string())
2006                        })?;
2007                        self.release_flow_control(&acquired_flow).await?;
2008                        decrement_in_flight(&self.state);
2009                        let trust_outcome = match error {
2010                            DispatchError::Denied(_) => TrustOutcome::Denied,
2011                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
2012                            _ => TrustOutcome::Failure,
2013                        };
2014                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2015                            "cancelled"
2016                        } else {
2017                            "failed"
2018                        };
2019                        self.append_dispatch_trust_record(
2020                            binding,
2021                            &route,
2022                            &event,
2023                            replay_of_event_id.as_ref(),
2024                            autonomy_tier,
2025                            trust_outcome,
2026                            terminal_status,
2027                            attempt,
2028                            Some(error.to_string()),
2029                        )
2030                        .await?;
2031                        return Ok(DispatchOutcome {
2032                            trigger_id: binding.id.as_str().to_string(),
2033                            binding_key: binding.binding_key(),
2034                            event_id: event.id.0,
2035                            attempt_count: attempt,
2036                            status: if matches!(error, DispatchError::Cancelled(_)) {
2037                                DispatchStatus::Cancelled
2038                            } else {
2039                                DispatchStatus::Failed
2040                            },
2041                            handler_kind: route.kind().to_string(),
2042                            target_uri: route.target_uri(),
2043                            replay_of_event_id,
2044                            result: None,
2045                            error: Some(error.to_string()),
2046                        });
2047                    }
2048
2049                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2050                        if let Some(metrics) = self.metrics.as_ref() {
2051                            metrics.record_retry_scheduled();
2052                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2053                            metrics.record_trigger_retry_delay(
2054                                binding.id.as_str(),
2055                                &binding_key,
2056                                event.provider.as_str(),
2057                                tenant_id(&event),
2058                                "scheduled",
2059                                delay,
2060                            );
2061                        }
2062                        tracing::info!(
2063                            component = "dispatcher",
2064                            lifecycle = "retry_scheduled",
2065                            trigger_id = %binding.id.as_str(),
2066                            binding_key = %binding_key,
2067                            event_id = %event.id.0,
2068                            attempt = attempt + 1,
2069                            delay_ms = delay.as_millis(),
2070                            trace_id = %event.trace_id.0
2071                        );
2072                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2073                        previous_retry_node = Some(retry_node_id.clone());
2074                        self.emit_action_graph(
2075                            &event,
2076                            vec![RunActionGraphNodeRecord {
2077                                id: retry_node_id.clone(),
2078                                label: format!("retry in {}ms", delay.as_millis()),
2079                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2080                                status: "scheduled".to_string(),
2081                                outcome: format!("attempt_{}", attempt + 1),
2082                                trace_id: Some(event.trace_id.0.clone()),
2083                                stage_id: None,
2084                                node_id: None,
2085                                worker_id: None,
2086                                run_id: None,
2087                                run_path: None,
2088                                metadata: retry_node_metadata(
2089                                    binding,
2090                                    &event,
2091                                    attempt + 1,
2092                                    delay,
2093                                    &error,
2094                                ),
2095                            }],
2096                            vec![RunActionGraphEdgeRecord {
2097                                from_id: attempt_node_id,
2098                                to_id: retry_node_id.clone(),
2099                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2100                                label: Some(format!("attempt {}", attempt + 1)),
2101                            }],
2102                            serde_json::json!({
2103                                "source": "dispatcher",
2104                                "trigger_id": binding.id.as_str(),
2105                                "binding_key": binding.binding_key(),
2106                                "event_id": event.id.0,
2107                                "attempt": attempt + 1,
2108                                "delay_ms": delay.as_millis(),
2109                                "replay_of_event_id": replay_of_event_id,
2110                            }),
2111                        )
2112                        .await?;
2113                        self.append_lifecycle_event(
2114                            "RetryScheduled",
2115                            &event,
2116                            binding,
2117                            serde_json::json!({
2118                                "event_id": event.id.0,
2119                                "attempt": attempt + 1,
2120                                "delay_ms": delay.as_millis(),
2121                                "error": error.to_string(),
2122                                "replay_of_event_id": replay_of_event_id,
2123                            }),
2124                            replay_of_event_id.as_ref(),
2125                        )
2126                        .await?;
2127                        self.append_topic_event(
2128                            TRIGGER_ATTEMPTS_TOPIC,
2129                            "retry_scheduled",
2130                            &event,
2131                            Some(binding),
2132                            Some(attempt + 1),
2133                            serde_json::json!({
2134                                "event_id": event.id.0,
2135                                "attempt": attempt + 1,
2136                                "trigger_id": binding.id.as_str(),
2137                                "binding_key": binding.binding_key(),
2138                                "delay_ms": delay.as_millis(),
2139                                "error": error.to_string(),
2140                                "replay_of_event_id": replay_of_event_id,
2141                            }),
2142                            replay_of_event_id.as_ref(),
2143                        )
2144                        .await?;
2145                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2146                        let sleep_result = sleep_or_cancel_or_request(
2147                            &self.event_log,
2148                            delay,
2149                            &binding_key,
2150                            &event.id.0,
2151                            replay_of_event_id.as_ref(),
2152                            &mut self.cancel_tx.subscribe(),
2153                        )
2154                        .await;
2155                        decrement_retry_queue_depth(&self.state);
2156                        if sleep_result.is_err() {
2157                            finish_in_flight(
2158                                binding.id.as_str(),
2159                                binding.version,
2160                                TriggerDispatchOutcome::Failed,
2161                            )
2162                            .await
2163                            .map_err(|registry_error| {
2164                                DispatchError::Registry(registry_error.to_string())
2165                            })?;
2166                            self.release_flow_control(&acquired_flow).await?;
2167                            decrement_in_flight(&self.state);
2168                            self.append_dispatch_trust_record(
2169                                binding,
2170                                &route,
2171                                &event,
2172                                replay_of_event_id.as_ref(),
2173                                autonomy_tier,
2174                                TrustOutcome::Failure,
2175                                "cancelled",
2176                                attempt,
2177                                Some("dispatcher shutdown cancelled retry wait".to_string()),
2178                            )
2179                            .await?;
2180                            return Ok(DispatchOutcome {
2181                                trigger_id: binding.id.as_str().to_string(),
2182                                binding_key: binding.binding_key(),
2183                                event_id: event.id.0,
2184                                attempt_count: attempt,
2185                                status: DispatchStatus::Cancelled,
2186                                handler_kind: route.kind().to_string(),
2187                                target_uri: route.target_uri(),
2188                                replay_of_event_id,
2189                                result: None,
2190                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2191                            });
2192                        }
2193                        continue;
2194                    }
2195
2196                    let final_error = error.to_string();
2197                    let dlq_entry = DlqEntry {
2198                        trigger_id: binding.id.as_str().to_string(),
2199                        binding_key: binding.binding_key(),
2200                        event: event.clone(),
2201                        attempt_count: attempt,
2202                        final_error: final_error.clone(),
2203                        error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2204                            .to_string(),
2205                        attempts: attempts.clone(),
2206                    };
2207                    self.state
2208                        .dlq
2209                        .lock()
2210                        .expect("dispatcher dlq poisoned")
2211                        .push(dlq_entry.clone());
2212                    if let Some(metrics) = self.metrics.as_ref() {
2213                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2214                        metrics.record_trigger_accepted_to_dlq(
2215                            binding.id.as_str(),
2216                            &binding_key,
2217                            event.provider.as_str(),
2218                            tenant_id(&event),
2219                            "retry_exhausted",
2220                            duration_between_ms(
2221                                current_unix_ms(),
2222                                accepted_at_ms(parent_headers.as_ref(), &event),
2223                            ),
2224                        );
2225                    }
2226                    tracing::info!(
2227                        component = "dispatcher",
2228                        lifecycle = "dlq_moved",
2229                        trigger_id = %binding.id.as_str(),
2230                        binding_key = %binding_key,
2231                        event_id = %event.id.0,
2232                        attempt_count = attempt,
2233                        reason = "retry_exhausted",
2234                        trace_id = %event.trace_id.0
2235                    );
2236                    self.emit_action_graph(
2237                        &event,
2238                        vec![RunActionGraphNodeRecord {
2239                            id: format!("dlq:{binding_key}:{}", event.id.0),
2240                            label: binding.id.as_str().to_string(),
2241                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2242                            status: "queued".to_string(),
2243                            outcome: "retry_exhausted".to_string(),
2244                            trace_id: Some(event.trace_id.0.clone()),
2245                            stage_id: None,
2246                            node_id: None,
2247                            worker_id: None,
2248                            run_id: None,
2249                            run_path: None,
2250                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2251                        }],
2252                        vec![RunActionGraphEdgeRecord {
2253                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2254                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
2255                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2256                            label: Some(format!("{attempt} attempts")),
2257                        }],
2258                        serde_json::json!({
2259                            "source": "dispatcher",
2260                            "trigger_id": binding.id.as_str(),
2261                            "binding_key": binding.binding_key(),
2262                            "event_id": event.id.0,
2263                            "attempt_count": attempt,
2264                            "final_error": final_error,
2265                            "replay_of_event_id": replay_of_event_id,
2266                        }),
2267                    )
2268                    .await?;
2269                    self.append_lifecycle_event(
2270                        "DlqMoved",
2271                        &event,
2272                        binding,
2273                        serde_json::json!({
2274                            "event_id": event.id.0,
2275                            "attempt_count": attempt,
2276                            "final_error": dlq_entry.final_error,
2277                            "replay_of_event_id": replay_of_event_id,
2278                        }),
2279                        replay_of_event_id.as_ref(),
2280                    )
2281                    .await?;
2282                    self.append_topic_event(
2283                        TRIGGER_DLQ_TOPIC,
2284                        "dlq_moved",
2285                        &event,
2286                        Some(binding),
2287                        Some(attempt),
2288                        serde_json::to_value(&dlq_entry)
2289                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2290                        replay_of_event_id.as_ref(),
2291                    )
2292                    .await?;
2293                    finish_in_flight(
2294                        binding.id.as_str(),
2295                        binding.version,
2296                        TriggerDispatchOutcome::Dlq,
2297                    )
2298                    .await
2299                    .map_err(|registry_error| {
2300                        DispatchError::Registry(registry_error.to_string())
2301                    })?;
2302                    self.release_flow_control(&acquired_flow).await?;
2303                    decrement_in_flight(&self.state);
2304                    self.append_dispatch_trust_record(
2305                        binding,
2306                        &route,
2307                        &event,
2308                        replay_of_event_id.as_ref(),
2309                        autonomy_tier,
2310                        TrustOutcome::Failure,
2311                        "dlq",
2312                        attempt,
2313                        Some(error.to_string()),
2314                    )
2315                    .await?;
2316                    return Ok(DispatchOutcome {
2317                        trigger_id: binding.id.as_str().to_string(),
2318                        binding_key: binding.binding_key(),
2319                        event_id: event.id.0,
2320                        attempt_count: attempt,
2321                        status: DispatchStatus::Dlq,
2322                        handler_kind: route.kind().to_string(),
2323                        target_uri: route.target_uri(),
2324                        replay_of_event_id,
2325                        result: None,
2326                        error: Some(error.to_string()),
2327                    });
2328                }
2329            }
2330        }
2331
2332        finish_in_flight(
2333            binding.id.as_str(),
2334            binding.version,
2335            TriggerDispatchOutcome::Failed,
2336        )
2337        .await
2338        .map_err(|error| DispatchError::Registry(error.to_string()))?;
2339        self.release_flow_control(&acquired_flow).await?;
2340        decrement_in_flight(&self.state);
2341        self.append_dispatch_trust_record(
2342            binding,
2343            &route,
2344            &event,
2345            replay_of_event_id.as_ref(),
2346            autonomy_tier,
2347            TrustOutcome::Failure,
2348            "failed",
2349            max_attempts,
2350            Some("dispatch exhausted without terminal outcome".to_string()),
2351        )
2352        .await?;
2353        Ok(DispatchOutcome {
2354            trigger_id: binding.id.as_str().to_string(),
2355            binding_key: binding.binding_key(),
2356            event_id: event.id.0,
2357            attempt_count: max_attempts,
2358            status: DispatchStatus::Failed,
2359            handler_kind: route.kind().to_string(),
2360            target_uri: route.target_uri(),
2361            replay_of_event_id,
2362            result: None,
2363            error: Some("dispatch exhausted without terminal outcome".to_string()),
2364        })
2365    }
2366
2367    async fn dispatch_once(
2368        &self,
2369        binding: &TriggerBinding,
2370        route: &DispatchUri,
2371        event: &TriggerEvent,
2372        autonomy_tier: AutonomyTier,
2373        wait_lease: Option<DispatchWaitLease>,
2374        cancel_rx: &mut broadcast::Receiver<()>,
2375    ) -> Result<serde_json::Value, DispatchError> {
2376        match route {
2377            DispatchUri::Local { .. } => {
2378                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2379                    return Err(DispatchError::Local(format!(
2380                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2381                        binding.id.as_str()
2382                    )));
2383                };
2384                let value = self
2385                    .invoke_vm_callable(
2386                        closure,
2387                        &binding.binding_key(),
2388                        event,
2389                        None,
2390                        binding.id.as_str(),
2391                        &format!("{}.{}", event.provider.as_str(), event.kind),
2392                        autonomy_tier,
2393                        wait_lease,
2394                        cancel_rx,
2395                    )
2396                    .await?;
2397                Ok(vm_value_to_json(&value))
2398            }
2399            DispatchUri::A2a {
2400                target,
2401                allow_cleartext,
2402            } => {
2403                if self.state.shutting_down.load(Ordering::SeqCst) {
2404                    return Err(DispatchError::Cancelled(
2405                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2406                    ));
2407                }
2408                let (_endpoint, ack) = self
2409                    .a2a_client
2410                    .dispatch(
2411                        target,
2412                        *allow_cleartext,
2413                        binding.id.as_str(),
2414                        &binding.binding_key(),
2415                        event,
2416                        cancel_rx,
2417                    )
2418                    .await
2419                    .map_err(|error| match error {
2420                        crate::a2a::A2aClientError::Cancelled(message) => {
2421                            DispatchError::Cancelled(message)
2422                        }
2423                        other => DispatchError::A2a(other.to_string()),
2424                    })?;
2425                match ack {
2426                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2427                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2428                }
2429            }
2430            DispatchUri::Worker { queue } => {
2431                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2432                    .enqueue(&crate::WorkerQueueJob {
2433                        queue: queue.clone(),
2434                        trigger_id: binding.id.as_str().to_string(),
2435                        binding_key: binding.binding_key(),
2436                        binding_version: binding.version,
2437                        event: event.clone(),
2438                        replay_of_event_id: current_dispatch_context()
2439                            .and_then(|context| context.replay_of_event_id),
2440                        priority: worker_queue_priority(binding, event),
2441                    })
2442                    .await
2443                    .map_err(DispatchError::from)?;
2444                Ok(serde_json::to_value(receipt)
2445                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2446            }
2447            DispatchUri::Persona { .. } => {
2448                let TriggerHandlerSpec::Persona {
2449                    binding: persona_binding,
2450                } = &binding.handler
2451                else {
2452                    return Err(DispatchError::Local(format!(
2453                        "trigger '{}' resolved to a persona dispatch URI but does not carry a persona binding",
2454                        binding.id.as_str()
2455                    )));
2456                };
2457                let receipt = crate::fire_persona_trigger(
2458                    &self.event_log,
2459                    persona_binding,
2460                    event.provider.as_str(),
2461                    &event.kind,
2462                    trigger_event_persona_metadata(event),
2463                    crate::PersonaRunCost::default(),
2464                    crate::persona_now_ms(),
2465                )
2466                .await
2467                .map_err(DispatchError::Local)?;
2468                Ok(serde_json::to_value(receipt)
2469                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2470            }
2471        }
2472    }
2473
2474    async fn apply_flow_control(
2475        &self,
2476        binding: &TriggerBinding,
2477        event: &TriggerEvent,
2478        replay_of_event_id: Option<&String>,
2479    ) -> Result<FlowControlOutcome, DispatchError> {
2480        let flow = &binding.flow_control;
2481        let mut managed_event = event.clone();
2482
2483        if let Some(batch) = &flow.batch {
2484            let gate = self
2485                .resolve_flow_gate(
2486                    &binding.binding_key(),
2487                    batch.key.as_ref(),
2488                    &managed_event,
2489                    replay_of_event_id,
2490                )
2491                .await?;
2492            match self
2493                .state
2494                .flow_control
2495                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2496                .await
2497                .map_err(DispatchError::from)?
2498            {
2499                BatchDecision::Dispatch(events) => {
2500                    managed_event = build_batched_event(events)?;
2501                }
2502                BatchDecision::Merged => {
2503                    return Ok(FlowControlOutcome::Skip {
2504                        reason: "batch_merged".to_string(),
2505                    })
2506                }
2507            }
2508        }
2509
2510        if let Some(debounce) = &flow.debounce {
2511            let gate = self
2512                .resolve_flow_gate(
2513                    &binding.binding_key(),
2514                    Some(&debounce.key),
2515                    &managed_event,
2516                    replay_of_event_id,
2517                )
2518                .await?;
2519            let latest = self
2520                .state
2521                .flow_control
2522                .debounce(&gate, debounce.period)
2523                .await
2524                .map_err(DispatchError::from)?;
2525            if !latest {
2526                return Ok(FlowControlOutcome::Skip {
2527                    reason: "debounced".to_string(),
2528                });
2529            }
2530        }
2531
2532        if let Some(rate_limit) = &flow.rate_limit {
2533            let gate = self
2534                .resolve_flow_gate(
2535                    &binding.binding_key(),
2536                    rate_limit.key.as_ref(),
2537                    &managed_event,
2538                    replay_of_event_id,
2539                )
2540                .await?;
2541            let allowed = self
2542                .state
2543                .flow_control
2544                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2545                .await
2546                .map_err(DispatchError::from)?;
2547            if !allowed {
2548                return Ok(FlowControlOutcome::Skip {
2549                    reason: "rate_limited".to_string(),
2550                });
2551            }
2552        }
2553
2554        if let Some(throttle) = &flow.throttle {
2555            let gate = self
2556                .resolve_flow_gate(
2557                    &binding.binding_key(),
2558                    throttle.key.as_ref(),
2559                    &managed_event,
2560                    replay_of_event_id,
2561                )
2562                .await?;
2563            self.state
2564                .flow_control
2565                .wait_for_throttle(&gate, throttle.period, throttle.max)
2566                .await
2567                .map_err(DispatchError::from)?;
2568        }
2569
2570        let mut acquired = AcquiredFlowControl::default();
2571        if let Some(singleton) = &flow.singleton {
2572            let gate = self
2573                .resolve_flow_gate(
2574                    &binding.binding_key(),
2575                    singleton.key.as_ref(),
2576                    &managed_event,
2577                    replay_of_event_id,
2578                )
2579                .await?;
2580            let acquired_singleton = self
2581                .state
2582                .flow_control
2583                .try_acquire_singleton(&gate)
2584                .await
2585                .map_err(DispatchError::from)?;
2586            if !acquired_singleton {
2587                return Ok(FlowControlOutcome::Skip {
2588                    reason: "singleton_active".to_string(),
2589                });
2590            }
2591            acquired.singleton = Some(SingletonLease { gate, held: true });
2592        }
2593
2594        if let Some(concurrency) = &flow.concurrency {
2595            let gate = self
2596                .resolve_flow_gate(
2597                    &binding.binding_key(),
2598                    concurrency.key.as_ref(),
2599                    &managed_event,
2600                    replay_of_event_id,
2601                )
2602                .await?;
2603            let priority_rank = self
2604                .resolve_priority_rank(
2605                    &binding.binding_key(),
2606                    flow.priority.as_ref(),
2607                    &managed_event,
2608                    replay_of_event_id,
2609                )
2610                .await?;
2611            let permit = self
2612                .state
2613                .flow_control
2614                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2615                .await
2616                .map_err(DispatchError::from)?;
2617            acquired.concurrency = Some(ConcurrencyLease {
2618                gate,
2619                max: concurrency.max,
2620                priority_rank,
2621                permit: Some(permit),
2622            });
2623        }
2624
2625        Ok(FlowControlOutcome::Dispatch {
2626            event: Box::new(managed_event),
2627            acquired,
2628        })
2629    }
2630
2631    async fn release_flow_control(
2632        &self,
2633        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2634    ) -> Result<(), DispatchError> {
2635        let (singleton_gate, concurrency_permit) = {
2636            let mut acquired = acquired.lock().await;
2637            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2638                if lease.held {
2639                    lease.held = false;
2640                    Some(lease.gate.clone())
2641                } else {
2642                    None
2643                }
2644            });
2645            let concurrency_permit = acquired
2646                .concurrency
2647                .as_mut()
2648                .and_then(|lease| lease.permit.take());
2649            (singleton_gate, concurrency_permit)
2650        };
2651        if let Some(gate) = singleton_gate {
2652            self.state
2653                .flow_control
2654                .release_singleton(&gate)
2655                .await
2656                .map_err(DispatchError::from)?;
2657        }
2658        if let Some(permit) = concurrency_permit {
2659            self.state
2660                .flow_control
2661                .release_concurrency(permit)
2662                .await
2663                .map_err(DispatchError::from)?;
2664        }
2665        Ok(())
2666    }
2667
2668    async fn resolve_flow_gate(
2669        &self,
2670        binding_key: &str,
2671        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2672        event: &TriggerEvent,
2673        replay_of_event_id: Option<&String>,
2674    ) -> Result<String, DispatchError> {
2675        let key = match expr {
2676            Some(expr) => {
2677                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2678                    .await?
2679            }
2680            None => "_global".to_string(),
2681        };
2682        Ok(format!("{binding_key}:{key}"))
2683    }
2684
2685    async fn resolve_priority_rank(
2686        &self,
2687        binding_key: &str,
2688        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2689        event: &TriggerEvent,
2690        replay_of_event_id: Option<&String>,
2691    ) -> Result<usize, DispatchError> {
2692        let Some(priority) = priority else {
2693            return Ok(0);
2694        };
2695        let value = self
2696            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2697            .await?;
2698        Ok(priority
2699            .order
2700            .iter()
2701            .position(|candidate| candidate == &value)
2702            .unwrap_or(priority.order.len()))
2703    }
2704
2705    async fn evaluate_flow_expression(
2706        &self,
2707        binding_key: &str,
2708        expr: &crate::triggers::TriggerExpressionSpec,
2709        event: &TriggerEvent,
2710        replay_of_event_id: Option<&String>,
2711    ) -> Result<String, DispatchError> {
2712        let value = self
2713            .invoke_vm_callable(
2714                &expr.closure,
2715                binding_key,
2716                event,
2717                replay_of_event_id,
2718                "",
2719                "flow_control",
2720                AutonomyTier::Suggest,
2721                None,
2722                &mut self.cancel_tx.subscribe(),
2723            )
2724            .await?;
2725        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2726    }
2727
2728    #[allow(clippy::too_many_arguments)]
2729    async fn invoke_vm_callable(
2730        &self,
2731        closure: &crate::value::VmClosure,
2732        binding_key: &str,
2733        event: &TriggerEvent,
2734        replay_of_event_id: Option<&String>,
2735        agent_id: &str,
2736        action: &str,
2737        autonomy_tier: AutonomyTier,
2738        wait_lease: Option<DispatchWaitLease>,
2739        cancel_rx: &mut broadcast::Receiver<()>,
2740    ) -> Result<VmValue, DispatchError> {
2741        let mut vm = self.base_vm.child_vm();
2742        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2743        if self.state.shutting_down.load(Ordering::SeqCst) {
2744            cancel_token.store(true, Ordering::SeqCst);
2745        }
2746        self.state
2747            .cancel_tokens
2748            .lock()
2749            .expect("dispatcher cancel tokens poisoned")
2750            .push(cancel_token.clone());
2751        vm.install_cancel_token(cancel_token.clone());
2752        let arg = event_to_handler_value(event)?;
2753        let args = [arg];
2754        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2755        let effective_policy = match crate::orchestration::current_execution_policy() {
2756            Some(parent) => parent
2757                .intersect(&tier_policy)
2758                .map_err(|error| DispatchError::Local(error.to_string()))?,
2759            None => tier_policy,
2760        };
2761        crate::orchestration::push_execution_policy(effective_policy);
2762        let _policy_guard = DispatchExecutionPolicyGuard;
2763        let future = vm.call_closure_pub(closure, &args);
2764        pin_mut!(future);
2765        let (binding_id, binding_version) = split_binding_key(binding_key);
2766        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2767            slot.borrow_mut().replace(DispatchContext {
2768                trigger_event: event.clone(),
2769                replay_of_event_id: replay_of_event_id.cloned(),
2770                binding_id,
2771                binding_version,
2772                agent_id: agent_id.to_string(),
2773                action: action.to_string(),
2774                autonomy_tier,
2775            })
2776        });
2777        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2778            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2779        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2780        crate::stdlib::hitl::reset_hitl_state();
2781        let mut poll = tokio::time::interval(Duration::from_millis(100));
2782        let result = loop {
2783            tokio::select! {
2784                result = &mut future => break result,
2785                _ = recv_cancel(cancel_rx) => {
2786                    cancel_token.store(true, Ordering::SeqCst);
2787                }
2788                _ = poll.tick() => {
2789                    if dispatch_cancel_requested(
2790                        &self.event_log,
2791                        binding_key,
2792                        &event.id.0,
2793                        replay_of_event_id,
2794                    )
2795                    .await? {
2796                        cancel_token.store(true, Ordering::SeqCst);
2797                    }
2798                }
2799            }
2800        };
2801        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2802            *slot.borrow_mut() = prior_context;
2803        });
2804        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2805            *slot.borrow_mut() = prior_wait_lease;
2806        });
2807        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2808        {
2809            let mut tokens = self
2810                .state
2811                .cancel_tokens
2812                .lock()
2813                .expect("dispatcher cancel tokens poisoned");
2814            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2815        }
2816
2817        if cancel_token.load(Ordering::SeqCst) {
2818            if dispatch_cancel_requested(
2819                &self.event_log,
2820                binding_key,
2821                &event.id.0,
2822                replay_of_event_id,
2823            )
2824            .await?
2825            {
2826                Err(DispatchError::Cancelled(
2827                    "trigger cancel request cancelled local handler".to_string(),
2828                ))
2829            } else {
2830                Err(DispatchError::Cancelled(
2831                    "dispatcher shutdown cancelled local handler".to_string(),
2832                ))
2833            }
2834        } else {
2835            result.map_err(dispatch_error_from_vm_error)
2836        }
2837    }
2838
2839    #[allow(clippy::too_many_arguments)]
2840    async fn invoke_vm_callable_with_timeout(
2841        &self,
2842        closure: &crate::value::VmClosure,
2843        binding_key: &str,
2844        event: &TriggerEvent,
2845        replay_of_event_id: Option<&String>,
2846        agent_id: &str,
2847        action: &str,
2848        autonomy_tier: AutonomyTier,
2849        cancel_rx: &mut broadcast::Receiver<()>,
2850        timeout: Option<Duration>,
2851    ) -> Result<VmValue, DispatchError> {
2852        let future = self.invoke_vm_callable(
2853            closure,
2854            binding_key,
2855            event,
2856            replay_of_event_id,
2857            agent_id,
2858            action,
2859            autonomy_tier,
2860            None,
2861            cancel_rx,
2862        );
2863        pin_mut!(future);
2864        if let Some(timeout) = timeout {
2865            match tokio::time::timeout(timeout, future).await {
2866                Ok(result) => result,
2867                Err(_) => Err(DispatchError::Local(
2868                    "predicate evaluation timed out".to_string(),
2869                )),
2870            }
2871        } else {
2872            future.await
2873        }
2874    }
2875
2876    #[allow(clippy::too_many_arguments)]
2877    async fn append_dispatch_trust_record(
2878        &self,
2879        binding: &TriggerBinding,
2880        route: &DispatchUri,
2881        event: &TriggerEvent,
2882        replay_of_event_id: Option<&String>,
2883        autonomy_tier: AutonomyTier,
2884        outcome: TrustOutcome,
2885        terminal_status: &str,
2886        attempt_count: u32,
2887        error: Option<String>,
2888    ) -> Result<(), DispatchError> {
2889        let mut record = TrustRecord::new(
2890            binding.id.as_str().to_string(),
2891            format!("{}.{}", event.provider.as_str(), event.kind),
2892            None,
2893            outcome,
2894            event.trace_id.0.clone(),
2895            autonomy_tier,
2896        );
2897        record.metadata.insert(
2898            "binding_key".to_string(),
2899            serde_json::json!(binding.binding_key()),
2900        );
2901        record.metadata.insert(
2902            "binding_version".to_string(),
2903            serde_json::json!(binding.version),
2904        );
2905        record.metadata.insert(
2906            "provider".to_string(),
2907            serde_json::json!(event.provider.as_str()),
2908        );
2909        record
2910            .metadata
2911            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2912        record
2913            .metadata
2914            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2915        record.metadata.insert(
2916            "target_uri".to_string(),
2917            serde_json::json!(route.target_uri()),
2918        );
2919        record.metadata.insert(
2920            "terminal_status".to_string(),
2921            serde_json::json!(terminal_status),
2922        );
2923        record.metadata.insert(
2924            "attempt_count".to_string(),
2925            serde_json::json!(attempt_count),
2926        );
2927        if let Some(replay_of_event_id) = replay_of_event_id {
2928            record.metadata.insert(
2929                "replay_of_event_id".to_string(),
2930                serde_json::json!(replay_of_event_id),
2931            );
2932        }
2933        if let Some(error) = error {
2934            record
2935                .metadata
2936                .insert("error".to_string(), serde_json::json!(error));
2937        }
2938        append_trust_record(&self.event_log, &record)
2939            .await
2940            .map(|_| ())
2941            .map_err(DispatchError::from)
2942    }
2943
2944    async fn append_attempt_record(
2945        &self,
2946        event: &TriggerEvent,
2947        binding: &TriggerBinding,
2948        attempt: &DispatchAttemptRecord,
2949        replay_of_event_id: Option<&String>,
2950    ) -> Result<(), DispatchError> {
2951        self.append_topic_event(
2952            TRIGGER_ATTEMPTS_TOPIC,
2953            "attempt_recorded",
2954            event,
2955            Some(binding),
2956            Some(attempt.attempt),
2957            serde_json::to_value(attempt)
2958                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2959            replay_of_event_id,
2960        )
2961        .await
2962    }
2963
2964    async fn append_lifecycle_event(
2965        &self,
2966        kind: &str,
2967        event: &TriggerEvent,
2968        binding: &TriggerBinding,
2969        payload: serde_json::Value,
2970        replay_of_event_id: Option<&String>,
2971    ) -> Result<(), DispatchError> {
2972        self.append_topic_event(
2973            TRIGGERS_LIFECYCLE_TOPIC,
2974            kind,
2975            event,
2976            Some(binding),
2977            None,
2978            payload,
2979            replay_of_event_id,
2980        )
2981        .await
2982    }
2983
2984    async fn append_autonomy_budget_approval_request(
2985        &self,
2986        binding: &TriggerBinding,
2987        route: &DispatchUri,
2988        event: &TriggerEvent,
2989        replay_of_event_id: Option<&String>,
2990        reason: &str,
2991    ) -> Result<String, DispatchError> {
2992        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2993        let detail = serde_json::json!({
2994            "trigger_id": binding.id.as_str(),
2995            "binding_key": binding.binding_key(),
2996            "event_id": event.id.0,
2997            "reason": reason,
2998            "from_tier": AutonomyTier::ActAuto.as_str(),
2999            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3000            "handler_kind": route.kind(),
3001            "target_uri": route.target_uri(),
3002            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3003            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3004            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3005            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3006            "replay_of_event_id": replay_of_event_id,
3007        });
3008        let request_id = crate::stdlib::hitl::append_approval_request_on(
3009            &self.event_log,
3010            binding.id.as_str().to_string(),
3011            event.trace_id.0.clone(),
3012            format!(
3013                "approve autonomous dispatch for trigger '{}' after {}",
3014                binding.id.as_str(),
3015                reason
3016            ),
3017            detail.clone(),
3018            reviewers.clone(),
3019        )
3020        .await
3021        .map_err(dispatch_error_from_vm_error)?;
3022        self.append_lifecycle_event(
3023            "autonomy.budget_exceeded",
3024            event,
3025            binding,
3026            serde_json::json!({
3027                "trigger_id": binding.id.as_str(),
3028                "event_id": event.id.0,
3029                "reason": reason,
3030                "request_id": request_id,
3031                "reviewers": reviewers,
3032                "from_tier": AutonomyTier::ActAuto.as_str(),
3033                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3034                "replay_of_event_id": replay_of_event_id,
3035            }),
3036            replay_of_event_id,
3037        )
3038        .await?;
3039        Ok(request_id)
3040    }
3041
3042    #[allow(clippy::too_many_arguments)]
3043    async fn emit_autonomy_budget_approval_action_graph(
3044        &self,
3045        binding: &TriggerBinding,
3046        route: &DispatchUri,
3047        event: &TriggerEvent,
3048        source_node_id: &str,
3049        replay_of_event_id: Option<&String>,
3050        reason: &str,
3051        request_id: &str,
3052    ) -> Result<(), DispatchError> {
3053        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3054        self.emit_action_graph(
3055            event,
3056            vec![RunActionGraphNodeRecord {
3057                id: approval_node_id.clone(),
3058                label: format!("approval required: {reason}"),
3059                kind: "approval".to_string(),
3060                status: "waiting".to_string(),
3061                outcome: "request_approval".to_string(),
3062                trace_id: Some(event.trace_id.0.clone()),
3063                stage_id: None,
3064                node_id: None,
3065                worker_id: None,
3066                run_id: None,
3067                run_path: None,
3068                metadata: BTreeMap::from([
3069                    (
3070                        "trigger_id".to_string(),
3071                        serde_json::json!(binding.id.as_str()),
3072                    ),
3073                    (
3074                        "binding_key".to_string(),
3075                        serde_json::json!(binding.binding_key()),
3076                    ),
3077                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3078                    ("reason".to_string(), serde_json::json!(reason)),
3079                    ("request_id".to_string(), serde_json::json!(request_id)),
3080                    (
3081                        "reviewers".to_string(),
3082                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3083                    ),
3084                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3085                    (
3086                        "target_uri".to_string(),
3087                        serde_json::json!(route.target_uri()),
3088                    ),
3089                    (
3090                        "from_tier".to_string(),
3091                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3092                    ),
3093                    (
3094                        "requested_tier".to_string(),
3095                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3096                    ),
3097                    (
3098                        "replay_of_event_id".to_string(),
3099                        serde_json::json!(replay_of_event_id),
3100                    ),
3101                ]),
3102            }],
3103            vec![RunActionGraphEdgeRecord {
3104                from_id: source_node_id.to_string(),
3105                to_id: approval_node_id,
3106                kind: "approval_gate".to_string(),
3107                label: Some("autonomy budget".to_string()),
3108            }],
3109            serde_json::json!({
3110                "source": "dispatcher",
3111                "trigger_id": binding.id.as_str(),
3112                "binding_key": binding.binding_key(),
3113                "event_id": event.id.0,
3114                "reason": reason,
3115                "request_id": request_id,
3116                "replay_of_event_id": replay_of_event_id,
3117            }),
3118        )
3119        .await
3120    }
3121
3122    #[allow(clippy::too_many_arguments)]
3123    async fn append_tier_transition_trust_record(
3124        &self,
3125        binding: &TriggerBinding,
3126        event: &TriggerEvent,
3127        replay_of_event_id: Option<&String>,
3128        from_tier: AutonomyTier,
3129        to_tier: AutonomyTier,
3130        reason: &str,
3131        request_id: &str,
3132    ) -> Result<(), DispatchError> {
3133        let mut record = TrustRecord::new(
3134            binding.id.as_str().to_string(),
3135            "autonomy.tier_transition",
3136            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3137            TrustOutcome::Denied,
3138            event.trace_id.0.clone(),
3139            to_tier,
3140        );
3141        record.metadata.insert(
3142            "binding_key".to_string(),
3143            serde_json::json!(binding.binding_key()),
3144        );
3145        record
3146            .metadata
3147            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3148        record.metadata.insert(
3149            "from_tier".to_string(),
3150            serde_json::json!(from_tier.as_str()),
3151        );
3152        record
3153            .metadata
3154            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3155        record
3156            .metadata
3157            .insert("reason".to_string(), serde_json::json!(reason));
3158        record
3159            .metadata
3160            .insert("request_id".to_string(), serde_json::json!(request_id));
3161        if let Some(replay_of_event_id) = replay_of_event_id {
3162            record.metadata.insert(
3163                "replay_of_event_id".to_string(),
3164                serde_json::json!(replay_of_event_id),
3165            );
3166        }
3167        append_trust_record(&self.event_log, &record)
3168            .await
3169            .map(|_| ())
3170            .map_err(DispatchError::from)
3171    }
3172
3173    async fn append_budget_deferred_event(
3174        &self,
3175        binding: &TriggerBinding,
3176        route: &DispatchUri,
3177        event: &TriggerEvent,
3178        replay_of_event_id: Option<&String>,
3179        reason: &str,
3180    ) -> Result<(), DispatchError> {
3181        self.append_topic_event(
3182            TRIGGER_ATTEMPTS_TOPIC,
3183            "budget_deferred",
3184            event,
3185            Some(binding),
3186            None,
3187            serde_json::json!({
3188                "event_id": event.id.0,
3189                "trigger_id": binding.id.as_str(),
3190                "binding_key": binding.binding_key(),
3191                "handler_kind": route.kind(),
3192                "target_uri": route.target_uri(),
3193                "reason": reason,
3194                "retry_at": next_budget_reset_rfc3339(binding),
3195                "replay_of_event_id": replay_of_event_id,
3196            }),
3197            replay_of_event_id,
3198        )
3199        .await?;
3200        self.append_skipped_outbox_event(
3201            binding,
3202            route,
3203            event,
3204            replay_of_event_id,
3205            DispatchSkipStage::Predicate,
3206            serde_json::json!({
3207                "deferred": true,
3208                "reason": reason,
3209                "retry_at": next_budget_reset_rfc3339(binding),
3210            }),
3211        )
3212        .await
3213    }
3214
3215    async fn append_skipped_outbox_event(
3216        &self,
3217        binding: &TriggerBinding,
3218        route: &DispatchUri,
3219        event: &TriggerEvent,
3220        replay_of_event_id: Option<&String>,
3221        stage: DispatchSkipStage,
3222        detail: serde_json::Value,
3223    ) -> Result<(), DispatchError> {
3224        self.append_topic_event(
3225            TRIGGER_OUTBOX_TOPIC,
3226            "dispatch_skipped",
3227            event,
3228            Some(binding),
3229            None,
3230            serde_json::json!({
3231                "event_id": event.id.0,
3232                "trigger_id": binding.id.as_str(),
3233                "binding_key": binding.binding_key(),
3234                "handler_kind": route.kind(),
3235                "target_uri": route.target_uri(),
3236                "skip_stage": stage.as_str(),
3237                "detail": detail,
3238                "replay_of_event_id": replay_of_event_id,
3239            }),
3240            replay_of_event_id,
3241        )
3242        .await
3243    }
3244
3245    async fn append_topic_event(
3246        &self,
3247        topic_name: &str,
3248        kind: &str,
3249        event: &TriggerEvent,
3250        binding: Option<&TriggerBinding>,
3251        attempt: Option<u32>,
3252        payload: serde_json::Value,
3253        replay_of_event_id: Option<&String>,
3254    ) -> Result<(), DispatchError> {
3255        let topic = topic_for_event(event, topic_name)?;
3256        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3257        self.event_log
3258            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3259            .await
3260            .map_err(DispatchError::from)
3261            .map(|_| ())
3262    }
3263
3264    async fn emit_action_graph(
3265        &self,
3266        event: &TriggerEvent,
3267        nodes: Vec<RunActionGraphNodeRecord>,
3268        edges: Vec<RunActionGraphEdgeRecord>,
3269        extra: serde_json::Value,
3270    ) -> Result<(), DispatchError> {
3271        let mut headers = BTreeMap::new();
3272        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3273        headers.insert("event_id".to_string(), event.id.0.clone());
3274        let observability = RunObservabilityRecord {
3275            schema_version: 1,
3276            action_graph_nodes: nodes,
3277            action_graph_edges: edges,
3278            ..Default::default()
3279        };
3280        append_action_graph_update(
3281            headers,
3282            serde_json::json!({
3283                "source": "dispatcher",
3284                "trace_id": event.trace_id.0,
3285                "event_id": event.id.0,
3286                "observability": observability,
3287                "context": extra,
3288            }),
3289        )
3290        .await
3291        .map_err(DispatchError::from)
3292    }
3293}
3294
3295async fn dispatch_cancel_requested(
3296    event_log: &Arc<AnyEventLog>,
3297    binding_key: &str,
3298    event_id: &str,
3299    replay_of_event_id: Option<&String>,
3300) -> Result<bool, DispatchError> {
3301    if replay_of_event_id.is_some() {
3302        return Ok(false);
3303    }
3304    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3305        .expect("static trigger cancel topic should always be valid");
3306    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3307    let requested = events
3308        .into_iter()
3309        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3310        .filter_map(|(_, event)| {
3311            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3312        })
3313        .collect::<BTreeSet<_>>();
3314    Ok(requested
3315        .iter()
3316        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3317}
3318
3319async fn sleep_or_cancel_or_request(
3320    event_log: &Arc<AnyEventLog>,
3321    delay: Duration,
3322    binding_key: &str,
3323    event_id: &str,
3324    replay_of_event_id: Option<&String>,
3325    cancel_rx: &mut broadcast::Receiver<()>,
3326) -> Result<(), DispatchError> {
3327    let sleep = tokio::time::sleep(delay);
3328    pin_mut!(sleep);
3329    let mut poll = tokio::time::interval(Duration::from_millis(100));
3330    loop {
3331        tokio::select! {
3332            _ = &mut sleep => return Ok(()),
3333            _ = recv_cancel(cancel_rx) => {
3334                return Err(DispatchError::Cancelled(
3335                    "dispatcher shutdown cancelled retry wait".to_string(),
3336                ));
3337            }
3338            _ = poll.tick() => {
3339                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3340                    return Err(DispatchError::Cancelled(
3341                        "trigger cancel request cancelled retry wait".to_string(),
3342                    ));
3343                }
3344            }
3345        }
3346    }
3347}
3348
3349fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3350    let mut iter = events.into_iter();
3351    let Some(mut root) = iter.next() else {
3352        return Err(DispatchError::Registry(
3353            "batch dispatch produced an empty event list".to_string(),
3354        ));
3355    };
3356    let mut batch = Vec::new();
3357    batch.push(
3358        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3359    );
3360    for event in iter {
3361        batch.push(
3362            serde_json::to_value(&event)
3363                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3364        );
3365    }
3366    root.batch = Some(batch);
3367    Ok(root)
3368}
3369
3370fn json_value_to_gate(value: &serde_json::Value) -> String {
3371    match value {
3372        serde_json::Value::Null => "null".to_string(),
3373        serde_json::Value::String(text) => text.clone(),
3374        serde_json::Value::Bool(value) => value.to_string(),
3375        serde_json::Value::Number(value) => value.to_string(),
3376        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3377    }
3378}
3379
3380fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3381    let json =
3382        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3383    let value = json_to_vm_value(&json);
3384    match (&event.raw_body, value) {
3385        (Some(raw_body), VmValue::Dict(dict)) => {
3386            let mut map = (*dict).clone();
3387            map.insert(
3388                "raw_body".to_string(),
3389                VmValue::Bytes(Rc::new(raw_body.clone())),
3390            );
3391            Ok(VmValue::Dict(Rc::new(map)))
3392        }
3393        (_, other) => Ok(other),
3394    }
3395}
3396
3397fn decrement_in_flight(state: &DispatcherRuntimeState) {
3398    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3399    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3400        state.idle_notify.notify_waiters();
3401    }
3402}
3403
3404fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3405    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3406    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3407        state.idle_notify.notify_waiters();
3408    }
3409}
3410
3411#[cfg(test)]
3412fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3413    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3414        *slot.borrow_mut() = Some(tx);
3415    });
3416}
3417
3418#[cfg(not(test))]
3419fn notify_test_inbox_dequeued() {}
3420
3421#[cfg(test)]
3422fn notify_test_inbox_dequeued() {
3423    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3424        if let Some(tx) = slot.borrow_mut().take() {
3425            let _ = tx.send(());
3426        }
3427    });
3428}
3429
3430pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3431    event_log: &L,
3432    event: &TriggerEvent,
3433) -> Result<u64, DispatchError> {
3434    let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
3435    let headers = event_headers(event, None, None, None);
3436    let payload =
3437        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3438    event_log
3439        .append(
3440            &topic,
3441            LogEvent::new("event_ingested", payload).with_headers(headers),
3442        )
3443        .await
3444        .map_err(DispatchError::from)
3445}
3446
3447pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3448    ACTIVE_DISPATCHER_STATE.with(|slot| {
3449        slot.borrow()
3450            .as_ref()
3451            .map(|state| DispatcherStatsSnapshot {
3452                in_flight: state.in_flight.load(Ordering::Relaxed),
3453                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3454                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3455            })
3456            .unwrap_or_default()
3457    })
3458}
3459
3460pub fn clear_dispatcher_state() {
3461    ACTIVE_DISPATCHER_STATE.with(|slot| {
3462        *slot.borrow_mut() = None;
3463    });
3464    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3465        *slot.borrow_mut() = None;
3466    });
3467}
3468
3469fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3470    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3471        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3472    }
3473    if is_cancelled_vm_error(&error) {
3474        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3475    }
3476    if let VmError::Thrown(VmValue::String(message)) = &error {
3477        return DispatchError::Local(message.to_string());
3478    }
3479    match error_to_category(&error) {
3480        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3481        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3482        ErrorCategory::Cancelled => {
3483            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3484        }
3485        _ => DispatchError::Local(error.to_string()),
3486    }
3487}
3488
3489fn dispatch_error_label(error: &DispatchError) -> &'static str {
3490    match error {
3491        DispatchError::Denied(_) => "denied",
3492        DispatchError::Timeout(_) => "timeout",
3493        DispatchError::Waiting(_) => "waiting",
3494        DispatchError::Cancelled(_) => "cancelled",
3495        _ => "failed",
3496    }
3497}
3498
3499fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3500    match route {
3501        DispatchUri::Worker { .. } => "enqueued",
3502        DispatchUri::Persona { .. } => "recorded",
3503        DispatchUri::A2a { .. }
3504            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3505        {
3506            "pending"
3507        }
3508        DispatchUri::A2a { .. } => "completed",
3509        DispatchUri::Local { .. } => "success",
3510    }
3511}
3512
3513fn dispatch_node_id(
3514    route: &DispatchUri,
3515    binding_key: &str,
3516    event_id: &str,
3517    attempt: u32,
3518) -> String {
3519    let prefix = match route {
3520        DispatchUri::A2a { .. } => "a2a",
3521        _ => "dispatch",
3522    };
3523    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3524}
3525
3526fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3527    match route {
3528        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3529        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3530        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3531    }
3532}
3533
3534fn dispatch_node_label(route: &DispatchUri) -> String {
3535    match route {
3536        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3537        _ => route.target_uri(),
3538    }
3539}
3540
3541fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3542    match route {
3543        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3544        _ => None,
3545    }
3546}
3547
3548fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3549    match route {
3550        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3551        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3552        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3553    }
3554}
3555
3556fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3557    match status {
3558        crate::triggers::SignatureStatus::Verified => "verified",
3559        crate::triggers::SignatureStatus::Unsigned => "unsigned",
3560        crate::triggers::SignatureStatus::Failed { .. } => "failed",
3561    }
3562}
3563
3564fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3565    let mut metadata = BTreeMap::new();
3566    metadata.insert(
3567        "provider".to_string(),
3568        serde_json::json!(event.provider.as_str()),
3569    );
3570    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3571    metadata.insert(
3572        "dedupe_key".to_string(),
3573        serde_json::json!(event.dedupe_key),
3574    );
3575    metadata.insert(
3576        "signature_status".to_string(),
3577        serde_json::json!(signature_status_label(&event.signature_status)),
3578    );
3579    metadata
3580}
3581
3582fn trigger_event_persona_metadata(event: &TriggerEvent) -> BTreeMap<String, String> {
3583    let mut metadata = BTreeMap::new();
3584    metadata.insert("trigger_event_id".to_string(), event.id.0.clone());
3585    metadata.insert("event_id".to_string(), event.id.0.clone());
3586    metadata.insert("dedupe_key".to_string(), event.dedupe_key.clone());
3587    metadata.insert("trace_id".to_string(), event.trace_id.0.clone());
3588    if let Some(tenant_id) = &event.tenant_id {
3589        metadata.insert("tenant_id".to_string(), tenant_id.0.clone());
3590    }
3591    for (key, value) in &event.headers {
3592        metadata.insert(format!("header.{key}"), value.clone());
3593    }
3594    if let Ok(payload) = serde_json::to_value(&event.provider_payload) {
3595        collect_persona_payload_metadata("", &payload, &mut metadata);
3596        if let Some(raw) = payload.get("raw") {
3597            collect_persona_payload_metadata("", raw, &mut metadata);
3598        }
3599    }
3600    metadata
3601}
3602
3603fn collect_persona_payload_metadata(
3604    prefix: &str,
3605    value: &serde_json::Value,
3606    metadata: &mut BTreeMap<String, String>,
3607) {
3608    let serde_json::Value::Object(object) = value else {
3609        return;
3610    };
3611    for (key, value) in object {
3612        let path = if prefix.is_empty() {
3613            key.clone()
3614        } else {
3615            format!("{prefix}.{key}")
3616        };
3617        match value {
3618            serde_json::Value::String(text) => {
3619                metadata.insert(path, text.clone());
3620            }
3621            serde_json::Value::Number(number) => {
3622                metadata.insert(path, number.to_string());
3623            }
3624            serde_json::Value::Bool(flag) => {
3625                metadata.insert(path, flag.to_string());
3626            }
3627            serde_json::Value::Object(_) if prefix.is_empty() => {
3628                collect_persona_payload_metadata(&path, value, metadata);
3629            }
3630            _ => {}
3631        }
3632    }
3633}
3634
3635fn dispatch_node_metadata(
3636    route: &DispatchUri,
3637    binding: &TriggerBinding,
3638    event: &TriggerEvent,
3639    attempt: u32,
3640) -> BTreeMap<String, serde_json::Value> {
3641    let mut metadata = BTreeMap::new();
3642    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3643    metadata.insert(
3644        "target_uri".to_string(),
3645        serde_json::json!(route.target_uri()),
3646    );
3647    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3648    metadata.insert(
3649        "trigger_id".to_string(),
3650        serde_json::json!(binding.id.as_str()),
3651    );
3652    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3653    if let Some(target_agent) = dispatch_target_agent(route) {
3654        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3655    }
3656    if let DispatchUri::Worker { queue } = route {
3657        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3658    }
3659    if let DispatchUri::Persona { name } = route {
3660        metadata.insert("persona".to_string(), serde_json::json!(name));
3661    }
3662    metadata
3663}
3664
3665fn dispatch_success_metadata(
3666    route: &DispatchUri,
3667    binding: &TriggerBinding,
3668    event: &TriggerEvent,
3669    attempt: u32,
3670    result: &serde_json::Value,
3671) -> BTreeMap<String, serde_json::Value> {
3672    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3673    match route {
3674        DispatchUri::A2a { .. } => {
3675            if let Some(task_id) = result
3676                .get("task_id")
3677                .or_else(|| result.get("id"))
3678                .and_then(|value| value.as_str())
3679            {
3680                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3681            }
3682            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3683                metadata.insert("state".to_string(), serde_json::json!(state));
3684            }
3685        }
3686        DispatchUri::Worker { .. } => {
3687            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3688            {
3689                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3690            }
3691            if let Some(response_topic) = result
3692                .get("response_topic")
3693                .and_then(|value| value.as_str())
3694            {
3695                metadata.insert(
3696                    "response_topic".to_string(),
3697                    serde_json::json!(response_topic),
3698                );
3699            }
3700        }
3701        DispatchUri::Persona { .. } => {
3702            if let Some(receipt_id) = result.get("receipt_id").and_then(|value| value.as_str()) {
3703                metadata.insert("receipt_id".to_string(), serde_json::json!(receipt_id));
3704            }
3705            if let Some(status) = result.get("status").and_then(|value| value.as_str()) {
3706                metadata.insert("status".to_string(), serde_json::json!(status));
3707            }
3708        }
3709        DispatchUri::Local { .. } => {}
3710    }
3711    metadata
3712}
3713
3714fn dispatch_error_metadata(
3715    route: &DispatchUri,
3716    binding: &TriggerBinding,
3717    event: &TriggerEvent,
3718    attempt: u32,
3719    error: &DispatchError,
3720) -> BTreeMap<String, serde_json::Value> {
3721    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3722    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3723    metadata
3724}
3725
3726fn retry_node_metadata(
3727    binding: &TriggerBinding,
3728    event: &TriggerEvent,
3729    attempt: u32,
3730    delay: Duration,
3731    error: &DispatchError,
3732) -> BTreeMap<String, serde_json::Value> {
3733    let mut metadata = BTreeMap::new();
3734    metadata.insert(
3735        "trigger_id".to_string(),
3736        serde_json::json!(binding.id.as_str()),
3737    );
3738    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3739    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3740    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3741    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3742    metadata
3743}
3744
3745fn split_binding_key(binding_key: &str) -> (String, u32) {
3746    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3747        return (binding_key.to_string(), 0);
3748    };
3749    let version = suffix.parse::<u32>().unwrap_or(0);
3750    (binding_id.to_string(), version)
3751}
3752
3753fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
3754    match binding_version {
3755        Some(version) => format!("{trigger_id}@v{version}"),
3756        None => trigger_id.to_string(),
3757    }
3758}
3759
3760fn tenant_id(event: &TriggerEvent) -> Option<&str> {
3761    event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
3762}
3763
3764fn current_unix_ms() -> i64 {
3765    unix_ms(crate::triggers::test_util::clock::now_utc())
3766}
3767
3768fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
3769    (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
3770}
3771
3772fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3773    lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
3774        .unwrap_or_else(|| unix_ms(event.received_at))
3775}
3776
3777fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3778    lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
3779        .unwrap_or_else(|| accepted_at_ms(headers, event))
3780}
3781
3782fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
3783    headers
3784        .and_then(|headers| headers.get(name))
3785        .and_then(|value| value.parse::<i64>().ok())
3786}
3787
3788fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
3789    Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
3790}
3791
3792fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
3793    match result {
3794        Ok(_) => "succeeded",
3795        Err(DispatchError::Waiting(_)) => "waiting",
3796        Err(DispatchError::Cancelled(_)) => "cancelled",
3797        Err(DispatchError::Denied(_)) => "denied",
3798        Err(DispatchError::Timeout(_)) => "timeout",
3799        Err(_) => "failed",
3800    }
3801}
3802
3803fn is_cancelled_vm_error(error: &VmError) -> bool {
3804    matches!(
3805        error,
3806        VmError::Thrown(VmValue::String(message))
3807            if message.starts_with("kind:cancelled:")
3808    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3809}
3810
3811fn event_headers(
3812    event: &TriggerEvent,
3813    binding: Option<&TriggerBinding>,
3814    attempt: Option<u32>,
3815    replay_of_event_id: Option<&String>,
3816) -> BTreeMap<String, String> {
3817    let mut headers = BTreeMap::new();
3818    headers.insert("event_id".to_string(), event.id.0.clone());
3819    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3820    headers.insert("provider".to_string(), event.provider.as_str().to_string());
3821    headers.insert("kind".to_string(), event.kind.clone());
3822    if let Some(replay_of_event_id) = replay_of_event_id {
3823        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3824    }
3825    if let Some(tenant_id) = event.tenant_id.as_ref() {
3826        headers.insert("tenant_id".to_string(), tenant_id.0.clone());
3827    }
3828    if let Some(binding) = binding {
3829        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3830        headers.insert("binding_key".to_string(), binding.binding_key());
3831        headers.insert(
3832            "handler_kind".to_string(),
3833            DispatchUri::from(&binding.handler).kind().to_string(),
3834        );
3835    }
3836    if let Some(attempt) = attempt {
3837        headers.insert("attempt".to_string(), attempt.to_string());
3838    }
3839    headers
3840}
3841
3842fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
3843    let topic = Topic::new(topic_name)
3844        .expect("static trigger dispatcher topic names should always be valid");
3845    match event.tenant_id.as_ref() {
3846        Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
3847        None => Ok(topic),
3848    }
3849}
3850
3851fn worker_queue_priority(
3852    binding: &super::registry::TriggerBinding,
3853    event: &TriggerEvent,
3854) -> crate::WorkerQueuePriority {
3855    match event
3856        .headers
3857        .get("priority")
3858        .map(|value| value.trim().to_ascii_lowercase())
3859        .as_deref()
3860    {
3861        Some("high") => crate::WorkerQueuePriority::High,
3862        Some("low") => crate::WorkerQueuePriority::Low,
3863        _ => binding.dispatch_priority,
3864    }
3865}
3866
3867const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3868
3869fn maybe_fail_before_outbox() {
3870    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3871        std::process::exit(86);
3872    }
3873}
3874
3875fn now_rfc3339() -> String {
3876    crate::triggers::test_util::clock::now_utc()
3877        .format(&time::format_description::well_known::Rfc3339)
3878        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3879}
3880
3881fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3882    let now = crate::triggers::test_util::clock::now_utc();
3883    let reset = if binding.hourly_cost_usd.is_some() {
3884        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3885        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3886    } else {
3887        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3888        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3889    };
3890    reset
3891        .format(&time::format_description::well_known::Rfc3339)
3892        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3893}
3894
3895fn now_unix_ms() -> i64 {
3896    (crate::triggers::test_util::clock::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3897}
3898
3899fn cancelled_dispatch_outcome(
3900    binding: &TriggerBinding,
3901    route: &DispatchUri,
3902    event: &TriggerEvent,
3903    replay_of_event_id: Option<String>,
3904    attempt_count: u32,
3905    error: String,
3906) -> DispatchOutcome {
3907    DispatchOutcome {
3908        trigger_id: binding.id.as_str().to_string(),
3909        binding_key: binding.binding_key(),
3910        event_id: event.id.0.clone(),
3911        attempt_count,
3912        status: DispatchStatus::Cancelled,
3913        handler_kind: route.kind().to_string(),
3914        target_uri: route.target_uri(),
3915        replay_of_event_id,
3916        result: None,
3917        error: Some(error),
3918    }
3919}
3920
3921async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3922    let _ = cancel_rx.recv().await;
3923}
3924
3925#[cfg(test)]
3926mod tests;