Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::{Mutex as AsyncMutex, Notify};
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25    ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26    ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{
30    append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
31};
32use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
33use crate::vm::Vm;
34
35use self::uri::DispatchUri;
36use super::registry::{
37    binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
38    expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
39    note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
40    reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
41    TriggerHandlerSpec,
42};
43use super::{
44    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
45    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
46    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
47    TRIGGER_OUTBOX_TOPIC,
48};
49use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
50
51mod flow_control;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57thread_local! {
58    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
59    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
60    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
61    #[cfg(test)]
62    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
63}
64
65tokio::task_local! {
66    static ACTIVE_DISPATCH_IS_REPLAY: bool;
67}
68
69#[derive(Clone, Debug)]
70pub(crate) struct DispatchContext {
71    pub trigger_event: TriggerEvent,
72    pub replay_of_event_id: Option<String>,
73    pub binding_id: String,
74    pub binding_version: u32,
75    pub agent_id: String,
76    pub action: String,
77    pub autonomy_tier: AutonomyTier,
78}
79
80struct DispatchExecutionPolicyGuard;
81
82impl Drop for DispatchExecutionPolicyGuard {
83    fn drop(&mut self) {
84        crate::orchestration::pop_execution_policy();
85    }
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize)]
89struct PredicateCacheRecord {
90    trigger_id: String,
91    event_id: String,
92    entries: Vec<PredicateCacheEntry>,
93}
94
95#[derive(Clone, Debug, Default)]
96struct PredicateEvaluationRecord {
97    result: bool,
98    cost_usd: f64,
99    tokens: u64,
100    latency_ms: u64,
101    cached: bool,
102    reason: Option<String>,
103    exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
104}
105
106const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
107
108pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
109    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
110}
111
112pub(crate) fn current_dispatch_is_replay() -> bool {
113    ACTIVE_DISPATCH_IS_REPLAY
114        .try_with(|is_replay| *is_replay)
115        .unwrap_or(false)
116}
117
118pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
119    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
120}
121
122#[derive(Clone)]
123pub struct Dispatcher {
124    base_vm: Rc<Vm>,
125    event_log: Arc<AnyEventLog>,
126    cancel_tx: broadcast::Sender<()>,
127    state: Arc<DispatcherRuntimeState>,
128    metrics: Option<Arc<crate::MetricsRegistry>>,
129}
130
131#[derive(Debug)]
132struct DispatcherRuntimeState {
133    in_flight: AtomicU64,
134    retry_queue_depth: AtomicU64,
135    dlq: Mutex<Vec<DlqEntry>>,
136    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
137    shutting_down: std::sync::atomic::AtomicBool,
138    idle_notify: Notify,
139    flow_control: FlowControlManager,
140}
141
142impl DispatcherRuntimeState {
143    fn new(event_log: Arc<AnyEventLog>) -> Self {
144        Self {
145            in_flight: AtomicU64::new(0),
146            retry_queue_depth: AtomicU64::new(0),
147            dlq: Mutex::new(Vec::new()),
148            cancel_tokens: Mutex::new(Vec::new()),
149            shutting_down: std::sync::atomic::AtomicBool::new(false),
150            idle_notify: Notify::new(),
151            flow_control: FlowControlManager::new(event_log),
152        }
153    }
154}
155
156#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(default)]
158pub struct DispatcherStatsSnapshot {
159    pub in_flight: u64,
160    pub retry_queue_depth: u64,
161    pub dlq_depth: u64,
162}
163
164#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum DispatchStatus {
167    Succeeded,
168    Failed,
169    Dlq,
170    Skipped,
171    Waiting,
172    Cancelled,
173}
174
175impl DispatchStatus {
176    pub fn as_str(&self) -> &'static str {
177        match self {
178            Self::Succeeded => "succeeded",
179            Self::Failed => "failed",
180            Self::Dlq => "dlq",
181            Self::Skipped => "skipped",
182            Self::Waiting => "waiting",
183            Self::Cancelled => "cancelled",
184        }
185    }
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
189#[serde(default)]
190pub struct DispatchOutcome {
191    pub trigger_id: String,
192    pub binding_key: String,
193    pub event_id: String,
194    pub attempt_count: u32,
195    pub status: DispatchStatus,
196    pub handler_kind: String,
197    pub target_uri: String,
198    pub replay_of_event_id: Option<String>,
199    pub result: Option<serde_json::Value>,
200    pub error: Option<String>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize)]
204pub struct InboxEnvelope {
205    pub trigger_id: Option<String>,
206    pub binding_version: Option<u32>,
207    pub event: TriggerEvent,
208}
209
210#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
211#[serde(default)]
212pub struct DispatcherDrainReport {
213    pub drained: bool,
214    pub in_flight: u64,
215    pub retry_queue_depth: u64,
216    pub dlq_depth: u64,
217}
218
219impl Default for DispatchOutcome {
220    fn default() -> Self {
221        Self {
222            trigger_id: String::new(),
223            binding_key: String::new(),
224            event_id: String::new(),
225            attempt_count: 0,
226            status: DispatchStatus::Failed,
227            handler_kind: String::new(),
228            target_uri: String::new(),
229            replay_of_event_id: None,
230            result: None,
231            error: None,
232        }
233    }
234}
235
236#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
237#[serde(default)]
238pub struct DispatchAttemptRecord {
239    pub trigger_id: String,
240    pub binding_key: String,
241    pub event_id: String,
242    pub attempt: u32,
243    pub handler_kind: String,
244    pub started_at: String,
245    pub completed_at: String,
246    pub outcome: String,
247    pub error_msg: Option<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
251pub struct DispatchCancelRequest {
252    pub binding_key: String,
253    pub event_id: String,
254    #[serde(with = "time::serde::rfc3339")]
255    pub requested_at: time::OffsetDateTime,
256    #[serde(default)]
257    pub requested_by: Option<String>,
258    #[serde(default)]
259    pub audit_id: Option<String>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct DlqEntry {
264    pub trigger_id: String,
265    pub binding_key: String,
266    pub event: TriggerEvent,
267    pub attempt_count: u32,
268    pub final_error: String,
269    pub attempts: Vec<DispatchAttemptRecord>,
270}
271
272#[derive(Clone, Debug)]
273struct SingletonLease {
274    gate: String,
275    held: bool,
276}
277
278#[derive(Clone, Debug)]
279struct ConcurrencyLease {
280    gate: String,
281    max: u32,
282    priority_rank: usize,
283    permit: Option<ConcurrencyPermit>,
284}
285
286#[derive(Default, Debug)]
287struct AcquiredFlowControl {
288    singleton: Option<SingletonLease>,
289    concurrency: Option<ConcurrencyLease>,
290}
291
292#[derive(Clone)]
293pub(crate) struct DispatchWaitLease {
294    state: Arc<DispatcherRuntimeState>,
295    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
296    suspended: Arc<AtomicBool>,
297}
298
299impl DispatchWaitLease {
300    fn new(
301        state: Arc<DispatcherRuntimeState>,
302        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
303    ) -> Self {
304        Self {
305            state,
306            acquired,
307            suspended: Arc::new(AtomicBool::new(false)),
308        }
309    }
310
311    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
312        if self.suspended.swap(true, Ordering::SeqCst) {
313            return Ok(());
314        }
315        let (singleton_gate, concurrency_permit) = {
316            let mut acquired = self.acquired.lock().await;
317            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
318                if lease.held {
319                    lease.held = false;
320                    Some(lease.gate.clone())
321                } else {
322                    None
323                }
324            });
325            let concurrency_permit = acquired
326                .concurrency
327                .as_mut()
328                .and_then(|lease| lease.permit.take());
329            (singleton_gate, concurrency_permit)
330        };
331
332        if let Some(gate) = singleton_gate {
333            self.state
334                .flow_control
335                .release_singleton(&gate)
336                .await
337                .map_err(DispatchError::from)?;
338        }
339        if let Some(permit) = concurrency_permit {
340            self.state
341                .flow_control
342                .release_concurrency(permit)
343                .await
344                .map_err(DispatchError::from)?;
345        }
346        Ok(())
347    }
348
349    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
350        if !self.suspended.swap(false, Ordering::SeqCst) {
351            return Ok(());
352        }
353
354        let singleton_gate = {
355            let acquired = self.acquired.lock().await;
356            acquired.singleton.as_ref().and_then(|lease| {
357                if lease.held {
358                    None
359                } else {
360                    Some(lease.gate.clone())
361                }
362            })
363        };
364        if let Some(gate) = singleton_gate {
365            self.state
366                .flow_control
367                .acquire_singleton(&gate)
368                .await
369                .map_err(DispatchError::from)?;
370            let mut acquired = self.acquired.lock().await;
371            if let Some(lease) = acquired.singleton.as_mut() {
372                lease.held = true;
373            }
374        }
375
376        let concurrency_spec = {
377            let acquired = self.acquired.lock().await;
378            acquired.concurrency.as_ref().and_then(|lease| {
379                if lease.permit.is_some() {
380                    None
381                } else {
382                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
383                }
384            })
385        };
386        if let Some((gate, max, priority_rank)) = concurrency_spec {
387            let permit = self
388                .state
389                .flow_control
390                .acquire_concurrency(&gate, max, priority_rank)
391                .await
392                .map_err(DispatchError::from)?;
393            let mut acquired = self.acquired.lock().await;
394            if let Some(lease) = acquired.concurrency.as_mut() {
395                lease.permit = Some(permit);
396            }
397        }
398        Ok(())
399    }
400}
401
402enum FlowControlOutcome {
403    Dispatch {
404        event: Box<TriggerEvent>,
405        acquired: AcquiredFlowControl,
406    },
407    Skip {
408        reason: String,
409    },
410}
411
412#[derive(Clone, Debug)]
413enum DispatchSkipStage {
414    Predicate,
415    FlowControl,
416}
417
418#[derive(Debug)]
419pub enum DispatchError {
420    EventLog(String),
421    Registry(String),
422    Serde(String),
423    Local(String),
424    A2a(String),
425    Denied(String),
426    Timeout(String),
427    Waiting(String),
428    Cancelled(String),
429    NotImplemented(String),
430}
431
432impl std::fmt::Display for DispatchError {
433    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434        match self {
435            Self::EventLog(message)
436            | Self::Registry(message)
437            | Self::Serde(message)
438            | Self::Local(message)
439            | Self::A2a(message)
440            | Self::Denied(message)
441            | Self::Timeout(message)
442            | Self::Waiting(message)
443            | Self::Cancelled(message)
444            | Self::NotImplemented(message) => f.write_str(message),
445        }
446    }
447}
448
449impl std::error::Error for DispatchError {}
450
451impl DispatchError {
452    fn retryable(&self) -> bool {
453        !matches!(
454            self,
455            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
456        )
457    }
458}
459
460impl DispatchSkipStage {
461    fn as_str(&self) -> &'static str {
462        match self {
463            Self::Predicate => "predicate",
464            Self::FlowControl => "flow_control",
465        }
466    }
467}
468
469impl From<LogError> for DispatchError {
470    fn from(value: LogError) -> Self {
471        Self::EventLog(value.to_string())
472    }
473}
474
475pub async fn append_dispatch_cancel_request(
476    event_log: &Arc<AnyEventLog>,
477    request: &DispatchCancelRequest,
478) -> Result<u64, DispatchError> {
479    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
480        .expect("static trigger cancel topic should always be valid");
481    event_log
482        .append(
483            &topic,
484            LogEvent::new(
485                "dispatch_cancel_requested",
486                serde_json::to_value(request)
487                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
488            ),
489        )
490        .await
491        .map_err(DispatchError::from)
492}
493
494impl Dispatcher {
495    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
496        self.event_log.clone()
497    }
498
499    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
500        let event_log = active_event_log().ok_or_else(|| {
501            DispatchError::EventLog("dispatcher requires an active event log".to_string())
502        })?;
503        Ok(Self::with_event_log(base_vm, event_log))
504    }
505
506    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
507        Self::with_event_log_and_metrics(base_vm, event_log, None)
508    }
509
510    pub fn with_event_log_and_metrics(
511        base_vm: Vm,
512        event_log: Arc<AnyEventLog>,
513        metrics: Option<Arc<crate::MetricsRegistry>>,
514    ) -> Self {
515        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
516        ACTIVE_DISPATCHER_STATE.with(|slot| {
517            *slot.borrow_mut() = Some(state.clone());
518        });
519        let (cancel_tx, _) = broadcast::channel(32);
520        Self {
521            base_vm: Rc::new(base_vm),
522            event_log,
523            cancel_tx,
524            state,
525            metrics,
526        }
527    }
528
529    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
530        DispatcherStatsSnapshot {
531            in_flight: self.state.in_flight.load(Ordering::Relaxed),
532            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
533            dlq_depth: self
534                .state
535                .dlq
536                .lock()
537                .expect("dispatcher dlq poisoned")
538                .len() as u64,
539        }
540    }
541
542    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
543        self.state
544            .dlq
545            .lock()
546            .expect("dispatcher dlq poisoned")
547            .clone()
548    }
549
550    pub fn shutdown(&self) {
551        self.state.shutting_down.store(true, Ordering::SeqCst);
552        for token in self
553            .state
554            .cancel_tokens
555            .lock()
556            .expect("dispatcher cancel tokens poisoned")
557            .iter()
558        {
559            token.store(true, Ordering::SeqCst);
560        }
561        let _ = self.cancel_tx.send(());
562    }
563
564    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
565        self.enqueue_targeted(None, None, event).await
566    }
567
568    pub async fn enqueue_targeted(
569        &self,
570        trigger_id: Option<String>,
571        binding_version: Option<u32>,
572        event: TriggerEvent,
573    ) -> Result<u64, DispatchError> {
574        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
575            .expect("static trigger inbox envelopes topic is valid");
576        let headers = event_headers(&event, None, None, None);
577        let payload = serde_json::to_value(InboxEnvelope {
578            trigger_id,
579            binding_version,
580            event,
581        })
582        .map_err(|error| DispatchError::Serde(error.to_string()))?;
583        self.event_log
584            .append(
585                &topic,
586                LogEvent::new("event_ingested", payload).with_headers(headers),
587            )
588            .await
589            .map_err(DispatchError::from)
590    }
591
592    pub async fn run(&self) -> Result<(), DispatchError> {
593        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
594            .expect("static trigger inbox envelopes topic is valid");
595        let start_from = self.event_log.latest(&topic).await?;
596        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
597        pin_mut!(stream);
598        let mut cancel_rx = self.cancel_tx.subscribe();
599
600        loop {
601            tokio::select! {
602                received = stream.next() => {
603                    let Some(received) = received else {
604                        break;
605                    };
606                    let (_, event) = received.map_err(DispatchError::from)?;
607                    if event.kind != "event_ingested" {
608                        continue;
609                    }
610                    let parent_headers = event.headers.clone();
611                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
612                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
613                    notify_test_inbox_dequeued();
614                    let _ = self
615                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
616                        .await;
617                }
618                _ = recv_cancel(&mut cancel_rx) => break,
619            }
620        }
621
622        Ok(())
623    }
624
625    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
626        let deadline = tokio::time::Instant::now() + timeout;
627        loop {
628            let snapshot = self.snapshot();
629            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
630                return Ok(DispatcherDrainReport {
631                    drained: true,
632                    in_flight: snapshot.in_flight,
633                    retry_queue_depth: snapshot.retry_queue_depth,
634                    dlq_depth: snapshot.dlq_depth,
635                });
636            }
637
638            let notified = self.state.idle_notify.notified();
639            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
640            if remaining.is_zero() {
641                return Ok(DispatcherDrainReport {
642                    drained: false,
643                    in_flight: snapshot.in_flight,
644                    retry_queue_depth: snapshot.retry_queue_depth,
645                    dlq_depth: snapshot.dlq_depth,
646                });
647            }
648            if tokio::time::timeout(remaining, notified).await.is_err() {
649                let snapshot = self.snapshot();
650                return Ok(DispatcherDrainReport {
651                    drained: false,
652                    in_flight: snapshot.in_flight,
653                    retry_queue_depth: snapshot.retry_queue_depth,
654                    dlq_depth: snapshot.dlq_depth,
655                });
656            }
657        }
658    }
659
660    pub async fn dispatch_inbox_envelope(
661        &self,
662        envelope: InboxEnvelope,
663    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
664        self.dispatch_inbox_envelope_with_headers(envelope, None)
665            .await
666    }
667
668    async fn dispatch_inbox_envelope_with_headers(
669        &self,
670        envelope: InboxEnvelope,
671        parent_headers: Option<&BTreeMap<String, String>>,
672    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
673        if let Some(trigger_id) = envelope.trigger_id {
674            let binding = super::registry::resolve_live_trigger_binding(
675                &trigger_id,
676                envelope.binding_version,
677            )
678            .map_err(|error| DispatchError::Registry(error.to_string()))?;
679            return Ok(vec![
680                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
681                    .await?,
682            ]);
683        }
684
685        let cron_target = match &envelope.event.provider_payload {
686            crate::triggers::ProviderPayload::Known(
687                crate::triggers::event::KnownProviderPayload::Cron(payload),
688            ) => payload.cron_id.clone(),
689            _ => None,
690        };
691        if let Some(trigger_id) = cron_target {
692            let binding = super::registry::resolve_live_trigger_binding(
693                &trigger_id,
694                envelope.binding_version,
695            )
696            .map_err(|error| DispatchError::Registry(error.to_string()))?;
697            return Ok(vec![
698                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
699                    .await?,
700            ]);
701        }
702
703        self.dispatch_event(envelope.event).await
704    }
705
706    pub async fn dispatch_event(
707        &self,
708        event: TriggerEvent,
709    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
710        let bindings = matching_bindings(&event);
711        let mut outcomes = Vec::new();
712        for binding in bindings {
713            outcomes.push(self.dispatch(&binding, event.clone()).await?);
714        }
715        Ok(outcomes)
716    }
717
718    pub async fn dispatch(
719        &self,
720        binding: &TriggerBinding,
721        event: TriggerEvent,
722    ) -> Result<DispatchOutcome, DispatchError> {
723        self.dispatch_with_replay(binding, event, None, None, None)
724            .await
725    }
726
727    pub async fn dispatch_replay(
728        &self,
729        binding: &TriggerBinding,
730        event: TriggerEvent,
731        replay_of_event_id: String,
732    ) -> Result<DispatchOutcome, DispatchError> {
733        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
734            .await
735    }
736
737    pub async fn dispatch_with_parent_span_id(
738        &self,
739        binding: &TriggerBinding,
740        event: TriggerEvent,
741        parent_span_id: Option<String>,
742    ) -> Result<DispatchOutcome, DispatchError> {
743        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
744            .await
745    }
746
747    async fn dispatch_with_replay(
748        &self,
749        binding: &TriggerBinding,
750        event: TriggerEvent,
751        replay_of_event_id: Option<String>,
752        parent_span_id: Option<String>,
753        parent_headers: Option<&BTreeMap<String, String>>,
754    ) -> Result<DispatchOutcome, DispatchError> {
755        let span = tracing::info_span!(
756            "dispatch",
757            trigger_id = %binding.id.as_str(),
758            binding_version = binding.version,
759            trace_id = %event.trace_id.0
760        );
761        #[cfg(feature = "otel")]
762        let span_for_otel = span.clone();
763        let _ = if let Some(headers) = parent_headers {
764            crate::observability::otel::set_span_parent_from_headers(
765                &span,
766                headers,
767                &event.trace_id,
768                parent_span_id.as_deref(),
769            )
770        } else {
771            crate::observability::otel::set_span_parent(
772                &span,
773                &event.trace_id,
774                parent_span_id.as_deref(),
775            )
776        };
777        #[cfg(feature = "otel")]
778        let started_at = Instant::now();
779        let metrics = self.metrics.clone();
780        let outcome = ACTIVE_DISPATCH_IS_REPLAY
781            .scope(
782                replay_of_event_id.is_some(),
783                self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
784                    .instrument(span),
785            )
786            .await;
787        if let Some(metrics) = metrics.as_ref() {
788            match &outcome {
789                Ok(dispatch_outcome) => match dispatch_outcome.status {
790                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
791                        metrics.record_dispatch_succeeded();
792                    }
793                    DispatchStatus::Waiting => {}
794                    _ => metrics.record_dispatch_failed(),
795                },
796                Err(_) => metrics.record_dispatch_failed(),
797            }
798            let outcome_label = match &outcome {
799                Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
800                Err(DispatchError::Cancelled(_)) => "cancelled",
801                Err(_) => "failed",
802            };
803            metrics.record_trigger_dispatched(
804                binding.id.as_str(),
805                binding.handler.kind(),
806                outcome_label,
807            );
808            metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
809        }
810        #[cfg(feature = "otel")]
811        {
812            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
813
814            let duration_ms = started_at.elapsed().as_millis() as i64;
815            let status = match &outcome {
816                Ok(dispatch_outcome) => match dispatch_outcome.status {
817                    DispatchStatus::Succeeded => "succeeded",
818                    DispatchStatus::Skipped => "skipped",
819                    DispatchStatus::Waiting => "waiting",
820                    DispatchStatus::Cancelled => "cancelled",
821                    DispatchStatus::Failed => "failed",
822                    DispatchStatus::Dlq => "dlq",
823                },
824                Err(DispatchError::Cancelled(_)) => "cancelled",
825                Err(_) => "failed",
826            };
827            span_for_otel.set_attribute("result.status", status);
828            span_for_otel.set_attribute("result.duration_ms", duration_ms);
829        }
830        outcome
831    }
832
833    async fn dispatch_with_replay_inner(
834        &self,
835        binding: &TriggerBinding,
836        event: TriggerEvent,
837        replay_of_event_id: Option<String>,
838    ) -> Result<DispatchOutcome, DispatchError> {
839        let autonomy_tier = crate::resolve_agent_autonomy_tier(
840            &self.event_log,
841            binding.id.as_str(),
842            binding.autonomy_tier,
843        )
844        .await
845        .unwrap_or(binding.autonomy_tier);
846        let binding_key = binding.binding_key();
847        let route = DispatchUri::from(&binding.handler);
848        let trigger_id = binding.id.as_str().to_string();
849        let event_id = event.id.0.clone();
850        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
851        let begin = if replay_of_event_id.is_some() {
852            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
853        } else {
854            begin_in_flight(binding.id.as_str(), binding.version)
855        };
856        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
857
858        let mut attempts = Vec::new();
859        let mut source_node_id = format!("trigger:{}", event.id.0);
860        let mut initial_nodes = Vec::new();
861        let mut initial_edges = Vec::new();
862        if let Some(original_event_id) = replay_of_event_id.as_ref() {
863            let original_node_id = format!("trigger:{original_event_id}");
864            initial_nodes.push(RunActionGraphNodeRecord {
865                id: original_node_id.clone(),
866                label: format!(
867                    "{}:{} (original {})",
868                    event.provider.as_str(),
869                    event.kind,
870                    original_event_id
871                ),
872                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
873                status: "historical".to_string(),
874                outcome: "replayed_from".to_string(),
875                trace_id: Some(event.trace_id.0.clone()),
876                stage_id: None,
877                node_id: None,
878                worker_id: None,
879                run_id: None,
880                run_path: None,
881                metadata: trigger_node_metadata(&event),
882            });
883            initial_edges.push(RunActionGraphEdgeRecord {
884                from_id: original_node_id,
885                to_id: source_node_id.clone(),
886                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
887                label: Some("replay chain".to_string()),
888            });
889        }
890        initial_nodes.push(RunActionGraphNodeRecord {
891            id: source_node_id.clone(),
892            label: format!("{}:{}", event.provider.as_str(), event.kind),
893            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
894            status: "received".to_string(),
895            outcome: "received".to_string(),
896            trace_id: Some(event.trace_id.0.clone()),
897            stage_id: None,
898            node_id: None,
899            worker_id: None,
900            run_id: None,
901            run_path: None,
902            metadata: trigger_node_metadata(&event),
903        });
904        self.emit_action_graph(
905            &event,
906            initial_nodes,
907            initial_edges,
908            serde_json::json!({
909                "source": "dispatcher",
910                "trigger_id": trigger_id,
911                "binding_key": binding_key,
912                "event_id": event_id,
913                "replay_of_event_id": replay_of_event_id,
914            }),
915        )
916        .await?;
917
918        if dispatch_cancel_requested(
919            &self.event_log,
920            &binding_key,
921            &event.id.0,
922            replay_of_event_id.as_ref(),
923        )
924        .await?
925        {
926            finish_in_flight(
927                binding.id.as_str(),
928                binding.version,
929                TriggerDispatchOutcome::Failed,
930            )
931            .await
932            .map_err(|error| DispatchError::Registry(error.to_string()))?;
933            decrement_in_flight(&self.state);
934            return Ok(cancelled_dispatch_outcome(
935                binding,
936                &route,
937                &event,
938                replay_of_event_id,
939                0,
940                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
941            ));
942        }
943
944        if let Some(predicate) = binding.when.as_ref() {
945            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
946            let evaluation = self
947                .evaluate_predicate(
948                    binding,
949                    predicate,
950                    &event,
951                    replay_of_event_id.as_ref(),
952                    autonomy_tier,
953                )
954                .await?;
955            let passed = evaluation.result;
956            self.emit_action_graph(
957                &event,
958                vec![RunActionGraphNodeRecord {
959                    id: predicate_node_id.clone(),
960                    label: predicate.raw.clone(),
961                    kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
962                    status: "completed".to_string(),
963                    outcome: passed.to_string(),
964                    trace_id: Some(event.trace_id.0.clone()),
965                    stage_id: None,
966                    node_id: None,
967                    worker_id: None,
968                    run_id: None,
969                    run_path: None,
970                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
971                }],
972                vec![RunActionGraphEdgeRecord {
973                    from_id: source_node_id.clone(),
974                    to_id: predicate_node_id.clone(),
975                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
976                    label: None,
977                }],
978                serde_json::json!({
979                    "source": "dispatcher",
980                    "trigger_id": binding.id.as_str(),
981                    "binding_key": binding.binding_key(),
982                    "event_id": event.id.0,
983                    "predicate": predicate.raw,
984                    "reason": evaluation.reason,
985                    "cached": evaluation.cached,
986                    "cost_usd": evaluation.cost_usd,
987                    "tokens": evaluation.tokens,
988                    "latency_ms": evaluation.latency_ms,
989                    "replay_of_event_id": replay_of_event_id,
990                }),
991            )
992            .await?;
993
994            if !passed {
995                if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
996                    let final_error = format!(
997                        "trigger budget exhausted: {}",
998                        evaluation.reason.as_deref().unwrap_or("budget_exhausted")
999                    );
1000                    self.move_budget_exhausted_to_dlq(
1001                        binding,
1002                        &route,
1003                        &event,
1004                        replay_of_event_id.as_ref(),
1005                        &final_error,
1006                    )
1007                    .await?;
1008                    finish_in_flight(
1009                        binding.id.as_str(),
1010                        binding.version,
1011                        TriggerDispatchOutcome::Dlq,
1012                    )
1013                    .await
1014                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1015                    decrement_in_flight(&self.state);
1016                    self.append_dispatch_trust_record(
1017                        binding,
1018                        &route,
1019                        &event,
1020                        replay_of_event_id.as_ref(),
1021                        autonomy_tier,
1022                        TrustOutcome::Failure,
1023                        "dlq",
1024                        0,
1025                        Some(final_error.clone()),
1026                    )
1027                    .await?;
1028                    return Ok(DispatchOutcome {
1029                        trigger_id: binding.id.as_str().to_string(),
1030                        binding_key: binding.binding_key(),
1031                        event_id: event.id.0,
1032                        attempt_count: 0,
1033                        status: DispatchStatus::Dlq,
1034                        handler_kind: route.kind().to_string(),
1035                        target_uri: route.target_uri(),
1036                        replay_of_event_id,
1037                        result: None,
1038                        error: Some(final_error),
1039                    });
1040                }
1041
1042                if evaluation.exhaustion_strategy
1043                    == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1044                {
1045                    self.append_budget_deferred_event(
1046                        binding,
1047                        &route,
1048                        &event,
1049                        replay_of_event_id.as_ref(),
1050                        evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1051                    )
1052                    .await?;
1053                    finish_in_flight(
1054                        binding.id.as_str(),
1055                        binding.version,
1056                        TriggerDispatchOutcome::Dispatched,
1057                    )
1058                    .await
1059                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1060                    decrement_in_flight(&self.state);
1061                    self.append_dispatch_trust_record(
1062                        binding,
1063                        &route,
1064                        &event,
1065                        replay_of_event_id.as_ref(),
1066                        autonomy_tier,
1067                        TrustOutcome::Denied,
1068                        "waiting",
1069                        0,
1070                        evaluation.reason.clone(),
1071                    )
1072                    .await?;
1073                    return Ok(DispatchOutcome {
1074                        trigger_id: binding.id.as_str().to_string(),
1075                        binding_key: binding.binding_key(),
1076                        event_id: event.id.0,
1077                        attempt_count: 0,
1078                        status: DispatchStatus::Waiting,
1079                        handler_kind: route.kind().to_string(),
1080                        target_uri: route.target_uri(),
1081                        replay_of_event_id,
1082                        result: Some(serde_json::json!({
1083                            "deferred": true,
1084                            "predicate": predicate.raw,
1085                            "reason": evaluation.reason,
1086                        })),
1087                        error: None,
1088                    });
1089                }
1090
1091                self.append_skipped_outbox_event(
1092                    binding,
1093                    &route,
1094                    &event,
1095                    replay_of_event_id.as_ref(),
1096                    DispatchSkipStage::Predicate,
1097                    serde_json::json!({
1098                        "predicate": predicate.raw,
1099                        "reason": evaluation.reason,
1100                    }),
1101                )
1102                .await?;
1103                finish_in_flight(
1104                    binding.id.as_str(),
1105                    binding.version,
1106                    TriggerDispatchOutcome::Dispatched,
1107                )
1108                .await
1109                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1110                decrement_in_flight(&self.state);
1111                self.append_dispatch_trust_record(
1112                    binding,
1113                    &route,
1114                    &event,
1115                    replay_of_event_id.as_ref(),
1116                    autonomy_tier,
1117                    TrustOutcome::Denied,
1118                    "skipped",
1119                    0,
1120                    None,
1121                )
1122                .await?;
1123                return Ok(DispatchOutcome {
1124                    trigger_id: binding.id.as_str().to_string(),
1125                    binding_key: binding.binding_key(),
1126                    event_id: event.id.0,
1127                    attempt_count: 0,
1128                    status: DispatchStatus::Skipped,
1129                    handler_kind: route.kind().to_string(),
1130                    target_uri: route.target_uri(),
1131                    replay_of_event_id,
1132                    result: Some(serde_json::json!({
1133                        "skipped": true,
1134                        "predicate": predicate.raw,
1135                        "reason": evaluation.reason,
1136                    })),
1137                    error: None,
1138                });
1139            }
1140
1141            source_node_id = predicate_node_id;
1142        }
1143
1144        if autonomy_tier == AutonomyTier::ActAuto {
1145            if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1146                let request_id = self
1147                    .append_autonomy_budget_approval_request(
1148                        binding,
1149                        &route,
1150                        &event,
1151                        replay_of_event_id.as_ref(),
1152                        reason,
1153                    )
1154                    .await?;
1155                self.emit_autonomy_budget_approval_action_graph(
1156                    binding,
1157                    &route,
1158                    &event,
1159                    &source_node_id,
1160                    replay_of_event_id.as_ref(),
1161                    reason,
1162                    &request_id,
1163                )
1164                .await?;
1165                finish_in_flight(
1166                    binding.id.as_str(),
1167                    binding.version,
1168                    TriggerDispatchOutcome::Dispatched,
1169                )
1170                .await
1171                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1172                decrement_in_flight(&self.state);
1173                self.append_tier_transition_trust_record(
1174                    binding,
1175                    &event,
1176                    replay_of_event_id.as_ref(),
1177                    autonomy_tier,
1178                    AutonomyTier::ActWithApproval,
1179                    reason,
1180                    &request_id,
1181                )
1182                .await?;
1183                self.append_dispatch_trust_record(
1184                    binding,
1185                    &route,
1186                    &event,
1187                    replay_of_event_id.as_ref(),
1188                    autonomy_tier,
1189                    TrustOutcome::Denied,
1190                    "waiting",
1191                    0,
1192                    Some(reason.to_string()),
1193                )
1194                .await?;
1195                return Ok(DispatchOutcome {
1196                    trigger_id: binding.id.as_str().to_string(),
1197                    binding_key: binding.binding_key(),
1198                    event_id: event.id.0,
1199                    attempt_count: 0,
1200                    status: DispatchStatus::Waiting,
1201                    handler_kind: route.kind().to_string(),
1202                    target_uri: route.target_uri(),
1203                    replay_of_event_id,
1204                    result: Some(serde_json::json!({
1205                        "approval_required": true,
1206                        "request_id": request_id,
1207                        "reason": reason,
1208                        "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1209                    })),
1210                    error: None,
1211                });
1212            }
1213            note_autonomous_decision(binding);
1214        }
1215
1216        let (event, acquired_flow) = match self
1217            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1218            .await?
1219        {
1220            FlowControlOutcome::Dispatch { event, acquired } => {
1221                (*event, Arc::new(AsyncMutex::new(acquired)))
1222            }
1223            FlowControlOutcome::Skip { reason } => {
1224                self.append_skipped_outbox_event(
1225                    binding,
1226                    &route,
1227                    &event,
1228                    replay_of_event_id.as_ref(),
1229                    DispatchSkipStage::FlowControl,
1230                    serde_json::json!({
1231                        "flow_control": reason,
1232                    }),
1233                )
1234                .await?;
1235                finish_in_flight(
1236                    binding.id.as_str(),
1237                    binding.version,
1238                    TriggerDispatchOutcome::Dispatched,
1239                )
1240                .await
1241                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1242                decrement_in_flight(&self.state);
1243                return Ok(DispatchOutcome {
1244                    trigger_id: binding.id.as_str().to_string(),
1245                    binding_key: binding.binding_key(),
1246                    event_id: event.id.0,
1247                    attempt_count: 0,
1248                    status: DispatchStatus::Skipped,
1249                    handler_kind: route.kind().to_string(),
1250                    target_uri: route.target_uri(),
1251                    replay_of_event_id,
1252                    result: Some(serde_json::json!({
1253                        "skipped": true,
1254                        "flow_control": reason,
1255                    })),
1256                    error: None,
1257                });
1258            }
1259        };
1260
1261        let mut previous_retry_node = None;
1262        let max_attempts = binding.retry.max_attempts();
1263        for attempt in 1..=max_attempts {
1264            if dispatch_cancel_requested(
1265                &self.event_log,
1266                &binding_key,
1267                &event.id.0,
1268                replay_of_event_id.as_ref(),
1269            )
1270            .await?
1271            {
1272                finish_in_flight(
1273                    binding.id.as_str(),
1274                    binding.version,
1275                    TriggerDispatchOutcome::Failed,
1276                )
1277                .await
1278                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1279                decrement_in_flight(&self.state);
1280                return Ok(cancelled_dispatch_outcome(
1281                    binding,
1282                    &route,
1283                    &event,
1284                    replay_of_event_id,
1285                    attempt.saturating_sub(1),
1286                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1287                ));
1288            }
1289            maybe_fail_before_outbox();
1290            let started_at = now_rfc3339();
1291            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1292            self.append_lifecycle_event(
1293                "DispatchStarted",
1294                &event,
1295                binding,
1296                serde_json::json!({
1297                    "event_id": event.id.0,
1298                    "attempt": attempt,
1299                    "handler_kind": route.kind(),
1300                    "target_uri": route.target_uri(),
1301                    "replay_of_event_id": replay_of_event_id,
1302                }),
1303                replay_of_event_id.as_ref(),
1304            )
1305            .await?;
1306            self.append_topic_event(
1307                TRIGGER_OUTBOX_TOPIC,
1308                "dispatch_started",
1309                &event,
1310                Some(binding),
1311                Some(attempt),
1312                serde_json::json!({
1313                    "event_id": event.id.0,
1314                    "attempt": attempt,
1315                    "trigger_id": binding.id.as_str(),
1316                    "binding_key": binding.binding_key(),
1317                    "handler_kind": route.kind(),
1318                    "target_uri": route.target_uri(),
1319                    "replay_of_event_id": replay_of_event_id,
1320                }),
1321                replay_of_event_id.as_ref(),
1322            )
1323            .await?;
1324
1325            let mut dispatch_edges = Vec::new();
1326            if attempt == 1 {
1327                dispatch_edges.push(RunActionGraphEdgeRecord {
1328                    from_id: source_node_id.clone(),
1329                    to_id: attempt_node_id.clone(),
1330                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1331                    label: binding.when.as_ref().map(|_| "true".to_string()),
1332                });
1333            } else if let Some(retry_node_id) = previous_retry_node.take() {
1334                dispatch_edges.push(RunActionGraphEdgeRecord {
1335                    from_id: retry_node_id,
1336                    to_id: attempt_node_id.clone(),
1337                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1338                    label: Some(format!("attempt {attempt}")),
1339                });
1340            }
1341
1342            self.emit_action_graph(
1343                &event,
1344                vec![RunActionGraphNodeRecord {
1345                    id: attempt_node_id.clone(),
1346                    label: dispatch_node_label(&route),
1347                    kind: dispatch_node_kind(&route).to_string(),
1348                    status: "running".to_string(),
1349                    outcome: format!("attempt_{attempt}"),
1350                    trace_id: Some(event.trace_id.0.clone()),
1351                    stage_id: None,
1352                    node_id: None,
1353                    worker_id: None,
1354                    run_id: None,
1355                    run_path: None,
1356                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1357                }],
1358                dispatch_edges,
1359                serde_json::json!({
1360                    "source": "dispatcher",
1361                    "trigger_id": binding.id.as_str(),
1362                    "binding_key": binding.binding_key(),
1363                    "event_id": event.id.0,
1364                    "attempt": attempt,
1365                    "handler_kind": route.kind(),
1366                    "target_uri": route.target_uri(),
1367                    "target_agent": dispatch_target_agent(&route),
1368                    "replay_of_event_id": replay_of_event_id,
1369                }),
1370            )
1371            .await?;
1372
1373            let result = self
1374                .dispatch_once(
1375                    binding,
1376                    &route,
1377                    &event,
1378                    autonomy_tier,
1379                    Some(DispatchWaitLease::new(
1380                        self.state.clone(),
1381                        acquired_flow.clone(),
1382                    )),
1383                    &mut self.cancel_tx.subscribe(),
1384                )
1385                .await;
1386            let completed_at = now_rfc3339();
1387
1388            match result {
1389                Ok(result) => {
1390                    let attempt_record = DispatchAttemptRecord {
1391                        trigger_id: binding.id.as_str().to_string(),
1392                        binding_key: binding.binding_key(),
1393                        event_id: event.id.0.clone(),
1394                        attempt,
1395                        handler_kind: route.kind().to_string(),
1396                        started_at,
1397                        completed_at,
1398                        outcome: "success".to_string(),
1399                        error_msg: None,
1400                    };
1401                    attempts.push(attempt_record.clone());
1402                    self.append_attempt_record(
1403                        &event,
1404                        binding,
1405                        &attempt_record,
1406                        replay_of_event_id.as_ref(),
1407                    )
1408                    .await?;
1409                    self.append_lifecycle_event(
1410                        "DispatchSucceeded",
1411                        &event,
1412                        binding,
1413                        serde_json::json!({
1414                            "event_id": event.id.0,
1415                            "attempt": attempt,
1416                            "handler_kind": route.kind(),
1417                            "target_uri": route.target_uri(),
1418                            "result": result,
1419                            "replay_of_event_id": replay_of_event_id,
1420                        }),
1421                        replay_of_event_id.as_ref(),
1422                    )
1423                    .await?;
1424                    self.append_topic_event(
1425                        TRIGGER_OUTBOX_TOPIC,
1426                        "dispatch_succeeded",
1427                        &event,
1428                        Some(binding),
1429                        Some(attempt),
1430                        serde_json::json!({
1431                            "event_id": event.id.0,
1432                            "attempt": attempt,
1433                            "trigger_id": binding.id.as_str(),
1434                            "binding_key": binding.binding_key(),
1435                            "handler_kind": route.kind(),
1436                            "target_uri": route.target_uri(),
1437                            "result": result,
1438                            "replay_of_event_id": replay_of_event_id,
1439                        }),
1440                        replay_of_event_id.as_ref(),
1441                    )
1442                    .await?;
1443                    self.emit_action_graph(
1444                        &event,
1445                        vec![RunActionGraphNodeRecord {
1446                            id: attempt_node_id.clone(),
1447                            label: dispatch_node_label(&route),
1448                            kind: dispatch_node_kind(&route).to_string(),
1449                            status: "completed".to_string(),
1450                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1451                            trace_id: Some(event.trace_id.0.clone()),
1452                            stage_id: None,
1453                            node_id: None,
1454                            worker_id: None,
1455                            run_id: None,
1456                            run_path: None,
1457                            metadata: dispatch_success_metadata(
1458                                &route, binding, &event, attempt, &result,
1459                            ),
1460                        }],
1461                        Vec::new(),
1462                        serde_json::json!({
1463                            "source": "dispatcher",
1464                            "trigger_id": binding.id.as_str(),
1465                            "binding_key": binding.binding_key(),
1466                            "event_id": event.id.0,
1467                            "attempt": attempt,
1468                            "handler_kind": route.kind(),
1469                            "target_uri": route.target_uri(),
1470                            "result": result,
1471                            "replay_of_event_id": replay_of_event_id,
1472                        }),
1473                    )
1474                    .await?;
1475                    finish_in_flight(
1476                        binding.id.as_str(),
1477                        binding.version,
1478                        TriggerDispatchOutcome::Dispatched,
1479                    )
1480                    .await
1481                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1482                    self.release_flow_control(&acquired_flow).await?;
1483                    decrement_in_flight(&self.state);
1484                    self.append_dispatch_trust_record(
1485                        binding,
1486                        &route,
1487                        &event,
1488                        replay_of_event_id.as_ref(),
1489                        autonomy_tier,
1490                        TrustOutcome::Success,
1491                        "succeeded",
1492                        attempt,
1493                        None,
1494                    )
1495                    .await?;
1496                    return Ok(DispatchOutcome {
1497                        trigger_id: binding.id.as_str().to_string(),
1498                        binding_key: binding.binding_key(),
1499                        event_id: event.id.0,
1500                        attempt_count: attempt,
1501                        status: DispatchStatus::Succeeded,
1502                        handler_kind: route.kind().to_string(),
1503                        target_uri: route.target_uri(),
1504                        replay_of_event_id,
1505                        result: Some(result),
1506                        error: None,
1507                    });
1508                }
1509                Err(error) => {
1510                    let attempt_record = DispatchAttemptRecord {
1511                        trigger_id: binding.id.as_str().to_string(),
1512                        binding_key: binding.binding_key(),
1513                        event_id: event.id.0.clone(),
1514                        attempt,
1515                        handler_kind: route.kind().to_string(),
1516                        started_at,
1517                        completed_at,
1518                        outcome: dispatch_error_label(&error).to_string(),
1519                        error_msg: Some(error.to_string()),
1520                    };
1521                    attempts.push(attempt_record.clone());
1522                    self.append_attempt_record(
1523                        &event,
1524                        binding,
1525                        &attempt_record,
1526                        replay_of_event_id.as_ref(),
1527                    )
1528                    .await?;
1529                    if let DispatchError::Waiting(message) = &error {
1530                        self.append_lifecycle_event(
1531                            "DispatchWaiting",
1532                            &event,
1533                            binding,
1534                            serde_json::json!({
1535                                "event_id": event.id.0,
1536                                "attempt": attempt,
1537                                "handler_kind": route.kind(),
1538                                "target_uri": route.target_uri(),
1539                                "message": message,
1540                                "replay_of_event_id": replay_of_event_id,
1541                            }),
1542                            replay_of_event_id.as_ref(),
1543                        )
1544                        .await?;
1545                        self.append_topic_event(
1546                            TRIGGER_OUTBOX_TOPIC,
1547                            "dispatch_waiting",
1548                            &event,
1549                            Some(binding),
1550                            Some(attempt),
1551                            serde_json::json!({
1552                                "event_id": event.id.0,
1553                                "attempt": attempt,
1554                                "trigger_id": binding.id.as_str(),
1555                                "binding_key": binding.binding_key(),
1556                                "handler_kind": route.kind(),
1557                                "target_uri": route.target_uri(),
1558                                "message": message,
1559                                "replay_of_event_id": replay_of_event_id,
1560                            }),
1561                            replay_of_event_id.as_ref(),
1562                        )
1563                        .await?;
1564                        finish_in_flight(
1565                            binding.id.as_str(),
1566                            binding.version,
1567                            TriggerDispatchOutcome::Dispatched,
1568                        )
1569                        .await
1570                        .map_err(|registry_error| {
1571                            DispatchError::Registry(registry_error.to_string())
1572                        })?;
1573                        self.release_flow_control(&acquired_flow).await?;
1574                        decrement_in_flight(&self.state);
1575                        return Ok(DispatchOutcome {
1576                            trigger_id: binding.id.as_str().to_string(),
1577                            binding_key: binding.binding_key(),
1578                            event_id: event.id.0,
1579                            attempt_count: attempt,
1580                            status: DispatchStatus::Waiting,
1581                            handler_kind: route.kind().to_string(),
1582                            target_uri: route.target_uri(),
1583                            replay_of_event_id,
1584                            result: Some(serde_json::json!({
1585                                "waiting": true,
1586                                "message": message,
1587                            })),
1588                            error: None,
1589                        });
1590                    }
1591
1592                    self.append_lifecycle_event(
1593                        "DispatchFailed",
1594                        &event,
1595                        binding,
1596                        serde_json::json!({
1597                            "event_id": event.id.0,
1598                            "attempt": attempt,
1599                            "handler_kind": route.kind(),
1600                            "target_uri": route.target_uri(),
1601                            "error": error.to_string(),
1602                            "replay_of_event_id": replay_of_event_id,
1603                        }),
1604                        replay_of_event_id.as_ref(),
1605                    )
1606                    .await?;
1607                    self.append_topic_event(
1608                        TRIGGER_OUTBOX_TOPIC,
1609                        "dispatch_failed",
1610                        &event,
1611                        Some(binding),
1612                        Some(attempt),
1613                        serde_json::json!({
1614                            "event_id": event.id.0,
1615                            "attempt": attempt,
1616                            "trigger_id": binding.id.as_str(),
1617                            "binding_key": binding.binding_key(),
1618                            "handler_kind": route.kind(),
1619                            "target_uri": route.target_uri(),
1620                            "error": error.to_string(),
1621                            "replay_of_event_id": replay_of_event_id,
1622                        }),
1623                        replay_of_event_id.as_ref(),
1624                    )
1625                    .await?;
1626                    self.emit_action_graph(
1627                        &event,
1628                        vec![RunActionGraphNodeRecord {
1629                            id: attempt_node_id.clone(),
1630                            label: dispatch_node_label(&route),
1631                            kind: dispatch_node_kind(&route).to_string(),
1632                            status: if matches!(error, DispatchError::Cancelled(_)) {
1633                                "cancelled".to_string()
1634                            } else {
1635                                "failed".to_string()
1636                            },
1637                            outcome: dispatch_error_label(&error).to_string(),
1638                            trace_id: Some(event.trace_id.0.clone()),
1639                            stage_id: None,
1640                            node_id: None,
1641                            worker_id: None,
1642                            run_id: None,
1643                            run_path: None,
1644                            metadata: dispatch_error_metadata(
1645                                &route, binding, &event, attempt, &error,
1646                            ),
1647                        }],
1648                        Vec::new(),
1649                        serde_json::json!({
1650                            "source": "dispatcher",
1651                            "trigger_id": binding.id.as_str(),
1652                            "binding_key": binding.binding_key(),
1653                            "event_id": event.id.0,
1654                            "attempt": attempt,
1655                            "handler_kind": route.kind(),
1656                            "target_uri": route.target_uri(),
1657                            "error": error.to_string(),
1658                            "replay_of_event_id": replay_of_event_id,
1659                        }),
1660                    )
1661                    .await?;
1662
1663                    if !error.retryable() {
1664                        finish_in_flight(
1665                            binding.id.as_str(),
1666                            binding.version,
1667                            TriggerDispatchOutcome::Failed,
1668                        )
1669                        .await
1670                        .map_err(|registry_error| {
1671                            DispatchError::Registry(registry_error.to_string())
1672                        })?;
1673                        self.release_flow_control(&acquired_flow).await?;
1674                        decrement_in_flight(&self.state);
1675                        let trust_outcome = match error {
1676                            DispatchError::Denied(_) => TrustOutcome::Denied,
1677                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
1678                            _ => TrustOutcome::Failure,
1679                        };
1680                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1681                            "cancelled"
1682                        } else {
1683                            "failed"
1684                        };
1685                        self.append_dispatch_trust_record(
1686                            binding,
1687                            &route,
1688                            &event,
1689                            replay_of_event_id.as_ref(),
1690                            autonomy_tier,
1691                            trust_outcome,
1692                            terminal_status,
1693                            attempt,
1694                            Some(error.to_string()),
1695                        )
1696                        .await?;
1697                        return Ok(DispatchOutcome {
1698                            trigger_id: binding.id.as_str().to_string(),
1699                            binding_key: binding.binding_key(),
1700                            event_id: event.id.0,
1701                            attempt_count: attempt,
1702                            status: if matches!(error, DispatchError::Cancelled(_)) {
1703                                DispatchStatus::Cancelled
1704                            } else {
1705                                DispatchStatus::Failed
1706                            },
1707                            handler_kind: route.kind().to_string(),
1708                            target_uri: route.target_uri(),
1709                            replay_of_event_id,
1710                            result: None,
1711                            error: Some(error.to_string()),
1712                        });
1713                    }
1714
1715                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1716                        if let Some(metrics) = self.metrics.as_ref() {
1717                            metrics.record_retry_scheduled();
1718                            metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
1719                        }
1720                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1721                        previous_retry_node = Some(retry_node_id.clone());
1722                        self.emit_action_graph(
1723                            &event,
1724                            vec![RunActionGraphNodeRecord {
1725                                id: retry_node_id.clone(),
1726                                label: format!("retry in {}ms", delay.as_millis()),
1727                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1728                                status: "scheduled".to_string(),
1729                                outcome: format!("attempt_{}", attempt + 1),
1730                                trace_id: Some(event.trace_id.0.clone()),
1731                                stage_id: None,
1732                                node_id: None,
1733                                worker_id: None,
1734                                run_id: None,
1735                                run_path: None,
1736                                metadata: retry_node_metadata(
1737                                    binding,
1738                                    &event,
1739                                    attempt + 1,
1740                                    delay,
1741                                    &error,
1742                                ),
1743                            }],
1744                            vec![RunActionGraphEdgeRecord {
1745                                from_id: attempt_node_id,
1746                                to_id: retry_node_id.clone(),
1747                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1748                                label: Some(format!("attempt {}", attempt + 1)),
1749                            }],
1750                            serde_json::json!({
1751                                "source": "dispatcher",
1752                                "trigger_id": binding.id.as_str(),
1753                                "binding_key": binding.binding_key(),
1754                                "event_id": event.id.0,
1755                                "attempt": attempt + 1,
1756                                "delay_ms": delay.as_millis(),
1757                                "replay_of_event_id": replay_of_event_id,
1758                            }),
1759                        )
1760                        .await?;
1761                        self.append_lifecycle_event(
1762                            "RetryScheduled",
1763                            &event,
1764                            binding,
1765                            serde_json::json!({
1766                                "event_id": event.id.0,
1767                                "attempt": attempt + 1,
1768                                "delay_ms": delay.as_millis(),
1769                                "error": error.to_string(),
1770                                "replay_of_event_id": replay_of_event_id,
1771                            }),
1772                            replay_of_event_id.as_ref(),
1773                        )
1774                        .await?;
1775                        self.append_topic_event(
1776                            TRIGGER_ATTEMPTS_TOPIC,
1777                            "retry_scheduled",
1778                            &event,
1779                            Some(binding),
1780                            Some(attempt + 1),
1781                            serde_json::json!({
1782                                "event_id": event.id.0,
1783                                "attempt": attempt + 1,
1784                                "trigger_id": binding.id.as_str(),
1785                                "binding_key": binding.binding_key(),
1786                                "delay_ms": delay.as_millis(),
1787                                "error": error.to_string(),
1788                                "replay_of_event_id": replay_of_event_id,
1789                            }),
1790                            replay_of_event_id.as_ref(),
1791                        )
1792                        .await?;
1793                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1794                        let sleep_result = sleep_or_cancel_or_request(
1795                            &self.event_log,
1796                            delay,
1797                            &binding_key,
1798                            &event.id.0,
1799                            replay_of_event_id.as_ref(),
1800                            &mut self.cancel_tx.subscribe(),
1801                        )
1802                        .await;
1803                        decrement_retry_queue_depth(&self.state);
1804                        if sleep_result.is_err() {
1805                            finish_in_flight(
1806                                binding.id.as_str(),
1807                                binding.version,
1808                                TriggerDispatchOutcome::Failed,
1809                            )
1810                            .await
1811                            .map_err(|registry_error| {
1812                                DispatchError::Registry(registry_error.to_string())
1813                            })?;
1814                            self.release_flow_control(&acquired_flow).await?;
1815                            decrement_in_flight(&self.state);
1816                            self.append_dispatch_trust_record(
1817                                binding,
1818                                &route,
1819                                &event,
1820                                replay_of_event_id.as_ref(),
1821                                autonomy_tier,
1822                                TrustOutcome::Failure,
1823                                "cancelled",
1824                                attempt,
1825                                Some("dispatcher shutdown cancelled retry wait".to_string()),
1826                            )
1827                            .await?;
1828                            return Ok(DispatchOutcome {
1829                                trigger_id: binding.id.as_str().to_string(),
1830                                binding_key: binding.binding_key(),
1831                                event_id: event.id.0,
1832                                attempt_count: attempt,
1833                                status: DispatchStatus::Cancelled,
1834                                handler_kind: route.kind().to_string(),
1835                                target_uri: route.target_uri(),
1836                                replay_of_event_id,
1837                                result: None,
1838                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1839                            });
1840                        }
1841                        continue;
1842                    }
1843
1844                    let final_error = error.to_string();
1845                    let dlq_entry = DlqEntry {
1846                        trigger_id: binding.id.as_str().to_string(),
1847                        binding_key: binding.binding_key(),
1848                        event: event.clone(),
1849                        attempt_count: attempt,
1850                        final_error: final_error.clone(),
1851                        attempts: attempts.clone(),
1852                    };
1853                    self.state
1854                        .dlq
1855                        .lock()
1856                        .expect("dispatcher dlq poisoned")
1857                        .push(dlq_entry.clone());
1858                    if let Some(metrics) = self.metrics.as_ref() {
1859                        metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
1860                    }
1861                    self.emit_action_graph(
1862                        &event,
1863                        vec![RunActionGraphNodeRecord {
1864                            id: format!("dlq:{binding_key}:{}", event.id.0),
1865                            label: binding.id.as_str().to_string(),
1866                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1867                            status: "queued".to_string(),
1868                            outcome: "retry_exhausted".to_string(),
1869                            trace_id: Some(event.trace_id.0.clone()),
1870                            stage_id: None,
1871                            node_id: None,
1872                            worker_id: None,
1873                            run_id: None,
1874                            run_path: None,
1875                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
1876                        }],
1877                        vec![RunActionGraphEdgeRecord {
1878                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1879                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
1880                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1881                            label: Some(format!("{attempt} attempts")),
1882                        }],
1883                        serde_json::json!({
1884                            "source": "dispatcher",
1885                            "trigger_id": binding.id.as_str(),
1886                            "binding_key": binding.binding_key(),
1887                            "event_id": event.id.0,
1888                            "attempt_count": attempt,
1889                            "final_error": final_error,
1890                            "replay_of_event_id": replay_of_event_id,
1891                        }),
1892                    )
1893                    .await?;
1894                    self.append_lifecycle_event(
1895                        "DlqMoved",
1896                        &event,
1897                        binding,
1898                        serde_json::json!({
1899                            "event_id": event.id.0,
1900                            "attempt_count": attempt,
1901                            "final_error": dlq_entry.final_error,
1902                            "replay_of_event_id": replay_of_event_id,
1903                        }),
1904                        replay_of_event_id.as_ref(),
1905                    )
1906                    .await?;
1907                    self.append_topic_event(
1908                        TRIGGER_DLQ_TOPIC,
1909                        "dlq_moved",
1910                        &event,
1911                        Some(binding),
1912                        Some(attempt),
1913                        serde_json::to_value(&dlq_entry)
1914                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1915                        replay_of_event_id.as_ref(),
1916                    )
1917                    .await?;
1918                    finish_in_flight(
1919                        binding.id.as_str(),
1920                        binding.version,
1921                        TriggerDispatchOutcome::Dlq,
1922                    )
1923                    .await
1924                    .map_err(|registry_error| {
1925                        DispatchError::Registry(registry_error.to_string())
1926                    })?;
1927                    self.release_flow_control(&acquired_flow).await?;
1928                    decrement_in_flight(&self.state);
1929                    self.append_dispatch_trust_record(
1930                        binding,
1931                        &route,
1932                        &event,
1933                        replay_of_event_id.as_ref(),
1934                        autonomy_tier,
1935                        TrustOutcome::Failure,
1936                        "dlq",
1937                        attempt,
1938                        Some(error.to_string()),
1939                    )
1940                    .await?;
1941                    return Ok(DispatchOutcome {
1942                        trigger_id: binding.id.as_str().to_string(),
1943                        binding_key: binding.binding_key(),
1944                        event_id: event.id.0,
1945                        attempt_count: attempt,
1946                        status: DispatchStatus::Dlq,
1947                        handler_kind: route.kind().to_string(),
1948                        target_uri: route.target_uri(),
1949                        replay_of_event_id,
1950                        result: None,
1951                        error: Some(error.to_string()),
1952                    });
1953                }
1954            }
1955        }
1956
1957        finish_in_flight(
1958            binding.id.as_str(),
1959            binding.version,
1960            TriggerDispatchOutcome::Failed,
1961        )
1962        .await
1963        .map_err(|error| DispatchError::Registry(error.to_string()))?;
1964        self.release_flow_control(&acquired_flow).await?;
1965        decrement_in_flight(&self.state);
1966        self.append_dispatch_trust_record(
1967            binding,
1968            &route,
1969            &event,
1970            replay_of_event_id.as_ref(),
1971            autonomy_tier,
1972            TrustOutcome::Failure,
1973            "failed",
1974            max_attempts,
1975            Some("dispatch exhausted without terminal outcome".to_string()),
1976        )
1977        .await?;
1978        Ok(DispatchOutcome {
1979            trigger_id: binding.id.as_str().to_string(),
1980            binding_key: binding.binding_key(),
1981            event_id: event.id.0,
1982            attempt_count: max_attempts,
1983            status: DispatchStatus::Failed,
1984            handler_kind: route.kind().to_string(),
1985            target_uri: route.target_uri(),
1986            replay_of_event_id,
1987            result: None,
1988            error: Some("dispatch exhausted without terminal outcome".to_string()),
1989        })
1990    }
1991
1992    async fn dispatch_once(
1993        &self,
1994        binding: &TriggerBinding,
1995        route: &DispatchUri,
1996        event: &TriggerEvent,
1997        autonomy_tier: AutonomyTier,
1998        wait_lease: Option<DispatchWaitLease>,
1999        cancel_rx: &mut broadcast::Receiver<()>,
2000    ) -> Result<serde_json::Value, DispatchError> {
2001        match route {
2002            DispatchUri::Local { .. } => {
2003                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2004                    return Err(DispatchError::Local(format!(
2005                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2006                        binding.id.as_str()
2007                    )));
2008                };
2009                let value = self
2010                    .invoke_vm_callable(
2011                        closure,
2012                        &binding.binding_key(),
2013                        event,
2014                        None,
2015                        binding.id.as_str(),
2016                        &format!("{}.{}", event.provider.as_str(), event.kind),
2017                        autonomy_tier,
2018                        wait_lease,
2019                        cancel_rx,
2020                    )
2021                    .await?;
2022                Ok(vm_value_to_json(&value))
2023            }
2024            DispatchUri::A2a {
2025                target,
2026                allow_cleartext,
2027            } => {
2028                if self.state.shutting_down.load(Ordering::SeqCst) {
2029                    return Err(DispatchError::Cancelled(
2030                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
2031                    ));
2032                }
2033                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2034                    target,
2035                    *allow_cleartext,
2036                    binding.id.as_str(),
2037                    &binding.binding_key(),
2038                    event,
2039                    cancel_rx,
2040                )
2041                .await
2042                .map_err(|error| match error {
2043                    crate::a2a::A2aClientError::Cancelled(message) => {
2044                        DispatchError::Cancelled(message)
2045                    }
2046                    other => DispatchError::A2a(other.to_string()),
2047                })?;
2048                match ack {
2049                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2050                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2051                }
2052            }
2053            DispatchUri::Worker { queue } => {
2054                let receipt = crate::WorkerQueue::new(self.event_log.clone())
2055                    .enqueue(&crate::WorkerQueueJob {
2056                        queue: queue.clone(),
2057                        trigger_id: binding.id.as_str().to_string(),
2058                        binding_key: binding.binding_key(),
2059                        binding_version: binding.version,
2060                        event: event.clone(),
2061                        replay_of_event_id: current_dispatch_context()
2062                            .and_then(|context| context.replay_of_event_id),
2063                        priority: worker_queue_priority(binding, event),
2064                    })
2065                    .await
2066                    .map_err(DispatchError::from)?;
2067                Ok(serde_json::to_value(receipt)
2068                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
2069            }
2070        }
2071    }
2072
2073    async fn apply_flow_control(
2074        &self,
2075        binding: &TriggerBinding,
2076        event: &TriggerEvent,
2077        replay_of_event_id: Option<&String>,
2078    ) -> Result<FlowControlOutcome, DispatchError> {
2079        let flow = &binding.flow_control;
2080        let mut managed_event = event.clone();
2081
2082        if let Some(batch) = &flow.batch {
2083            let gate = self
2084                .resolve_flow_gate(
2085                    &binding.binding_key(),
2086                    batch.key.as_ref(),
2087                    &managed_event,
2088                    replay_of_event_id,
2089                )
2090                .await?;
2091            match self
2092                .state
2093                .flow_control
2094                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2095                .await
2096                .map_err(DispatchError::from)?
2097            {
2098                BatchDecision::Dispatch(events) => {
2099                    managed_event = build_batched_event(events)?;
2100                }
2101                BatchDecision::Merged => {
2102                    return Ok(FlowControlOutcome::Skip {
2103                        reason: "batch_merged".to_string(),
2104                    })
2105                }
2106            }
2107        }
2108
2109        if let Some(debounce) = &flow.debounce {
2110            let gate = self
2111                .resolve_flow_gate(
2112                    &binding.binding_key(),
2113                    Some(&debounce.key),
2114                    &managed_event,
2115                    replay_of_event_id,
2116                )
2117                .await?;
2118            let latest = self
2119                .state
2120                .flow_control
2121                .debounce(&gate, debounce.period)
2122                .await
2123                .map_err(DispatchError::from)?;
2124            if !latest {
2125                return Ok(FlowControlOutcome::Skip {
2126                    reason: "debounced".to_string(),
2127                });
2128            }
2129        }
2130
2131        if let Some(rate_limit) = &flow.rate_limit {
2132            let gate = self
2133                .resolve_flow_gate(
2134                    &binding.binding_key(),
2135                    rate_limit.key.as_ref(),
2136                    &managed_event,
2137                    replay_of_event_id,
2138                )
2139                .await?;
2140            let allowed = self
2141                .state
2142                .flow_control
2143                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2144                .await
2145                .map_err(DispatchError::from)?;
2146            if !allowed {
2147                return Ok(FlowControlOutcome::Skip {
2148                    reason: "rate_limited".to_string(),
2149                });
2150            }
2151        }
2152
2153        if let Some(throttle) = &flow.throttle {
2154            let gate = self
2155                .resolve_flow_gate(
2156                    &binding.binding_key(),
2157                    throttle.key.as_ref(),
2158                    &managed_event,
2159                    replay_of_event_id,
2160                )
2161                .await?;
2162            self.state
2163                .flow_control
2164                .wait_for_throttle(&gate, throttle.period, throttle.max)
2165                .await
2166                .map_err(DispatchError::from)?;
2167        }
2168
2169        let mut acquired = AcquiredFlowControl::default();
2170        if let Some(singleton) = &flow.singleton {
2171            let gate = self
2172                .resolve_flow_gate(
2173                    &binding.binding_key(),
2174                    singleton.key.as_ref(),
2175                    &managed_event,
2176                    replay_of_event_id,
2177                )
2178                .await?;
2179            let acquired_singleton = self
2180                .state
2181                .flow_control
2182                .try_acquire_singleton(&gate)
2183                .await
2184                .map_err(DispatchError::from)?;
2185            if !acquired_singleton {
2186                return Ok(FlowControlOutcome::Skip {
2187                    reason: "singleton_active".to_string(),
2188                });
2189            }
2190            acquired.singleton = Some(SingletonLease { gate, held: true });
2191        }
2192
2193        if let Some(concurrency) = &flow.concurrency {
2194            let gate = self
2195                .resolve_flow_gate(
2196                    &binding.binding_key(),
2197                    concurrency.key.as_ref(),
2198                    &managed_event,
2199                    replay_of_event_id,
2200                )
2201                .await?;
2202            let priority_rank = self
2203                .resolve_priority_rank(
2204                    &binding.binding_key(),
2205                    flow.priority.as_ref(),
2206                    &managed_event,
2207                    replay_of_event_id,
2208                )
2209                .await?;
2210            let permit = self
2211                .state
2212                .flow_control
2213                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2214                .await
2215                .map_err(DispatchError::from)?;
2216            acquired.concurrency = Some(ConcurrencyLease {
2217                gate,
2218                max: concurrency.max,
2219                priority_rank,
2220                permit: Some(permit),
2221            });
2222        }
2223
2224        Ok(FlowControlOutcome::Dispatch {
2225            event: Box::new(managed_event),
2226            acquired,
2227        })
2228    }
2229
2230    async fn release_flow_control(
2231        &self,
2232        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2233    ) -> Result<(), DispatchError> {
2234        let (singleton_gate, concurrency_permit) = {
2235            let mut acquired = acquired.lock().await;
2236            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2237                if lease.held {
2238                    lease.held = false;
2239                    Some(lease.gate.clone())
2240                } else {
2241                    None
2242                }
2243            });
2244            let concurrency_permit = acquired
2245                .concurrency
2246                .as_mut()
2247                .and_then(|lease| lease.permit.take());
2248            (singleton_gate, concurrency_permit)
2249        };
2250        if let Some(gate) = singleton_gate {
2251            self.state
2252                .flow_control
2253                .release_singleton(&gate)
2254                .await
2255                .map_err(DispatchError::from)?;
2256        }
2257        if let Some(permit) = concurrency_permit {
2258            self.state
2259                .flow_control
2260                .release_concurrency(permit)
2261                .await
2262                .map_err(DispatchError::from)?;
2263        }
2264        Ok(())
2265    }
2266
2267    async fn resolve_flow_gate(
2268        &self,
2269        binding_key: &str,
2270        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2271        event: &TriggerEvent,
2272        replay_of_event_id: Option<&String>,
2273    ) -> Result<String, DispatchError> {
2274        let key = match expr {
2275            Some(expr) => {
2276                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2277                    .await?
2278            }
2279            None => "_global".to_string(),
2280        };
2281        Ok(format!("{binding_key}:{key}"))
2282    }
2283
2284    async fn resolve_priority_rank(
2285        &self,
2286        binding_key: &str,
2287        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2288        event: &TriggerEvent,
2289        replay_of_event_id: Option<&String>,
2290    ) -> Result<usize, DispatchError> {
2291        let Some(priority) = priority else {
2292            return Ok(0);
2293        };
2294        let value = self
2295            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2296            .await?;
2297        Ok(priority
2298            .order
2299            .iter()
2300            .position(|candidate| candidate == &value)
2301            .unwrap_or(priority.order.len()))
2302    }
2303
2304    async fn evaluate_flow_expression(
2305        &self,
2306        binding_key: &str,
2307        expr: &crate::triggers::TriggerExpressionSpec,
2308        event: &TriggerEvent,
2309        replay_of_event_id: Option<&String>,
2310    ) -> Result<String, DispatchError> {
2311        let value = self
2312            .invoke_vm_callable(
2313                &expr.closure,
2314                binding_key,
2315                event,
2316                replay_of_event_id,
2317                "",
2318                "flow_control",
2319                AutonomyTier::Suggest,
2320                None,
2321                &mut self.cancel_tx.subscribe(),
2322            )
2323            .await?;
2324        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2325    }
2326
2327    #[allow(clippy::too_many_arguments)]
2328    async fn invoke_vm_callable(
2329        &self,
2330        closure: &crate::value::VmClosure,
2331        binding_key: &str,
2332        event: &TriggerEvent,
2333        replay_of_event_id: Option<&String>,
2334        agent_id: &str,
2335        action: &str,
2336        autonomy_tier: AutonomyTier,
2337        wait_lease: Option<DispatchWaitLease>,
2338        cancel_rx: &mut broadcast::Receiver<()>,
2339    ) -> Result<VmValue, DispatchError> {
2340        let mut vm = self.base_vm.child_vm();
2341        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2342        if self.state.shutting_down.load(Ordering::SeqCst) {
2343            cancel_token.store(true, Ordering::SeqCst);
2344        }
2345        self.state
2346            .cancel_tokens
2347            .lock()
2348            .expect("dispatcher cancel tokens poisoned")
2349            .push(cancel_token.clone());
2350        vm.install_cancel_token(cancel_token.clone());
2351        let arg = event_to_handler_value(event)?;
2352        let args = [arg];
2353        let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2354        let effective_policy = match crate::orchestration::current_execution_policy() {
2355            Some(parent) => parent
2356                .intersect(&tier_policy)
2357                .map_err(|error| DispatchError::Local(error.to_string()))?,
2358            None => tier_policy,
2359        };
2360        crate::orchestration::push_execution_policy(effective_policy);
2361        let _policy_guard = DispatchExecutionPolicyGuard;
2362        let future = vm.call_closure_pub(closure, &args);
2363        pin_mut!(future);
2364        let (binding_id, binding_version) = split_binding_key(binding_key);
2365        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2366            slot.borrow_mut().replace(DispatchContext {
2367                trigger_event: event.clone(),
2368                replay_of_event_id: replay_of_event_id.cloned(),
2369                binding_id,
2370                binding_version,
2371                agent_id: agent_id.to_string(),
2372                action: action.to_string(),
2373                autonomy_tier,
2374            })
2375        });
2376        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2377            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2378        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2379        crate::stdlib::hitl::reset_hitl_state();
2380        let mut poll = tokio::time::interval(Duration::from_millis(100));
2381        let result = loop {
2382            tokio::select! {
2383                result = &mut future => break result,
2384                _ = recv_cancel(cancel_rx) => {
2385                    cancel_token.store(true, Ordering::SeqCst);
2386                }
2387                _ = poll.tick() => {
2388                    if dispatch_cancel_requested(
2389                        &self.event_log,
2390                        binding_key,
2391                        &event.id.0,
2392                        replay_of_event_id,
2393                    )
2394                    .await? {
2395                        cancel_token.store(true, Ordering::SeqCst);
2396                    }
2397                }
2398            }
2399        };
2400        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2401            *slot.borrow_mut() = prior_context;
2402        });
2403        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2404            *slot.borrow_mut() = prior_wait_lease;
2405        });
2406        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2407        {
2408            let mut tokens = self
2409                .state
2410                .cancel_tokens
2411                .lock()
2412                .expect("dispatcher cancel tokens poisoned");
2413            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2414        }
2415
2416        if cancel_token.load(Ordering::SeqCst) {
2417            if dispatch_cancel_requested(
2418                &self.event_log,
2419                binding_key,
2420                &event.id.0,
2421                replay_of_event_id,
2422            )
2423            .await?
2424            {
2425                Err(DispatchError::Cancelled(
2426                    "trigger cancel request cancelled local handler".to_string(),
2427                ))
2428            } else {
2429                Err(DispatchError::Cancelled(
2430                    "dispatcher shutdown cancelled local handler".to_string(),
2431                ))
2432            }
2433        } else {
2434            result.map_err(dispatch_error_from_vm_error)
2435        }
2436    }
2437
2438    async fn evaluate_predicate(
2439        &self,
2440        binding: &TriggerBinding,
2441        predicate: &super::registry::TriggerPredicateSpec,
2442        event: &TriggerEvent,
2443        replay_of_event_id: Option<&String>,
2444        autonomy_tier: AutonomyTier,
2445    ) -> Result<PredicateEvaluationRecord, DispatchError> {
2446        let event_id = event.id.0.clone();
2447        let trigger_id = binding.id.as_str().to_string();
2448        let now_ms = now_unix_ms();
2449        reset_binding_budget_windows(binding);
2450
2451        let breaker_open_until = {
2452            let state = binding
2453                .predicate_state
2454                .lock()
2455                .expect("trigger predicate state poisoned");
2456            if state
2457                .breaker_open_until_ms
2458                .is_some_and(|until_ms| until_ms > now_ms)
2459            {
2460                state.breaker_open_until_ms
2461            } else {
2462                None
2463            }
2464        };
2465
2466        if breaker_open_until.is_some() {
2467            let mut metadata = BTreeMap::new();
2468            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2469            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2470            metadata.insert(
2471                "breaker_open_until_ms".to_string(),
2472                serde_json::json!(breaker_open_until),
2473            );
2474            crate::events::log_warn_meta(
2475                "trigger.predicate.circuit_breaker",
2476                "trigger predicate circuit breaker is open; short-circuiting to false",
2477                metadata,
2478            );
2479            let record = PredicateEvaluationRecord {
2480                result: false,
2481                reason: Some("circuit_open".to_string()),
2482                ..Default::default()
2483            };
2484            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2485                .await?;
2486            return Ok(record);
2487        }
2488
2489        let expected_cost = expected_predicate_cost_usd_micros(binding);
2490        if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2491            .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2492        {
2493            self.append_budget_exhausted_event(
2494                binding,
2495                event,
2496                reason,
2497                expected_cost,
2498                None,
2499                replay_of_event_id,
2500            )
2501            .await?;
2502            let record = PredicateEvaluationRecord {
2503                result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2504                reason: Some(reason.to_string()),
2505                exhaustion_strategy: Some(binding.on_budget_exhausted),
2506                ..Default::default()
2507            };
2508            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2509                .await?;
2510            return Ok(record);
2511        }
2512
2513        let replay_cache = self
2514            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2515            .await?;
2516        let guard = start_predicate_evaluation(
2517            binding.when_budget.clone().unwrap_or_default(),
2518            replay_cache,
2519        );
2520        let started = std::time::Instant::now();
2521        let eval = self
2522            .invoke_vm_callable_with_timeout(
2523                &predicate.closure,
2524                &binding.binding_key(),
2525                event,
2526                replay_of_event_id,
2527                binding.id.as_str(),
2528                &format!("{}.{}", event.provider.as_str(), event.kind),
2529                autonomy_tier,
2530                &mut self.cancel_tx.subscribe(),
2531                binding
2532                    .when_budget
2533                    .as_ref()
2534                    .and_then(|budget| budget.timeout()),
2535            )
2536            .await;
2537        let capture = guard.finish();
2538        let latency_ms = started.elapsed().as_millis() as u64;
2539        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
2540            self.append_predicate_cache_record(binding, event, &capture.entries)
2541                .await?;
2542        }
2543
2544        let mut record = PredicateEvaluationRecord {
2545            result: false,
2546            cost_usd: capture.total_cost_usd,
2547            tokens: capture.total_tokens,
2548            latency_ms,
2549            cached: capture.cached,
2550            reason: None,
2551            exhaustion_strategy: None,
2552        };
2553
2554        let mut count_failure = false;
2555        let mut opened_breaker = false;
2556
2557        match eval {
2558            Ok(value) => match predicate_value_as_bool(value) {
2559                Ok(result) => {
2560                    record.result = result;
2561                }
2562                Err(reason) => {
2563                    count_failure = true;
2564                    record.reason = Some(reason);
2565                }
2566            },
2567            Err(error) => {
2568                count_failure = true;
2569                record.reason = Some(error.to_string());
2570            }
2571        }
2572
2573        let cost_usd_micros = usd_to_micros(record.cost_usd);
2574        if cost_usd_micros > 0 {
2575            binding
2576                .metrics
2577                .cost_total_usd_micros
2578                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2579            binding
2580                .metrics
2581                .cost_today_usd_micros
2582                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2583            binding
2584                .metrics
2585                .cost_hour_usd_micros
2586                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2587            note_orchestrator_budget_cost(cost_usd_micros);
2588            record_predicate_cost_sample(binding, cost_usd_micros);
2589        }
2590
2591        let timed_out = matches!(
2592            record.reason.as_deref(),
2593            Some("predicate evaluation timed out")
2594        );
2595        if capture.budget_exceeded || timed_out {
2596            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
2597                record.result = false;
2598            } else if timed_out {
2599                record.result = true;
2600            }
2601            record.reason = Some("budget_exceeded".to_string());
2602            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
2603            self.append_budget_exhausted_event(
2604                binding,
2605                event,
2606                "budget_exceeded",
2607                cost_usd_micros,
2608                Some(record.tokens),
2609                replay_of_event_id,
2610            )
2611            .await?;
2612            self.append_lifecycle_event(
2613                "predicate.invocation_budget_exceeded",
2614                event,
2615                binding,
2616                serde_json::json!({
2617                    "trigger_id": binding.id.as_str(),
2618                    "event_id": event.id.0,
2619                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2620                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2621                    "cost_usd": record.cost_usd,
2622                    "tokens": record.tokens,
2623                    "replay_of_event_id": replay_of_event_id,
2624                }),
2625                replay_of_event_id,
2626            )
2627            .await?;
2628        }
2629
2630        if let Some(reason) =
2631            binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
2632        {
2633            if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
2634                record.result = false;
2635            }
2636            record.reason = Some(reason.to_string());
2637            record.exhaustion_strategy = Some(binding.on_budget_exhausted);
2638            self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
2639                .await?;
2640        }
2641
2642        {
2643            let mut state = binding
2644                .predicate_state
2645                .lock()
2646                .expect("trigger predicate state poisoned");
2647            if count_failure {
2648                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2649                if state.consecutive_failures >= 3 {
2650                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2651                    opened_breaker = true;
2652                }
2653            } else {
2654                state.consecutive_failures = 0;
2655                state.breaker_open_until_ms = None;
2656            }
2657        }
2658
2659        if opened_breaker {
2660            let mut metadata = BTreeMap::new();
2661            metadata.insert(
2662                "trigger_id".to_string(),
2663                serde_json::json!(binding.id.as_str()),
2664            );
2665            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2666            metadata.insert("failure_count".to_string(), serde_json::json!(3));
2667            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2668            crate::events::log_warn_meta(
2669                "trigger.predicate.circuit_breaker",
2670                "trigger predicate circuit breaker opened for 5 minutes",
2671                metadata,
2672            );
2673        }
2674
2675        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2676            .await?;
2677        Ok(record)
2678    }
2679
2680    #[allow(clippy::too_many_arguments)]
2681    #[allow(clippy::too_many_arguments)]
2682    async fn invoke_vm_callable_with_timeout(
2683        &self,
2684        closure: &crate::value::VmClosure,
2685        binding_key: &str,
2686        event: &TriggerEvent,
2687        replay_of_event_id: Option<&String>,
2688        agent_id: &str,
2689        action: &str,
2690        autonomy_tier: AutonomyTier,
2691        cancel_rx: &mut broadcast::Receiver<()>,
2692        timeout: Option<Duration>,
2693    ) -> Result<VmValue, DispatchError> {
2694        let future = self.invoke_vm_callable(
2695            closure,
2696            binding_key,
2697            event,
2698            replay_of_event_id,
2699            agent_id,
2700            action,
2701            autonomy_tier,
2702            None,
2703            cancel_rx,
2704        );
2705        pin_mut!(future);
2706        if let Some(timeout) = timeout {
2707            match tokio::time::timeout(timeout, future).await {
2708                Ok(result) => result,
2709                Err(_) => Err(DispatchError::Local(
2710                    "predicate evaluation timed out".to_string(),
2711                )),
2712            }
2713        } else {
2714            future.await
2715        }
2716    }
2717
2718    async fn append_predicate_evaluated_event(
2719        &self,
2720        binding: &TriggerBinding,
2721        event: &TriggerEvent,
2722        record: &PredicateEvaluationRecord,
2723        replay_of_event_id: Option<&String>,
2724    ) -> Result<(), DispatchError> {
2725        if let Some(metrics) = self.metrics.as_ref() {
2726            metrics.record_trigger_predicate_evaluation(
2727                binding.id.as_str(),
2728                record.result,
2729                record.cost_usd,
2730            );
2731            metrics.set_trigger_budget_cost_today(
2732                binding.id.as_str(),
2733                current_predicate_daily_cost(binding),
2734            );
2735            if matches!(
2736                record.reason.as_deref(),
2737                Some(
2738                    "budget_exceeded"
2739                        | "daily_budget_exceeded"
2740                        | "hourly_budget_exceeded"
2741                        | "orchestrator_daily_budget_exceeded"
2742                        | "orchestrator_hourly_budget_exceeded"
2743                )
2744            ) {
2745                metrics.record_trigger_budget_exhausted(
2746                    binding.id.as_str(),
2747                    record
2748                        .exhaustion_strategy
2749                        .map(TriggerBudgetExhaustionStrategy::as_str)
2750                        .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
2751                );
2752            }
2753        }
2754        self.append_lifecycle_event(
2755            "predicate.evaluated",
2756            event,
2757            binding,
2758            serde_json::json!({
2759                "trigger_id": binding.id.as_str(),
2760                "event_id": event.id.0,
2761                "result": record.result,
2762                "cost_usd": record.cost_usd,
2763                "tokens": record.tokens,
2764                "latency_ms": record.latency_ms,
2765                "cached": record.cached,
2766                "reason": record.reason,
2767                "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
2768                "replay_of_event_id": replay_of_event_id,
2769            }),
2770            replay_of_event_id,
2771        )
2772        .await
2773    }
2774
2775    async fn append_predicate_cache_record(
2776        &self,
2777        binding: &TriggerBinding,
2778        event: &TriggerEvent,
2779        entries: &[PredicateCacheEntry],
2780    ) -> Result<(), DispatchError> {
2781        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2782            .expect("static trigger inbox legacy topic name is valid");
2783        let payload = serde_json::to_value(PredicateCacheRecord {
2784            trigger_id: binding.id.as_str().to_string(),
2785            event_id: event.id.0.clone(),
2786            entries: entries.to_vec(),
2787        })
2788        .map_err(|error| DispatchError::Serde(error.to_string()))?;
2789        self.event_log
2790            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2791            .await
2792            .map_err(DispatchError::from)
2793            .map(|_| ())
2794    }
2795
2796    async fn read_predicate_cache_record(
2797        &self,
2798        event_id: &str,
2799    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2800        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2801            .expect("static trigger inbox legacy topic name is valid");
2802        let records = self
2803            .event_log
2804            .read_range(&topic, None, usize::MAX)
2805            .await
2806            .map_err(DispatchError::from)?;
2807        Ok(records
2808            .into_iter()
2809            .filter(|(_, event)| event.kind == "predicate_llm_cache")
2810            .filter_map(|(_, event)| {
2811                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2812            })
2813            .filter(|record| record.event_id == event_id)
2814            .flat_map(|record| record.entries)
2815            .collect())
2816    }
2817
2818    #[allow(clippy::too_many_arguments)]
2819    async fn append_dispatch_trust_record(
2820        &self,
2821        binding: &TriggerBinding,
2822        route: &DispatchUri,
2823        event: &TriggerEvent,
2824        replay_of_event_id: Option<&String>,
2825        autonomy_tier: AutonomyTier,
2826        outcome: TrustOutcome,
2827        terminal_status: &str,
2828        attempt_count: u32,
2829        error: Option<String>,
2830    ) -> Result<(), DispatchError> {
2831        let mut record = TrustRecord::new(
2832            binding.id.as_str().to_string(),
2833            format!("{}.{}", event.provider.as_str(), event.kind),
2834            None,
2835            outcome,
2836            event.trace_id.0.clone(),
2837            autonomy_tier,
2838        );
2839        record.metadata.insert(
2840            "binding_key".to_string(),
2841            serde_json::json!(binding.binding_key()),
2842        );
2843        record.metadata.insert(
2844            "binding_version".to_string(),
2845            serde_json::json!(binding.version),
2846        );
2847        record.metadata.insert(
2848            "provider".to_string(),
2849            serde_json::json!(event.provider.as_str()),
2850        );
2851        record
2852            .metadata
2853            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2854        record
2855            .metadata
2856            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2857        record.metadata.insert(
2858            "target_uri".to_string(),
2859            serde_json::json!(route.target_uri()),
2860        );
2861        record.metadata.insert(
2862            "terminal_status".to_string(),
2863            serde_json::json!(terminal_status),
2864        );
2865        record.metadata.insert(
2866            "attempt_count".to_string(),
2867            serde_json::json!(attempt_count),
2868        );
2869        if let Some(replay_of_event_id) = replay_of_event_id {
2870            record.metadata.insert(
2871                "replay_of_event_id".to_string(),
2872                serde_json::json!(replay_of_event_id),
2873            );
2874        }
2875        if let Some(error) = error {
2876            record
2877                .metadata
2878                .insert("error".to_string(), serde_json::json!(error));
2879        }
2880        append_trust_record(&self.event_log, &record)
2881            .await
2882            .map(|_| ())
2883            .map_err(DispatchError::from)
2884    }
2885
2886    async fn append_attempt_record(
2887        &self,
2888        event: &TriggerEvent,
2889        binding: &TriggerBinding,
2890        attempt: &DispatchAttemptRecord,
2891        replay_of_event_id: Option<&String>,
2892    ) -> Result<(), DispatchError> {
2893        self.append_topic_event(
2894            TRIGGER_ATTEMPTS_TOPIC,
2895            "attempt_recorded",
2896            event,
2897            Some(binding),
2898            Some(attempt.attempt),
2899            serde_json::to_value(attempt)
2900                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2901            replay_of_event_id,
2902        )
2903        .await
2904    }
2905
2906    async fn append_lifecycle_event(
2907        &self,
2908        kind: &str,
2909        event: &TriggerEvent,
2910        binding: &TriggerBinding,
2911        payload: serde_json::Value,
2912        replay_of_event_id: Option<&String>,
2913    ) -> Result<(), DispatchError> {
2914        self.append_topic_event(
2915            TRIGGERS_LIFECYCLE_TOPIC,
2916            kind,
2917            event,
2918            Some(binding),
2919            None,
2920            payload,
2921            replay_of_event_id,
2922        )
2923        .await
2924    }
2925
2926    async fn append_budget_exhausted_event(
2927        &self,
2928        binding: &TriggerBinding,
2929        event: &TriggerEvent,
2930        reason: &str,
2931        expected_cost_usd_micros: u64,
2932        tokens: Option<u64>,
2933        replay_of_event_id: Option<&String>,
2934    ) -> Result<(), DispatchError> {
2935        let payload = serde_json::json!({
2936            "trigger_id": binding.id.as_str(),
2937            "event_id": event.id.0,
2938            "reason": reason,
2939            "strategy": binding.on_budget_exhausted.as_str(),
2940            "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
2941            "cost_usd": micros_to_usd(expected_cost_usd_micros),
2942            "tokens": tokens,
2943            "daily_limit_usd": binding.daily_cost_usd,
2944            "hourly_limit_usd": binding.hourly_cost_usd,
2945            "cost_today_usd": current_predicate_daily_cost(binding),
2946            "cost_hour_usd": current_predicate_hourly_cost(binding),
2947            "replay_of_event_id": replay_of_event_id,
2948        });
2949        self.append_lifecycle_event(
2950            "predicate.budget_exceeded",
2951            event,
2952            binding,
2953            payload.clone(),
2954            replay_of_event_id,
2955        )
2956        .await?;
2957        let legacy_kind = match reason {
2958            "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
2959            "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
2960            "orchestrator_daily_budget_exceeded" => {
2961                Some("predicate.orchestrator_daily_budget_exceeded")
2962            }
2963            "orchestrator_hourly_budget_exceeded" => {
2964                Some("predicate.orchestrator_hourly_budget_exceeded")
2965            }
2966            _ => None,
2967        };
2968        if let Some(kind) = legacy_kind {
2969            self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
2970                .await?;
2971        }
2972        Ok(())
2973    }
2974
2975    async fn append_autonomy_budget_approval_request(
2976        &self,
2977        binding: &TriggerBinding,
2978        route: &DispatchUri,
2979        event: &TriggerEvent,
2980        replay_of_event_id: Option<&String>,
2981        reason: &str,
2982    ) -> Result<String, DispatchError> {
2983        let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2984        let detail = serde_json::json!({
2985            "trigger_id": binding.id.as_str(),
2986            "binding_key": binding.binding_key(),
2987            "event_id": event.id.0,
2988            "reason": reason,
2989            "from_tier": AutonomyTier::ActAuto.as_str(),
2990            "requested_tier": AutonomyTier::ActWithApproval.as_str(),
2991            "handler_kind": route.kind(),
2992            "target_uri": route.target_uri(),
2993            "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
2994            "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
2995            "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
2996            "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
2997            "replay_of_event_id": replay_of_event_id,
2998        });
2999        let request_id = crate::stdlib::hitl::append_approval_request_on(
3000            &self.event_log,
3001            binding.id.as_str().to_string(),
3002            event.trace_id.0.clone(),
3003            format!(
3004                "approve autonomous dispatch for trigger '{}' after {}",
3005                binding.id.as_str(),
3006                reason
3007            ),
3008            detail.clone(),
3009            reviewers.clone(),
3010        )
3011        .await
3012        .map_err(dispatch_error_from_vm_error)?;
3013        self.append_lifecycle_event(
3014            "autonomy.budget_exceeded",
3015            event,
3016            binding,
3017            serde_json::json!({
3018                "trigger_id": binding.id.as_str(),
3019                "event_id": event.id.0,
3020                "reason": reason,
3021                "request_id": request_id,
3022                "reviewers": reviewers,
3023                "from_tier": AutonomyTier::ActAuto.as_str(),
3024                "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3025                "replay_of_event_id": replay_of_event_id,
3026            }),
3027            replay_of_event_id,
3028        )
3029        .await?;
3030        Ok(request_id)
3031    }
3032
3033    #[allow(clippy::too_many_arguments)]
3034    async fn emit_autonomy_budget_approval_action_graph(
3035        &self,
3036        binding: &TriggerBinding,
3037        route: &DispatchUri,
3038        event: &TriggerEvent,
3039        source_node_id: &str,
3040        replay_of_event_id: Option<&String>,
3041        reason: &str,
3042        request_id: &str,
3043    ) -> Result<(), DispatchError> {
3044        let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3045        self.emit_action_graph(
3046            event,
3047            vec![RunActionGraphNodeRecord {
3048                id: approval_node_id.clone(),
3049                label: format!("approval required: {reason}"),
3050                kind: "approval".to_string(),
3051                status: "waiting".to_string(),
3052                outcome: "request_approval".to_string(),
3053                trace_id: Some(event.trace_id.0.clone()),
3054                stage_id: None,
3055                node_id: None,
3056                worker_id: None,
3057                run_id: None,
3058                run_path: None,
3059                metadata: BTreeMap::from([
3060                    (
3061                        "trigger_id".to_string(),
3062                        serde_json::json!(binding.id.as_str()),
3063                    ),
3064                    (
3065                        "binding_key".to_string(),
3066                        serde_json::json!(binding.binding_key()),
3067                    ),
3068                    ("event_id".to_string(), serde_json::json!(event.id.0)),
3069                    ("reason".to_string(), serde_json::json!(reason)),
3070                    ("request_id".to_string(), serde_json::json!(request_id)),
3071                    (
3072                        "reviewers".to_string(),
3073                        serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3074                    ),
3075                    ("handler_kind".to_string(), serde_json::json!(route.kind())),
3076                    (
3077                        "target_uri".to_string(),
3078                        serde_json::json!(route.target_uri()),
3079                    ),
3080                    (
3081                        "from_tier".to_string(),
3082                        serde_json::json!(AutonomyTier::ActAuto.as_str()),
3083                    ),
3084                    (
3085                        "requested_tier".to_string(),
3086                        serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3087                    ),
3088                    (
3089                        "replay_of_event_id".to_string(),
3090                        serde_json::json!(replay_of_event_id),
3091                    ),
3092                ]),
3093            }],
3094            vec![RunActionGraphEdgeRecord {
3095                from_id: source_node_id.to_string(),
3096                to_id: approval_node_id,
3097                kind: "approval_gate".to_string(),
3098                label: Some("autonomy budget".to_string()),
3099            }],
3100            serde_json::json!({
3101                "source": "dispatcher",
3102                "trigger_id": binding.id.as_str(),
3103                "binding_key": binding.binding_key(),
3104                "event_id": event.id.0,
3105                "reason": reason,
3106                "request_id": request_id,
3107                "replay_of_event_id": replay_of_event_id,
3108            }),
3109        )
3110        .await
3111    }
3112
3113    #[allow(clippy::too_many_arguments)]
3114    async fn append_tier_transition_trust_record(
3115        &self,
3116        binding: &TriggerBinding,
3117        event: &TriggerEvent,
3118        replay_of_event_id: Option<&String>,
3119        from_tier: AutonomyTier,
3120        to_tier: AutonomyTier,
3121        reason: &str,
3122        request_id: &str,
3123    ) -> Result<(), DispatchError> {
3124        let mut record = TrustRecord::new(
3125            binding.id.as_str().to_string(),
3126            "autonomy.tier_transition",
3127            Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3128            TrustOutcome::Denied,
3129            event.trace_id.0.clone(),
3130            to_tier,
3131        );
3132        record.metadata.insert(
3133            "binding_key".to_string(),
3134            serde_json::json!(binding.binding_key()),
3135        );
3136        record
3137            .metadata
3138            .insert("event_id".to_string(), serde_json::json!(event.id.0));
3139        record.metadata.insert(
3140            "from_tier".to_string(),
3141            serde_json::json!(from_tier.as_str()),
3142        );
3143        record
3144            .metadata
3145            .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3146        record
3147            .metadata
3148            .insert("reason".to_string(), serde_json::json!(reason));
3149        record
3150            .metadata
3151            .insert("request_id".to_string(), serde_json::json!(request_id));
3152        if let Some(replay_of_event_id) = replay_of_event_id {
3153            record.metadata.insert(
3154                "replay_of_event_id".to_string(),
3155                serde_json::json!(replay_of_event_id),
3156            );
3157        }
3158        append_trust_record(&self.event_log, &record)
3159            .await
3160            .map(|_| ())
3161            .map_err(DispatchError::from)
3162    }
3163
3164    async fn append_budget_deferred_event(
3165        &self,
3166        binding: &TriggerBinding,
3167        route: &DispatchUri,
3168        event: &TriggerEvent,
3169        replay_of_event_id: Option<&String>,
3170        reason: &str,
3171    ) -> Result<(), DispatchError> {
3172        self.append_topic_event(
3173            TRIGGER_ATTEMPTS_TOPIC,
3174            "budget_deferred",
3175            event,
3176            Some(binding),
3177            None,
3178            serde_json::json!({
3179                "event_id": event.id.0,
3180                "trigger_id": binding.id.as_str(),
3181                "binding_key": binding.binding_key(),
3182                "handler_kind": route.kind(),
3183                "target_uri": route.target_uri(),
3184                "reason": reason,
3185                "retry_at": next_budget_reset_rfc3339(binding),
3186                "replay_of_event_id": replay_of_event_id,
3187            }),
3188            replay_of_event_id,
3189        )
3190        .await?;
3191        self.append_skipped_outbox_event(
3192            binding,
3193            route,
3194            event,
3195            replay_of_event_id,
3196            DispatchSkipStage::Predicate,
3197            serde_json::json!({
3198                "deferred": true,
3199                "reason": reason,
3200                "retry_at": next_budget_reset_rfc3339(binding),
3201            }),
3202        )
3203        .await
3204    }
3205
3206    async fn move_budget_exhausted_to_dlq(
3207        &self,
3208        binding: &TriggerBinding,
3209        route: &DispatchUri,
3210        event: &TriggerEvent,
3211        replay_of_event_id: Option<&String>,
3212        final_error: &str,
3213    ) -> Result<(), DispatchError> {
3214        let dlq_entry = DlqEntry {
3215            trigger_id: binding.id.as_str().to_string(),
3216            binding_key: binding.binding_key(),
3217            event: event.clone(),
3218            attempt_count: 0,
3219            final_error: final_error.to_string(),
3220            attempts: Vec::new(),
3221        };
3222        self.state
3223            .dlq
3224            .lock()
3225            .expect("dispatcher dlq poisoned")
3226            .push(dlq_entry.clone());
3227        if let Some(metrics) = self.metrics.as_ref() {
3228            metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3229        }
3230        self.emit_action_graph(
3231            event,
3232            vec![RunActionGraphNodeRecord {
3233                id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3234                label: binding.id.as_str().to_string(),
3235                kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3236                status: "queued".to_string(),
3237                outcome: "budget_exhausted".to_string(),
3238                trace_id: Some(event.trace_id.0.clone()),
3239                stage_id: None,
3240                node_id: None,
3241                worker_id: None,
3242                run_id: None,
3243                run_path: None,
3244                metadata: dlq_node_metadata(binding, event, 0, final_error),
3245            }],
3246            vec![RunActionGraphEdgeRecord {
3247                from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3248                to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3249                kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3250                label: Some("budget exhausted".to_string()),
3251            }],
3252            serde_json::json!({
3253                "source": "dispatcher",
3254                "trigger_id": binding.id.as_str(),
3255                "binding_key": binding.binding_key(),
3256                "event_id": event.id.0,
3257                "handler_kind": route.kind(),
3258                "target_uri": route.target_uri(),
3259                "final_error": final_error,
3260                "replay_of_event_id": replay_of_event_id,
3261            }),
3262        )
3263        .await?;
3264        self.append_lifecycle_event(
3265            "DlqMoved",
3266            event,
3267            binding,
3268            serde_json::json!({
3269                "event_id": event.id.0,
3270                "attempt_count": 0,
3271                "final_error": final_error,
3272                "reason": "budget_exhausted",
3273                "replay_of_event_id": replay_of_event_id,
3274            }),
3275            replay_of_event_id,
3276        )
3277        .await?;
3278        self.append_topic_event(
3279            TRIGGER_DLQ_TOPIC,
3280            "dlq_moved",
3281            event,
3282            Some(binding),
3283            None,
3284            serde_json::to_value(&dlq_entry)
3285                .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3286            replay_of_event_id,
3287        )
3288        .await
3289    }
3290
3291    async fn append_skipped_outbox_event(
3292        &self,
3293        binding: &TriggerBinding,
3294        route: &DispatchUri,
3295        event: &TriggerEvent,
3296        replay_of_event_id: Option<&String>,
3297        stage: DispatchSkipStage,
3298        detail: serde_json::Value,
3299    ) -> Result<(), DispatchError> {
3300        self.append_topic_event(
3301            TRIGGER_OUTBOX_TOPIC,
3302            "dispatch_skipped",
3303            event,
3304            Some(binding),
3305            None,
3306            serde_json::json!({
3307                "event_id": event.id.0,
3308                "trigger_id": binding.id.as_str(),
3309                "binding_key": binding.binding_key(),
3310                "handler_kind": route.kind(),
3311                "target_uri": route.target_uri(),
3312                "skip_stage": stage.as_str(),
3313                "detail": detail,
3314                "replay_of_event_id": replay_of_event_id,
3315            }),
3316            replay_of_event_id,
3317        )
3318        .await
3319    }
3320
3321    async fn append_topic_event(
3322        &self,
3323        topic_name: &str,
3324        kind: &str,
3325        event: &TriggerEvent,
3326        binding: Option<&TriggerBinding>,
3327        attempt: Option<u32>,
3328        payload: serde_json::Value,
3329        replay_of_event_id: Option<&String>,
3330    ) -> Result<(), DispatchError> {
3331        let topic = Topic::new(topic_name)
3332            .expect("static trigger dispatcher topic names should always be valid");
3333        let headers = event_headers(event, binding, attempt, replay_of_event_id);
3334        self.event_log
3335            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3336            .await
3337            .map_err(DispatchError::from)
3338            .map(|_| ())
3339    }
3340
3341    async fn emit_action_graph(
3342        &self,
3343        event: &TriggerEvent,
3344        nodes: Vec<RunActionGraphNodeRecord>,
3345        edges: Vec<RunActionGraphEdgeRecord>,
3346        extra: serde_json::Value,
3347    ) -> Result<(), DispatchError> {
3348        let mut headers = BTreeMap::new();
3349        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3350        headers.insert("event_id".to_string(), event.id.0.clone());
3351        let observability = RunObservabilityRecord {
3352            schema_version: 1,
3353            action_graph_nodes: nodes,
3354            action_graph_edges: edges,
3355            ..Default::default()
3356        };
3357        append_action_graph_update(
3358            headers,
3359            serde_json::json!({
3360                "source": "dispatcher",
3361                "trace_id": event.trace_id.0,
3362                "event_id": event.id.0,
3363                "observability": observability,
3364                "context": extra,
3365            }),
3366        )
3367        .await
3368        .map_err(DispatchError::from)
3369    }
3370}
3371
3372async fn dispatch_cancel_requested(
3373    event_log: &Arc<AnyEventLog>,
3374    binding_key: &str,
3375    event_id: &str,
3376    replay_of_event_id: Option<&String>,
3377) -> Result<bool, DispatchError> {
3378    if replay_of_event_id.is_some() {
3379        return Ok(false);
3380    }
3381    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3382        .expect("static trigger cancel topic should always be valid");
3383    let events = event_log.read_range(&topic, None, usize::MAX).await?;
3384    let requested = events
3385        .into_iter()
3386        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3387        .filter_map(|(_, event)| {
3388            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3389        })
3390        .collect::<BTreeSet<_>>();
3391    Ok(requested
3392        .iter()
3393        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3394}
3395
3396async fn sleep_or_cancel_or_request(
3397    event_log: &Arc<AnyEventLog>,
3398    delay: Duration,
3399    binding_key: &str,
3400    event_id: &str,
3401    replay_of_event_id: Option<&String>,
3402    cancel_rx: &mut broadcast::Receiver<()>,
3403) -> Result<(), DispatchError> {
3404    let sleep = tokio::time::sleep(delay);
3405    pin_mut!(sleep);
3406    let mut poll = tokio::time::interval(Duration::from_millis(100));
3407    loop {
3408        tokio::select! {
3409            _ = &mut sleep => return Ok(()),
3410            _ = recv_cancel(cancel_rx) => {
3411                return Err(DispatchError::Cancelled(
3412                    "dispatcher shutdown cancelled retry wait".to_string(),
3413                ));
3414            }
3415            _ = poll.tick() => {
3416                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3417                    return Err(DispatchError::Cancelled(
3418                        "trigger cancel request cancelled retry wait".to_string(),
3419                    ));
3420                }
3421            }
3422        }
3423    }
3424}
3425
3426fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3427    let mut iter = events.into_iter();
3428    let Some(mut root) = iter.next() else {
3429        return Err(DispatchError::Registry(
3430            "batch dispatch produced an empty event list".to_string(),
3431        ));
3432    };
3433    let mut batch = Vec::new();
3434    batch.push(
3435        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3436    );
3437    for event in iter {
3438        batch.push(
3439            serde_json::to_value(&event)
3440                .map_err(|error| DispatchError::Serde(error.to_string()))?,
3441        );
3442    }
3443    root.batch = Some(batch);
3444    Ok(root)
3445}
3446
3447fn json_value_to_gate(value: &serde_json::Value) -> String {
3448    match value {
3449        serde_json::Value::Null => "null".to_string(),
3450        serde_json::Value::String(text) => text.clone(),
3451        serde_json::Value::Bool(value) => value.to_string(),
3452        serde_json::Value::Number(value) => value.to_string(),
3453        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3454    }
3455}
3456
3457fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3458    let json =
3459        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3460    let value = json_to_vm_value(&json);
3461    match (&event.raw_body, value) {
3462        (Some(raw_body), VmValue::Dict(dict)) => {
3463            let mut map = (*dict).clone();
3464            map.insert(
3465                "raw_body".to_string(),
3466                VmValue::Bytes(Rc::new(raw_body.clone())),
3467            );
3468            Ok(VmValue::Dict(Rc::new(map)))
3469        }
3470        (_, other) => Ok(other),
3471    }
3472}
3473
3474fn decrement_in_flight(state: &DispatcherRuntimeState) {
3475    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3476    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3477        state.idle_notify.notify_waiters();
3478    }
3479}
3480
3481fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3482    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3483    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3484        state.idle_notify.notify_waiters();
3485    }
3486}
3487
3488#[cfg(test)]
3489fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3490    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3491        *slot.borrow_mut() = Some(tx);
3492    });
3493}
3494
3495#[cfg(not(test))]
3496fn notify_test_inbox_dequeued() {}
3497
3498#[cfg(test)]
3499fn notify_test_inbox_dequeued() {
3500    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3501        if let Some(tx) = slot.borrow_mut().take() {
3502            let _ = tx.send(());
3503        }
3504    });
3505}
3506
3507pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3508    event_log: &L,
3509    event: &TriggerEvent,
3510) -> Result<u64, DispatchError> {
3511    let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
3512        .expect("static trigger.inbox.envelopes topic is valid");
3513    let headers = event_headers(event, None, None, None);
3514    let payload =
3515        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3516    event_log
3517        .append(
3518            &topic,
3519            LogEvent::new("event_ingested", payload).with_headers(headers),
3520        )
3521        .await
3522        .map_err(DispatchError::from)
3523}
3524
3525pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3526    ACTIVE_DISPATCHER_STATE.with(|slot| {
3527        slot.borrow()
3528            .as_ref()
3529            .map(|state| DispatcherStatsSnapshot {
3530                in_flight: state.in_flight.load(Ordering::Relaxed),
3531                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3532                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3533            })
3534            .unwrap_or_default()
3535    })
3536}
3537
3538pub fn clear_dispatcher_state() {
3539    ACTIVE_DISPATCHER_STATE.with(|slot| {
3540        *slot.borrow_mut() = None;
3541    });
3542    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3543        *slot.borrow_mut() = None;
3544    });
3545}
3546
3547fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3548    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3549        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3550    }
3551    if is_cancelled_vm_error(&error) {
3552        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3553    }
3554    if let VmError::Thrown(VmValue::String(message)) = &error {
3555        return DispatchError::Local(message.to_string());
3556    }
3557    match error_to_category(&error) {
3558        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3559        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3560        ErrorCategory::Cancelled => {
3561            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3562        }
3563        _ => DispatchError::Local(error.to_string()),
3564    }
3565}
3566
3567fn dispatch_error_label(error: &DispatchError) -> &'static str {
3568    match error {
3569        DispatchError::Denied(_) => "denied",
3570        DispatchError::Timeout(_) => "timeout",
3571        DispatchError::Waiting(_) => "waiting",
3572        DispatchError::Cancelled(_) => "cancelled",
3573        _ => "failed",
3574    }
3575}
3576
3577fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3578    match route {
3579        DispatchUri::Worker { .. } => "enqueued",
3580        DispatchUri::A2a { .. }
3581            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3582        {
3583            "pending"
3584        }
3585        DispatchUri::A2a { .. } => "completed",
3586        DispatchUri::Local { .. } => "success",
3587    }
3588}
3589
3590fn dispatch_node_id(
3591    route: &DispatchUri,
3592    binding_key: &str,
3593    event_id: &str,
3594    attempt: u32,
3595) -> String {
3596    let prefix = match route {
3597        DispatchUri::A2a { .. } => "a2a",
3598        _ => "dispatch",
3599    };
3600    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3601}
3602
3603fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3604    match route {
3605        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3606        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3607        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3608    }
3609}
3610
3611fn dispatch_node_label(route: &DispatchUri) -> String {
3612    match route {
3613        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3614        _ => route.target_uri(),
3615    }
3616}
3617
3618fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3619    match route {
3620        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3621        _ => None,
3622    }
3623}
3624
3625fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3626    match route {
3627        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3628        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3629        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3630    }
3631}
3632
3633fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3634    match status {
3635        crate::triggers::SignatureStatus::Verified => "verified",
3636        crate::triggers::SignatureStatus::Unsigned => "unsigned",
3637        crate::triggers::SignatureStatus::Failed { .. } => "failed",
3638    }
3639}
3640
3641fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3642    let mut metadata = BTreeMap::new();
3643    metadata.insert(
3644        "provider".to_string(),
3645        serde_json::json!(event.provider.as_str()),
3646    );
3647    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3648    metadata.insert(
3649        "dedupe_key".to_string(),
3650        serde_json::json!(event.dedupe_key),
3651    );
3652    metadata.insert(
3653        "signature_status".to_string(),
3654        serde_json::json!(signature_status_label(&event.signature_status)),
3655    );
3656    metadata
3657}
3658
3659fn predicate_node_metadata(
3660    binding: &TriggerBinding,
3661    predicate: &super::registry::TriggerPredicateSpec,
3662    event: &TriggerEvent,
3663    evaluation: &PredicateEvaluationRecord,
3664) -> BTreeMap<String, serde_json::Value> {
3665    let mut metadata = BTreeMap::new();
3666    metadata.insert(
3667        "trigger_id".to_string(),
3668        serde_json::json!(binding.id.as_str()),
3669    );
3670    metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
3671    metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
3672    metadata.insert(
3673        "cost_usd".to_string(),
3674        serde_json::json!(evaluation.cost_usd),
3675    );
3676    metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
3677    metadata.insert(
3678        "latency_ms".to_string(),
3679        serde_json::json!(evaluation.latency_ms),
3680    );
3681    metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
3682    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3683    if let Some(reason) = evaluation.reason.as_ref() {
3684        metadata.insert("reason".to_string(), serde_json::json!(reason));
3685    }
3686    metadata
3687}
3688
3689fn dispatch_node_metadata(
3690    route: &DispatchUri,
3691    binding: &TriggerBinding,
3692    event: &TriggerEvent,
3693    attempt: u32,
3694) -> BTreeMap<String, serde_json::Value> {
3695    let mut metadata = BTreeMap::new();
3696    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3697    metadata.insert(
3698        "target_uri".to_string(),
3699        serde_json::json!(route.target_uri()),
3700    );
3701    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3702    metadata.insert(
3703        "trigger_id".to_string(),
3704        serde_json::json!(binding.id.as_str()),
3705    );
3706    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3707    if let Some(target_agent) = dispatch_target_agent(route) {
3708        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3709    }
3710    if let DispatchUri::Worker { queue } = route {
3711        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3712    }
3713    metadata
3714}
3715
3716fn dispatch_success_metadata(
3717    route: &DispatchUri,
3718    binding: &TriggerBinding,
3719    event: &TriggerEvent,
3720    attempt: u32,
3721    result: &serde_json::Value,
3722) -> BTreeMap<String, serde_json::Value> {
3723    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3724    match route {
3725        DispatchUri::A2a { .. } => {
3726            if let Some(task_id) = result
3727                .get("task_id")
3728                .or_else(|| result.get("id"))
3729                .and_then(|value| value.as_str())
3730            {
3731                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3732            }
3733            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3734                metadata.insert("state".to_string(), serde_json::json!(state));
3735            }
3736        }
3737        DispatchUri::Worker { .. } => {
3738            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3739            {
3740                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3741            }
3742            if let Some(response_topic) = result
3743                .get("response_topic")
3744                .and_then(|value| value.as_str())
3745            {
3746                metadata.insert(
3747                    "response_topic".to_string(),
3748                    serde_json::json!(response_topic),
3749                );
3750            }
3751        }
3752        DispatchUri::Local { .. } => {}
3753    }
3754    metadata
3755}
3756
3757fn dispatch_error_metadata(
3758    route: &DispatchUri,
3759    binding: &TriggerBinding,
3760    event: &TriggerEvent,
3761    attempt: u32,
3762    error: &DispatchError,
3763) -> BTreeMap<String, serde_json::Value> {
3764    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3765    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3766    metadata
3767}
3768
3769fn retry_node_metadata(
3770    binding: &TriggerBinding,
3771    event: &TriggerEvent,
3772    attempt: u32,
3773    delay: Duration,
3774    error: &DispatchError,
3775) -> BTreeMap<String, serde_json::Value> {
3776    let mut metadata = BTreeMap::new();
3777    metadata.insert(
3778        "trigger_id".to_string(),
3779        serde_json::json!(binding.id.as_str()),
3780    );
3781    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3782    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3783    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3784    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3785    metadata
3786}
3787
3788fn dlq_node_metadata(
3789    binding: &TriggerBinding,
3790    event: &TriggerEvent,
3791    attempt_count: u32,
3792    final_error: &str,
3793) -> BTreeMap<String, serde_json::Value> {
3794    let mut metadata = BTreeMap::new();
3795    metadata.insert(
3796        "trigger_id".to_string(),
3797        serde_json::json!(binding.id.as_str()),
3798    );
3799    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3800    metadata.insert(
3801        "attempt_count".to_string(),
3802        serde_json::json!(attempt_count),
3803    );
3804    metadata.insert("final_error".to_string(), serde_json::json!(final_error));
3805    metadata
3806}
3807
3808fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
3809    match value {
3810        VmValue::Bool(result) => Ok(result),
3811        VmValue::EnumVariant {
3812            enum_name,
3813            variant,
3814            fields,
3815        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
3816            Some(VmValue::Bool(result)) => Ok(*result),
3817            Some(other) => Err(format!(
3818                "predicate Result.Ok payload must be bool, got {}",
3819                other.type_name()
3820            )),
3821            None => Err("predicate Result.Ok payload is missing".to_string()),
3822        },
3823        VmValue::EnumVariant {
3824            enum_name,
3825            variant,
3826            fields,
3827        } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
3828            .first()
3829            .map(VmValue::display)
3830            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
3831        other => Err(format!(
3832            "predicate must return bool or Result<bool, _>, got {}",
3833            other.type_name()
3834        )),
3835    }
3836}
3837
3838fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
3839    micros_to_usd(
3840        binding
3841            .metrics
3842            .cost_today_usd_micros
3843            .load(Ordering::Relaxed),
3844    )
3845}
3846
3847fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
3848    micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
3849}
3850
3851fn split_binding_key(binding_key: &str) -> (String, u32) {
3852    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3853        return (binding_key.to_string(), 0);
3854    };
3855    let version = suffix.parse::<u32>().unwrap_or(0);
3856    (binding_id.to_string(), version)
3857}
3858
3859fn is_cancelled_vm_error(error: &VmError) -> bool {
3860    matches!(
3861        error,
3862        VmError::Thrown(VmValue::String(message))
3863            if message.starts_with("kind:cancelled:")
3864    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3865}
3866
3867fn event_headers(
3868    event: &TriggerEvent,
3869    binding: Option<&TriggerBinding>,
3870    attempt: Option<u32>,
3871    replay_of_event_id: Option<&String>,
3872) -> BTreeMap<String, String> {
3873    let mut headers = BTreeMap::new();
3874    headers.insert("event_id".to_string(), event.id.0.clone());
3875    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3876    headers.insert("provider".to_string(), event.provider.as_str().to_string());
3877    headers.insert("kind".to_string(), event.kind.clone());
3878    if let Some(replay_of_event_id) = replay_of_event_id {
3879        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3880    }
3881    if let Some(binding) = binding {
3882        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3883        headers.insert("binding_key".to_string(), binding.binding_key());
3884        headers.insert(
3885            "handler_kind".to_string(),
3886            DispatchUri::from(&binding.handler).kind().to_string(),
3887        );
3888    }
3889    if let Some(attempt) = attempt {
3890        headers.insert("attempt".to_string(), attempt.to_string());
3891    }
3892    headers
3893}
3894
3895fn worker_queue_priority(
3896    binding: &super::registry::TriggerBinding,
3897    event: &TriggerEvent,
3898) -> crate::WorkerQueuePriority {
3899    match event
3900        .headers
3901        .get("priority")
3902        .map(|value| value.trim().to_ascii_lowercase())
3903        .as_deref()
3904    {
3905        Some("high") => crate::WorkerQueuePriority::High,
3906        Some("low") => crate::WorkerQueuePriority::Low,
3907        _ => binding.dispatch_priority,
3908    }
3909}
3910
3911const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3912
3913fn maybe_fail_before_outbox() {
3914    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3915        std::process::exit(86);
3916    }
3917}
3918
3919fn now_rfc3339() -> String {
3920    time::OffsetDateTime::now_utc()
3921        .format(&time::format_description::well_known::Rfc3339)
3922        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3923}
3924
3925fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3926    let now = time::OffsetDateTime::now_utc();
3927    let reset = if binding.hourly_cost_usd.is_some() {
3928        let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3929        time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3930    } else {
3931        let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3932        time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3933    };
3934    reset
3935        .format(&time::format_description::well_known::Rfc3339)
3936        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3937}
3938
3939fn now_unix_ms() -> i64 {
3940    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3941}
3942
3943fn cancelled_dispatch_outcome(
3944    binding: &TriggerBinding,
3945    route: &DispatchUri,
3946    event: &TriggerEvent,
3947    replay_of_event_id: Option<String>,
3948    attempt_count: u32,
3949    error: String,
3950) -> DispatchOutcome {
3951    DispatchOutcome {
3952        trigger_id: binding.id.as_str().to_string(),
3953        binding_key: binding.binding_key(),
3954        event_id: event.id.0.clone(),
3955        attempt_count,
3956        status: DispatchStatus::Cancelled,
3957        handler_kind: route.kind().to_string(),
3958        target_uri: route.target_uri(),
3959        replay_of_event_id,
3960        result: None,
3961        error: Some(error),
3962    }
3963}
3964
3965async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3966    let _ = cancel_rx.recv().await;
3967}
3968
3969#[cfg(test)]
3970mod tests;