Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::{Mutex as AsyncMutex, Notify};
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25    ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26    ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE, ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40    TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53    static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
54    #[cfg(test)]
55    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
56}
57
58tokio::task_local! {
59    static ACTIVE_DISPATCH_IS_REPLAY: bool;
60}
61
62#[derive(Clone, Debug)]
63pub(crate) struct DispatchContext {
64    pub trigger_event: TriggerEvent,
65    pub replay_of_event_id: Option<String>,
66    pub binding_id: String,
67    pub binding_version: u32,
68    pub agent_id: String,
69    pub action: String,
70    pub autonomy_tier: AutonomyTier,
71}
72
73#[derive(Clone, Debug, Serialize, Deserialize)]
74struct PredicateCacheRecord {
75    trigger_id: String,
76    event_id: String,
77    entries: Vec<PredicateCacheEntry>,
78}
79
80#[derive(Clone, Debug, Default)]
81struct PredicateEvaluationRecord {
82    result: bool,
83    cost_usd: f64,
84    tokens: u64,
85    latency_ms: u64,
86    cached: bool,
87    reason: Option<String>,
88}
89
90pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
91    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
92}
93
94pub(crate) fn current_dispatch_is_replay() -> bool {
95    ACTIVE_DISPATCH_IS_REPLAY
96        .try_with(|is_replay| *is_replay)
97        .unwrap_or(false)
98}
99
100pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
101    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
102}
103
104#[derive(Clone)]
105pub struct Dispatcher {
106    base_vm: Rc<Vm>,
107    event_log: Arc<AnyEventLog>,
108    cancel_tx: broadcast::Sender<()>,
109    state: Arc<DispatcherRuntimeState>,
110    metrics: Option<Arc<crate::MetricsRegistry>>,
111}
112
113#[derive(Debug)]
114struct DispatcherRuntimeState {
115    in_flight: AtomicU64,
116    retry_queue_depth: AtomicU64,
117    dlq: Mutex<Vec<DlqEntry>>,
118    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
119    shutting_down: std::sync::atomic::AtomicBool,
120    idle_notify: Notify,
121    flow_control: FlowControlManager,
122}
123
124impl DispatcherRuntimeState {
125    fn new(event_log: Arc<AnyEventLog>) -> Self {
126        Self {
127            in_flight: AtomicU64::new(0),
128            retry_queue_depth: AtomicU64::new(0),
129            dlq: Mutex::new(Vec::new()),
130            cancel_tokens: Mutex::new(Vec::new()),
131            shutting_down: std::sync::atomic::AtomicBool::new(false),
132            idle_notify: Notify::new(),
133            flow_control: FlowControlManager::new(event_log),
134        }
135    }
136}
137
138#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(default)]
140pub struct DispatcherStatsSnapshot {
141    pub in_flight: u64,
142    pub retry_queue_depth: u64,
143    pub dlq_depth: u64,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
147#[serde(rename_all = "snake_case")]
148pub enum DispatchStatus {
149    Succeeded,
150    Failed,
151    Dlq,
152    Skipped,
153    Waiting,
154    Cancelled,
155}
156
157impl DispatchStatus {
158    pub fn as_str(&self) -> &'static str {
159        match self {
160            Self::Succeeded => "succeeded",
161            Self::Failed => "failed",
162            Self::Dlq => "dlq",
163            Self::Skipped => "skipped",
164            Self::Waiting => "waiting",
165            Self::Cancelled => "cancelled",
166        }
167    }
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171#[serde(default)]
172pub struct DispatchOutcome {
173    pub trigger_id: String,
174    pub binding_key: String,
175    pub event_id: String,
176    pub attempt_count: u32,
177    pub status: DispatchStatus,
178    pub handler_kind: String,
179    pub target_uri: String,
180    pub replay_of_event_id: Option<String>,
181    pub result: Option<serde_json::Value>,
182    pub error: Option<String>,
183}
184
185#[derive(Clone, Debug, Serialize, Deserialize)]
186pub struct InboxEnvelope {
187    pub trigger_id: Option<String>,
188    pub binding_version: Option<u32>,
189    pub event: TriggerEvent,
190}
191
192#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(default)]
194pub struct DispatcherDrainReport {
195    pub drained: bool,
196    pub in_flight: u64,
197    pub retry_queue_depth: u64,
198    pub dlq_depth: u64,
199}
200
201impl Default for DispatchOutcome {
202    fn default() -> Self {
203        Self {
204            trigger_id: String::new(),
205            binding_key: String::new(),
206            event_id: String::new(),
207            attempt_count: 0,
208            status: DispatchStatus::Failed,
209            handler_kind: String::new(),
210            target_uri: String::new(),
211            replay_of_event_id: None,
212            result: None,
213            error: None,
214        }
215    }
216}
217
218#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
219#[serde(default)]
220pub struct DispatchAttemptRecord {
221    pub trigger_id: String,
222    pub binding_key: String,
223    pub event_id: String,
224    pub attempt: u32,
225    pub handler_kind: String,
226    pub started_at: String,
227    pub completed_at: String,
228    pub outcome: String,
229    pub error_msg: Option<String>,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
233pub struct DispatchCancelRequest {
234    pub binding_key: String,
235    pub event_id: String,
236    #[serde(with = "time::serde::rfc3339")]
237    pub requested_at: time::OffsetDateTime,
238    #[serde(default)]
239    pub requested_by: Option<String>,
240    #[serde(default)]
241    pub audit_id: Option<String>,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct DlqEntry {
246    pub trigger_id: String,
247    pub binding_key: String,
248    pub event: TriggerEvent,
249    pub attempt_count: u32,
250    pub final_error: String,
251    pub attempts: Vec<DispatchAttemptRecord>,
252}
253
254#[derive(Clone, Debug)]
255struct SingletonLease {
256    gate: String,
257    held: bool,
258}
259
260#[derive(Clone, Debug)]
261struct ConcurrencyLease {
262    gate: String,
263    max: u32,
264    priority_rank: usize,
265    permit: Option<ConcurrencyPermit>,
266}
267
268#[derive(Default, Debug)]
269struct AcquiredFlowControl {
270    singleton: Option<SingletonLease>,
271    concurrency: Option<ConcurrencyLease>,
272}
273
274#[derive(Clone)]
275pub(crate) struct DispatchWaitLease {
276    state: Arc<DispatcherRuntimeState>,
277    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
278    suspended: Arc<AtomicBool>,
279}
280
281impl DispatchWaitLease {
282    fn new(
283        state: Arc<DispatcherRuntimeState>,
284        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
285    ) -> Self {
286        Self {
287            state,
288            acquired,
289            suspended: Arc::new(AtomicBool::new(false)),
290        }
291    }
292
293    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
294        if self.suspended.swap(true, Ordering::SeqCst) {
295            return Ok(());
296        }
297        let (singleton_gate, concurrency_permit) = {
298            let mut acquired = self.acquired.lock().await;
299            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
300                if lease.held {
301                    lease.held = false;
302                    Some(lease.gate.clone())
303                } else {
304                    None
305                }
306            });
307            let concurrency_permit = acquired
308                .concurrency
309                .as_mut()
310                .and_then(|lease| lease.permit.take());
311            (singleton_gate, concurrency_permit)
312        };
313
314        if let Some(gate) = singleton_gate {
315            self.state
316                .flow_control
317                .release_singleton(&gate)
318                .await
319                .map_err(DispatchError::from)?;
320        }
321        if let Some(permit) = concurrency_permit {
322            self.state
323                .flow_control
324                .release_concurrency(permit)
325                .await
326                .map_err(DispatchError::from)?;
327        }
328        Ok(())
329    }
330
331    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
332        if !self.suspended.swap(false, Ordering::SeqCst) {
333            return Ok(());
334        }
335
336        let singleton_gate = {
337            let acquired = self.acquired.lock().await;
338            acquired.singleton.as_ref().and_then(|lease| {
339                if lease.held {
340                    None
341                } else {
342                    Some(lease.gate.clone())
343                }
344            })
345        };
346        if let Some(gate) = singleton_gate {
347            self.state
348                .flow_control
349                .acquire_singleton(&gate)
350                .await
351                .map_err(DispatchError::from)?;
352            let mut acquired = self.acquired.lock().await;
353            if let Some(lease) = acquired.singleton.as_mut() {
354                lease.held = true;
355            }
356        }
357
358        let concurrency_spec = {
359            let acquired = self.acquired.lock().await;
360            acquired.concurrency.as_ref().and_then(|lease| {
361                if lease.permit.is_some() {
362                    None
363                } else {
364                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
365                }
366            })
367        };
368        if let Some((gate, max, priority_rank)) = concurrency_spec {
369            let permit = self
370                .state
371                .flow_control
372                .acquire_concurrency(&gate, max, priority_rank)
373                .await
374                .map_err(DispatchError::from)?;
375            let mut acquired = self.acquired.lock().await;
376            if let Some(lease) = acquired.concurrency.as_mut() {
377                lease.permit = Some(permit);
378            }
379        }
380        Ok(())
381    }
382}
383
384enum FlowControlOutcome {
385    Dispatch {
386        event: Box<TriggerEvent>,
387        acquired: AcquiredFlowControl,
388    },
389    Skip {
390        reason: String,
391    },
392}
393
394#[derive(Clone, Debug)]
395enum DispatchSkipStage {
396    Predicate,
397    FlowControl,
398}
399
400#[derive(Debug)]
401pub enum DispatchError {
402    EventLog(String),
403    Registry(String),
404    Serde(String),
405    Local(String),
406    A2a(String),
407    Denied(String),
408    Timeout(String),
409    Waiting(String),
410    Cancelled(String),
411    NotImplemented(String),
412}
413
414impl std::fmt::Display for DispatchError {
415    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416        match self {
417            Self::EventLog(message)
418            | Self::Registry(message)
419            | Self::Serde(message)
420            | Self::Local(message)
421            | Self::A2a(message)
422            | Self::Denied(message)
423            | Self::Timeout(message)
424            | Self::Waiting(message)
425            | Self::Cancelled(message)
426            | Self::NotImplemented(message) => f.write_str(message),
427        }
428    }
429}
430
431impl std::error::Error for DispatchError {}
432
433impl DispatchError {
434    fn retryable(&self) -> bool {
435        !matches!(
436            self,
437            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
438        )
439    }
440}
441
442impl DispatchSkipStage {
443    fn as_str(&self) -> &'static str {
444        match self {
445            Self::Predicate => "predicate",
446            Self::FlowControl => "flow_control",
447        }
448    }
449}
450
451impl From<LogError> for DispatchError {
452    fn from(value: LogError) -> Self {
453        Self::EventLog(value.to_string())
454    }
455}
456
457pub async fn append_dispatch_cancel_request(
458    event_log: &Arc<AnyEventLog>,
459    request: &DispatchCancelRequest,
460) -> Result<u64, DispatchError> {
461    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
462        .expect("static trigger cancel topic should always be valid");
463    event_log
464        .append(
465            &topic,
466            LogEvent::new(
467                "dispatch_cancel_requested",
468                serde_json::to_value(request)
469                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
470            ),
471        )
472        .await
473        .map_err(DispatchError::from)
474}
475
476impl Dispatcher {
477    pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
478        self.event_log.clone()
479    }
480
481    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
482        let event_log = active_event_log().ok_or_else(|| {
483            DispatchError::EventLog("dispatcher requires an active event log".to_string())
484        })?;
485        Ok(Self::with_event_log(base_vm, event_log))
486    }
487
488    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
489        Self::with_event_log_and_metrics(base_vm, event_log, None)
490    }
491
492    pub fn with_event_log_and_metrics(
493        base_vm: Vm,
494        event_log: Arc<AnyEventLog>,
495        metrics: Option<Arc<crate::MetricsRegistry>>,
496    ) -> Self {
497        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
498        ACTIVE_DISPATCHER_STATE.with(|slot| {
499            *slot.borrow_mut() = Some(state.clone());
500        });
501        let (cancel_tx, _) = broadcast::channel(32);
502        Self {
503            base_vm: Rc::new(base_vm),
504            event_log,
505            cancel_tx,
506            state,
507            metrics,
508        }
509    }
510
511    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
512        DispatcherStatsSnapshot {
513            in_flight: self.state.in_flight.load(Ordering::Relaxed),
514            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
515            dlq_depth: self
516                .state
517                .dlq
518                .lock()
519                .expect("dispatcher dlq poisoned")
520                .len() as u64,
521        }
522    }
523
524    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
525        self.state
526            .dlq
527            .lock()
528            .expect("dispatcher dlq poisoned")
529            .clone()
530    }
531
532    pub fn shutdown(&self) {
533        self.state.shutting_down.store(true, Ordering::SeqCst);
534        for token in self
535            .state
536            .cancel_tokens
537            .lock()
538            .expect("dispatcher cancel tokens poisoned")
539            .iter()
540        {
541            token.store(true, Ordering::SeqCst);
542        }
543        let _ = self.cancel_tx.send(());
544    }
545
546    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
547        self.enqueue_targeted(None, None, event).await
548    }
549
550    pub async fn enqueue_targeted(
551        &self,
552        trigger_id: Option<String>,
553        binding_version: Option<u32>,
554        event: TriggerEvent,
555    ) -> Result<u64, DispatchError> {
556        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
557            .expect("static trigger inbox envelopes topic is valid");
558        let headers = event_headers(&event, None, None, None);
559        let payload = serde_json::to_value(InboxEnvelope {
560            trigger_id,
561            binding_version,
562            event,
563        })
564        .map_err(|error| DispatchError::Serde(error.to_string()))?;
565        self.event_log
566            .append(
567                &topic,
568                LogEvent::new("event_ingested", payload).with_headers(headers),
569            )
570            .await
571            .map_err(DispatchError::from)
572    }
573
574    pub async fn run(&self) -> Result<(), DispatchError> {
575        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
576            .expect("static trigger inbox envelopes topic is valid");
577        let start_from = self.event_log.latest(&topic).await?;
578        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
579        pin_mut!(stream);
580        let mut cancel_rx = self.cancel_tx.subscribe();
581
582        loop {
583            tokio::select! {
584                received = stream.next() => {
585                    let Some(received) = received else {
586                        break;
587                    };
588                    let (_, event) = received.map_err(DispatchError::from)?;
589                    if event.kind != "event_ingested" {
590                        continue;
591                    }
592                    let parent_headers = event.headers.clone();
593                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
594                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
595                    notify_test_inbox_dequeued();
596                    let _ = self
597                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
598                        .await;
599                }
600                _ = recv_cancel(&mut cancel_rx) => break,
601            }
602        }
603
604        Ok(())
605    }
606
607    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
608        let deadline = tokio::time::Instant::now() + timeout;
609        loop {
610            let snapshot = self.snapshot();
611            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
612                return Ok(DispatcherDrainReport {
613                    drained: true,
614                    in_flight: snapshot.in_flight,
615                    retry_queue_depth: snapshot.retry_queue_depth,
616                    dlq_depth: snapshot.dlq_depth,
617                });
618            }
619
620            let notified = self.state.idle_notify.notified();
621            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
622            if remaining.is_zero() {
623                return Ok(DispatcherDrainReport {
624                    drained: false,
625                    in_flight: snapshot.in_flight,
626                    retry_queue_depth: snapshot.retry_queue_depth,
627                    dlq_depth: snapshot.dlq_depth,
628                });
629            }
630            if tokio::time::timeout(remaining, notified).await.is_err() {
631                let snapshot = self.snapshot();
632                return Ok(DispatcherDrainReport {
633                    drained: false,
634                    in_flight: snapshot.in_flight,
635                    retry_queue_depth: snapshot.retry_queue_depth,
636                    dlq_depth: snapshot.dlq_depth,
637                });
638            }
639        }
640    }
641
642    pub async fn dispatch_inbox_envelope(
643        &self,
644        envelope: InboxEnvelope,
645    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
646        self.dispatch_inbox_envelope_with_headers(envelope, None)
647            .await
648    }
649
650    async fn dispatch_inbox_envelope_with_headers(
651        &self,
652        envelope: InboxEnvelope,
653        parent_headers: Option<&BTreeMap<String, String>>,
654    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
655        if let Some(trigger_id) = envelope.trigger_id {
656            let binding = super::registry::resolve_live_trigger_binding(
657                &trigger_id,
658                envelope.binding_version,
659            )
660            .map_err(|error| DispatchError::Registry(error.to_string()))?;
661            return Ok(vec![
662                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
663                    .await?,
664            ]);
665        }
666
667        let cron_target = match &envelope.event.provider_payload {
668            crate::triggers::ProviderPayload::Known(
669                crate::triggers::event::KnownProviderPayload::Cron(payload),
670            ) => payload.cron_id.clone(),
671            _ => None,
672        };
673        if let Some(trigger_id) = cron_target {
674            let binding = super::registry::resolve_live_trigger_binding(
675                &trigger_id,
676                envelope.binding_version,
677            )
678            .map_err(|error| DispatchError::Registry(error.to_string()))?;
679            return Ok(vec![
680                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
681                    .await?,
682            ]);
683        }
684
685        self.dispatch_event(envelope.event).await
686    }
687
688    pub async fn dispatch_event(
689        &self,
690        event: TriggerEvent,
691    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
692        let bindings = matching_bindings(&event);
693        let mut outcomes = Vec::new();
694        for binding in bindings {
695            outcomes.push(self.dispatch(&binding, event.clone()).await?);
696        }
697        Ok(outcomes)
698    }
699
700    pub async fn dispatch(
701        &self,
702        binding: &TriggerBinding,
703        event: TriggerEvent,
704    ) -> Result<DispatchOutcome, DispatchError> {
705        self.dispatch_with_replay(binding, event, None, None, None)
706            .await
707    }
708
709    pub async fn dispatch_replay(
710        &self,
711        binding: &TriggerBinding,
712        event: TriggerEvent,
713        replay_of_event_id: String,
714    ) -> Result<DispatchOutcome, DispatchError> {
715        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
716            .await
717    }
718
719    pub async fn dispatch_with_parent_span_id(
720        &self,
721        binding: &TriggerBinding,
722        event: TriggerEvent,
723        parent_span_id: Option<String>,
724    ) -> Result<DispatchOutcome, DispatchError> {
725        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
726            .await
727    }
728
729    async fn dispatch_with_replay(
730        &self,
731        binding: &TriggerBinding,
732        event: TriggerEvent,
733        replay_of_event_id: Option<String>,
734        parent_span_id: Option<String>,
735        parent_headers: Option<&BTreeMap<String, String>>,
736    ) -> Result<DispatchOutcome, DispatchError> {
737        let span = tracing::info_span!(
738            "dispatch",
739            trigger_id = %binding.id.as_str(),
740            binding_version = binding.version,
741            trace_id = %event.trace_id.0
742        );
743        #[cfg(feature = "otel")]
744        let span_for_otel = span.clone();
745        let _ = if let Some(headers) = parent_headers {
746            crate::observability::otel::set_span_parent_from_headers(
747                &span,
748                headers,
749                &event.trace_id,
750                parent_span_id.as_deref(),
751            )
752        } else {
753            crate::observability::otel::set_span_parent(
754                &span,
755                &event.trace_id,
756                parent_span_id.as_deref(),
757            )
758        };
759        #[cfg(feature = "otel")]
760        let started_at = Instant::now();
761        let metrics = self.metrics.clone();
762        let outcome = ACTIVE_DISPATCH_IS_REPLAY
763            .scope(
764                replay_of_event_id.is_some(),
765                self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
766                    .instrument(span),
767            )
768            .await;
769        if let Some(metrics) = metrics.as_ref() {
770            match &outcome {
771                Ok(dispatch_outcome) => match dispatch_outcome.status {
772                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
773                        metrics.record_dispatch_succeeded();
774                    }
775                    DispatchStatus::Waiting => {}
776                    _ => metrics.record_dispatch_failed(),
777                },
778                Err(_) => metrics.record_dispatch_failed(),
779            }
780        }
781        #[cfg(feature = "otel")]
782        {
783            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
784
785            let duration_ms = started_at.elapsed().as_millis() as i64;
786            let status = match &outcome {
787                Ok(dispatch_outcome) => match dispatch_outcome.status {
788                    DispatchStatus::Succeeded => "succeeded",
789                    DispatchStatus::Skipped => "skipped",
790                    DispatchStatus::Waiting => "waiting",
791                    DispatchStatus::Cancelled => "cancelled",
792                    DispatchStatus::Failed => "failed",
793                    DispatchStatus::Dlq => "dlq",
794                },
795                Err(DispatchError::Cancelled(_)) => "cancelled",
796                Err(_) => "failed",
797            };
798            span_for_otel.set_attribute("result.status", status);
799            span_for_otel.set_attribute("result.duration_ms", duration_ms);
800        }
801        outcome
802    }
803
804    async fn dispatch_with_replay_inner(
805        &self,
806        binding: &TriggerBinding,
807        event: TriggerEvent,
808        replay_of_event_id: Option<String>,
809    ) -> Result<DispatchOutcome, DispatchError> {
810        let autonomy_tier = crate::resolve_agent_autonomy_tier(
811            &self.event_log,
812            binding.id.as_str(),
813            binding.autonomy_tier,
814        )
815        .await
816        .unwrap_or(binding.autonomy_tier);
817        let binding_key = binding.binding_key();
818        let route = DispatchUri::from(&binding.handler);
819        let trigger_id = binding.id.as_str().to_string();
820        let event_id = event.id.0.clone();
821        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
822        let begin = if replay_of_event_id.is_some() {
823            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
824        } else {
825            begin_in_flight(binding.id.as_str(), binding.version)
826        };
827        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
828
829        let mut attempts = Vec::new();
830        let mut source_node_id = format!("trigger:{}", event.id.0);
831        let mut initial_nodes = Vec::new();
832        let mut initial_edges = Vec::new();
833        if let Some(original_event_id) = replay_of_event_id.as_ref() {
834            let original_node_id = format!("trigger:{original_event_id}");
835            initial_nodes.push(RunActionGraphNodeRecord {
836                id: original_node_id.clone(),
837                label: format!(
838                    "{}:{} (original {})",
839                    event.provider.as_str(),
840                    event.kind,
841                    original_event_id
842                ),
843                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
844                status: "historical".to_string(),
845                outcome: "replayed_from".to_string(),
846                trace_id: Some(event.trace_id.0.clone()),
847                stage_id: None,
848                node_id: None,
849                worker_id: None,
850                run_id: None,
851                run_path: None,
852                metadata: trigger_node_metadata(&event),
853            });
854            initial_edges.push(RunActionGraphEdgeRecord {
855                from_id: original_node_id,
856                to_id: source_node_id.clone(),
857                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
858                label: Some("replay chain".to_string()),
859            });
860        }
861        initial_nodes.push(RunActionGraphNodeRecord {
862            id: source_node_id.clone(),
863            label: format!("{}:{}", event.provider.as_str(), event.kind),
864            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
865            status: "received".to_string(),
866            outcome: "received".to_string(),
867            trace_id: Some(event.trace_id.0.clone()),
868            stage_id: None,
869            node_id: None,
870            worker_id: None,
871            run_id: None,
872            run_path: None,
873            metadata: trigger_node_metadata(&event),
874        });
875        self.emit_action_graph(
876            &event,
877            initial_nodes,
878            initial_edges,
879            serde_json::json!({
880                "source": "dispatcher",
881                "trigger_id": trigger_id,
882                "binding_key": binding_key,
883                "event_id": event_id,
884                "replay_of_event_id": replay_of_event_id,
885            }),
886        )
887        .await?;
888
889        if dispatch_cancel_requested(
890            &self.event_log,
891            &binding_key,
892            &event.id.0,
893            replay_of_event_id.as_ref(),
894        )
895        .await?
896        {
897            finish_in_flight(
898                binding.id.as_str(),
899                binding.version,
900                TriggerDispatchOutcome::Failed,
901            )
902            .await
903            .map_err(|error| DispatchError::Registry(error.to_string()))?;
904            decrement_in_flight(&self.state);
905            return Ok(cancelled_dispatch_outcome(
906                binding,
907                &route,
908                &event,
909                replay_of_event_id,
910                0,
911                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
912            ));
913        }
914
915        if let Some(predicate) = binding.when.as_ref() {
916            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
917            let evaluation = self
918                .evaluate_predicate(
919                    binding,
920                    predicate,
921                    &event,
922                    replay_of_event_id.as_ref(),
923                    autonomy_tier,
924                )
925                .await?;
926            let passed = evaluation.result;
927            self.emit_action_graph(
928                &event,
929                vec![RunActionGraphNodeRecord {
930                    id: predicate_node_id.clone(),
931                    label: predicate.raw.clone(),
932                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
933                    status: "completed".to_string(),
934                    outcome: passed.to_string(),
935                    trace_id: Some(event.trace_id.0.clone()),
936                    stage_id: None,
937                    node_id: None,
938                    worker_id: None,
939                    run_id: None,
940                    run_path: None,
941                    metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
942                }],
943                vec![RunActionGraphEdgeRecord {
944                    from_id: source_node_id.clone(),
945                    to_id: predicate_node_id.clone(),
946                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
947                    label: None,
948                }],
949                serde_json::json!({
950                    "source": "dispatcher",
951                    "trigger_id": binding.id.as_str(),
952                    "binding_key": binding.binding_key(),
953                    "event_id": event.id.0,
954                    "predicate": predicate.raw,
955                    "reason": evaluation.reason,
956                    "cached": evaluation.cached,
957                    "cost_usd": evaluation.cost_usd,
958                    "tokens": evaluation.tokens,
959                    "latency_ms": evaluation.latency_ms,
960                    "replay_of_event_id": replay_of_event_id,
961                }),
962            )
963            .await?;
964
965            if !passed {
966                self.append_skipped_outbox_event(
967                    binding,
968                    &route,
969                    &event,
970                    replay_of_event_id.as_ref(),
971                    DispatchSkipStage::Predicate,
972                    serde_json::json!({
973                        "predicate": predicate.raw,
974                        "reason": evaluation.reason,
975                    }),
976                )
977                .await?;
978                finish_in_flight(
979                    binding.id.as_str(),
980                    binding.version,
981                    TriggerDispatchOutcome::Dispatched,
982                )
983                .await
984                .map_err(|error| DispatchError::Registry(error.to_string()))?;
985                decrement_in_flight(&self.state);
986                self.append_dispatch_trust_record(
987                    binding,
988                    &route,
989                    &event,
990                    replay_of_event_id.as_ref(),
991                    autonomy_tier,
992                    TrustOutcome::Denied,
993                    "skipped",
994                    0,
995                    None,
996                )
997                .await?;
998                return Ok(DispatchOutcome {
999                    trigger_id: binding.id.as_str().to_string(),
1000                    binding_key: binding.binding_key(),
1001                    event_id: event.id.0,
1002                    attempt_count: 0,
1003                    status: DispatchStatus::Skipped,
1004                    handler_kind: route.kind().to_string(),
1005                    target_uri: route.target_uri(),
1006                    replay_of_event_id,
1007                    result: Some(serde_json::json!({
1008                        "skipped": true,
1009                        "predicate": predicate.raw,
1010                        "reason": evaluation.reason,
1011                    })),
1012                    error: None,
1013                });
1014            }
1015
1016            source_node_id = predicate_node_id;
1017        }
1018
1019        let (event, acquired_flow) = match self
1020            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1021            .await?
1022        {
1023            FlowControlOutcome::Dispatch { event, acquired } => {
1024                (*event, Arc::new(AsyncMutex::new(acquired)))
1025            }
1026            FlowControlOutcome::Skip { reason } => {
1027                self.append_skipped_outbox_event(
1028                    binding,
1029                    &route,
1030                    &event,
1031                    replay_of_event_id.as_ref(),
1032                    DispatchSkipStage::FlowControl,
1033                    serde_json::json!({
1034                        "flow_control": reason,
1035                    }),
1036                )
1037                .await?;
1038                finish_in_flight(
1039                    binding.id.as_str(),
1040                    binding.version,
1041                    TriggerDispatchOutcome::Dispatched,
1042                )
1043                .await
1044                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1045                decrement_in_flight(&self.state);
1046                return Ok(DispatchOutcome {
1047                    trigger_id: binding.id.as_str().to_string(),
1048                    binding_key: binding.binding_key(),
1049                    event_id: event.id.0,
1050                    attempt_count: 0,
1051                    status: DispatchStatus::Skipped,
1052                    handler_kind: route.kind().to_string(),
1053                    target_uri: route.target_uri(),
1054                    replay_of_event_id,
1055                    result: Some(serde_json::json!({
1056                        "skipped": true,
1057                        "flow_control": reason,
1058                    })),
1059                    error: None,
1060                });
1061            }
1062        };
1063
1064        let mut previous_retry_node = None;
1065        let max_attempts = binding.retry.max_attempts();
1066        for attempt in 1..=max_attempts {
1067            if dispatch_cancel_requested(
1068                &self.event_log,
1069                &binding_key,
1070                &event.id.0,
1071                replay_of_event_id.as_ref(),
1072            )
1073            .await?
1074            {
1075                finish_in_flight(
1076                    binding.id.as_str(),
1077                    binding.version,
1078                    TriggerDispatchOutcome::Failed,
1079                )
1080                .await
1081                .map_err(|error| DispatchError::Registry(error.to_string()))?;
1082                decrement_in_flight(&self.state);
1083                return Ok(cancelled_dispatch_outcome(
1084                    binding,
1085                    &route,
1086                    &event,
1087                    replay_of_event_id,
1088                    attempt.saturating_sub(1),
1089                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1090                ));
1091            }
1092            maybe_fail_before_outbox();
1093            let started_at = now_rfc3339();
1094            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1095            self.append_lifecycle_event(
1096                "DispatchStarted",
1097                &event,
1098                binding,
1099                serde_json::json!({
1100                    "event_id": event.id.0,
1101                    "attempt": attempt,
1102                    "handler_kind": route.kind(),
1103                    "target_uri": route.target_uri(),
1104                    "replay_of_event_id": replay_of_event_id,
1105                }),
1106                replay_of_event_id.as_ref(),
1107            )
1108            .await?;
1109            self.append_topic_event(
1110                TRIGGER_OUTBOX_TOPIC,
1111                "dispatch_started",
1112                &event,
1113                Some(binding),
1114                Some(attempt),
1115                serde_json::json!({
1116                    "event_id": event.id.0,
1117                    "attempt": attempt,
1118                    "trigger_id": binding.id.as_str(),
1119                    "binding_key": binding.binding_key(),
1120                    "handler_kind": route.kind(),
1121                    "target_uri": route.target_uri(),
1122                    "replay_of_event_id": replay_of_event_id,
1123                }),
1124                replay_of_event_id.as_ref(),
1125            )
1126            .await?;
1127
1128            let mut dispatch_edges = Vec::new();
1129            if attempt == 1 {
1130                dispatch_edges.push(RunActionGraphEdgeRecord {
1131                    from_id: source_node_id.clone(),
1132                    to_id: attempt_node_id.clone(),
1133                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1134                    label: binding.when.as_ref().map(|_| "true".to_string()),
1135                });
1136            } else if let Some(retry_node_id) = previous_retry_node.take() {
1137                dispatch_edges.push(RunActionGraphEdgeRecord {
1138                    from_id: retry_node_id,
1139                    to_id: attempt_node_id.clone(),
1140                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1141                    label: Some(format!("attempt {attempt}")),
1142                });
1143            }
1144
1145            self.emit_action_graph(
1146                &event,
1147                vec![RunActionGraphNodeRecord {
1148                    id: attempt_node_id.clone(),
1149                    label: dispatch_node_label(&route),
1150                    kind: dispatch_node_kind(&route).to_string(),
1151                    status: "running".to_string(),
1152                    outcome: format!("attempt_{attempt}"),
1153                    trace_id: Some(event.trace_id.0.clone()),
1154                    stage_id: None,
1155                    node_id: None,
1156                    worker_id: None,
1157                    run_id: None,
1158                    run_path: None,
1159                    metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1160                }],
1161                dispatch_edges,
1162                serde_json::json!({
1163                    "source": "dispatcher",
1164                    "trigger_id": binding.id.as_str(),
1165                    "binding_key": binding.binding_key(),
1166                    "event_id": event.id.0,
1167                    "attempt": attempt,
1168                    "handler_kind": route.kind(),
1169                    "target_uri": route.target_uri(),
1170                    "target_agent": dispatch_target_agent(&route),
1171                    "replay_of_event_id": replay_of_event_id,
1172                }),
1173            )
1174            .await?;
1175
1176            let result = self
1177                .dispatch_once(
1178                    binding,
1179                    &route,
1180                    &event,
1181                    autonomy_tier,
1182                    Some(DispatchWaitLease::new(
1183                        self.state.clone(),
1184                        acquired_flow.clone(),
1185                    )),
1186                    &mut self.cancel_tx.subscribe(),
1187                )
1188                .await;
1189            let completed_at = now_rfc3339();
1190
1191            match result {
1192                Ok(result) => {
1193                    let attempt_record = DispatchAttemptRecord {
1194                        trigger_id: binding.id.as_str().to_string(),
1195                        binding_key: binding.binding_key(),
1196                        event_id: event.id.0.clone(),
1197                        attempt,
1198                        handler_kind: route.kind().to_string(),
1199                        started_at,
1200                        completed_at,
1201                        outcome: "success".to_string(),
1202                        error_msg: None,
1203                    };
1204                    attempts.push(attempt_record.clone());
1205                    self.append_attempt_record(
1206                        &event,
1207                        binding,
1208                        &attempt_record,
1209                        replay_of_event_id.as_ref(),
1210                    )
1211                    .await?;
1212                    self.append_lifecycle_event(
1213                        "DispatchSucceeded",
1214                        &event,
1215                        binding,
1216                        serde_json::json!({
1217                            "event_id": event.id.0,
1218                            "attempt": attempt,
1219                            "handler_kind": route.kind(),
1220                            "target_uri": route.target_uri(),
1221                            "result": result,
1222                            "replay_of_event_id": replay_of_event_id,
1223                        }),
1224                        replay_of_event_id.as_ref(),
1225                    )
1226                    .await?;
1227                    self.append_topic_event(
1228                        TRIGGER_OUTBOX_TOPIC,
1229                        "dispatch_succeeded",
1230                        &event,
1231                        Some(binding),
1232                        Some(attempt),
1233                        serde_json::json!({
1234                            "event_id": event.id.0,
1235                            "attempt": attempt,
1236                            "trigger_id": binding.id.as_str(),
1237                            "binding_key": binding.binding_key(),
1238                            "handler_kind": route.kind(),
1239                            "target_uri": route.target_uri(),
1240                            "result": result,
1241                            "replay_of_event_id": replay_of_event_id,
1242                        }),
1243                        replay_of_event_id.as_ref(),
1244                    )
1245                    .await?;
1246                    self.emit_action_graph(
1247                        &event,
1248                        vec![RunActionGraphNodeRecord {
1249                            id: attempt_node_id.clone(),
1250                            label: dispatch_node_label(&route),
1251                            kind: dispatch_node_kind(&route).to_string(),
1252                            status: "completed".to_string(),
1253                            outcome: dispatch_success_outcome(&route, &result).to_string(),
1254                            trace_id: Some(event.trace_id.0.clone()),
1255                            stage_id: None,
1256                            node_id: None,
1257                            worker_id: None,
1258                            run_id: None,
1259                            run_path: None,
1260                            metadata: dispatch_success_metadata(
1261                                &route, binding, &event, attempt, &result,
1262                            ),
1263                        }],
1264                        Vec::new(),
1265                        serde_json::json!({
1266                            "source": "dispatcher",
1267                            "trigger_id": binding.id.as_str(),
1268                            "binding_key": binding.binding_key(),
1269                            "event_id": event.id.0,
1270                            "attempt": attempt,
1271                            "handler_kind": route.kind(),
1272                            "target_uri": route.target_uri(),
1273                            "result": result,
1274                            "replay_of_event_id": replay_of_event_id,
1275                        }),
1276                    )
1277                    .await?;
1278                    finish_in_flight(
1279                        binding.id.as_str(),
1280                        binding.version,
1281                        TriggerDispatchOutcome::Dispatched,
1282                    )
1283                    .await
1284                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1285                    self.release_flow_control(&acquired_flow).await?;
1286                    decrement_in_flight(&self.state);
1287                    self.append_dispatch_trust_record(
1288                        binding,
1289                        &route,
1290                        &event,
1291                        replay_of_event_id.as_ref(),
1292                        autonomy_tier,
1293                        TrustOutcome::Success,
1294                        "succeeded",
1295                        attempt,
1296                        None,
1297                    )
1298                    .await?;
1299                    return Ok(DispatchOutcome {
1300                        trigger_id: binding.id.as_str().to_string(),
1301                        binding_key: binding.binding_key(),
1302                        event_id: event.id.0,
1303                        attempt_count: attempt,
1304                        status: DispatchStatus::Succeeded,
1305                        handler_kind: route.kind().to_string(),
1306                        target_uri: route.target_uri(),
1307                        replay_of_event_id,
1308                        result: Some(result),
1309                        error: None,
1310                    });
1311                }
1312                Err(error) => {
1313                    let attempt_record = DispatchAttemptRecord {
1314                        trigger_id: binding.id.as_str().to_string(),
1315                        binding_key: binding.binding_key(),
1316                        event_id: event.id.0.clone(),
1317                        attempt,
1318                        handler_kind: route.kind().to_string(),
1319                        started_at,
1320                        completed_at,
1321                        outcome: dispatch_error_label(&error).to_string(),
1322                        error_msg: Some(error.to_string()),
1323                    };
1324                    attempts.push(attempt_record.clone());
1325                    self.append_attempt_record(
1326                        &event,
1327                        binding,
1328                        &attempt_record,
1329                        replay_of_event_id.as_ref(),
1330                    )
1331                    .await?;
1332                    if let DispatchError::Waiting(message) = &error {
1333                        self.append_lifecycle_event(
1334                            "DispatchWaiting",
1335                            &event,
1336                            binding,
1337                            serde_json::json!({
1338                                "event_id": event.id.0,
1339                                "attempt": attempt,
1340                                "handler_kind": route.kind(),
1341                                "target_uri": route.target_uri(),
1342                                "message": message,
1343                                "replay_of_event_id": replay_of_event_id,
1344                            }),
1345                            replay_of_event_id.as_ref(),
1346                        )
1347                        .await?;
1348                        self.append_topic_event(
1349                            TRIGGER_OUTBOX_TOPIC,
1350                            "dispatch_waiting",
1351                            &event,
1352                            Some(binding),
1353                            Some(attempt),
1354                            serde_json::json!({
1355                                "event_id": event.id.0,
1356                                "attempt": attempt,
1357                                "trigger_id": binding.id.as_str(),
1358                                "binding_key": binding.binding_key(),
1359                                "handler_kind": route.kind(),
1360                                "target_uri": route.target_uri(),
1361                                "message": message,
1362                                "replay_of_event_id": replay_of_event_id,
1363                            }),
1364                            replay_of_event_id.as_ref(),
1365                        )
1366                        .await?;
1367                        finish_in_flight(
1368                            binding.id.as_str(),
1369                            binding.version,
1370                            TriggerDispatchOutcome::Dispatched,
1371                        )
1372                        .await
1373                        .map_err(|registry_error| {
1374                            DispatchError::Registry(registry_error.to_string())
1375                        })?;
1376                        self.release_flow_control(&acquired_flow).await?;
1377                        decrement_in_flight(&self.state);
1378                        return Ok(DispatchOutcome {
1379                            trigger_id: binding.id.as_str().to_string(),
1380                            binding_key: binding.binding_key(),
1381                            event_id: event.id.0,
1382                            attempt_count: attempt,
1383                            status: DispatchStatus::Waiting,
1384                            handler_kind: route.kind().to_string(),
1385                            target_uri: route.target_uri(),
1386                            replay_of_event_id,
1387                            result: Some(serde_json::json!({
1388                                "waiting": true,
1389                                "message": message,
1390                            })),
1391                            error: None,
1392                        });
1393                    }
1394
1395                    self.append_lifecycle_event(
1396                        "DispatchFailed",
1397                        &event,
1398                        binding,
1399                        serde_json::json!({
1400                            "event_id": event.id.0,
1401                            "attempt": attempt,
1402                            "handler_kind": route.kind(),
1403                            "target_uri": route.target_uri(),
1404                            "error": error.to_string(),
1405                            "replay_of_event_id": replay_of_event_id,
1406                        }),
1407                        replay_of_event_id.as_ref(),
1408                    )
1409                    .await?;
1410                    self.append_topic_event(
1411                        TRIGGER_OUTBOX_TOPIC,
1412                        "dispatch_failed",
1413                        &event,
1414                        Some(binding),
1415                        Some(attempt),
1416                        serde_json::json!({
1417                            "event_id": event.id.0,
1418                            "attempt": attempt,
1419                            "trigger_id": binding.id.as_str(),
1420                            "binding_key": binding.binding_key(),
1421                            "handler_kind": route.kind(),
1422                            "target_uri": route.target_uri(),
1423                            "error": error.to_string(),
1424                            "replay_of_event_id": replay_of_event_id,
1425                        }),
1426                        replay_of_event_id.as_ref(),
1427                    )
1428                    .await?;
1429                    self.emit_action_graph(
1430                        &event,
1431                        vec![RunActionGraphNodeRecord {
1432                            id: attempt_node_id.clone(),
1433                            label: dispatch_node_label(&route),
1434                            kind: dispatch_node_kind(&route).to_string(),
1435                            status: if matches!(error, DispatchError::Cancelled(_)) {
1436                                "cancelled".to_string()
1437                            } else {
1438                                "failed".to_string()
1439                            },
1440                            outcome: dispatch_error_label(&error).to_string(),
1441                            trace_id: Some(event.trace_id.0.clone()),
1442                            stage_id: None,
1443                            node_id: None,
1444                            worker_id: None,
1445                            run_id: None,
1446                            run_path: None,
1447                            metadata: dispatch_error_metadata(
1448                                &route, binding, &event, attempt, &error,
1449                            ),
1450                        }],
1451                        Vec::new(),
1452                        serde_json::json!({
1453                            "source": "dispatcher",
1454                            "trigger_id": binding.id.as_str(),
1455                            "binding_key": binding.binding_key(),
1456                            "event_id": event.id.0,
1457                            "attempt": attempt,
1458                            "handler_kind": route.kind(),
1459                            "target_uri": route.target_uri(),
1460                            "error": error.to_string(),
1461                            "replay_of_event_id": replay_of_event_id,
1462                        }),
1463                    )
1464                    .await?;
1465
1466                    if !error.retryable() {
1467                        finish_in_flight(
1468                            binding.id.as_str(),
1469                            binding.version,
1470                            TriggerDispatchOutcome::Failed,
1471                        )
1472                        .await
1473                        .map_err(|registry_error| {
1474                            DispatchError::Registry(registry_error.to_string())
1475                        })?;
1476                        self.release_flow_control(&acquired_flow).await?;
1477                        decrement_in_flight(&self.state);
1478                        let trust_outcome = match error {
1479                            DispatchError::Denied(_) => TrustOutcome::Denied,
1480                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
1481                            _ => TrustOutcome::Failure,
1482                        };
1483                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1484                            "cancelled"
1485                        } else {
1486                            "failed"
1487                        };
1488                        self.append_dispatch_trust_record(
1489                            binding,
1490                            &route,
1491                            &event,
1492                            replay_of_event_id.as_ref(),
1493                            autonomy_tier,
1494                            trust_outcome,
1495                            terminal_status,
1496                            attempt,
1497                            Some(error.to_string()),
1498                        )
1499                        .await?;
1500                        return Ok(DispatchOutcome {
1501                            trigger_id: binding.id.as_str().to_string(),
1502                            binding_key: binding.binding_key(),
1503                            event_id: event.id.0,
1504                            attempt_count: attempt,
1505                            status: if matches!(error, DispatchError::Cancelled(_)) {
1506                                DispatchStatus::Cancelled
1507                            } else {
1508                                DispatchStatus::Failed
1509                            },
1510                            handler_kind: route.kind().to_string(),
1511                            target_uri: route.target_uri(),
1512                            replay_of_event_id,
1513                            result: None,
1514                            error: Some(error.to_string()),
1515                        });
1516                    }
1517
1518                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1519                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1520                        previous_retry_node = Some(retry_node_id.clone());
1521                        self.emit_action_graph(
1522                            &event,
1523                            vec![RunActionGraphNodeRecord {
1524                                id: retry_node_id.clone(),
1525                                label: format!("retry in {}ms", delay.as_millis()),
1526                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1527                                status: "scheduled".to_string(),
1528                                outcome: format!("attempt_{}", attempt + 1),
1529                                trace_id: Some(event.trace_id.0.clone()),
1530                                stage_id: None,
1531                                node_id: None,
1532                                worker_id: None,
1533                                run_id: None,
1534                                run_path: None,
1535                                metadata: retry_node_metadata(
1536                                    binding,
1537                                    &event,
1538                                    attempt + 1,
1539                                    delay,
1540                                    &error,
1541                                ),
1542                            }],
1543                            vec![RunActionGraphEdgeRecord {
1544                                from_id: attempt_node_id,
1545                                to_id: retry_node_id.clone(),
1546                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1547                                label: Some(format!("attempt {}", attempt + 1)),
1548                            }],
1549                            serde_json::json!({
1550                                "source": "dispatcher",
1551                                "trigger_id": binding.id.as_str(),
1552                                "binding_key": binding.binding_key(),
1553                                "event_id": event.id.0,
1554                                "attempt": attempt + 1,
1555                                "delay_ms": delay.as_millis(),
1556                                "replay_of_event_id": replay_of_event_id,
1557                            }),
1558                        )
1559                        .await?;
1560                        self.append_lifecycle_event(
1561                            "RetryScheduled",
1562                            &event,
1563                            binding,
1564                            serde_json::json!({
1565                                "event_id": event.id.0,
1566                                "attempt": attempt + 1,
1567                                "delay_ms": delay.as_millis(),
1568                                "error": error.to_string(),
1569                                "replay_of_event_id": replay_of_event_id,
1570                            }),
1571                            replay_of_event_id.as_ref(),
1572                        )
1573                        .await?;
1574                        self.append_topic_event(
1575                            TRIGGER_ATTEMPTS_TOPIC,
1576                            "retry_scheduled",
1577                            &event,
1578                            Some(binding),
1579                            Some(attempt + 1),
1580                            serde_json::json!({
1581                                "event_id": event.id.0,
1582                                "attempt": attempt + 1,
1583                                "trigger_id": binding.id.as_str(),
1584                                "binding_key": binding.binding_key(),
1585                                "delay_ms": delay.as_millis(),
1586                                "error": error.to_string(),
1587                                "replay_of_event_id": replay_of_event_id,
1588                            }),
1589                            replay_of_event_id.as_ref(),
1590                        )
1591                        .await?;
1592                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1593                        let sleep_result = sleep_or_cancel_or_request(
1594                            &self.event_log,
1595                            delay,
1596                            &binding_key,
1597                            &event.id.0,
1598                            replay_of_event_id.as_ref(),
1599                            &mut self.cancel_tx.subscribe(),
1600                        )
1601                        .await;
1602                        decrement_retry_queue_depth(&self.state);
1603                        if sleep_result.is_err() {
1604                            finish_in_flight(
1605                                binding.id.as_str(),
1606                                binding.version,
1607                                TriggerDispatchOutcome::Failed,
1608                            )
1609                            .await
1610                            .map_err(|registry_error| {
1611                                DispatchError::Registry(registry_error.to_string())
1612                            })?;
1613                            self.release_flow_control(&acquired_flow).await?;
1614                            decrement_in_flight(&self.state);
1615                            self.append_dispatch_trust_record(
1616                                binding,
1617                                &route,
1618                                &event,
1619                                replay_of_event_id.as_ref(),
1620                                autonomy_tier,
1621                                TrustOutcome::Failure,
1622                                "cancelled",
1623                                attempt,
1624                                Some("dispatcher shutdown cancelled retry wait".to_string()),
1625                            )
1626                            .await?;
1627                            return Ok(DispatchOutcome {
1628                                trigger_id: binding.id.as_str().to_string(),
1629                                binding_key: binding.binding_key(),
1630                                event_id: event.id.0,
1631                                attempt_count: attempt,
1632                                status: DispatchStatus::Cancelled,
1633                                handler_kind: route.kind().to_string(),
1634                                target_uri: route.target_uri(),
1635                                replay_of_event_id,
1636                                result: None,
1637                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1638                            });
1639                        }
1640                        continue;
1641                    }
1642
1643                    let final_error = error.to_string();
1644                    let dlq_entry = DlqEntry {
1645                        trigger_id: binding.id.as_str().to_string(),
1646                        binding_key: binding.binding_key(),
1647                        event: event.clone(),
1648                        attempt_count: attempt,
1649                        final_error: final_error.clone(),
1650                        attempts: attempts.clone(),
1651                    };
1652                    self.state
1653                        .dlq
1654                        .lock()
1655                        .expect("dispatcher dlq poisoned")
1656                        .push(dlq_entry.clone());
1657                    self.emit_action_graph(
1658                        &event,
1659                        vec![RunActionGraphNodeRecord {
1660                            id: format!("dlq:{binding_key}:{}", event.id.0),
1661                            label: binding.id.as_str().to_string(),
1662                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1663                            status: "queued".to_string(),
1664                            outcome: "retry_exhausted".to_string(),
1665                            trace_id: Some(event.trace_id.0.clone()),
1666                            stage_id: None,
1667                            node_id: None,
1668                            worker_id: None,
1669                            run_id: None,
1670                            run_path: None,
1671                            metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
1672                        }],
1673                        vec![RunActionGraphEdgeRecord {
1674                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1675                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
1676                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1677                            label: Some(format!("{attempt} attempts")),
1678                        }],
1679                        serde_json::json!({
1680                            "source": "dispatcher",
1681                            "trigger_id": binding.id.as_str(),
1682                            "binding_key": binding.binding_key(),
1683                            "event_id": event.id.0,
1684                            "attempt_count": attempt,
1685                            "final_error": final_error,
1686                            "replay_of_event_id": replay_of_event_id,
1687                        }),
1688                    )
1689                    .await?;
1690                    self.append_lifecycle_event(
1691                        "DlqMoved",
1692                        &event,
1693                        binding,
1694                        serde_json::json!({
1695                            "event_id": event.id.0,
1696                            "attempt_count": attempt,
1697                            "final_error": dlq_entry.final_error,
1698                            "replay_of_event_id": replay_of_event_id,
1699                        }),
1700                        replay_of_event_id.as_ref(),
1701                    )
1702                    .await?;
1703                    self.append_topic_event(
1704                        TRIGGER_DLQ_TOPIC,
1705                        "dlq_moved",
1706                        &event,
1707                        Some(binding),
1708                        Some(attempt),
1709                        serde_json::to_value(&dlq_entry)
1710                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1711                        replay_of_event_id.as_ref(),
1712                    )
1713                    .await?;
1714                    finish_in_flight(
1715                        binding.id.as_str(),
1716                        binding.version,
1717                        TriggerDispatchOutcome::Dlq,
1718                    )
1719                    .await
1720                    .map_err(|registry_error| {
1721                        DispatchError::Registry(registry_error.to_string())
1722                    })?;
1723                    self.release_flow_control(&acquired_flow).await?;
1724                    decrement_in_flight(&self.state);
1725                    self.append_dispatch_trust_record(
1726                        binding,
1727                        &route,
1728                        &event,
1729                        replay_of_event_id.as_ref(),
1730                        autonomy_tier,
1731                        TrustOutcome::Failure,
1732                        "dlq",
1733                        attempt,
1734                        Some(error.to_string()),
1735                    )
1736                    .await?;
1737                    return Ok(DispatchOutcome {
1738                        trigger_id: binding.id.as_str().to_string(),
1739                        binding_key: binding.binding_key(),
1740                        event_id: event.id.0,
1741                        attempt_count: attempt,
1742                        status: DispatchStatus::Dlq,
1743                        handler_kind: route.kind().to_string(),
1744                        target_uri: route.target_uri(),
1745                        replay_of_event_id,
1746                        result: None,
1747                        error: Some(error.to_string()),
1748                    });
1749                }
1750            }
1751        }
1752
1753        finish_in_flight(
1754            binding.id.as_str(),
1755            binding.version,
1756            TriggerDispatchOutcome::Failed,
1757        )
1758        .await
1759        .map_err(|error| DispatchError::Registry(error.to_string()))?;
1760        self.release_flow_control(&acquired_flow).await?;
1761        decrement_in_flight(&self.state);
1762        self.append_dispatch_trust_record(
1763            binding,
1764            &route,
1765            &event,
1766            replay_of_event_id.as_ref(),
1767            autonomy_tier,
1768            TrustOutcome::Failure,
1769            "failed",
1770            max_attempts,
1771            Some("dispatch exhausted without terminal outcome".to_string()),
1772        )
1773        .await?;
1774        Ok(DispatchOutcome {
1775            trigger_id: binding.id.as_str().to_string(),
1776            binding_key: binding.binding_key(),
1777            event_id: event.id.0,
1778            attempt_count: max_attempts,
1779            status: DispatchStatus::Failed,
1780            handler_kind: route.kind().to_string(),
1781            target_uri: route.target_uri(),
1782            replay_of_event_id,
1783            result: None,
1784            error: Some("dispatch exhausted without terminal outcome".to_string()),
1785        })
1786    }
1787
1788    async fn dispatch_once(
1789        &self,
1790        binding: &TriggerBinding,
1791        route: &DispatchUri,
1792        event: &TriggerEvent,
1793        autonomy_tier: AutonomyTier,
1794        wait_lease: Option<DispatchWaitLease>,
1795        cancel_rx: &mut broadcast::Receiver<()>,
1796    ) -> Result<serde_json::Value, DispatchError> {
1797        match route {
1798            DispatchUri::Local { .. } => {
1799                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1800                    return Err(DispatchError::Local(format!(
1801                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1802                        binding.id.as_str()
1803                    )));
1804                };
1805                let value = self
1806                    .invoke_vm_callable(
1807                        closure,
1808                        &binding.binding_key(),
1809                        event,
1810                        None,
1811                        binding.id.as_str(),
1812                        &format!("{}.{}", event.provider.as_str(), event.kind),
1813                        autonomy_tier,
1814                        wait_lease,
1815                        cancel_rx,
1816                    )
1817                    .await?;
1818                Ok(vm_value_to_json(&value))
1819            }
1820            DispatchUri::A2a {
1821                target,
1822                allow_cleartext,
1823            } => {
1824                if self.state.shutting_down.load(Ordering::SeqCst) {
1825                    return Err(DispatchError::Cancelled(
1826                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
1827                    ));
1828                }
1829                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1830                    target,
1831                    *allow_cleartext,
1832                    binding.id.as_str(),
1833                    &binding.binding_key(),
1834                    event,
1835                    cancel_rx,
1836                )
1837                .await
1838                .map_err(|error| match error {
1839                    crate::a2a::A2aClientError::Cancelled(message) => {
1840                        DispatchError::Cancelled(message)
1841                    }
1842                    other => DispatchError::A2a(other.to_string()),
1843                })?;
1844                match ack {
1845                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1846                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1847                }
1848            }
1849            DispatchUri::Worker { queue } => {
1850                let receipt = crate::WorkerQueue::new(self.event_log.clone())
1851                    .enqueue(&crate::WorkerQueueJob {
1852                        queue: queue.clone(),
1853                        trigger_id: binding.id.as_str().to_string(),
1854                        binding_key: binding.binding_key(),
1855                        binding_version: binding.version,
1856                        event: event.clone(),
1857                        replay_of_event_id: current_dispatch_context()
1858                            .and_then(|context| context.replay_of_event_id),
1859                        priority: worker_queue_priority(binding, event),
1860                    })
1861                    .await
1862                    .map_err(DispatchError::from)?;
1863                Ok(serde_json::to_value(receipt)
1864                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
1865            }
1866        }
1867    }
1868
1869    async fn apply_flow_control(
1870        &self,
1871        binding: &TriggerBinding,
1872        event: &TriggerEvent,
1873        replay_of_event_id: Option<&String>,
1874    ) -> Result<FlowControlOutcome, DispatchError> {
1875        let flow = &binding.flow_control;
1876        let mut managed_event = event.clone();
1877
1878        if let Some(batch) = &flow.batch {
1879            let gate = self
1880                .resolve_flow_gate(
1881                    &binding.binding_key(),
1882                    batch.key.as_ref(),
1883                    &managed_event,
1884                    replay_of_event_id,
1885                )
1886                .await?;
1887            match self
1888                .state
1889                .flow_control
1890                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1891                .await
1892                .map_err(DispatchError::from)?
1893            {
1894                BatchDecision::Dispatch(events) => {
1895                    managed_event = build_batched_event(events)?;
1896                }
1897                BatchDecision::Merged => {
1898                    return Ok(FlowControlOutcome::Skip {
1899                        reason: "batch_merged".to_string(),
1900                    })
1901                }
1902            }
1903        }
1904
1905        if let Some(debounce) = &flow.debounce {
1906            let gate = self
1907                .resolve_flow_gate(
1908                    &binding.binding_key(),
1909                    Some(&debounce.key),
1910                    &managed_event,
1911                    replay_of_event_id,
1912                )
1913                .await?;
1914            let latest = self
1915                .state
1916                .flow_control
1917                .debounce(&gate, debounce.period)
1918                .await
1919                .map_err(DispatchError::from)?;
1920            if !latest {
1921                return Ok(FlowControlOutcome::Skip {
1922                    reason: "debounced".to_string(),
1923                });
1924            }
1925        }
1926
1927        if let Some(rate_limit) = &flow.rate_limit {
1928            let gate = self
1929                .resolve_flow_gate(
1930                    &binding.binding_key(),
1931                    rate_limit.key.as_ref(),
1932                    &managed_event,
1933                    replay_of_event_id,
1934                )
1935                .await?;
1936            let allowed = self
1937                .state
1938                .flow_control
1939                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1940                .await
1941                .map_err(DispatchError::from)?;
1942            if !allowed {
1943                return Ok(FlowControlOutcome::Skip {
1944                    reason: "rate_limited".to_string(),
1945                });
1946            }
1947        }
1948
1949        if let Some(throttle) = &flow.throttle {
1950            let gate = self
1951                .resolve_flow_gate(
1952                    &binding.binding_key(),
1953                    throttle.key.as_ref(),
1954                    &managed_event,
1955                    replay_of_event_id,
1956                )
1957                .await?;
1958            self.state
1959                .flow_control
1960                .wait_for_throttle(&gate, throttle.period, throttle.max)
1961                .await
1962                .map_err(DispatchError::from)?;
1963        }
1964
1965        let mut acquired = AcquiredFlowControl::default();
1966        if let Some(singleton) = &flow.singleton {
1967            let gate = self
1968                .resolve_flow_gate(
1969                    &binding.binding_key(),
1970                    singleton.key.as_ref(),
1971                    &managed_event,
1972                    replay_of_event_id,
1973                )
1974                .await?;
1975            let acquired_singleton = self
1976                .state
1977                .flow_control
1978                .try_acquire_singleton(&gate)
1979                .await
1980                .map_err(DispatchError::from)?;
1981            if !acquired_singleton {
1982                return Ok(FlowControlOutcome::Skip {
1983                    reason: "singleton_active".to_string(),
1984                });
1985            }
1986            acquired.singleton = Some(SingletonLease { gate, held: true });
1987        }
1988
1989        if let Some(concurrency) = &flow.concurrency {
1990            let gate = self
1991                .resolve_flow_gate(
1992                    &binding.binding_key(),
1993                    concurrency.key.as_ref(),
1994                    &managed_event,
1995                    replay_of_event_id,
1996                )
1997                .await?;
1998            let priority_rank = self
1999                .resolve_priority_rank(
2000                    &binding.binding_key(),
2001                    flow.priority.as_ref(),
2002                    &managed_event,
2003                    replay_of_event_id,
2004                )
2005                .await?;
2006            let permit = self
2007                .state
2008                .flow_control
2009                .acquire_concurrency(&gate, concurrency.max, priority_rank)
2010                .await
2011                .map_err(DispatchError::from)?;
2012            acquired.concurrency = Some(ConcurrencyLease {
2013                gate,
2014                max: concurrency.max,
2015                priority_rank,
2016                permit: Some(permit),
2017            });
2018        }
2019
2020        Ok(FlowControlOutcome::Dispatch {
2021            event: Box::new(managed_event),
2022            acquired,
2023        })
2024    }
2025
2026    async fn release_flow_control(
2027        &self,
2028        acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2029    ) -> Result<(), DispatchError> {
2030        let (singleton_gate, concurrency_permit) = {
2031            let mut acquired = acquired.lock().await;
2032            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2033                if lease.held {
2034                    lease.held = false;
2035                    Some(lease.gate.clone())
2036                } else {
2037                    None
2038                }
2039            });
2040            let concurrency_permit = acquired
2041                .concurrency
2042                .as_mut()
2043                .and_then(|lease| lease.permit.take());
2044            (singleton_gate, concurrency_permit)
2045        };
2046        if let Some(gate) = singleton_gate {
2047            self.state
2048                .flow_control
2049                .release_singleton(&gate)
2050                .await
2051                .map_err(DispatchError::from)?;
2052        }
2053        if let Some(permit) = concurrency_permit {
2054            self.state
2055                .flow_control
2056                .release_concurrency(permit)
2057                .await
2058                .map_err(DispatchError::from)?;
2059        }
2060        Ok(())
2061    }
2062
2063    async fn resolve_flow_gate(
2064        &self,
2065        binding_key: &str,
2066        expr: Option<&crate::triggers::TriggerExpressionSpec>,
2067        event: &TriggerEvent,
2068        replay_of_event_id: Option<&String>,
2069    ) -> Result<String, DispatchError> {
2070        let key = match expr {
2071            Some(expr) => {
2072                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2073                    .await?
2074            }
2075            None => "_global".to_string(),
2076        };
2077        Ok(format!("{binding_key}:{key}"))
2078    }
2079
2080    async fn resolve_priority_rank(
2081        &self,
2082        binding_key: &str,
2083        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2084        event: &TriggerEvent,
2085        replay_of_event_id: Option<&String>,
2086    ) -> Result<usize, DispatchError> {
2087        let Some(priority) = priority else {
2088            return Ok(0);
2089        };
2090        let value = self
2091            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2092            .await?;
2093        Ok(priority
2094            .order
2095            .iter()
2096            .position(|candidate| candidate == &value)
2097            .unwrap_or(priority.order.len()))
2098    }
2099
2100    async fn evaluate_flow_expression(
2101        &self,
2102        binding_key: &str,
2103        expr: &crate::triggers::TriggerExpressionSpec,
2104        event: &TriggerEvent,
2105        replay_of_event_id: Option<&String>,
2106    ) -> Result<String, DispatchError> {
2107        let value = self
2108            .invoke_vm_callable(
2109                &expr.closure,
2110                binding_key,
2111                event,
2112                replay_of_event_id,
2113                "",
2114                "flow_control",
2115                AutonomyTier::Suggest,
2116                None,
2117                &mut self.cancel_tx.subscribe(),
2118            )
2119            .await?;
2120        Ok(json_value_to_gate(&vm_value_to_json(&value)))
2121    }
2122
2123    #[allow(clippy::too_many_arguments)]
2124    async fn invoke_vm_callable(
2125        &self,
2126        closure: &crate::value::VmClosure,
2127        binding_key: &str,
2128        event: &TriggerEvent,
2129        replay_of_event_id: Option<&String>,
2130        agent_id: &str,
2131        action: &str,
2132        autonomy_tier: AutonomyTier,
2133        wait_lease: Option<DispatchWaitLease>,
2134        cancel_rx: &mut broadcast::Receiver<()>,
2135    ) -> Result<VmValue, DispatchError> {
2136        let mut vm = self.base_vm.child_vm();
2137        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2138        if self.state.shutting_down.load(Ordering::SeqCst) {
2139            cancel_token.store(true, Ordering::SeqCst);
2140        }
2141        self.state
2142            .cancel_tokens
2143            .lock()
2144            .expect("dispatcher cancel tokens poisoned")
2145            .push(cancel_token.clone());
2146        vm.install_cancel_token(cancel_token.clone());
2147        let arg = event_to_handler_value(event)?;
2148        let args = [arg];
2149        let future = vm.call_closure_pub(closure, &args, &[]);
2150        pin_mut!(future);
2151        let (binding_id, binding_version) = split_binding_key(binding_key);
2152        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2153            slot.borrow_mut().replace(DispatchContext {
2154                trigger_event: event.clone(),
2155                replay_of_event_id: replay_of_event_id.cloned(),
2156                binding_id,
2157                binding_version,
2158                agent_id: agent_id.to_string(),
2159                action: action.to_string(),
2160                autonomy_tier,
2161            })
2162        });
2163        let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2164            .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2165        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2166        crate::stdlib::hitl::reset_hitl_state();
2167        let mut poll = tokio::time::interval(Duration::from_millis(100));
2168        let result = loop {
2169            tokio::select! {
2170                result = &mut future => break result,
2171                _ = recv_cancel(cancel_rx) => {
2172                    cancel_token.store(true, Ordering::SeqCst);
2173                }
2174                _ = poll.tick() => {
2175                    if dispatch_cancel_requested(
2176                        &self.event_log,
2177                        binding_key,
2178                        &event.id.0,
2179                        replay_of_event_id,
2180                    )
2181                    .await? {
2182                        cancel_token.store(true, Ordering::SeqCst);
2183                    }
2184                }
2185            }
2186        };
2187        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2188            *slot.borrow_mut() = prior_context;
2189        });
2190        ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2191            *slot.borrow_mut() = prior_wait_lease;
2192        });
2193        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2194        {
2195            let mut tokens = self
2196                .state
2197                .cancel_tokens
2198                .lock()
2199                .expect("dispatcher cancel tokens poisoned");
2200            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2201        }
2202
2203        if cancel_token.load(Ordering::SeqCst) {
2204            if dispatch_cancel_requested(
2205                &self.event_log,
2206                binding_key,
2207                &event.id.0,
2208                replay_of_event_id,
2209            )
2210            .await?
2211            {
2212                Err(DispatchError::Cancelled(
2213                    "trigger cancel request cancelled local handler".to_string(),
2214                ))
2215            } else {
2216                Err(DispatchError::Cancelled(
2217                    "dispatcher shutdown cancelled local handler".to_string(),
2218                ))
2219            }
2220        } else {
2221            result.map_err(dispatch_error_from_vm_error)
2222        }
2223    }
2224
2225    async fn evaluate_predicate(
2226        &self,
2227        binding: &TriggerBinding,
2228        predicate: &super::registry::TriggerPredicateSpec,
2229        event: &TriggerEvent,
2230        replay_of_event_id: Option<&String>,
2231        autonomy_tier: AutonomyTier,
2232    ) -> Result<PredicateEvaluationRecord, DispatchError> {
2233        let event_id = event.id.0.clone();
2234        let trigger_id = binding.id.as_str().to_string();
2235        let now_ms = now_unix_ms();
2236        let today = utc_day_key();
2237
2238        let breaker_open_until = {
2239            let mut state = binding
2240                .predicate_state
2241                .lock()
2242                .expect("trigger predicate state poisoned");
2243            if state.budget_day_utc != Some(today) {
2244                state.budget_day_utc = Some(today);
2245                binding
2246                    .metrics
2247                    .cost_today_usd_micros
2248                    .store(0, Ordering::Relaxed);
2249            }
2250            if state
2251                .breaker_open_until_ms
2252                .is_some_and(|until_ms| until_ms > now_ms)
2253            {
2254                state.breaker_open_until_ms
2255            } else {
2256                None
2257            }
2258        };
2259
2260        if breaker_open_until.is_some() {
2261            let mut metadata = BTreeMap::new();
2262            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2263            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2264            metadata.insert(
2265                "breaker_open_until_ms".to_string(),
2266                serde_json::json!(breaker_open_until),
2267            );
2268            crate::events::log_warn_meta(
2269                "trigger.predicate.circuit_breaker",
2270                "trigger predicate circuit breaker is open; short-circuiting to false",
2271                metadata,
2272            );
2273            let record = PredicateEvaluationRecord {
2274                result: false,
2275                reason: Some("circuit_open".to_string()),
2276                ..Default::default()
2277            };
2278            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2279                .await?;
2280            return Ok(record);
2281        }
2282
2283        if binding
2284            .daily_cost_usd
2285            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2286        {
2287            self.append_lifecycle_event(
2288                "predicate.daily_budget_exceeded",
2289                event,
2290                binding,
2291                serde_json::json!({
2292                    "trigger_id": binding.id.as_str(),
2293                    "event_id": event.id.0,
2294                    "limit_usd": binding.daily_cost_usd,
2295                    "cost_today_usd": current_predicate_daily_cost(binding),
2296                    "replay_of_event_id": replay_of_event_id,
2297                }),
2298                replay_of_event_id,
2299            )
2300            .await?;
2301            let record = PredicateEvaluationRecord {
2302                result: false,
2303                reason: Some("daily_budget_exceeded".to_string()),
2304                ..Default::default()
2305            };
2306            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2307                .await?;
2308            return Ok(record);
2309        }
2310
2311        let replay_cache = self
2312            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2313            .await?;
2314        let guard = start_predicate_evaluation(
2315            binding.when_budget.clone().unwrap_or_default(),
2316            replay_cache,
2317        );
2318        let started = std::time::Instant::now();
2319        let eval = self
2320            .invoke_vm_callable_with_timeout(
2321                &predicate.closure,
2322                &binding.binding_key(),
2323                event,
2324                replay_of_event_id,
2325                binding.id.as_str(),
2326                &format!("{}.{}", event.provider.as_str(), event.kind),
2327                autonomy_tier,
2328                &mut self.cancel_tx.subscribe(),
2329                binding
2330                    .when_budget
2331                    .as_ref()
2332                    .and_then(|budget| budget.timeout()),
2333            )
2334            .await;
2335        let capture = guard.finish();
2336        let latency_ms = started.elapsed().as_millis() as u64;
2337        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
2338            self.append_predicate_cache_record(binding, event, &capture.entries)
2339                .await?;
2340        }
2341
2342        let mut record = PredicateEvaluationRecord {
2343            result: false,
2344            cost_usd: capture.total_cost_usd,
2345            tokens: capture.total_tokens,
2346            latency_ms,
2347            cached: capture.cached,
2348            reason: None,
2349        };
2350
2351        let mut count_failure = false;
2352        let mut opened_breaker = false;
2353
2354        match eval {
2355            Ok(value) => match predicate_value_as_bool(value) {
2356                Ok(result) => {
2357                    record.result = result;
2358                }
2359                Err(reason) => {
2360                    count_failure = true;
2361                    record.reason = Some(reason);
2362                }
2363            },
2364            Err(error) => {
2365                count_failure = true;
2366                record.reason = Some(error.to_string());
2367            }
2368        }
2369
2370        let cost_usd_micros = usd_to_micros(record.cost_usd);
2371        if cost_usd_micros > 0 {
2372            binding
2373                .metrics
2374                .cost_total_usd_micros
2375                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2376            binding
2377                .metrics
2378                .cost_today_usd_micros
2379                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2380        }
2381
2382        let timed_out = matches!(
2383            record.reason.as_deref(),
2384            Some("predicate evaluation timed out")
2385        );
2386        if capture.budget_exceeded || timed_out {
2387            record.result = false;
2388            record.reason = Some("budget_exceeded".to_string());
2389            self.append_lifecycle_event(
2390                "predicate.budget_exceeded",
2391                event,
2392                binding,
2393                serde_json::json!({
2394                    "trigger_id": binding.id.as_str(),
2395                    "event_id": event.id.0,
2396                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2397                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2398                    "cost_usd": record.cost_usd,
2399                    "tokens": record.tokens,
2400                    "replay_of_event_id": replay_of_event_id,
2401                }),
2402                replay_of_event_id,
2403            )
2404            .await?;
2405        }
2406
2407        if binding
2408            .daily_cost_usd
2409            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2410        {
2411            record.result = false;
2412            record.reason = Some("daily_budget_exceeded".to_string());
2413            self.append_lifecycle_event(
2414                "predicate.daily_budget_exceeded",
2415                event,
2416                binding,
2417                serde_json::json!({
2418                    "trigger_id": binding.id.as_str(),
2419                    "event_id": event.id.0,
2420                    "limit_usd": binding.daily_cost_usd,
2421                    "cost_today_usd": current_predicate_daily_cost(binding),
2422                    "replay_of_event_id": replay_of_event_id,
2423                }),
2424                replay_of_event_id,
2425            )
2426            .await?;
2427        }
2428
2429        {
2430            let mut state = binding
2431                .predicate_state
2432                .lock()
2433                .expect("trigger predicate state poisoned");
2434            if state.budget_day_utc != Some(today) {
2435                state.budget_day_utc = Some(today);
2436                binding
2437                    .metrics
2438                    .cost_today_usd_micros
2439                    .store(cost_usd_micros, Ordering::Relaxed);
2440            }
2441            if count_failure {
2442                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2443                if state.consecutive_failures >= 3 {
2444                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2445                    opened_breaker = true;
2446                }
2447            } else {
2448                state.consecutive_failures = 0;
2449                state.breaker_open_until_ms = None;
2450            }
2451        }
2452
2453        if opened_breaker {
2454            let mut metadata = BTreeMap::new();
2455            metadata.insert(
2456                "trigger_id".to_string(),
2457                serde_json::json!(binding.id.as_str()),
2458            );
2459            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2460            metadata.insert("failure_count".to_string(), serde_json::json!(3));
2461            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2462            crate::events::log_warn_meta(
2463                "trigger.predicate.circuit_breaker",
2464                "trigger predicate circuit breaker opened for 5 minutes",
2465                metadata,
2466            );
2467        }
2468
2469        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2470            .await?;
2471        Ok(record)
2472    }
2473
2474    #[allow(clippy::too_many_arguments)]
2475    #[allow(clippy::too_many_arguments)]
2476    async fn invoke_vm_callable_with_timeout(
2477        &self,
2478        closure: &crate::value::VmClosure,
2479        binding_key: &str,
2480        event: &TriggerEvent,
2481        replay_of_event_id: Option<&String>,
2482        agent_id: &str,
2483        action: &str,
2484        autonomy_tier: AutonomyTier,
2485        cancel_rx: &mut broadcast::Receiver<()>,
2486        timeout: Option<Duration>,
2487    ) -> Result<VmValue, DispatchError> {
2488        let future = self.invoke_vm_callable(
2489            closure,
2490            binding_key,
2491            event,
2492            replay_of_event_id,
2493            agent_id,
2494            action,
2495            autonomy_tier,
2496            None,
2497            cancel_rx,
2498        );
2499        pin_mut!(future);
2500        if let Some(timeout) = timeout {
2501            match tokio::time::timeout(timeout, future).await {
2502                Ok(result) => result,
2503                Err(_) => Err(DispatchError::Local(
2504                    "predicate evaluation timed out".to_string(),
2505                )),
2506            }
2507        } else {
2508            future.await
2509        }
2510    }
2511
2512    async fn append_predicate_evaluated_event(
2513        &self,
2514        binding: &TriggerBinding,
2515        event: &TriggerEvent,
2516        record: &PredicateEvaluationRecord,
2517        replay_of_event_id: Option<&String>,
2518    ) -> Result<(), DispatchError> {
2519        self.append_lifecycle_event(
2520            "predicate.evaluated",
2521            event,
2522            binding,
2523            serde_json::json!({
2524                "trigger_id": binding.id.as_str(),
2525                "event_id": event.id.0,
2526                "result": record.result,
2527                "cost_usd": record.cost_usd,
2528                "tokens": record.tokens,
2529                "latency_ms": record.latency_ms,
2530                "cached": record.cached,
2531                "reason": record.reason,
2532                "replay_of_event_id": replay_of_event_id,
2533            }),
2534            replay_of_event_id,
2535        )
2536        .await
2537    }
2538
2539    async fn append_predicate_cache_record(
2540        &self,
2541        binding: &TriggerBinding,
2542        event: &TriggerEvent,
2543        entries: &[PredicateCacheEntry],
2544    ) -> Result<(), DispatchError> {
2545        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2546            .expect("static trigger inbox legacy topic name is valid");
2547        let payload = serde_json::to_value(PredicateCacheRecord {
2548            trigger_id: binding.id.as_str().to_string(),
2549            event_id: event.id.0.clone(),
2550            entries: entries.to_vec(),
2551        })
2552        .map_err(|error| DispatchError::Serde(error.to_string()))?;
2553        self.event_log
2554            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2555            .await
2556            .map_err(DispatchError::from)
2557            .map(|_| ())
2558    }
2559
2560    async fn read_predicate_cache_record(
2561        &self,
2562        event_id: &str,
2563    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2564        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2565            .expect("static trigger inbox legacy topic name is valid");
2566        let records = self
2567            .event_log
2568            .read_range(&topic, None, usize::MAX)
2569            .await
2570            .map_err(DispatchError::from)?;
2571        Ok(records
2572            .into_iter()
2573            .filter(|(_, event)| event.kind == "predicate_llm_cache")
2574            .filter_map(|(_, event)| {
2575                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2576            })
2577            .filter(|record| record.event_id == event_id)
2578            .flat_map(|record| record.entries)
2579            .collect())
2580    }
2581
2582    #[allow(clippy::too_many_arguments)]
2583    async fn append_dispatch_trust_record(
2584        &self,
2585        binding: &TriggerBinding,
2586        route: &DispatchUri,
2587        event: &TriggerEvent,
2588        replay_of_event_id: Option<&String>,
2589        autonomy_tier: AutonomyTier,
2590        outcome: TrustOutcome,
2591        terminal_status: &str,
2592        attempt_count: u32,
2593        error: Option<String>,
2594    ) -> Result<(), DispatchError> {
2595        let mut record = TrustRecord::new(
2596            binding.id.as_str().to_string(),
2597            format!("{}.{}", event.provider.as_str(), event.kind),
2598            None,
2599            outcome,
2600            event.trace_id.0.clone(),
2601            autonomy_tier,
2602        );
2603        record.metadata.insert(
2604            "binding_key".to_string(),
2605            serde_json::json!(binding.binding_key()),
2606        );
2607        record.metadata.insert(
2608            "binding_version".to_string(),
2609            serde_json::json!(binding.version),
2610        );
2611        record.metadata.insert(
2612            "provider".to_string(),
2613            serde_json::json!(event.provider.as_str()),
2614        );
2615        record
2616            .metadata
2617            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2618        record
2619            .metadata
2620            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2621        record.metadata.insert(
2622            "target_uri".to_string(),
2623            serde_json::json!(route.target_uri()),
2624        );
2625        record.metadata.insert(
2626            "terminal_status".to_string(),
2627            serde_json::json!(terminal_status),
2628        );
2629        record.metadata.insert(
2630            "attempt_count".to_string(),
2631            serde_json::json!(attempt_count),
2632        );
2633        if let Some(replay_of_event_id) = replay_of_event_id {
2634            record.metadata.insert(
2635                "replay_of_event_id".to_string(),
2636                serde_json::json!(replay_of_event_id),
2637            );
2638        }
2639        if let Some(error) = error {
2640            record
2641                .metadata
2642                .insert("error".to_string(), serde_json::json!(error));
2643        }
2644        append_trust_record(&self.event_log, &record)
2645            .await
2646            .map_err(DispatchError::from)
2647    }
2648
2649    async fn append_attempt_record(
2650        &self,
2651        event: &TriggerEvent,
2652        binding: &TriggerBinding,
2653        attempt: &DispatchAttemptRecord,
2654        replay_of_event_id: Option<&String>,
2655    ) -> Result<(), DispatchError> {
2656        self.append_topic_event(
2657            TRIGGER_ATTEMPTS_TOPIC,
2658            "attempt_recorded",
2659            event,
2660            Some(binding),
2661            Some(attempt.attempt),
2662            serde_json::to_value(attempt)
2663                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2664            replay_of_event_id,
2665        )
2666        .await
2667    }
2668
2669    async fn append_lifecycle_event(
2670        &self,
2671        kind: &str,
2672        event: &TriggerEvent,
2673        binding: &TriggerBinding,
2674        payload: serde_json::Value,
2675        replay_of_event_id: Option<&String>,
2676    ) -> Result<(), DispatchError> {
2677        self.append_topic_event(
2678            TRIGGERS_LIFECYCLE_TOPIC,
2679            kind,
2680            event,
2681            Some(binding),
2682            None,
2683            payload,
2684            replay_of_event_id,
2685        )
2686        .await
2687    }
2688
2689    async fn append_skipped_outbox_event(
2690        &self,
2691        binding: &TriggerBinding,
2692        route: &DispatchUri,
2693        event: &TriggerEvent,
2694        replay_of_event_id: Option<&String>,
2695        stage: DispatchSkipStage,
2696        detail: serde_json::Value,
2697    ) -> Result<(), DispatchError> {
2698        self.append_topic_event(
2699            TRIGGER_OUTBOX_TOPIC,
2700            "dispatch_skipped",
2701            event,
2702            Some(binding),
2703            None,
2704            serde_json::json!({
2705                "event_id": event.id.0,
2706                "trigger_id": binding.id.as_str(),
2707                "binding_key": binding.binding_key(),
2708                "handler_kind": route.kind(),
2709                "target_uri": route.target_uri(),
2710                "skip_stage": stage.as_str(),
2711                "detail": detail,
2712                "replay_of_event_id": replay_of_event_id,
2713            }),
2714            replay_of_event_id,
2715        )
2716        .await
2717    }
2718
2719    async fn append_topic_event(
2720        &self,
2721        topic_name: &str,
2722        kind: &str,
2723        event: &TriggerEvent,
2724        binding: Option<&TriggerBinding>,
2725        attempt: Option<u32>,
2726        payload: serde_json::Value,
2727        replay_of_event_id: Option<&String>,
2728    ) -> Result<(), DispatchError> {
2729        let topic = Topic::new(topic_name)
2730            .expect("static trigger dispatcher topic names should always be valid");
2731        let headers = event_headers(event, binding, attempt, replay_of_event_id);
2732        self.event_log
2733            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2734            .await
2735            .map_err(DispatchError::from)
2736            .map(|_| ())
2737    }
2738
2739    async fn emit_action_graph(
2740        &self,
2741        event: &TriggerEvent,
2742        nodes: Vec<RunActionGraphNodeRecord>,
2743        edges: Vec<RunActionGraphEdgeRecord>,
2744        extra: serde_json::Value,
2745    ) -> Result<(), DispatchError> {
2746        let mut headers = BTreeMap::new();
2747        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2748        headers.insert("event_id".to_string(), event.id.0.clone());
2749        let observability = RunObservabilityRecord {
2750            schema_version: 1,
2751            action_graph_nodes: nodes,
2752            action_graph_edges: edges,
2753            ..Default::default()
2754        };
2755        append_action_graph_update(
2756            headers,
2757            serde_json::json!({
2758                "source": "dispatcher",
2759                "trace_id": event.trace_id.0,
2760                "event_id": event.id.0,
2761                "observability": observability,
2762                "context": extra,
2763            }),
2764        )
2765        .await
2766        .map_err(DispatchError::from)
2767    }
2768}
2769
2770async fn dispatch_cancel_requested(
2771    event_log: &Arc<AnyEventLog>,
2772    binding_key: &str,
2773    event_id: &str,
2774    replay_of_event_id: Option<&String>,
2775) -> Result<bool, DispatchError> {
2776    if replay_of_event_id.is_some() {
2777        return Ok(false);
2778    }
2779    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2780        .expect("static trigger cancel topic should always be valid");
2781    let events = event_log.read_range(&topic, None, usize::MAX).await?;
2782    let requested = events
2783        .into_iter()
2784        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2785        .filter_map(|(_, event)| {
2786            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2787        })
2788        .collect::<BTreeSet<_>>();
2789    Ok(requested
2790        .iter()
2791        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2792}
2793
2794async fn sleep_or_cancel_or_request(
2795    event_log: &Arc<AnyEventLog>,
2796    delay: Duration,
2797    binding_key: &str,
2798    event_id: &str,
2799    replay_of_event_id: Option<&String>,
2800    cancel_rx: &mut broadcast::Receiver<()>,
2801) -> Result<(), DispatchError> {
2802    let sleep = tokio::time::sleep(delay);
2803    pin_mut!(sleep);
2804    let mut poll = tokio::time::interval(Duration::from_millis(100));
2805    loop {
2806        tokio::select! {
2807            _ = &mut sleep => return Ok(()),
2808            _ = recv_cancel(cancel_rx) => {
2809                return Err(DispatchError::Cancelled(
2810                    "dispatcher shutdown cancelled retry wait".to_string(),
2811                ));
2812            }
2813            _ = poll.tick() => {
2814                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2815                    return Err(DispatchError::Cancelled(
2816                        "trigger cancel request cancelled retry wait".to_string(),
2817                    ));
2818                }
2819            }
2820        }
2821    }
2822}
2823
2824fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2825    let mut iter = events.into_iter();
2826    let Some(mut root) = iter.next() else {
2827        return Err(DispatchError::Registry(
2828            "batch dispatch produced an empty event list".to_string(),
2829        ));
2830    };
2831    let mut batch = Vec::new();
2832    batch.push(
2833        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2834    );
2835    for event in iter {
2836        batch.push(
2837            serde_json::to_value(&event)
2838                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2839        );
2840    }
2841    root.batch = Some(batch);
2842    Ok(root)
2843}
2844
2845fn json_value_to_gate(value: &serde_json::Value) -> String {
2846    match value {
2847        serde_json::Value::Null => "null".to_string(),
2848        serde_json::Value::String(text) => text.clone(),
2849        serde_json::Value::Bool(value) => value.to_string(),
2850        serde_json::Value::Number(value) => value.to_string(),
2851        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2852    }
2853}
2854
2855fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
2856    let json =
2857        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2858    let value = json_to_vm_value(&json);
2859    match (&event.raw_body, value) {
2860        (Some(raw_body), VmValue::Dict(dict)) => {
2861            let mut map = (*dict).clone();
2862            map.insert(
2863                "raw_body".to_string(),
2864                VmValue::Bytes(Rc::new(raw_body.clone())),
2865            );
2866            Ok(VmValue::Dict(Rc::new(map)))
2867        }
2868        (_, other) => Ok(other),
2869    }
2870}
2871
2872fn decrement_in_flight(state: &DispatcherRuntimeState) {
2873    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2874    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2875        state.idle_notify.notify_waiters();
2876    }
2877}
2878
2879fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2880    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2881    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2882        state.idle_notify.notify_waiters();
2883    }
2884}
2885
2886#[cfg(test)]
2887fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2888    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2889        *slot.borrow_mut() = Some(tx);
2890    });
2891}
2892
2893#[cfg(not(test))]
2894fn notify_test_inbox_dequeued() {}
2895
2896#[cfg(test)]
2897fn notify_test_inbox_dequeued() {
2898    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2899        if let Some(tx) = slot.borrow_mut().take() {
2900            let _ = tx.send(());
2901        }
2902    });
2903}
2904
2905pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2906    event_log: &L,
2907    event: &TriggerEvent,
2908) -> Result<u64, DispatchError> {
2909    let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2910        .expect("static trigger.inbox.envelopes topic is valid");
2911    let headers = event_headers(event, None, None, None);
2912    let payload =
2913        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2914    event_log
2915        .append(
2916            &topic,
2917            LogEvent::new("event_ingested", payload).with_headers(headers),
2918        )
2919        .await
2920        .map_err(DispatchError::from)
2921}
2922
2923pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2924    ACTIVE_DISPATCHER_STATE.with(|slot| {
2925        slot.borrow()
2926            .as_ref()
2927            .map(|state| DispatcherStatsSnapshot {
2928                in_flight: state.in_flight.load(Ordering::Relaxed),
2929                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2930                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2931            })
2932            .unwrap_or_default()
2933    })
2934}
2935
2936pub fn clear_dispatcher_state() {
2937    ACTIVE_DISPATCHER_STATE.with(|slot| {
2938        *slot.borrow_mut() = None;
2939    });
2940    ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2941        *slot.borrow_mut() = None;
2942    });
2943}
2944
2945fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2946    if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
2947        return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
2948    }
2949    if is_cancelled_vm_error(&error) {
2950        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2951    }
2952    if let VmError::Thrown(VmValue::String(message)) = &error {
2953        return DispatchError::Local(message.to_string());
2954    }
2955    match error_to_category(&error) {
2956        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2957        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2958        ErrorCategory::Cancelled => {
2959            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2960        }
2961        _ => DispatchError::Local(error.to_string()),
2962    }
2963}
2964
2965fn dispatch_error_label(error: &DispatchError) -> &'static str {
2966    match error {
2967        DispatchError::Denied(_) => "denied",
2968        DispatchError::Timeout(_) => "timeout",
2969        DispatchError::Waiting(_) => "waiting",
2970        DispatchError::Cancelled(_) => "cancelled",
2971        _ => "failed",
2972    }
2973}
2974
2975fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
2976    match route {
2977        DispatchUri::Worker { .. } => "enqueued",
2978        DispatchUri::A2a { .. }
2979            if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
2980        {
2981            "pending"
2982        }
2983        DispatchUri::A2a { .. } => "completed",
2984        DispatchUri::Local { .. } => "success",
2985    }
2986}
2987
2988fn dispatch_node_id(
2989    route: &DispatchUri,
2990    binding_key: &str,
2991    event_id: &str,
2992    attempt: u32,
2993) -> String {
2994    let prefix = match route {
2995        DispatchUri::A2a { .. } => "a2a",
2996        _ => "dispatch",
2997    };
2998    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2999}
3000
3001fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3002    match route {
3003        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3004        DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3005        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3006    }
3007}
3008
3009fn dispatch_node_label(route: &DispatchUri) -> String {
3010    match route {
3011        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3012        _ => route.target_uri(),
3013    }
3014}
3015
3016fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3017    match route {
3018        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3019        _ => None,
3020    }
3021}
3022
3023fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3024    match route {
3025        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3026        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3027        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3028    }
3029}
3030
3031fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3032    match status {
3033        crate::triggers::SignatureStatus::Verified => "verified",
3034        crate::triggers::SignatureStatus::Unsigned => "unsigned",
3035        crate::triggers::SignatureStatus::Failed { .. } => "failed",
3036    }
3037}
3038
3039fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3040    let mut metadata = BTreeMap::new();
3041    metadata.insert(
3042        "provider".to_string(),
3043        serde_json::json!(event.provider.as_str()),
3044    );
3045    metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3046    metadata.insert(
3047        "dedupe_key".to_string(),
3048        serde_json::json!(event.dedupe_key),
3049    );
3050    metadata.insert(
3051        "signature_status".to_string(),
3052        serde_json::json!(signature_status_label(&event.signature_status)),
3053    );
3054    metadata
3055}
3056
3057fn predicate_node_metadata(
3058    binding: &TriggerBinding,
3059    predicate: &super::registry::TriggerPredicateSpec,
3060    event: &TriggerEvent,
3061    evaluation: &PredicateEvaluationRecord,
3062) -> BTreeMap<String, serde_json::Value> {
3063    let mut metadata = BTreeMap::new();
3064    metadata.insert(
3065        "trigger_id".to_string(),
3066        serde_json::json!(binding.id.as_str()),
3067    );
3068    metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
3069    metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
3070    metadata.insert(
3071        "cost_usd".to_string(),
3072        serde_json::json!(evaluation.cost_usd),
3073    );
3074    metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
3075    metadata.insert(
3076        "latency_ms".to_string(),
3077        serde_json::json!(evaluation.latency_ms),
3078    );
3079    metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
3080    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3081    if let Some(reason) = evaluation.reason.as_ref() {
3082        metadata.insert("reason".to_string(), serde_json::json!(reason));
3083    }
3084    metadata
3085}
3086
3087fn dispatch_node_metadata(
3088    route: &DispatchUri,
3089    binding: &TriggerBinding,
3090    event: &TriggerEvent,
3091    attempt: u32,
3092) -> BTreeMap<String, serde_json::Value> {
3093    let mut metadata = BTreeMap::new();
3094    metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3095    metadata.insert(
3096        "target_uri".to_string(),
3097        serde_json::json!(route.target_uri()),
3098    );
3099    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3100    metadata.insert(
3101        "trigger_id".to_string(),
3102        serde_json::json!(binding.id.as_str()),
3103    );
3104    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3105    if let Some(target_agent) = dispatch_target_agent(route) {
3106        metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3107    }
3108    if let DispatchUri::Worker { queue } = route {
3109        metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3110    }
3111    metadata
3112}
3113
3114fn dispatch_success_metadata(
3115    route: &DispatchUri,
3116    binding: &TriggerBinding,
3117    event: &TriggerEvent,
3118    attempt: u32,
3119    result: &serde_json::Value,
3120) -> BTreeMap<String, serde_json::Value> {
3121    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3122    match route {
3123        DispatchUri::A2a { .. } => {
3124            if let Some(task_id) = result
3125                .get("task_id")
3126                .or_else(|| result.get("id"))
3127                .and_then(|value| value.as_str())
3128            {
3129                metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3130            }
3131            if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3132                metadata.insert("state".to_string(), serde_json::json!(state));
3133            }
3134        }
3135        DispatchUri::Worker { .. } => {
3136            if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3137            {
3138                metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3139            }
3140            if let Some(response_topic) = result
3141                .get("response_topic")
3142                .and_then(|value| value.as_str())
3143            {
3144                metadata.insert(
3145                    "response_topic".to_string(),
3146                    serde_json::json!(response_topic),
3147                );
3148            }
3149        }
3150        DispatchUri::Local { .. } => {}
3151    }
3152    metadata
3153}
3154
3155fn dispatch_error_metadata(
3156    route: &DispatchUri,
3157    binding: &TriggerBinding,
3158    event: &TriggerEvent,
3159    attempt: u32,
3160    error: &DispatchError,
3161) -> BTreeMap<String, serde_json::Value> {
3162    let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3163    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3164    metadata
3165}
3166
3167fn retry_node_metadata(
3168    binding: &TriggerBinding,
3169    event: &TriggerEvent,
3170    attempt: u32,
3171    delay: Duration,
3172    error: &DispatchError,
3173) -> BTreeMap<String, serde_json::Value> {
3174    let mut metadata = BTreeMap::new();
3175    metadata.insert(
3176        "trigger_id".to_string(),
3177        serde_json::json!(binding.id.as_str()),
3178    );
3179    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3180    metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3181    metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3182    metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3183    metadata
3184}
3185
3186fn dlq_node_metadata(
3187    binding: &TriggerBinding,
3188    event: &TriggerEvent,
3189    attempt_count: u32,
3190    final_error: &str,
3191) -> BTreeMap<String, serde_json::Value> {
3192    let mut metadata = BTreeMap::new();
3193    metadata.insert(
3194        "trigger_id".to_string(),
3195        serde_json::json!(binding.id.as_str()),
3196    );
3197    metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3198    metadata.insert(
3199        "attempt_count".to_string(),
3200        serde_json::json!(attempt_count),
3201    );
3202    metadata.insert("final_error".to_string(), serde_json::json!(final_error));
3203    metadata
3204}
3205
3206fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
3207    match value {
3208        VmValue::Bool(result) => Ok(result),
3209        VmValue::EnumVariant {
3210            enum_name,
3211            variant,
3212            mut fields,
3213        } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
3214            Some(VmValue::Bool(result)) => Ok(result),
3215            Some(other) => Err(format!(
3216                "predicate Result.Ok payload must be bool, got {}",
3217                other.type_name()
3218            )),
3219            None => Err("predicate Result.Ok payload is missing".to_string()),
3220        },
3221        VmValue::EnumVariant {
3222            enum_name,
3223            variant,
3224            fields,
3225        } if enum_name == "Result" && variant == "Err" => Err(fields
3226            .first()
3227            .map(VmValue::display)
3228            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
3229        other => Err(format!(
3230            "predicate must return bool or Result<bool, _>, got {}",
3231            other.type_name()
3232        )),
3233    }
3234}
3235
3236fn usd_to_micros(value: f64) -> u64 {
3237    if !value.is_finite() || value <= 0.0 {
3238        return 0;
3239    }
3240    (value * 1_000_000.0).round() as u64
3241}
3242
3243fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
3244    binding
3245        .metrics
3246        .cost_today_usd_micros
3247        .load(Ordering::Relaxed) as f64
3248        / 1_000_000.0
3249}
3250
3251fn split_binding_key(binding_key: &str) -> (String, u32) {
3252    let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3253        return (binding_key.to_string(), 0);
3254    };
3255    let version = suffix.parse::<u32>().unwrap_or(0);
3256    (binding_id.to_string(), version)
3257}
3258
3259fn is_cancelled_vm_error(error: &VmError) -> bool {
3260    matches!(
3261        error,
3262        VmError::Thrown(VmValue::String(message))
3263            if message.starts_with("kind:cancelled:")
3264    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3265}
3266
3267fn event_headers(
3268    event: &TriggerEvent,
3269    binding: Option<&TriggerBinding>,
3270    attempt: Option<u32>,
3271    replay_of_event_id: Option<&String>,
3272) -> BTreeMap<String, String> {
3273    let mut headers = BTreeMap::new();
3274    headers.insert("event_id".to_string(), event.id.0.clone());
3275    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3276    headers.insert("provider".to_string(), event.provider.as_str().to_string());
3277    headers.insert("kind".to_string(), event.kind.clone());
3278    if let Some(replay_of_event_id) = replay_of_event_id {
3279        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3280    }
3281    if let Some(binding) = binding {
3282        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3283        headers.insert("binding_key".to_string(), binding.binding_key());
3284        headers.insert(
3285            "handler_kind".to_string(),
3286            DispatchUri::from(&binding.handler).kind().to_string(),
3287        );
3288    }
3289    if let Some(attempt) = attempt {
3290        headers.insert("attempt".to_string(), attempt.to_string());
3291    }
3292    headers
3293}
3294
3295fn worker_queue_priority(
3296    binding: &super::registry::TriggerBinding,
3297    event: &TriggerEvent,
3298) -> crate::WorkerQueuePriority {
3299    match event
3300        .headers
3301        .get("priority")
3302        .map(|value| value.trim().to_ascii_lowercase())
3303        .as_deref()
3304    {
3305        Some("high") => crate::WorkerQueuePriority::High,
3306        Some("low") => crate::WorkerQueuePriority::Low,
3307        _ => binding.dispatch_priority,
3308    }
3309}
3310
3311const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3312
3313fn maybe_fail_before_outbox() {
3314    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3315        std::process::exit(86);
3316    }
3317}
3318
3319fn now_rfc3339() -> String {
3320    time::OffsetDateTime::now_utc()
3321        .format(&time::format_description::well_known::Rfc3339)
3322        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3323}
3324
3325fn now_unix_ms() -> i64 {
3326    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3327}
3328
3329fn utc_day_key() -> i32 {
3330    time::OffsetDateTime::now_utc().date().to_julian_day()
3331}
3332
3333fn cancelled_dispatch_outcome(
3334    binding: &TriggerBinding,
3335    route: &DispatchUri,
3336    event: &TriggerEvent,
3337    replay_of_event_id: Option<String>,
3338    attempt_count: u32,
3339    error: String,
3340) -> DispatchOutcome {
3341    DispatchOutcome {
3342        trigger_id: binding.id.as_str().to_string(),
3343        binding_key: binding.binding_key(),
3344        event_id: event.id.0.clone(),
3345        attempt_count,
3346        status: DispatchStatus::Cancelled,
3347        handler_kind: route.kind().to_string(),
3348        target_uri: route.target_uri(),
3349        replay_of_event_id,
3350        result: None,
3351        error: Some(error),
3352    }
3353}
3354
3355async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3356    let _ = cancel_rx.recv().await;
3357}
3358
3359#[cfg(test)]
3360mod tests;