Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::Notify;
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25    ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26    ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40    TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53    #[cfg(test)]
54    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
55}
56
57tokio::task_local! {
58    static ACTIVE_DISPATCH_IS_REPLAY: bool;
59}
60
61#[derive(Clone, Debug)]
62pub(crate) struct DispatchContext {
63    pub trigger_event: TriggerEvent,
64    pub replay_of_event_id: Option<String>,
65    pub agent_id: String,
66    pub action: String,
67    pub autonomy_tier: AutonomyTier,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71struct PredicateCacheRecord {
72    trigger_id: String,
73    event_id: String,
74    entries: Vec<PredicateCacheEntry>,
75}
76
77#[derive(Clone, Debug, Default)]
78struct PredicateEvaluationRecord {
79    result: bool,
80    cost_usd: f64,
81    tokens: u64,
82    latency_ms: u64,
83    cached: bool,
84    reason: Option<String>,
85}
86
87pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
88    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
89}
90
91pub(crate) fn current_dispatch_is_replay() -> bool {
92    ACTIVE_DISPATCH_IS_REPLAY
93        .try_with(|is_replay| *is_replay)
94        .unwrap_or(false)
95}
96
97#[derive(Clone)]
98pub struct Dispatcher {
99    base_vm: Rc<Vm>,
100    event_log: Arc<AnyEventLog>,
101    cancel_tx: broadcast::Sender<()>,
102    state: Arc<DispatcherRuntimeState>,
103    metrics: Option<Arc<crate::MetricsRegistry>>,
104}
105
106#[derive(Debug)]
107struct DispatcherRuntimeState {
108    in_flight: AtomicU64,
109    retry_queue_depth: AtomicU64,
110    dlq: Mutex<Vec<DlqEntry>>,
111    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
112    shutting_down: std::sync::atomic::AtomicBool,
113    idle_notify: Notify,
114    flow_control: FlowControlManager,
115}
116
117impl DispatcherRuntimeState {
118    fn new(event_log: Arc<AnyEventLog>) -> Self {
119        Self {
120            in_flight: AtomicU64::new(0),
121            retry_queue_depth: AtomicU64::new(0),
122            dlq: Mutex::new(Vec::new()),
123            cancel_tokens: Mutex::new(Vec::new()),
124            shutting_down: std::sync::atomic::AtomicBool::new(false),
125            idle_notify: Notify::new(),
126            flow_control: FlowControlManager::new(event_log),
127        }
128    }
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(default)]
133pub struct DispatcherStatsSnapshot {
134    pub in_flight: u64,
135    pub retry_queue_depth: u64,
136    pub dlq_depth: u64,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum DispatchStatus {
142    Succeeded,
143    Failed,
144    Dlq,
145    Skipped,
146    Cancelled,
147}
148
149impl DispatchStatus {
150    pub fn as_str(&self) -> &'static str {
151        match self {
152            Self::Succeeded => "succeeded",
153            Self::Failed => "failed",
154            Self::Dlq => "dlq",
155            Self::Skipped => "skipped",
156            Self::Cancelled => "cancelled",
157        }
158    }
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162#[serde(default)]
163pub struct DispatchOutcome {
164    pub trigger_id: String,
165    pub binding_key: String,
166    pub event_id: String,
167    pub attempt_count: u32,
168    pub status: DispatchStatus,
169    pub handler_kind: String,
170    pub target_uri: String,
171    pub replay_of_event_id: Option<String>,
172    pub result: Option<serde_json::Value>,
173    pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct InboxEnvelope {
178    pub trigger_id: Option<String>,
179    pub binding_version: Option<u32>,
180    pub event: TriggerEvent,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(default)]
185pub struct DispatcherDrainReport {
186    pub drained: bool,
187    pub in_flight: u64,
188    pub retry_queue_depth: u64,
189    pub dlq_depth: u64,
190}
191
192impl Default for DispatchOutcome {
193    fn default() -> Self {
194        Self {
195            trigger_id: String::new(),
196            binding_key: String::new(),
197            event_id: String::new(),
198            attempt_count: 0,
199            status: DispatchStatus::Failed,
200            handler_kind: String::new(),
201            target_uri: String::new(),
202            replay_of_event_id: None,
203            result: None,
204            error: None,
205        }
206    }
207}
208
209#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
210#[serde(default)]
211pub struct DispatchAttemptRecord {
212    pub trigger_id: String,
213    pub binding_key: String,
214    pub event_id: String,
215    pub attempt: u32,
216    pub handler_kind: String,
217    pub started_at: String,
218    pub completed_at: String,
219    pub outcome: String,
220    pub error_msg: Option<String>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
224pub struct DispatchCancelRequest {
225    pub binding_key: String,
226    pub event_id: String,
227    #[serde(with = "time::serde::rfc3339")]
228    pub requested_at: time::OffsetDateTime,
229    #[serde(default)]
230    pub requested_by: Option<String>,
231    #[serde(default)]
232    pub audit_id: Option<String>,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct DlqEntry {
237    pub trigger_id: String,
238    pub binding_key: String,
239    pub event: TriggerEvent,
240    pub attempt_count: u32,
241    pub final_error: String,
242    pub attempts: Vec<DispatchAttemptRecord>,
243}
244
245#[derive(Default)]
246struct AcquiredFlowControl {
247    singleton_gate: Option<String>,
248    concurrency: Option<ConcurrencyPermit>,
249}
250
251enum FlowControlOutcome {
252    Dispatch {
253        event: Box<TriggerEvent>,
254        acquired: AcquiredFlowControl,
255    },
256    Skip {
257        reason: String,
258    },
259}
260
261#[derive(Debug)]
262pub enum DispatchError {
263    EventLog(String),
264    Registry(String),
265    Serde(String),
266    Local(String),
267    A2a(String),
268    Denied(String),
269    Timeout(String),
270    Cancelled(String),
271    NotImplemented(String),
272}
273
274impl std::fmt::Display for DispatchError {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Self::EventLog(message)
278            | Self::Registry(message)
279            | Self::Serde(message)
280            | Self::Local(message)
281            | Self::A2a(message)
282            | Self::Denied(message)
283            | Self::Timeout(message)
284            | Self::Cancelled(message)
285            | Self::NotImplemented(message) => f.write_str(message),
286        }
287    }
288}
289
290impl std::error::Error for DispatchError {}
291
292impl DispatchError {
293    fn retryable(&self) -> bool {
294        !matches!(
295            self,
296            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_)
297        )
298    }
299}
300
301impl From<LogError> for DispatchError {
302    fn from(value: LogError) -> Self {
303        Self::EventLog(value.to_string())
304    }
305}
306
307pub async fn append_dispatch_cancel_request(
308    event_log: &Arc<AnyEventLog>,
309    request: &DispatchCancelRequest,
310) -> Result<u64, DispatchError> {
311    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
312        .expect("static trigger cancel topic should always be valid");
313    event_log
314        .append(
315            &topic,
316            LogEvent::new(
317                "dispatch_cancel_requested",
318                serde_json::to_value(request)
319                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
320            ),
321        )
322        .await
323        .map_err(DispatchError::from)
324}
325
326impl Dispatcher {
327    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
328        let event_log = active_event_log().ok_or_else(|| {
329            DispatchError::EventLog("dispatcher requires an active event log".to_string())
330        })?;
331        Ok(Self::with_event_log(base_vm, event_log))
332    }
333
334    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
335        Self::with_event_log_and_metrics(base_vm, event_log, None)
336    }
337
338    pub fn with_event_log_and_metrics(
339        base_vm: Vm,
340        event_log: Arc<AnyEventLog>,
341        metrics: Option<Arc<crate::MetricsRegistry>>,
342    ) -> Self {
343        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
344        ACTIVE_DISPATCHER_STATE.with(|slot| {
345            *slot.borrow_mut() = Some(state.clone());
346        });
347        let (cancel_tx, _) = broadcast::channel(32);
348        Self {
349            base_vm: Rc::new(base_vm),
350            event_log,
351            cancel_tx,
352            state,
353            metrics,
354        }
355    }
356
357    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
358        DispatcherStatsSnapshot {
359            in_flight: self.state.in_flight.load(Ordering::Relaxed),
360            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
361            dlq_depth: self
362                .state
363                .dlq
364                .lock()
365                .expect("dispatcher dlq poisoned")
366                .len() as u64,
367        }
368    }
369
370    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
371        self.state
372            .dlq
373            .lock()
374            .expect("dispatcher dlq poisoned")
375            .clone()
376    }
377
378    pub fn shutdown(&self) {
379        self.state.shutting_down.store(true, Ordering::SeqCst);
380        for token in self
381            .state
382            .cancel_tokens
383            .lock()
384            .expect("dispatcher cancel tokens poisoned")
385            .iter()
386        {
387            token.store(true, Ordering::SeqCst);
388        }
389        let _ = self.cancel_tx.send(());
390    }
391
392    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
393        self.enqueue_targeted(None, None, event).await
394    }
395
396    pub async fn enqueue_targeted(
397        &self,
398        trigger_id: Option<String>,
399        binding_version: Option<u32>,
400        event: TriggerEvent,
401    ) -> Result<u64, DispatchError> {
402        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
403            .expect("static trigger inbox envelopes topic is valid");
404        let headers = event_headers(&event, None, None, None);
405        let payload = serde_json::to_value(InboxEnvelope {
406            trigger_id,
407            binding_version,
408            event,
409        })
410        .map_err(|error| DispatchError::Serde(error.to_string()))?;
411        self.event_log
412            .append(
413                &topic,
414                LogEvent::new("event_ingested", payload).with_headers(headers),
415            )
416            .await
417            .map_err(DispatchError::from)
418    }
419
420    pub async fn run(&self) -> Result<(), DispatchError> {
421        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
422            .expect("static trigger inbox envelopes topic is valid");
423        let start_from = self.event_log.latest(&topic).await?;
424        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
425        pin_mut!(stream);
426        let mut cancel_rx = self.cancel_tx.subscribe();
427
428        loop {
429            tokio::select! {
430                received = stream.next() => {
431                    let Some(received) = received else {
432                        break;
433                    };
434                    let (_, event) = received.map_err(DispatchError::from)?;
435                    if event.kind != "event_ingested" {
436                        continue;
437                    }
438                    let parent_headers = event.headers.clone();
439                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
440                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
441                    notify_test_inbox_dequeued();
442                    let _ = self
443                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
444                        .await;
445                }
446                _ = recv_cancel(&mut cancel_rx) => break,
447            }
448        }
449
450        Ok(())
451    }
452
453    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
454        let deadline = tokio::time::Instant::now() + timeout;
455        loop {
456            let snapshot = self.snapshot();
457            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
458                return Ok(DispatcherDrainReport {
459                    drained: true,
460                    in_flight: snapshot.in_flight,
461                    retry_queue_depth: snapshot.retry_queue_depth,
462                    dlq_depth: snapshot.dlq_depth,
463                });
464            }
465
466            let notified = self.state.idle_notify.notified();
467            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
468            if remaining.is_zero() {
469                return Ok(DispatcherDrainReport {
470                    drained: false,
471                    in_flight: snapshot.in_flight,
472                    retry_queue_depth: snapshot.retry_queue_depth,
473                    dlq_depth: snapshot.dlq_depth,
474                });
475            }
476            if tokio::time::timeout(remaining, notified).await.is_err() {
477                let snapshot = self.snapshot();
478                return Ok(DispatcherDrainReport {
479                    drained: false,
480                    in_flight: snapshot.in_flight,
481                    retry_queue_depth: snapshot.retry_queue_depth,
482                    dlq_depth: snapshot.dlq_depth,
483                });
484            }
485        }
486    }
487
488    pub async fn dispatch_inbox_envelope(
489        &self,
490        envelope: InboxEnvelope,
491    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
492        self.dispatch_inbox_envelope_with_headers(envelope, None)
493            .await
494    }
495
496    async fn dispatch_inbox_envelope_with_headers(
497        &self,
498        envelope: InboxEnvelope,
499        parent_headers: Option<&BTreeMap<String, String>>,
500    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
501        if let Some(trigger_id) = envelope.trigger_id {
502            let binding = super::registry::resolve_live_trigger_binding(
503                &trigger_id,
504                envelope.binding_version,
505            )
506            .map_err(|error| DispatchError::Registry(error.to_string()))?;
507            return Ok(vec![
508                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
509                    .await?,
510            ]);
511        }
512
513        let cron_target = match &envelope.event.provider_payload {
514            crate::triggers::ProviderPayload::Known(
515                crate::triggers::event::KnownProviderPayload::Cron(payload),
516            ) => payload.cron_id.clone(),
517            _ => None,
518        };
519        if let Some(trigger_id) = cron_target {
520            let binding = super::registry::resolve_live_trigger_binding(
521                &trigger_id,
522                envelope.binding_version,
523            )
524            .map_err(|error| DispatchError::Registry(error.to_string()))?;
525            return Ok(vec![
526                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
527                    .await?,
528            ]);
529        }
530
531        self.dispatch_event(envelope.event).await
532    }
533
534    pub async fn dispatch_event(
535        &self,
536        event: TriggerEvent,
537    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
538        let bindings = matching_bindings(&event);
539        let mut outcomes = Vec::new();
540        for binding in bindings {
541            outcomes.push(self.dispatch(&binding, event.clone()).await?);
542        }
543        Ok(outcomes)
544    }
545
546    pub async fn dispatch(
547        &self,
548        binding: &TriggerBinding,
549        event: TriggerEvent,
550    ) -> Result<DispatchOutcome, DispatchError> {
551        self.dispatch_with_replay(binding, event, None, None, None)
552            .await
553    }
554
555    pub async fn dispatch_replay(
556        &self,
557        binding: &TriggerBinding,
558        event: TriggerEvent,
559        replay_of_event_id: String,
560    ) -> Result<DispatchOutcome, DispatchError> {
561        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
562            .await
563    }
564
565    pub async fn dispatch_with_parent_span_id(
566        &self,
567        binding: &TriggerBinding,
568        event: TriggerEvent,
569        parent_span_id: Option<String>,
570    ) -> Result<DispatchOutcome, DispatchError> {
571        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
572            .await
573    }
574
575    async fn dispatch_with_replay(
576        &self,
577        binding: &TriggerBinding,
578        event: TriggerEvent,
579        replay_of_event_id: Option<String>,
580        parent_span_id: Option<String>,
581        parent_headers: Option<&BTreeMap<String, String>>,
582    ) -> Result<DispatchOutcome, DispatchError> {
583        let span = tracing::info_span!(
584            "dispatch",
585            trigger_id = %binding.id.as_str(),
586            binding_version = binding.version,
587            trace_id = %event.trace_id.0
588        );
589        #[cfg(feature = "otel")]
590        let span_for_otel = span.clone();
591        let _ = if let Some(headers) = parent_headers {
592            crate::observability::otel::set_span_parent_from_headers(
593                &span,
594                headers,
595                &event.trace_id,
596                parent_span_id.as_deref(),
597            )
598        } else {
599            crate::observability::otel::set_span_parent(
600                &span,
601                &event.trace_id,
602                parent_span_id.as_deref(),
603            )
604        };
605        #[cfg(feature = "otel")]
606        let started_at = Instant::now();
607        let metrics = self.metrics.clone();
608        let outcome = ACTIVE_DISPATCH_IS_REPLAY
609            .scope(
610                replay_of_event_id.is_some(),
611                self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
612                    .instrument(span),
613            )
614            .await;
615        if let Some(metrics) = metrics.as_ref() {
616            match &outcome {
617                Ok(dispatch_outcome) => match dispatch_outcome.status {
618                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
619                        metrics.record_dispatch_succeeded();
620                    }
621                    _ => metrics.record_dispatch_failed(),
622                },
623                Err(_) => metrics.record_dispatch_failed(),
624            }
625        }
626        #[cfg(feature = "otel")]
627        {
628            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
629
630            let duration_ms = started_at.elapsed().as_millis() as i64;
631            let status = match &outcome {
632                Ok(dispatch_outcome) => match dispatch_outcome.status {
633                    DispatchStatus::Succeeded => "succeeded",
634                    DispatchStatus::Skipped => "skipped",
635                    DispatchStatus::Cancelled => "cancelled",
636                    DispatchStatus::Failed => "failed",
637                    DispatchStatus::Dlq => "dlq",
638                },
639                Err(DispatchError::Cancelled(_)) => "cancelled",
640                Err(_) => "failed",
641            };
642            span_for_otel.set_attribute("result.status", status);
643            span_for_otel.set_attribute("result.duration_ms", duration_ms);
644        }
645        outcome
646    }
647
648    async fn dispatch_with_replay_inner(
649        &self,
650        binding: &TriggerBinding,
651        event: TriggerEvent,
652        replay_of_event_id: Option<String>,
653    ) -> Result<DispatchOutcome, DispatchError> {
654        let autonomy_tier = crate::resolve_agent_autonomy_tier(
655            &self.event_log,
656            binding.id.as_str(),
657            binding.autonomy_tier,
658        )
659        .await
660        .unwrap_or(binding.autonomy_tier);
661        let binding_key = binding.binding_key();
662        let route = DispatchUri::from(&binding.handler);
663        let trigger_id = binding.id.as_str().to_string();
664        let event_id = event.id.0.clone();
665        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
666        let begin = if replay_of_event_id.is_some() {
667            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
668        } else {
669            begin_in_flight(binding.id.as_str(), binding.version)
670        };
671        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
672
673        let mut attempts = Vec::new();
674        let mut source_node_id = format!("trigger:{}", event.id.0);
675        let mut initial_nodes = Vec::new();
676        let mut initial_edges = Vec::new();
677        if let Some(original_event_id) = replay_of_event_id.as_ref() {
678            let original_node_id = format!("trigger:{original_event_id}");
679            initial_nodes.push(RunActionGraphNodeRecord {
680                id: original_node_id.clone(),
681                label: format!(
682                    "{}:{} (original {})",
683                    event.provider.as_str(),
684                    event.kind,
685                    original_event_id
686                ),
687                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
688                status: "historical".to_string(),
689                outcome: "replayed_from".to_string(),
690                trace_id: Some(event.trace_id.0.clone()),
691                stage_id: None,
692                node_id: None,
693                worker_id: None,
694                run_id: None,
695                run_path: None,
696            });
697            initial_edges.push(RunActionGraphEdgeRecord {
698                from_id: original_node_id,
699                to_id: source_node_id.clone(),
700                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
701                label: Some("replay chain".to_string()),
702            });
703        }
704        initial_nodes.push(RunActionGraphNodeRecord {
705            id: source_node_id.clone(),
706            label: format!("{}:{}", event.provider.as_str(), event.kind),
707            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
708            status: "received".to_string(),
709            outcome: "received".to_string(),
710            trace_id: Some(event.trace_id.0.clone()),
711            stage_id: None,
712            node_id: None,
713            worker_id: None,
714            run_id: None,
715            run_path: None,
716        });
717        self.emit_action_graph(
718            &event,
719            initial_nodes,
720            initial_edges,
721            serde_json::json!({
722                "source": "dispatcher",
723                "trigger_id": trigger_id,
724                "binding_key": binding_key,
725                "event_id": event_id,
726                "replay_of_event_id": replay_of_event_id,
727            }),
728        )
729        .await?;
730
731        if dispatch_cancel_requested(
732            &self.event_log,
733            &binding_key,
734            &event.id.0,
735            replay_of_event_id.as_ref(),
736        )
737        .await?
738        {
739            finish_in_flight(
740                binding.id.as_str(),
741                binding.version,
742                TriggerDispatchOutcome::Failed,
743            )
744            .await
745            .map_err(|error| DispatchError::Registry(error.to_string()))?;
746            decrement_in_flight(&self.state);
747            return Ok(cancelled_dispatch_outcome(
748                binding,
749                &route,
750                &event,
751                replay_of_event_id,
752                0,
753                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
754            ));
755        }
756
757        if let Some(predicate) = binding.when.as_ref() {
758            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
759            let evaluation = self
760                .evaluate_predicate(
761                    binding,
762                    predicate,
763                    &event,
764                    replay_of_event_id.as_ref(),
765                    autonomy_tier,
766                )
767                .await?;
768            let passed = evaluation.result;
769            self.emit_action_graph(
770                &event,
771                vec![RunActionGraphNodeRecord {
772                    id: predicate_node_id.clone(),
773                    label: predicate.raw.clone(),
774                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
775                    status: "completed".to_string(),
776                    outcome: passed.to_string(),
777                    trace_id: Some(event.trace_id.0.clone()),
778                    stage_id: None,
779                    node_id: None,
780                    worker_id: None,
781                    run_id: None,
782                    run_path: None,
783                }],
784                vec![RunActionGraphEdgeRecord {
785                    from_id: source_node_id.clone(),
786                    to_id: predicate_node_id.clone(),
787                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
788                    label: None,
789                }],
790                serde_json::json!({
791                    "source": "dispatcher",
792                    "trigger_id": binding.id.as_str(),
793                    "binding_key": binding.binding_key(),
794                    "event_id": event.id.0,
795                    "predicate": predicate.raw,
796                    "reason": evaluation.reason,
797                    "cached": evaluation.cached,
798                    "cost_usd": evaluation.cost_usd,
799                    "tokens": evaluation.tokens,
800                    "latency_ms": evaluation.latency_ms,
801                    "replay_of_event_id": replay_of_event_id,
802                }),
803            )
804            .await?;
805
806            if !passed {
807                finish_in_flight(
808                    binding.id.as_str(),
809                    binding.version,
810                    TriggerDispatchOutcome::Dispatched,
811                )
812                .await
813                .map_err(|error| DispatchError::Registry(error.to_string()))?;
814                decrement_in_flight(&self.state);
815                self.append_dispatch_trust_record(
816                    binding,
817                    &route,
818                    &event,
819                    replay_of_event_id.as_ref(),
820                    autonomy_tier,
821                    TrustOutcome::Denied,
822                    "skipped",
823                    0,
824                    None,
825                )
826                .await?;
827                return Ok(DispatchOutcome {
828                    trigger_id: binding.id.as_str().to_string(),
829                    binding_key: binding.binding_key(),
830                    event_id: event.id.0,
831                    attempt_count: 0,
832                    status: DispatchStatus::Skipped,
833                    handler_kind: route.kind().to_string(),
834                    target_uri: route.target_uri(),
835                    replay_of_event_id,
836                    result: Some(serde_json::json!({
837                        "skipped": true,
838                        "predicate": predicate.raw,
839                        "reason": evaluation.reason,
840                    })),
841                    error: None,
842                });
843            }
844
845            source_node_id = predicate_node_id;
846        }
847
848        let (event, mut acquired_flow) = match self
849            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
850            .await?
851        {
852            FlowControlOutcome::Dispatch { event, acquired } => (*event, acquired),
853            FlowControlOutcome::Skip { reason } => {
854                finish_in_flight(
855                    binding.id.as_str(),
856                    binding.version,
857                    TriggerDispatchOutcome::Dispatched,
858                )
859                .await
860                .map_err(|error| DispatchError::Registry(error.to_string()))?;
861                decrement_in_flight(&self.state);
862                return Ok(DispatchOutcome {
863                    trigger_id: binding.id.as_str().to_string(),
864                    binding_key: binding.binding_key(),
865                    event_id: event.id.0,
866                    attempt_count: 0,
867                    status: DispatchStatus::Skipped,
868                    handler_kind: route.kind().to_string(),
869                    target_uri: route.target_uri(),
870                    replay_of_event_id,
871                    result: Some(serde_json::json!({
872                        "skipped": true,
873                        "flow_control": reason,
874                    })),
875                    error: None,
876                });
877            }
878        };
879
880        let mut previous_retry_node = None;
881        let max_attempts = binding.retry.max_attempts();
882        for attempt in 1..=max_attempts {
883            if dispatch_cancel_requested(
884                &self.event_log,
885                &binding_key,
886                &event.id.0,
887                replay_of_event_id.as_ref(),
888            )
889            .await?
890            {
891                finish_in_flight(
892                    binding.id.as_str(),
893                    binding.version,
894                    TriggerDispatchOutcome::Failed,
895                )
896                .await
897                .map_err(|error| DispatchError::Registry(error.to_string()))?;
898                decrement_in_flight(&self.state);
899                return Ok(cancelled_dispatch_outcome(
900                    binding,
901                    &route,
902                    &event,
903                    replay_of_event_id,
904                    attempt.saturating_sub(1),
905                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
906                ));
907            }
908            maybe_fail_before_outbox();
909            let started_at = now_rfc3339();
910            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
911            self.append_lifecycle_event(
912                "DispatchStarted",
913                &event,
914                binding,
915                serde_json::json!({
916                    "event_id": event.id.0,
917                    "attempt": attempt,
918                    "handler_kind": route.kind(),
919                    "target_uri": route.target_uri(),
920                    "replay_of_event_id": replay_of_event_id,
921                }),
922                replay_of_event_id.as_ref(),
923            )
924            .await?;
925            self.append_topic_event(
926                TRIGGER_OUTBOX_TOPIC,
927                "dispatch_started",
928                &event,
929                Some(binding),
930                Some(attempt),
931                serde_json::json!({
932                    "event_id": event.id.0,
933                    "attempt": attempt,
934                    "trigger_id": binding.id.as_str(),
935                    "binding_key": binding.binding_key(),
936                    "handler_kind": route.kind(),
937                    "target_uri": route.target_uri(),
938                    "replay_of_event_id": replay_of_event_id,
939                }),
940                replay_of_event_id.as_ref(),
941            )
942            .await?;
943
944            let mut dispatch_edges = Vec::new();
945            if attempt == 1 {
946                dispatch_edges.push(RunActionGraphEdgeRecord {
947                    from_id: source_node_id.clone(),
948                    to_id: attempt_node_id.clone(),
949                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
950                    label: binding.when.as_ref().map(|_| "true".to_string()),
951                });
952            } else if let Some(retry_node_id) = previous_retry_node.take() {
953                dispatch_edges.push(RunActionGraphEdgeRecord {
954                    from_id: retry_node_id,
955                    to_id: attempt_node_id.clone(),
956                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
957                    label: Some(format!("attempt {attempt}")),
958                });
959            }
960
961            self.emit_action_graph(
962                &event,
963                vec![RunActionGraphNodeRecord {
964                    id: attempt_node_id.clone(),
965                    label: dispatch_node_label(&route),
966                    kind: dispatch_node_kind(&route).to_string(),
967                    status: "running".to_string(),
968                    outcome: format!("attempt_{attempt}"),
969                    trace_id: Some(event.trace_id.0.clone()),
970                    stage_id: None,
971                    node_id: None,
972                    worker_id: None,
973                    run_id: None,
974                    run_path: None,
975                }],
976                dispatch_edges,
977                serde_json::json!({
978                    "source": "dispatcher",
979                    "trigger_id": binding.id.as_str(),
980                    "binding_key": binding.binding_key(),
981                    "event_id": event.id.0,
982                    "attempt": attempt,
983                    "handler_kind": route.kind(),
984                    "target_uri": route.target_uri(),
985                    "target_agent": dispatch_target_agent(&route),
986                    "replay_of_event_id": replay_of_event_id,
987                }),
988            )
989            .await?;
990
991            let result = self
992                .dispatch_once(
993                    binding,
994                    &route,
995                    &event,
996                    autonomy_tier,
997                    &mut self.cancel_tx.subscribe(),
998                )
999                .await;
1000            let completed_at = now_rfc3339();
1001
1002            match result {
1003                Ok(result) => {
1004                    let attempt_record = DispatchAttemptRecord {
1005                        trigger_id: binding.id.as_str().to_string(),
1006                        binding_key: binding.binding_key(),
1007                        event_id: event.id.0.clone(),
1008                        attempt,
1009                        handler_kind: route.kind().to_string(),
1010                        started_at,
1011                        completed_at,
1012                        outcome: "success".to_string(),
1013                        error_msg: None,
1014                    };
1015                    attempts.push(attempt_record.clone());
1016                    self.append_attempt_record(
1017                        &event,
1018                        binding,
1019                        &attempt_record,
1020                        replay_of_event_id.as_ref(),
1021                    )
1022                    .await?;
1023                    self.append_lifecycle_event(
1024                        "DispatchSucceeded",
1025                        &event,
1026                        binding,
1027                        serde_json::json!({
1028                            "event_id": event.id.0,
1029                            "attempt": attempt,
1030                            "handler_kind": route.kind(),
1031                            "target_uri": route.target_uri(),
1032                            "result": result,
1033                            "replay_of_event_id": replay_of_event_id,
1034                        }),
1035                        replay_of_event_id.as_ref(),
1036                    )
1037                    .await?;
1038                    self.append_topic_event(
1039                        TRIGGER_OUTBOX_TOPIC,
1040                        "dispatch_succeeded",
1041                        &event,
1042                        Some(binding),
1043                        Some(attempt),
1044                        serde_json::json!({
1045                            "event_id": event.id.0,
1046                            "attempt": attempt,
1047                            "trigger_id": binding.id.as_str(),
1048                            "binding_key": binding.binding_key(),
1049                            "handler_kind": route.kind(),
1050                            "target_uri": route.target_uri(),
1051                            "result": result,
1052                            "replay_of_event_id": replay_of_event_id,
1053                        }),
1054                        replay_of_event_id.as_ref(),
1055                    )
1056                    .await?;
1057                    finish_in_flight(
1058                        binding.id.as_str(),
1059                        binding.version,
1060                        TriggerDispatchOutcome::Dispatched,
1061                    )
1062                    .await
1063                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1064                    self.release_flow_control(&mut acquired_flow).await?;
1065                    decrement_in_flight(&self.state);
1066                    self.append_dispatch_trust_record(
1067                        binding,
1068                        &route,
1069                        &event,
1070                        replay_of_event_id.as_ref(),
1071                        autonomy_tier,
1072                        TrustOutcome::Success,
1073                        "succeeded",
1074                        attempt,
1075                        None,
1076                    )
1077                    .await?;
1078                    return Ok(DispatchOutcome {
1079                        trigger_id: binding.id.as_str().to_string(),
1080                        binding_key: binding.binding_key(),
1081                        event_id: event.id.0,
1082                        attempt_count: attempt,
1083                        status: DispatchStatus::Succeeded,
1084                        handler_kind: route.kind().to_string(),
1085                        target_uri: route.target_uri(),
1086                        replay_of_event_id,
1087                        result: Some(result),
1088                        error: None,
1089                    });
1090                }
1091                Err(error) => {
1092                    let attempt_record = DispatchAttemptRecord {
1093                        trigger_id: binding.id.as_str().to_string(),
1094                        binding_key: binding.binding_key(),
1095                        event_id: event.id.0.clone(),
1096                        attempt,
1097                        handler_kind: route.kind().to_string(),
1098                        started_at,
1099                        completed_at,
1100                        outcome: dispatch_error_label(&error).to_string(),
1101                        error_msg: Some(error.to_string()),
1102                    };
1103                    attempts.push(attempt_record.clone());
1104                    self.append_attempt_record(
1105                        &event,
1106                        binding,
1107                        &attempt_record,
1108                        replay_of_event_id.as_ref(),
1109                    )
1110                    .await?;
1111                    self.append_lifecycle_event(
1112                        "DispatchFailed",
1113                        &event,
1114                        binding,
1115                        serde_json::json!({
1116                            "event_id": event.id.0,
1117                            "attempt": attempt,
1118                            "handler_kind": route.kind(),
1119                            "target_uri": route.target_uri(),
1120                            "error": error.to_string(),
1121                            "replay_of_event_id": replay_of_event_id,
1122                        }),
1123                        replay_of_event_id.as_ref(),
1124                    )
1125                    .await?;
1126                    self.append_topic_event(
1127                        TRIGGER_OUTBOX_TOPIC,
1128                        "dispatch_failed",
1129                        &event,
1130                        Some(binding),
1131                        Some(attempt),
1132                        serde_json::json!({
1133                            "event_id": event.id.0,
1134                            "attempt": attempt,
1135                            "trigger_id": binding.id.as_str(),
1136                            "binding_key": binding.binding_key(),
1137                            "handler_kind": route.kind(),
1138                            "target_uri": route.target_uri(),
1139                            "error": error.to_string(),
1140                            "replay_of_event_id": replay_of_event_id,
1141                        }),
1142                        replay_of_event_id.as_ref(),
1143                    )
1144                    .await?;
1145
1146                    if !error.retryable() {
1147                        finish_in_flight(
1148                            binding.id.as_str(),
1149                            binding.version,
1150                            TriggerDispatchOutcome::Failed,
1151                        )
1152                        .await
1153                        .map_err(|registry_error| {
1154                            DispatchError::Registry(registry_error.to_string())
1155                        })?;
1156                        self.release_flow_control(&mut acquired_flow).await?;
1157                        decrement_in_flight(&self.state);
1158                        let trust_outcome = match error {
1159                            DispatchError::Denied(_) => TrustOutcome::Denied,
1160                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
1161                            _ => TrustOutcome::Failure,
1162                        };
1163                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1164                            "cancelled"
1165                        } else {
1166                            "failed"
1167                        };
1168                        self.append_dispatch_trust_record(
1169                            binding,
1170                            &route,
1171                            &event,
1172                            replay_of_event_id.as_ref(),
1173                            autonomy_tier,
1174                            trust_outcome,
1175                            terminal_status,
1176                            attempt,
1177                            Some(error.to_string()),
1178                        )
1179                        .await?;
1180                        return Ok(DispatchOutcome {
1181                            trigger_id: binding.id.as_str().to_string(),
1182                            binding_key: binding.binding_key(),
1183                            event_id: event.id.0,
1184                            attempt_count: attempt,
1185                            status: if matches!(error, DispatchError::Cancelled(_)) {
1186                                DispatchStatus::Cancelled
1187                            } else {
1188                                DispatchStatus::Failed
1189                            },
1190                            handler_kind: route.kind().to_string(),
1191                            target_uri: route.target_uri(),
1192                            replay_of_event_id,
1193                            result: None,
1194                            error: Some(error.to_string()),
1195                        });
1196                    }
1197
1198                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1199                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1200                        previous_retry_node = Some(retry_node_id.clone());
1201                        self.emit_action_graph(
1202                            &event,
1203                            vec![RunActionGraphNodeRecord {
1204                                id: retry_node_id.clone(),
1205                                label: format!("retry in {}ms", delay.as_millis()),
1206                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1207                                status: "scheduled".to_string(),
1208                                outcome: format!("attempt_{}", attempt + 1),
1209                                trace_id: Some(event.trace_id.0.clone()),
1210                                stage_id: None,
1211                                node_id: None,
1212                                worker_id: None,
1213                                run_id: None,
1214                                run_path: None,
1215                            }],
1216                            vec![RunActionGraphEdgeRecord {
1217                                from_id: attempt_node_id,
1218                                to_id: retry_node_id.clone(),
1219                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1220                                label: Some(format!("attempt {}", attempt + 1)),
1221                            }],
1222                            serde_json::json!({
1223                                "source": "dispatcher",
1224                                "trigger_id": binding.id.as_str(),
1225                                "binding_key": binding.binding_key(),
1226                                "event_id": event.id.0,
1227                                "attempt": attempt + 1,
1228                                "delay_ms": delay.as_millis(),
1229                                "replay_of_event_id": replay_of_event_id,
1230                            }),
1231                        )
1232                        .await?;
1233                        self.append_lifecycle_event(
1234                            "RetryScheduled",
1235                            &event,
1236                            binding,
1237                            serde_json::json!({
1238                                "event_id": event.id.0,
1239                                "attempt": attempt + 1,
1240                                "delay_ms": delay.as_millis(),
1241                                "error": error.to_string(),
1242                                "replay_of_event_id": replay_of_event_id,
1243                            }),
1244                            replay_of_event_id.as_ref(),
1245                        )
1246                        .await?;
1247                        self.append_topic_event(
1248                            TRIGGER_ATTEMPTS_TOPIC,
1249                            "retry_scheduled",
1250                            &event,
1251                            Some(binding),
1252                            Some(attempt + 1),
1253                            serde_json::json!({
1254                                "event_id": event.id.0,
1255                                "attempt": attempt + 1,
1256                                "trigger_id": binding.id.as_str(),
1257                                "binding_key": binding.binding_key(),
1258                                "delay_ms": delay.as_millis(),
1259                                "error": error.to_string(),
1260                                "replay_of_event_id": replay_of_event_id,
1261                            }),
1262                            replay_of_event_id.as_ref(),
1263                        )
1264                        .await?;
1265                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1266                        let sleep_result = sleep_or_cancel_or_request(
1267                            &self.event_log,
1268                            delay,
1269                            &binding_key,
1270                            &event.id.0,
1271                            replay_of_event_id.as_ref(),
1272                            &mut self.cancel_tx.subscribe(),
1273                        )
1274                        .await;
1275                        decrement_retry_queue_depth(&self.state);
1276                        if sleep_result.is_err() {
1277                            finish_in_flight(
1278                                binding.id.as_str(),
1279                                binding.version,
1280                                TriggerDispatchOutcome::Failed,
1281                            )
1282                            .await
1283                            .map_err(|registry_error| {
1284                                DispatchError::Registry(registry_error.to_string())
1285                            })?;
1286                            self.release_flow_control(&mut acquired_flow).await?;
1287                            decrement_in_flight(&self.state);
1288                            self.append_dispatch_trust_record(
1289                                binding,
1290                                &route,
1291                                &event,
1292                                replay_of_event_id.as_ref(),
1293                                autonomy_tier,
1294                                TrustOutcome::Failure,
1295                                "cancelled",
1296                                attempt,
1297                                Some("dispatcher shutdown cancelled retry wait".to_string()),
1298                            )
1299                            .await?;
1300                            return Ok(DispatchOutcome {
1301                                trigger_id: binding.id.as_str().to_string(),
1302                                binding_key: binding.binding_key(),
1303                                event_id: event.id.0,
1304                                attempt_count: attempt,
1305                                status: DispatchStatus::Cancelled,
1306                                handler_kind: route.kind().to_string(),
1307                                target_uri: route.target_uri(),
1308                                replay_of_event_id,
1309                                result: None,
1310                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1311                            });
1312                        }
1313                        continue;
1314                    }
1315
1316                    let final_error = error.to_string();
1317                    let dlq_entry = DlqEntry {
1318                        trigger_id: binding.id.as_str().to_string(),
1319                        binding_key: binding.binding_key(),
1320                        event: event.clone(),
1321                        attempt_count: attempt,
1322                        final_error: final_error.clone(),
1323                        attempts: attempts.clone(),
1324                    };
1325                    self.state
1326                        .dlq
1327                        .lock()
1328                        .expect("dispatcher dlq poisoned")
1329                        .push(dlq_entry.clone());
1330                    self.emit_action_graph(
1331                        &event,
1332                        vec![RunActionGraphNodeRecord {
1333                            id: format!("dlq:{binding_key}:{}", event.id.0),
1334                            label: binding.id.as_str().to_string(),
1335                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1336                            status: "queued".to_string(),
1337                            outcome: "retry_exhausted".to_string(),
1338                            trace_id: Some(event.trace_id.0.clone()),
1339                            stage_id: None,
1340                            node_id: None,
1341                            worker_id: None,
1342                            run_id: None,
1343                            run_path: None,
1344                        }],
1345                        vec![RunActionGraphEdgeRecord {
1346                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1347                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
1348                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1349                            label: Some(format!("{attempt} attempts")),
1350                        }],
1351                        serde_json::json!({
1352                            "source": "dispatcher",
1353                            "trigger_id": binding.id.as_str(),
1354                            "binding_key": binding.binding_key(),
1355                            "event_id": event.id.0,
1356                            "attempt_count": attempt,
1357                            "final_error": final_error,
1358                            "replay_of_event_id": replay_of_event_id,
1359                        }),
1360                    )
1361                    .await?;
1362                    self.append_lifecycle_event(
1363                        "DlqMoved",
1364                        &event,
1365                        binding,
1366                        serde_json::json!({
1367                            "event_id": event.id.0,
1368                            "attempt_count": attempt,
1369                            "final_error": dlq_entry.final_error,
1370                            "replay_of_event_id": replay_of_event_id,
1371                        }),
1372                        replay_of_event_id.as_ref(),
1373                    )
1374                    .await?;
1375                    self.append_topic_event(
1376                        TRIGGER_DLQ_TOPIC,
1377                        "dlq_moved",
1378                        &event,
1379                        Some(binding),
1380                        Some(attempt),
1381                        serde_json::to_value(&dlq_entry)
1382                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1383                        replay_of_event_id.as_ref(),
1384                    )
1385                    .await?;
1386                    finish_in_flight(
1387                        binding.id.as_str(),
1388                        binding.version,
1389                        TriggerDispatchOutcome::Dlq,
1390                    )
1391                    .await
1392                    .map_err(|registry_error| {
1393                        DispatchError::Registry(registry_error.to_string())
1394                    })?;
1395                    self.release_flow_control(&mut acquired_flow).await?;
1396                    decrement_in_flight(&self.state);
1397                    self.append_dispatch_trust_record(
1398                        binding,
1399                        &route,
1400                        &event,
1401                        replay_of_event_id.as_ref(),
1402                        autonomy_tier,
1403                        TrustOutcome::Failure,
1404                        "dlq",
1405                        attempt,
1406                        Some(error.to_string()),
1407                    )
1408                    .await?;
1409                    return Ok(DispatchOutcome {
1410                        trigger_id: binding.id.as_str().to_string(),
1411                        binding_key: binding.binding_key(),
1412                        event_id: event.id.0,
1413                        attempt_count: attempt,
1414                        status: DispatchStatus::Dlq,
1415                        handler_kind: route.kind().to_string(),
1416                        target_uri: route.target_uri(),
1417                        replay_of_event_id,
1418                        result: None,
1419                        error: Some(error.to_string()),
1420                    });
1421                }
1422            }
1423        }
1424
1425        finish_in_flight(
1426            binding.id.as_str(),
1427            binding.version,
1428            TriggerDispatchOutcome::Failed,
1429        )
1430        .await
1431        .map_err(|error| DispatchError::Registry(error.to_string()))?;
1432        self.release_flow_control(&mut acquired_flow).await?;
1433        decrement_in_flight(&self.state);
1434        self.append_dispatch_trust_record(
1435            binding,
1436            &route,
1437            &event,
1438            replay_of_event_id.as_ref(),
1439            autonomy_tier,
1440            TrustOutcome::Failure,
1441            "failed",
1442            max_attempts,
1443            Some("dispatch exhausted without terminal outcome".to_string()),
1444        )
1445        .await?;
1446        Ok(DispatchOutcome {
1447            trigger_id: binding.id.as_str().to_string(),
1448            binding_key: binding.binding_key(),
1449            event_id: event.id.0,
1450            attempt_count: max_attempts,
1451            status: DispatchStatus::Failed,
1452            handler_kind: route.kind().to_string(),
1453            target_uri: route.target_uri(),
1454            replay_of_event_id,
1455            result: None,
1456            error: Some("dispatch exhausted without terminal outcome".to_string()),
1457        })
1458    }
1459
1460    async fn dispatch_once(
1461        &self,
1462        binding: &TriggerBinding,
1463        route: &DispatchUri,
1464        event: &TriggerEvent,
1465        autonomy_tier: AutonomyTier,
1466        cancel_rx: &mut broadcast::Receiver<()>,
1467    ) -> Result<serde_json::Value, DispatchError> {
1468        match route {
1469            DispatchUri::Local { .. } => {
1470                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1471                    return Err(DispatchError::Local(format!(
1472                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1473                        binding.id.as_str()
1474                    )));
1475                };
1476                let value = self
1477                    .invoke_vm_callable(
1478                        closure,
1479                        &binding.binding_key(),
1480                        event,
1481                        None,
1482                        binding.id.as_str(),
1483                        &format!("{}.{}", event.provider.as_str(), event.kind),
1484                        autonomy_tier,
1485                        cancel_rx,
1486                    )
1487                    .await?;
1488                Ok(vm_value_to_json(&value))
1489            }
1490            DispatchUri::A2a {
1491                target,
1492                allow_cleartext,
1493            } => {
1494                if self.state.shutting_down.load(Ordering::SeqCst) {
1495                    return Err(DispatchError::Cancelled(
1496                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
1497                    ));
1498                }
1499                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1500                    target,
1501                    *allow_cleartext,
1502                    binding.id.as_str(),
1503                    &binding.binding_key(),
1504                    event,
1505                    cancel_rx,
1506                )
1507                .await
1508                .map_err(|error| match error {
1509                    crate::a2a::A2aClientError::Cancelled(message) => {
1510                        DispatchError::Cancelled(message)
1511                    }
1512                    other => DispatchError::A2a(other.to_string()),
1513                })?;
1514                match ack {
1515                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1516                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1517                }
1518            }
1519            DispatchUri::Worker { queue } => Err(DispatchError::NotImplemented(format!(
1520                "worker:// dispatch to '{queue}' is not implemented yet; see O-05 #182"
1521            ))),
1522        }
1523    }
1524
1525    async fn apply_flow_control(
1526        &self,
1527        binding: &TriggerBinding,
1528        event: &TriggerEvent,
1529        replay_of_event_id: Option<&String>,
1530    ) -> Result<FlowControlOutcome, DispatchError> {
1531        let flow = &binding.flow_control;
1532        let mut managed_event = event.clone();
1533
1534        if let Some(batch) = &flow.batch {
1535            let gate = self
1536                .resolve_flow_gate(
1537                    &binding.binding_key(),
1538                    batch.key.as_ref(),
1539                    &managed_event,
1540                    replay_of_event_id,
1541                )
1542                .await?;
1543            match self
1544                .state
1545                .flow_control
1546                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1547                .await
1548                .map_err(DispatchError::from)?
1549            {
1550                BatchDecision::Dispatch(events) => {
1551                    managed_event = build_batched_event(events)?;
1552                }
1553                BatchDecision::Merged => {
1554                    return Ok(FlowControlOutcome::Skip {
1555                        reason: "batch_merged".to_string(),
1556                    })
1557                }
1558            }
1559        }
1560
1561        if let Some(debounce) = &flow.debounce {
1562            let gate = self
1563                .resolve_flow_gate(
1564                    &binding.binding_key(),
1565                    Some(&debounce.key),
1566                    &managed_event,
1567                    replay_of_event_id,
1568                )
1569                .await?;
1570            let latest = self
1571                .state
1572                .flow_control
1573                .debounce(&gate, debounce.period)
1574                .await
1575                .map_err(DispatchError::from)?;
1576            if !latest {
1577                return Ok(FlowControlOutcome::Skip {
1578                    reason: "debounced".to_string(),
1579                });
1580            }
1581        }
1582
1583        if let Some(rate_limit) = &flow.rate_limit {
1584            let gate = self
1585                .resolve_flow_gate(
1586                    &binding.binding_key(),
1587                    rate_limit.key.as_ref(),
1588                    &managed_event,
1589                    replay_of_event_id,
1590                )
1591                .await?;
1592            let allowed = self
1593                .state
1594                .flow_control
1595                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1596                .await
1597                .map_err(DispatchError::from)?;
1598            if !allowed {
1599                return Ok(FlowControlOutcome::Skip {
1600                    reason: "rate_limited".to_string(),
1601                });
1602            }
1603        }
1604
1605        if let Some(throttle) = &flow.throttle {
1606            let gate = self
1607                .resolve_flow_gate(
1608                    &binding.binding_key(),
1609                    throttle.key.as_ref(),
1610                    &managed_event,
1611                    replay_of_event_id,
1612                )
1613                .await?;
1614            self.state
1615                .flow_control
1616                .wait_for_throttle(&gate, throttle.period, throttle.max)
1617                .await
1618                .map_err(DispatchError::from)?;
1619        }
1620
1621        let mut acquired = AcquiredFlowControl::default();
1622        if let Some(singleton) = &flow.singleton {
1623            let gate = self
1624                .resolve_flow_gate(
1625                    &binding.binding_key(),
1626                    singleton.key.as_ref(),
1627                    &managed_event,
1628                    replay_of_event_id,
1629                )
1630                .await?;
1631            let acquired_singleton = self
1632                .state
1633                .flow_control
1634                .try_acquire_singleton(&gate)
1635                .await
1636                .map_err(DispatchError::from)?;
1637            if !acquired_singleton {
1638                return Ok(FlowControlOutcome::Skip {
1639                    reason: "singleton_active".to_string(),
1640                });
1641            }
1642            acquired.singleton_gate = Some(gate);
1643        }
1644
1645        if let Some(concurrency) = &flow.concurrency {
1646            let gate = self
1647                .resolve_flow_gate(
1648                    &binding.binding_key(),
1649                    concurrency.key.as_ref(),
1650                    &managed_event,
1651                    replay_of_event_id,
1652                )
1653                .await?;
1654            let priority_rank = self
1655                .resolve_priority_rank(
1656                    &binding.binding_key(),
1657                    flow.priority.as_ref(),
1658                    &managed_event,
1659                    replay_of_event_id,
1660                )
1661                .await?;
1662            acquired.concurrency = Some(
1663                self.state
1664                    .flow_control
1665                    .acquire_concurrency(&gate, concurrency.max, priority_rank)
1666                    .await
1667                    .map_err(DispatchError::from)?,
1668            );
1669        }
1670
1671        Ok(FlowControlOutcome::Dispatch {
1672            event: Box::new(managed_event),
1673            acquired,
1674        })
1675    }
1676
1677    async fn release_flow_control(
1678        &self,
1679        acquired: &mut AcquiredFlowControl,
1680    ) -> Result<(), DispatchError> {
1681        if let Some(gate) = acquired.singleton_gate.take() {
1682            self.state
1683                .flow_control
1684                .release_singleton(&gate)
1685                .await
1686                .map_err(DispatchError::from)?;
1687        }
1688        if let Some(permit) = acquired.concurrency.take() {
1689            self.state
1690                .flow_control
1691                .release_concurrency(permit)
1692                .await
1693                .map_err(DispatchError::from)?;
1694        }
1695        Ok(())
1696    }
1697
1698    async fn resolve_flow_gate(
1699        &self,
1700        binding_key: &str,
1701        expr: Option<&crate::triggers::TriggerExpressionSpec>,
1702        event: &TriggerEvent,
1703        replay_of_event_id: Option<&String>,
1704    ) -> Result<String, DispatchError> {
1705        let key = match expr {
1706            Some(expr) => {
1707                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
1708                    .await?
1709            }
1710            None => "_global".to_string(),
1711        };
1712        Ok(format!("{binding_key}:{key}"))
1713    }
1714
1715    async fn resolve_priority_rank(
1716        &self,
1717        binding_key: &str,
1718        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
1719        event: &TriggerEvent,
1720        replay_of_event_id: Option<&String>,
1721    ) -> Result<usize, DispatchError> {
1722        let Some(priority) = priority else {
1723            return Ok(0);
1724        };
1725        let value = self
1726            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
1727            .await?;
1728        Ok(priority
1729            .order
1730            .iter()
1731            .position(|candidate| candidate == &value)
1732            .unwrap_or(priority.order.len()))
1733    }
1734
1735    async fn evaluate_flow_expression(
1736        &self,
1737        binding_key: &str,
1738        expr: &crate::triggers::TriggerExpressionSpec,
1739        event: &TriggerEvent,
1740        replay_of_event_id: Option<&String>,
1741    ) -> Result<String, DispatchError> {
1742        let value = self
1743            .invoke_vm_callable(
1744                &expr.closure,
1745                binding_key,
1746                event,
1747                replay_of_event_id,
1748                "",
1749                "flow_control",
1750                AutonomyTier::Suggest,
1751                &mut self.cancel_tx.subscribe(),
1752            )
1753            .await?;
1754        Ok(json_value_to_gate(&vm_value_to_json(&value)))
1755    }
1756
1757    #[allow(clippy::too_many_arguments)]
1758    async fn invoke_vm_callable(
1759        &self,
1760        closure: &crate::value::VmClosure,
1761        binding_key: &str,
1762        event: &TriggerEvent,
1763        replay_of_event_id: Option<&String>,
1764        agent_id: &str,
1765        action: &str,
1766        autonomy_tier: AutonomyTier,
1767        cancel_rx: &mut broadcast::Receiver<()>,
1768    ) -> Result<VmValue, DispatchError> {
1769        let mut vm = self.base_vm.child_vm();
1770        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
1771        if self.state.shutting_down.load(Ordering::SeqCst) {
1772            cancel_token.store(true, Ordering::SeqCst);
1773        }
1774        self.state
1775            .cancel_tokens
1776            .lock()
1777            .expect("dispatcher cancel tokens poisoned")
1778            .push(cancel_token.clone());
1779        vm.install_cancel_token(cancel_token.clone());
1780        let arg = json_to_vm_value(
1781            &serde_json::to_value(event)
1782                .map_err(|error| DispatchError::Serde(error.to_string()))?,
1783        );
1784        let args = [arg];
1785        let future = vm.call_closure_pub(closure, &args, &[]);
1786        pin_mut!(future);
1787        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1788            slot.borrow_mut().replace(DispatchContext {
1789                trigger_event: event.clone(),
1790                replay_of_event_id: replay_of_event_id.cloned(),
1791                agent_id: agent_id.to_string(),
1792                action: action.to_string(),
1793                autonomy_tier,
1794            })
1795        });
1796        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
1797        crate::stdlib::hitl::reset_hitl_state();
1798        let mut poll = tokio::time::interval(Duration::from_millis(100));
1799        let result = loop {
1800            tokio::select! {
1801                result = &mut future => break result,
1802                _ = recv_cancel(cancel_rx) => {
1803                    cancel_token.store(true, Ordering::SeqCst);
1804                }
1805                _ = poll.tick() => {
1806                    if dispatch_cancel_requested(
1807                        &self.event_log,
1808                        binding_key,
1809                        &event.id.0,
1810                        replay_of_event_id,
1811                    )
1812                    .await? {
1813                        cancel_token.store(true, Ordering::SeqCst);
1814                    }
1815                }
1816            }
1817        };
1818        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1819            *slot.borrow_mut() = prior_context;
1820        });
1821        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
1822        {
1823            let mut tokens = self
1824                .state
1825                .cancel_tokens
1826                .lock()
1827                .expect("dispatcher cancel tokens poisoned");
1828            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
1829        }
1830
1831        if cancel_token.load(Ordering::SeqCst) {
1832            if dispatch_cancel_requested(
1833                &self.event_log,
1834                binding_key,
1835                &event.id.0,
1836                replay_of_event_id,
1837            )
1838            .await?
1839            {
1840                Err(DispatchError::Cancelled(
1841                    "trigger cancel request cancelled local handler".to_string(),
1842                ))
1843            } else {
1844                Err(DispatchError::Cancelled(
1845                    "dispatcher shutdown cancelled local handler".to_string(),
1846                ))
1847            }
1848        } else {
1849            result.map_err(dispatch_error_from_vm_error)
1850        }
1851    }
1852
1853    async fn evaluate_predicate(
1854        &self,
1855        binding: &TriggerBinding,
1856        predicate: &super::registry::TriggerPredicateSpec,
1857        event: &TriggerEvent,
1858        replay_of_event_id: Option<&String>,
1859        autonomy_tier: AutonomyTier,
1860    ) -> Result<PredicateEvaluationRecord, DispatchError> {
1861        let event_id = event.id.0.clone();
1862        let trigger_id = binding.id.as_str().to_string();
1863        let now_ms = now_unix_ms();
1864        let today = utc_day_key();
1865
1866        let breaker_open_until = {
1867            let mut state = binding
1868                .predicate_state
1869                .lock()
1870                .expect("trigger predicate state poisoned");
1871            if state.budget_day_utc != Some(today) {
1872                state.budget_day_utc = Some(today);
1873                binding
1874                    .metrics
1875                    .cost_today_usd_micros
1876                    .store(0, Ordering::Relaxed);
1877            }
1878            if state
1879                .breaker_open_until_ms
1880                .is_some_and(|until_ms| until_ms > now_ms)
1881            {
1882                state.breaker_open_until_ms
1883            } else {
1884                None
1885            }
1886        };
1887
1888        if breaker_open_until.is_some() {
1889            let mut metadata = BTreeMap::new();
1890            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
1891            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
1892            metadata.insert(
1893                "breaker_open_until_ms".to_string(),
1894                serde_json::json!(breaker_open_until),
1895            );
1896            crate::events::log_warn_meta(
1897                "trigger.predicate.circuit_breaker",
1898                "trigger predicate circuit breaker is open; short-circuiting to false",
1899                metadata,
1900            );
1901            let record = PredicateEvaluationRecord {
1902                result: false,
1903                reason: Some("circuit_open".to_string()),
1904                ..Default::default()
1905            };
1906            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1907                .await?;
1908            return Ok(record);
1909        }
1910
1911        if binding
1912            .daily_cost_usd
1913            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
1914        {
1915            self.append_lifecycle_event(
1916                "predicate.daily_budget_exceeded",
1917                event,
1918                binding,
1919                serde_json::json!({
1920                    "trigger_id": binding.id.as_str(),
1921                    "event_id": event.id.0,
1922                    "limit_usd": binding.daily_cost_usd,
1923                    "cost_today_usd": current_predicate_daily_cost(binding),
1924                    "replay_of_event_id": replay_of_event_id,
1925                }),
1926                replay_of_event_id,
1927            )
1928            .await?;
1929            let record = PredicateEvaluationRecord {
1930                result: false,
1931                reason: Some("daily_budget_exceeded".to_string()),
1932                ..Default::default()
1933            };
1934            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1935                .await?;
1936            return Ok(record);
1937        }
1938
1939        let replay_cache = self
1940            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
1941            .await?;
1942        let guard = start_predicate_evaluation(
1943            binding.when_budget.clone().unwrap_or_default(),
1944            replay_cache,
1945        );
1946        let started = std::time::Instant::now();
1947        let eval = self
1948            .invoke_vm_callable_with_timeout(
1949                &predicate.closure,
1950                &binding.binding_key(),
1951                event,
1952                replay_of_event_id,
1953                binding.id.as_str(),
1954                &format!("{}.{}", event.provider.as_str(), event.kind),
1955                autonomy_tier,
1956                &mut self.cancel_tx.subscribe(),
1957                binding
1958                    .when_budget
1959                    .as_ref()
1960                    .and_then(|budget| budget.timeout()),
1961            )
1962            .await;
1963        let capture = guard.finish();
1964        let latency_ms = started.elapsed().as_millis() as u64;
1965        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
1966            self.append_predicate_cache_record(binding, event, &capture.entries)
1967                .await?;
1968        }
1969
1970        let mut record = PredicateEvaluationRecord {
1971            result: false,
1972            cost_usd: capture.total_cost_usd,
1973            tokens: capture.total_tokens,
1974            latency_ms,
1975            cached: capture.cached,
1976            reason: None,
1977        };
1978
1979        let mut count_failure = false;
1980        let mut opened_breaker = false;
1981
1982        match eval {
1983            Ok(value) => match predicate_value_as_bool(value) {
1984                Ok(result) => {
1985                    record.result = result;
1986                }
1987                Err(reason) => {
1988                    count_failure = true;
1989                    record.reason = Some(reason);
1990                }
1991            },
1992            Err(error) => {
1993                count_failure = true;
1994                record.reason = Some(error.to_string());
1995            }
1996        }
1997
1998        let cost_usd_micros = usd_to_micros(record.cost_usd);
1999        if cost_usd_micros > 0 {
2000            binding
2001                .metrics
2002                .cost_total_usd_micros
2003                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2004            binding
2005                .metrics
2006                .cost_today_usd_micros
2007                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2008        }
2009
2010        let timed_out = matches!(
2011            record.reason.as_deref(),
2012            Some("predicate evaluation timed out")
2013        );
2014        if capture.budget_exceeded || timed_out {
2015            record.result = false;
2016            record.reason = Some("budget_exceeded".to_string());
2017            self.append_lifecycle_event(
2018                "predicate.budget_exceeded",
2019                event,
2020                binding,
2021                serde_json::json!({
2022                    "trigger_id": binding.id.as_str(),
2023                    "event_id": event.id.0,
2024                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2025                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2026                    "cost_usd": record.cost_usd,
2027                    "tokens": record.tokens,
2028                    "replay_of_event_id": replay_of_event_id,
2029                }),
2030                replay_of_event_id,
2031            )
2032            .await?;
2033        }
2034
2035        if binding
2036            .daily_cost_usd
2037            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2038        {
2039            record.result = false;
2040            record.reason = Some("daily_budget_exceeded".to_string());
2041            self.append_lifecycle_event(
2042                "predicate.daily_budget_exceeded",
2043                event,
2044                binding,
2045                serde_json::json!({
2046                    "trigger_id": binding.id.as_str(),
2047                    "event_id": event.id.0,
2048                    "limit_usd": binding.daily_cost_usd,
2049                    "cost_today_usd": current_predicate_daily_cost(binding),
2050                    "replay_of_event_id": replay_of_event_id,
2051                }),
2052                replay_of_event_id,
2053            )
2054            .await?;
2055        }
2056
2057        {
2058            let mut state = binding
2059                .predicate_state
2060                .lock()
2061                .expect("trigger predicate state poisoned");
2062            if state.budget_day_utc != Some(today) {
2063                state.budget_day_utc = Some(today);
2064                binding
2065                    .metrics
2066                    .cost_today_usd_micros
2067                    .store(cost_usd_micros, Ordering::Relaxed);
2068            }
2069            if count_failure {
2070                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2071                if state.consecutive_failures >= 3 {
2072                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2073                    opened_breaker = true;
2074                }
2075            } else {
2076                state.consecutive_failures = 0;
2077                state.breaker_open_until_ms = None;
2078            }
2079        }
2080
2081        if opened_breaker {
2082            let mut metadata = BTreeMap::new();
2083            metadata.insert(
2084                "trigger_id".to_string(),
2085                serde_json::json!(binding.id.as_str()),
2086            );
2087            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2088            metadata.insert("failure_count".to_string(), serde_json::json!(3));
2089            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2090            crate::events::log_warn_meta(
2091                "trigger.predicate.circuit_breaker",
2092                "trigger predicate circuit breaker opened for 5 minutes",
2093                metadata,
2094            );
2095        }
2096
2097        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2098            .await?;
2099        Ok(record)
2100    }
2101
2102    #[allow(clippy::too_many_arguments)]
2103    #[allow(clippy::too_many_arguments)]
2104    async fn invoke_vm_callable_with_timeout(
2105        &self,
2106        closure: &crate::value::VmClosure,
2107        binding_key: &str,
2108        event: &TriggerEvent,
2109        replay_of_event_id: Option<&String>,
2110        agent_id: &str,
2111        action: &str,
2112        autonomy_tier: AutonomyTier,
2113        cancel_rx: &mut broadcast::Receiver<()>,
2114        timeout: Option<Duration>,
2115    ) -> Result<VmValue, DispatchError> {
2116        let future = self.invoke_vm_callable(
2117            closure,
2118            binding_key,
2119            event,
2120            replay_of_event_id,
2121            agent_id,
2122            action,
2123            autonomy_tier,
2124            cancel_rx,
2125        );
2126        pin_mut!(future);
2127        if let Some(timeout) = timeout {
2128            match tokio::time::timeout(timeout, future).await {
2129                Ok(result) => result,
2130                Err(_) => Err(DispatchError::Local(
2131                    "predicate evaluation timed out".to_string(),
2132                )),
2133            }
2134        } else {
2135            future.await
2136        }
2137    }
2138
2139    async fn append_predicate_evaluated_event(
2140        &self,
2141        binding: &TriggerBinding,
2142        event: &TriggerEvent,
2143        record: &PredicateEvaluationRecord,
2144        replay_of_event_id: Option<&String>,
2145    ) -> Result<(), DispatchError> {
2146        self.append_lifecycle_event(
2147            "predicate.evaluated",
2148            event,
2149            binding,
2150            serde_json::json!({
2151                "trigger_id": binding.id.as_str(),
2152                "event_id": event.id.0,
2153                "result": record.result,
2154                "cost_usd": record.cost_usd,
2155                "tokens": record.tokens,
2156                "latency_ms": record.latency_ms,
2157                "cached": record.cached,
2158                "reason": record.reason,
2159                "replay_of_event_id": replay_of_event_id,
2160            }),
2161            replay_of_event_id,
2162        )
2163        .await
2164    }
2165
2166    async fn append_predicate_cache_record(
2167        &self,
2168        binding: &TriggerBinding,
2169        event: &TriggerEvent,
2170        entries: &[PredicateCacheEntry],
2171    ) -> Result<(), DispatchError> {
2172        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2173            .expect("static trigger inbox legacy topic name is valid");
2174        let payload = serde_json::to_value(PredicateCacheRecord {
2175            trigger_id: binding.id.as_str().to_string(),
2176            event_id: event.id.0.clone(),
2177            entries: entries.to_vec(),
2178        })
2179        .map_err(|error| DispatchError::Serde(error.to_string()))?;
2180        self.event_log
2181            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2182            .await
2183            .map_err(DispatchError::from)
2184            .map(|_| ())
2185    }
2186
2187    async fn read_predicate_cache_record(
2188        &self,
2189        event_id: &str,
2190    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2191        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2192            .expect("static trigger inbox legacy topic name is valid");
2193        let records = self
2194            .event_log
2195            .read_range(&topic, None, usize::MAX)
2196            .await
2197            .map_err(DispatchError::from)?;
2198        Ok(records
2199            .into_iter()
2200            .filter(|(_, event)| event.kind == "predicate_llm_cache")
2201            .filter_map(|(_, event)| {
2202                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2203            })
2204            .filter(|record| record.event_id == event_id)
2205            .flat_map(|record| record.entries)
2206            .collect())
2207    }
2208
2209    #[allow(clippy::too_many_arguments)]
2210    async fn append_dispatch_trust_record(
2211        &self,
2212        binding: &TriggerBinding,
2213        route: &DispatchUri,
2214        event: &TriggerEvent,
2215        replay_of_event_id: Option<&String>,
2216        autonomy_tier: AutonomyTier,
2217        outcome: TrustOutcome,
2218        terminal_status: &str,
2219        attempt_count: u32,
2220        error: Option<String>,
2221    ) -> Result<(), DispatchError> {
2222        let mut record = TrustRecord::new(
2223            binding.id.as_str().to_string(),
2224            format!("{}.{}", event.provider.as_str(), event.kind),
2225            None,
2226            outcome,
2227            event.trace_id.0.clone(),
2228            autonomy_tier,
2229        );
2230        record.metadata.insert(
2231            "binding_key".to_string(),
2232            serde_json::json!(binding.binding_key()),
2233        );
2234        record.metadata.insert(
2235            "binding_version".to_string(),
2236            serde_json::json!(binding.version),
2237        );
2238        record.metadata.insert(
2239            "provider".to_string(),
2240            serde_json::json!(event.provider.as_str()),
2241        );
2242        record
2243            .metadata
2244            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2245        record
2246            .metadata
2247            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2248        record.metadata.insert(
2249            "target_uri".to_string(),
2250            serde_json::json!(route.target_uri()),
2251        );
2252        record.metadata.insert(
2253            "terminal_status".to_string(),
2254            serde_json::json!(terminal_status),
2255        );
2256        record.metadata.insert(
2257            "attempt_count".to_string(),
2258            serde_json::json!(attempt_count),
2259        );
2260        if let Some(replay_of_event_id) = replay_of_event_id {
2261            record.metadata.insert(
2262                "replay_of_event_id".to_string(),
2263                serde_json::json!(replay_of_event_id),
2264            );
2265        }
2266        if let Some(error) = error {
2267            record
2268                .metadata
2269                .insert("error".to_string(), serde_json::json!(error));
2270        }
2271        append_trust_record(&self.event_log, &record)
2272            .await
2273            .map_err(DispatchError::from)
2274    }
2275
2276    async fn append_attempt_record(
2277        &self,
2278        event: &TriggerEvent,
2279        binding: &TriggerBinding,
2280        attempt: &DispatchAttemptRecord,
2281        replay_of_event_id: Option<&String>,
2282    ) -> Result<(), DispatchError> {
2283        self.append_topic_event(
2284            TRIGGER_ATTEMPTS_TOPIC,
2285            "attempt_recorded",
2286            event,
2287            Some(binding),
2288            Some(attempt.attempt),
2289            serde_json::to_value(attempt)
2290                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2291            replay_of_event_id,
2292        )
2293        .await
2294    }
2295
2296    async fn append_lifecycle_event(
2297        &self,
2298        kind: &str,
2299        event: &TriggerEvent,
2300        binding: &TriggerBinding,
2301        payload: serde_json::Value,
2302        replay_of_event_id: Option<&String>,
2303    ) -> Result<(), DispatchError> {
2304        self.append_topic_event(
2305            TRIGGERS_LIFECYCLE_TOPIC,
2306            kind,
2307            event,
2308            Some(binding),
2309            None,
2310            payload,
2311            replay_of_event_id,
2312        )
2313        .await
2314    }
2315
2316    async fn append_topic_event(
2317        &self,
2318        topic_name: &str,
2319        kind: &str,
2320        event: &TriggerEvent,
2321        binding: Option<&TriggerBinding>,
2322        attempt: Option<u32>,
2323        payload: serde_json::Value,
2324        replay_of_event_id: Option<&String>,
2325    ) -> Result<(), DispatchError> {
2326        let topic = Topic::new(topic_name)
2327            .expect("static trigger dispatcher topic names should always be valid");
2328        let headers = event_headers(event, binding, attempt, replay_of_event_id);
2329        self.event_log
2330            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2331            .await
2332            .map_err(DispatchError::from)
2333            .map(|_| ())
2334    }
2335
2336    async fn emit_action_graph(
2337        &self,
2338        event: &TriggerEvent,
2339        nodes: Vec<RunActionGraphNodeRecord>,
2340        edges: Vec<RunActionGraphEdgeRecord>,
2341        extra: serde_json::Value,
2342    ) -> Result<(), DispatchError> {
2343        let mut headers = BTreeMap::new();
2344        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2345        headers.insert("event_id".to_string(), event.id.0.clone());
2346        let observability = RunObservabilityRecord {
2347            schema_version: 1,
2348            action_graph_nodes: nodes,
2349            action_graph_edges: edges,
2350            ..Default::default()
2351        };
2352        append_action_graph_update(
2353            headers,
2354            serde_json::json!({
2355                "source": "dispatcher",
2356                "trace_id": event.trace_id.0,
2357                "event_id": event.id.0,
2358                "observability": observability,
2359                "context": extra,
2360            }),
2361        )
2362        .await
2363        .map_err(DispatchError::from)
2364    }
2365}
2366
2367async fn dispatch_cancel_requested(
2368    event_log: &Arc<AnyEventLog>,
2369    binding_key: &str,
2370    event_id: &str,
2371    replay_of_event_id: Option<&String>,
2372) -> Result<bool, DispatchError> {
2373    if replay_of_event_id.is_some() {
2374        return Ok(false);
2375    }
2376    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2377        .expect("static trigger cancel topic should always be valid");
2378    let events = event_log.read_range(&topic, None, usize::MAX).await?;
2379    let requested = events
2380        .into_iter()
2381        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2382        .filter_map(|(_, event)| {
2383            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2384        })
2385        .collect::<BTreeSet<_>>();
2386    Ok(requested
2387        .iter()
2388        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2389}
2390
2391async fn sleep_or_cancel_or_request(
2392    event_log: &Arc<AnyEventLog>,
2393    delay: Duration,
2394    binding_key: &str,
2395    event_id: &str,
2396    replay_of_event_id: Option<&String>,
2397    cancel_rx: &mut broadcast::Receiver<()>,
2398) -> Result<(), DispatchError> {
2399    let sleep = tokio::time::sleep(delay);
2400    pin_mut!(sleep);
2401    let mut poll = tokio::time::interval(Duration::from_millis(100));
2402    loop {
2403        tokio::select! {
2404            _ = &mut sleep => return Ok(()),
2405            _ = recv_cancel(cancel_rx) => {
2406                return Err(DispatchError::Cancelled(
2407                    "dispatcher shutdown cancelled retry wait".to_string(),
2408                ));
2409            }
2410            _ = poll.tick() => {
2411                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2412                    return Err(DispatchError::Cancelled(
2413                        "trigger cancel request cancelled retry wait".to_string(),
2414                    ));
2415                }
2416            }
2417        }
2418    }
2419}
2420
2421fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2422    let mut iter = events.into_iter();
2423    let Some(mut root) = iter.next() else {
2424        return Err(DispatchError::Registry(
2425            "batch dispatch produced an empty event list".to_string(),
2426        ));
2427    };
2428    let mut batch = Vec::new();
2429    batch.push(
2430        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2431    );
2432    for event in iter {
2433        batch.push(
2434            serde_json::to_value(&event)
2435                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2436        );
2437    }
2438    root.batch = Some(batch);
2439    Ok(root)
2440}
2441
2442fn json_value_to_gate(value: &serde_json::Value) -> String {
2443    match value {
2444        serde_json::Value::Null => "null".to_string(),
2445        serde_json::Value::String(text) => text.clone(),
2446        serde_json::Value::Bool(value) => value.to_string(),
2447        serde_json::Value::Number(value) => value.to_string(),
2448        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2449    }
2450}
2451
2452fn decrement_in_flight(state: &DispatcherRuntimeState) {
2453    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2454    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2455        state.idle_notify.notify_waiters();
2456    }
2457}
2458
2459fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2460    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2461    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2462        state.idle_notify.notify_waiters();
2463    }
2464}
2465
2466#[cfg(test)]
2467fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2468    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2469        *slot.borrow_mut() = Some(tx);
2470    });
2471}
2472
2473#[cfg(not(test))]
2474fn notify_test_inbox_dequeued() {}
2475
2476#[cfg(test)]
2477fn notify_test_inbox_dequeued() {
2478    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2479        if let Some(tx) = slot.borrow_mut().take() {
2480            let _ = tx.send(());
2481        }
2482    });
2483}
2484
2485pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2486    event_log: &L,
2487    event: &TriggerEvent,
2488) -> Result<u64, DispatchError> {
2489    let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2490        .expect("static trigger.inbox.envelopes topic is valid");
2491    let headers = event_headers(event, None, None, None);
2492    let payload =
2493        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2494    event_log
2495        .append(
2496            &topic,
2497            LogEvent::new("event_ingested", payload).with_headers(headers),
2498        )
2499        .await
2500        .map_err(DispatchError::from)
2501}
2502
2503pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2504    ACTIVE_DISPATCHER_STATE.with(|slot| {
2505        slot.borrow()
2506            .as_ref()
2507            .map(|state| DispatcherStatsSnapshot {
2508                in_flight: state.in_flight.load(Ordering::Relaxed),
2509                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2510                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2511            })
2512            .unwrap_or_default()
2513    })
2514}
2515
2516pub fn clear_dispatcher_state() {
2517    ACTIVE_DISPATCHER_STATE.with(|slot| {
2518        *slot.borrow_mut() = None;
2519    });
2520}
2521
2522fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2523    if is_cancelled_vm_error(&error) {
2524        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2525    }
2526    if let VmError::Thrown(VmValue::String(message)) = &error {
2527        return DispatchError::Local(message.to_string());
2528    }
2529    match error_to_category(&error) {
2530        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2531        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2532        ErrorCategory::Cancelled => {
2533            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2534        }
2535        _ => DispatchError::Local(error.to_string()),
2536    }
2537}
2538
2539fn dispatch_error_label(error: &DispatchError) -> &'static str {
2540    match error {
2541        DispatchError::Denied(_) => "denied",
2542        DispatchError::Timeout(_) => "timeout",
2543        DispatchError::Cancelled(_) => "cancelled",
2544        _ => "failed",
2545    }
2546}
2547
2548fn dispatch_node_id(
2549    route: &DispatchUri,
2550    binding_key: &str,
2551    event_id: &str,
2552    attempt: u32,
2553) -> String {
2554    let prefix = match route {
2555        DispatchUri::A2a { .. } => "a2a",
2556        _ => "dispatch",
2557    };
2558    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2559}
2560
2561fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
2562    match route {
2563        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
2564        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
2565    }
2566}
2567
2568fn dispatch_node_label(route: &DispatchUri) -> String {
2569    match route {
2570        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
2571        _ => route.target_uri(),
2572    }
2573}
2574
2575fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
2576    match route {
2577        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
2578        _ => None,
2579    }
2580}
2581
2582fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
2583    match route {
2584        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
2585        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
2586        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
2587    }
2588}
2589
2590fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
2591    match value {
2592        VmValue::Bool(result) => Ok(result),
2593        VmValue::EnumVariant {
2594            enum_name,
2595            variant,
2596            mut fields,
2597        } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
2598            Some(VmValue::Bool(result)) => Ok(result),
2599            Some(other) => Err(format!(
2600                "predicate Result.Ok payload must be bool, got {}",
2601                other.type_name()
2602            )),
2603            None => Err("predicate Result.Ok payload is missing".to_string()),
2604        },
2605        VmValue::EnumVariant {
2606            enum_name,
2607            variant,
2608            fields,
2609        } if enum_name == "Result" && variant == "Err" => Err(fields
2610            .first()
2611            .map(VmValue::display)
2612            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
2613        other => Err(format!(
2614            "predicate must return bool or Result<bool, _>, got {}",
2615            other.type_name()
2616        )),
2617    }
2618}
2619
2620fn usd_to_micros(value: f64) -> u64 {
2621    if !value.is_finite() || value <= 0.0 {
2622        return 0;
2623    }
2624    (value * 1_000_000.0).round() as u64
2625}
2626
2627fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
2628    binding
2629        .metrics
2630        .cost_today_usd_micros
2631        .load(Ordering::Relaxed) as f64
2632        / 1_000_000.0
2633}
2634
2635fn is_cancelled_vm_error(error: &VmError) -> bool {
2636    matches!(
2637        error,
2638        VmError::Thrown(VmValue::String(message))
2639            if message.starts_with("kind:cancelled:")
2640    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
2641}
2642
2643fn event_headers(
2644    event: &TriggerEvent,
2645    binding: Option<&TriggerBinding>,
2646    attempt: Option<u32>,
2647    replay_of_event_id: Option<&String>,
2648) -> BTreeMap<String, String> {
2649    let mut headers = BTreeMap::new();
2650    headers.insert("event_id".to_string(), event.id.0.clone());
2651    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2652    headers.insert("provider".to_string(), event.provider.as_str().to_string());
2653    headers.insert("kind".to_string(), event.kind.clone());
2654    if let Some(replay_of_event_id) = replay_of_event_id {
2655        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
2656    }
2657    if let Some(binding) = binding {
2658        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
2659        headers.insert("binding_key".to_string(), binding.binding_key());
2660        headers.insert(
2661            "handler_kind".to_string(),
2662            DispatchUri::from(&binding.handler).kind().to_string(),
2663        );
2664    }
2665    if let Some(attempt) = attempt {
2666        headers.insert("attempt".to_string(), attempt.to_string());
2667    }
2668    headers
2669}
2670
2671const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
2672
2673fn maybe_fail_before_outbox() {
2674    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
2675        std::process::exit(86);
2676    }
2677}
2678
2679fn now_rfc3339() -> String {
2680    time::OffsetDateTime::now_utc()
2681        .format(&time::format_description::well_known::Rfc3339)
2682        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2683}
2684
2685fn now_unix_ms() -> i64 {
2686    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
2687}
2688
2689fn utc_day_key() -> i32 {
2690    time::OffsetDateTime::now_utc().date().to_julian_day()
2691}
2692
2693fn cancelled_dispatch_outcome(
2694    binding: &TriggerBinding,
2695    route: &DispatchUri,
2696    event: &TriggerEvent,
2697    replay_of_event_id: Option<String>,
2698    attempt_count: u32,
2699    error: String,
2700) -> DispatchOutcome {
2701    DispatchOutcome {
2702        trigger_id: binding.id.as_str().to_string(),
2703        binding_key: binding.binding_key(),
2704        event_id: event.id.0.clone(),
2705        attempt_count,
2706        status: DispatchStatus::Cancelled,
2707        handler_kind: route.kind().to_string(),
2708        target_uri: route.target_uri(),
2709        replay_of_event_id,
2710        result: None,
2711        error: Some(error),
2712    }
2713}
2714
2715async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
2716    let _ = cancel_rx.recv().await;
2717}
2718
2719#[cfg(test)]
2720mod tests;