Skip to main content

harn_vm/triggers/dispatcher/
mod.rs

1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::Notify;
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20    append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21    RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22    ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23    ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24    ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25    ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26    ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37    begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38    TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39    TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40    TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51    static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52    static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53    #[cfg(test)]
54    static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
55}
56
57tokio::task_local! {
58    static ACTIVE_DISPATCH_IS_REPLAY: bool;
59}
60
61#[derive(Clone, Debug)]
62pub(crate) struct DispatchContext {
63    pub trigger_event: TriggerEvent,
64    pub replay_of_event_id: Option<String>,
65    pub agent_id: String,
66    pub action: String,
67    pub autonomy_tier: AutonomyTier,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71struct PredicateCacheRecord {
72    trigger_id: String,
73    event_id: String,
74    entries: Vec<PredicateCacheEntry>,
75}
76
77#[derive(Clone, Debug, Default)]
78struct PredicateEvaluationRecord {
79    result: bool,
80    cost_usd: f64,
81    tokens: u64,
82    latency_ms: u64,
83    cached: bool,
84    reason: Option<String>,
85}
86
87pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
88    ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
89}
90
91pub(crate) fn current_dispatch_is_replay() -> bool {
92    ACTIVE_DISPATCH_IS_REPLAY
93        .try_with(|is_replay| *is_replay)
94        .unwrap_or(false)
95}
96
97#[derive(Clone)]
98pub struct Dispatcher {
99    base_vm: Rc<Vm>,
100    event_log: Arc<AnyEventLog>,
101    cancel_tx: broadcast::Sender<()>,
102    state: Arc<DispatcherRuntimeState>,
103    metrics: Option<Arc<crate::MetricsRegistry>>,
104}
105
106#[derive(Debug)]
107struct DispatcherRuntimeState {
108    in_flight: AtomicU64,
109    retry_queue_depth: AtomicU64,
110    dlq: Mutex<Vec<DlqEntry>>,
111    cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
112    shutting_down: std::sync::atomic::AtomicBool,
113    idle_notify: Notify,
114    flow_control: FlowControlManager,
115}
116
117impl DispatcherRuntimeState {
118    fn new(event_log: Arc<AnyEventLog>) -> Self {
119        Self {
120            in_flight: AtomicU64::new(0),
121            retry_queue_depth: AtomicU64::new(0),
122            dlq: Mutex::new(Vec::new()),
123            cancel_tokens: Mutex::new(Vec::new()),
124            shutting_down: std::sync::atomic::AtomicBool::new(false),
125            idle_notify: Notify::new(),
126            flow_control: FlowControlManager::new(event_log),
127        }
128    }
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(default)]
133pub struct DispatcherStatsSnapshot {
134    pub in_flight: u64,
135    pub retry_queue_depth: u64,
136    pub dlq_depth: u64,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum DispatchStatus {
142    Succeeded,
143    Failed,
144    Dlq,
145    Skipped,
146    Cancelled,
147}
148
149impl DispatchStatus {
150    pub fn as_str(&self) -> &'static str {
151        match self {
152            Self::Succeeded => "succeeded",
153            Self::Failed => "failed",
154            Self::Dlq => "dlq",
155            Self::Skipped => "skipped",
156            Self::Cancelled => "cancelled",
157        }
158    }
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162#[serde(default)]
163pub struct DispatchOutcome {
164    pub trigger_id: String,
165    pub binding_key: String,
166    pub event_id: String,
167    pub attempt_count: u32,
168    pub status: DispatchStatus,
169    pub handler_kind: String,
170    pub target_uri: String,
171    pub replay_of_event_id: Option<String>,
172    pub result: Option<serde_json::Value>,
173    pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct InboxEnvelope {
178    pub trigger_id: Option<String>,
179    pub binding_version: Option<u32>,
180    pub event: TriggerEvent,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(default)]
185pub struct DispatcherDrainReport {
186    pub drained: bool,
187    pub in_flight: u64,
188    pub retry_queue_depth: u64,
189    pub dlq_depth: u64,
190}
191
192impl Default for DispatchOutcome {
193    fn default() -> Self {
194        Self {
195            trigger_id: String::new(),
196            binding_key: String::new(),
197            event_id: String::new(),
198            attempt_count: 0,
199            status: DispatchStatus::Failed,
200            handler_kind: String::new(),
201            target_uri: String::new(),
202            replay_of_event_id: None,
203            result: None,
204            error: None,
205        }
206    }
207}
208
209#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
210#[serde(default)]
211pub struct DispatchAttemptRecord {
212    pub trigger_id: String,
213    pub binding_key: String,
214    pub event_id: String,
215    pub attempt: u32,
216    pub handler_kind: String,
217    pub started_at: String,
218    pub completed_at: String,
219    pub outcome: String,
220    pub error_msg: Option<String>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
224pub struct DispatchCancelRequest {
225    pub binding_key: String,
226    pub event_id: String,
227    #[serde(with = "time::serde::rfc3339")]
228    pub requested_at: time::OffsetDateTime,
229    #[serde(default)]
230    pub requested_by: Option<String>,
231    #[serde(default)]
232    pub audit_id: Option<String>,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct DlqEntry {
237    pub trigger_id: String,
238    pub binding_key: String,
239    pub event: TriggerEvent,
240    pub attempt_count: u32,
241    pub final_error: String,
242    pub attempts: Vec<DispatchAttemptRecord>,
243}
244
245#[derive(Default)]
246struct AcquiredFlowControl {
247    singleton_gate: Option<String>,
248    concurrency: Option<ConcurrencyPermit>,
249}
250
251enum FlowControlOutcome {
252    Dispatch {
253        event: Box<TriggerEvent>,
254        acquired: AcquiredFlowControl,
255    },
256    Skip {
257        reason: String,
258    },
259}
260
261#[derive(Debug)]
262pub enum DispatchError {
263    EventLog(String),
264    Registry(String),
265    Serde(String),
266    Local(String),
267    A2a(String),
268    Denied(String),
269    Timeout(String),
270    Cancelled(String),
271    NotImplemented(String),
272}
273
274impl std::fmt::Display for DispatchError {
275    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276        match self {
277            Self::EventLog(message)
278            | Self::Registry(message)
279            | Self::Serde(message)
280            | Self::Local(message)
281            | Self::A2a(message)
282            | Self::Denied(message)
283            | Self::Timeout(message)
284            | Self::Cancelled(message)
285            | Self::NotImplemented(message) => f.write_str(message),
286        }
287    }
288}
289
290impl std::error::Error for DispatchError {}
291
292impl DispatchError {
293    fn retryable(&self) -> bool {
294        !matches!(
295            self,
296            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_)
297        )
298    }
299}
300
301impl From<LogError> for DispatchError {
302    fn from(value: LogError) -> Self {
303        Self::EventLog(value.to_string())
304    }
305}
306
307pub async fn append_dispatch_cancel_request(
308    event_log: &Arc<AnyEventLog>,
309    request: &DispatchCancelRequest,
310) -> Result<u64, DispatchError> {
311    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
312        .expect("static trigger cancel topic should always be valid");
313    event_log
314        .append(
315            &topic,
316            LogEvent::new(
317                "dispatch_cancel_requested",
318                serde_json::to_value(request)
319                    .map_err(|error| DispatchError::Serde(error.to_string()))?,
320            ),
321        )
322        .await
323        .map_err(DispatchError::from)
324}
325
326impl Dispatcher {
327    pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
328        let event_log = active_event_log().ok_or_else(|| {
329            DispatchError::EventLog("dispatcher requires an active event log".to_string())
330        })?;
331        Ok(Self::with_event_log(base_vm, event_log))
332    }
333
334    pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
335        Self::with_event_log_and_metrics(base_vm, event_log, None)
336    }
337
338    pub fn with_event_log_and_metrics(
339        base_vm: Vm,
340        event_log: Arc<AnyEventLog>,
341        metrics: Option<Arc<crate::MetricsRegistry>>,
342    ) -> Self {
343        let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
344        ACTIVE_DISPATCHER_STATE.with(|slot| {
345            *slot.borrow_mut() = Some(state.clone());
346        });
347        let (cancel_tx, _) = broadcast::channel(32);
348        Self {
349            base_vm: Rc::new(base_vm),
350            event_log,
351            cancel_tx,
352            state,
353            metrics,
354        }
355    }
356
357    pub fn snapshot(&self) -> DispatcherStatsSnapshot {
358        DispatcherStatsSnapshot {
359            in_flight: self.state.in_flight.load(Ordering::Relaxed),
360            retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
361            dlq_depth: self
362                .state
363                .dlq
364                .lock()
365                .expect("dispatcher dlq poisoned")
366                .len() as u64,
367        }
368    }
369
370    pub fn dlq_entries(&self) -> Vec<DlqEntry> {
371        self.state
372            .dlq
373            .lock()
374            .expect("dispatcher dlq poisoned")
375            .clone()
376    }
377
378    pub fn shutdown(&self) {
379        self.state.shutting_down.store(true, Ordering::SeqCst);
380        for token in self
381            .state
382            .cancel_tokens
383            .lock()
384            .expect("dispatcher cancel tokens poisoned")
385            .iter()
386        {
387            token.store(true, Ordering::SeqCst);
388        }
389        let _ = self.cancel_tx.send(());
390    }
391
392    pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
393        self.enqueue_targeted(None, None, event).await
394    }
395
396    pub async fn enqueue_targeted(
397        &self,
398        trigger_id: Option<String>,
399        binding_version: Option<u32>,
400        event: TriggerEvent,
401    ) -> Result<u64, DispatchError> {
402        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
403            .expect("static trigger inbox envelopes topic is valid");
404        let headers = event_headers(&event, None, None, None);
405        let payload = serde_json::to_value(InboxEnvelope {
406            trigger_id,
407            binding_version,
408            event,
409        })
410        .map_err(|error| DispatchError::Serde(error.to_string()))?;
411        self.event_log
412            .append(
413                &topic,
414                LogEvent::new("event_ingested", payload).with_headers(headers),
415            )
416            .await
417            .map_err(DispatchError::from)
418    }
419
420    pub async fn run(&self) -> Result<(), DispatchError> {
421        let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
422            .expect("static trigger inbox envelopes topic is valid");
423        let start_from = self.event_log.latest(&topic).await?;
424        let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
425        pin_mut!(stream);
426        let mut cancel_rx = self.cancel_tx.subscribe();
427
428        loop {
429            tokio::select! {
430                received = stream.next() => {
431                    let Some(received) = received else {
432                        break;
433                    };
434                    let (_, event) = received.map_err(DispatchError::from)?;
435                    if event.kind != "event_ingested" {
436                        continue;
437                    }
438                    let parent_headers = event.headers.clone();
439                    let envelope: InboxEnvelope = serde_json::from_value(event.payload)
440                        .map_err(|error| DispatchError::Serde(error.to_string()))?;
441                    notify_test_inbox_dequeued();
442                    let _ = self
443                        .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
444                        .await;
445                }
446                _ = recv_cancel(&mut cancel_rx) => break,
447            }
448        }
449
450        Ok(())
451    }
452
453    pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
454        let deadline = tokio::time::Instant::now() + timeout;
455        loop {
456            let snapshot = self.snapshot();
457            if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
458                return Ok(DispatcherDrainReport {
459                    drained: true,
460                    in_flight: snapshot.in_flight,
461                    retry_queue_depth: snapshot.retry_queue_depth,
462                    dlq_depth: snapshot.dlq_depth,
463                });
464            }
465
466            let notified = self.state.idle_notify.notified();
467            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
468            if remaining.is_zero() {
469                return Ok(DispatcherDrainReport {
470                    drained: false,
471                    in_flight: snapshot.in_flight,
472                    retry_queue_depth: snapshot.retry_queue_depth,
473                    dlq_depth: snapshot.dlq_depth,
474                });
475            }
476            if tokio::time::timeout(remaining, notified).await.is_err() {
477                let snapshot = self.snapshot();
478                return Ok(DispatcherDrainReport {
479                    drained: false,
480                    in_flight: snapshot.in_flight,
481                    retry_queue_depth: snapshot.retry_queue_depth,
482                    dlq_depth: snapshot.dlq_depth,
483                });
484            }
485        }
486    }
487
488    pub async fn dispatch_inbox_envelope(
489        &self,
490        envelope: InboxEnvelope,
491    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
492        self.dispatch_inbox_envelope_with_headers(envelope, None)
493            .await
494    }
495
496    async fn dispatch_inbox_envelope_with_headers(
497        &self,
498        envelope: InboxEnvelope,
499        parent_headers: Option<&BTreeMap<String, String>>,
500    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
501        if let Some(trigger_id) = envelope.trigger_id {
502            let binding = super::registry::resolve_live_trigger_binding(
503                &trigger_id,
504                envelope.binding_version,
505            )
506            .map_err(|error| DispatchError::Registry(error.to_string()))?;
507            return Ok(vec![
508                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
509                    .await?,
510            ]);
511        }
512
513        let cron_target = match &envelope.event.provider_payload {
514            crate::triggers::ProviderPayload::Known(
515                crate::triggers::event::KnownProviderPayload::Cron(payload),
516            ) => payload.cron_id.clone(),
517            _ => None,
518        };
519        if let Some(trigger_id) = cron_target {
520            let binding = super::registry::resolve_live_trigger_binding(
521                &trigger_id,
522                envelope.binding_version,
523            )
524            .map_err(|error| DispatchError::Registry(error.to_string()))?;
525            return Ok(vec![
526                self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
527                    .await?,
528            ]);
529        }
530
531        self.dispatch_event(envelope.event).await
532    }
533
534    pub async fn dispatch_event(
535        &self,
536        event: TriggerEvent,
537    ) -> Result<Vec<DispatchOutcome>, DispatchError> {
538        let bindings = matching_bindings(&event);
539        let mut outcomes = Vec::new();
540        for binding in bindings {
541            outcomes.push(self.dispatch(&binding, event.clone()).await?);
542        }
543        Ok(outcomes)
544    }
545
546    pub async fn dispatch(
547        &self,
548        binding: &TriggerBinding,
549        event: TriggerEvent,
550    ) -> Result<DispatchOutcome, DispatchError> {
551        self.dispatch_with_replay(binding, event, None, None, None)
552            .await
553    }
554
555    pub async fn dispatch_replay(
556        &self,
557        binding: &TriggerBinding,
558        event: TriggerEvent,
559        replay_of_event_id: String,
560    ) -> Result<DispatchOutcome, DispatchError> {
561        self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
562            .await
563    }
564
565    pub async fn dispatch_with_parent_span_id(
566        &self,
567        binding: &TriggerBinding,
568        event: TriggerEvent,
569        parent_span_id: Option<String>,
570    ) -> Result<DispatchOutcome, DispatchError> {
571        self.dispatch_with_replay(binding, event, None, parent_span_id, None)
572            .await
573    }
574
575    async fn dispatch_with_replay(
576        &self,
577        binding: &TriggerBinding,
578        event: TriggerEvent,
579        replay_of_event_id: Option<String>,
580        parent_span_id: Option<String>,
581        parent_headers: Option<&BTreeMap<String, String>>,
582    ) -> Result<DispatchOutcome, DispatchError> {
583        let span = tracing::info_span!(
584            "dispatch",
585            trigger_id = %binding.id.as_str(),
586            binding_version = binding.version,
587            trace_id = %event.trace_id.0
588        );
589        #[cfg(feature = "otel")]
590        let span_for_otel = span.clone();
591        let _ = if let Some(headers) = parent_headers {
592            crate::observability::otel::set_span_parent_from_headers(
593                &span,
594                headers,
595                &event.trace_id,
596                parent_span_id.as_deref(),
597            )
598        } else {
599            crate::observability::otel::set_span_parent(
600                &span,
601                &event.trace_id,
602                parent_span_id.as_deref(),
603            )
604        };
605        #[cfg(feature = "otel")]
606        let started_at = Instant::now();
607        let metrics = self.metrics.clone();
608        let outcome = ACTIVE_DISPATCH_IS_REPLAY
609            .scope(
610                replay_of_event_id.is_some(),
611                self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
612                    .instrument(span),
613            )
614            .await;
615        if let Some(metrics) = metrics.as_ref() {
616            match &outcome {
617                Ok(dispatch_outcome) => match dispatch_outcome.status {
618                    DispatchStatus::Succeeded | DispatchStatus::Skipped => {
619                        metrics.record_dispatch_succeeded();
620                    }
621                    _ => metrics.record_dispatch_failed(),
622                },
623                Err(_) => metrics.record_dispatch_failed(),
624            }
625        }
626        #[cfg(feature = "otel")]
627        {
628            use tracing_opentelemetry::OpenTelemetrySpanExt as _;
629
630            let duration_ms = started_at.elapsed().as_millis() as i64;
631            let status = match &outcome {
632                Ok(dispatch_outcome) => match dispatch_outcome.status {
633                    DispatchStatus::Succeeded => "succeeded",
634                    DispatchStatus::Skipped => "skipped",
635                    DispatchStatus::Cancelled => "cancelled",
636                    DispatchStatus::Failed => "failed",
637                    DispatchStatus::Dlq => "dlq",
638                },
639                Err(DispatchError::Cancelled(_)) => "cancelled",
640                Err(_) => "failed",
641            };
642            span_for_otel.set_attribute("result.status", status);
643            span_for_otel.set_attribute("result.duration_ms", duration_ms);
644        }
645        outcome
646    }
647
648    async fn dispatch_with_replay_inner(
649        &self,
650        binding: &TriggerBinding,
651        event: TriggerEvent,
652        replay_of_event_id: Option<String>,
653    ) -> Result<DispatchOutcome, DispatchError> {
654        let autonomy_tier = crate::resolve_agent_autonomy_tier(
655            &self.event_log,
656            binding.id.as_str(),
657            binding.autonomy_tier,
658        )
659        .await
660        .unwrap_or(binding.autonomy_tier);
661        let binding_key = binding.binding_key();
662        let route = DispatchUri::from(&binding.handler);
663        let trigger_id = binding.id.as_str().to_string();
664        let event_id = event.id.0.clone();
665        self.state.in_flight.fetch_add(1, Ordering::Relaxed);
666        let begin = if replay_of_event_id.is_some() {
667            super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
668        } else {
669            begin_in_flight(binding.id.as_str(), binding.version)
670        };
671        begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
672
673        let mut attempts = Vec::new();
674        let mut source_node_id = format!("trigger:{}", event.id.0);
675        let mut initial_nodes = Vec::new();
676        let mut initial_edges = Vec::new();
677        if let Some(original_event_id) = replay_of_event_id.as_ref() {
678            let original_node_id = format!("trigger:{original_event_id}");
679            initial_nodes.push(RunActionGraphNodeRecord {
680                id: original_node_id.clone(),
681                label: format!(
682                    "{}:{} (original {})",
683                    event.provider.as_str(),
684                    event.kind,
685                    original_event_id
686                ),
687                kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
688                status: "historical".to_string(),
689                outcome: "replayed_from".to_string(),
690                trace_id: Some(event.trace_id.0.clone()),
691                stage_id: None,
692                node_id: None,
693                worker_id: None,
694                run_id: None,
695                run_path: None,
696            });
697            initial_edges.push(RunActionGraphEdgeRecord {
698                from_id: original_node_id,
699                to_id: source_node_id.clone(),
700                kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
701                label: Some("replay chain".to_string()),
702            });
703        }
704        initial_nodes.push(RunActionGraphNodeRecord {
705            id: source_node_id.clone(),
706            label: format!("{}:{}", event.provider.as_str(), event.kind),
707            kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
708            status: "received".to_string(),
709            outcome: "received".to_string(),
710            trace_id: Some(event.trace_id.0.clone()),
711            stage_id: None,
712            node_id: None,
713            worker_id: None,
714            run_id: None,
715            run_path: None,
716        });
717        self.emit_action_graph(
718            &event,
719            initial_nodes,
720            initial_edges,
721            serde_json::json!({
722                "source": "dispatcher",
723                "trigger_id": trigger_id,
724                "binding_key": binding_key,
725                "event_id": event_id,
726                "replay_of_event_id": replay_of_event_id,
727            }),
728        )
729        .await?;
730
731        if dispatch_cancel_requested(
732            &self.event_log,
733            &binding_key,
734            &event.id.0,
735            replay_of_event_id.as_ref(),
736        )
737        .await?
738        {
739            finish_in_flight(
740                binding.id.as_str(),
741                binding.version,
742                TriggerDispatchOutcome::Failed,
743            )
744            .await
745            .map_err(|error| DispatchError::Registry(error.to_string()))?;
746            decrement_in_flight(&self.state);
747            return Ok(cancelled_dispatch_outcome(
748                binding,
749                &route,
750                &event,
751                replay_of_event_id,
752                0,
753                "trigger cancel request cancelled dispatch before attempt 1".to_string(),
754            ));
755        }
756
757        if let Some(predicate) = binding.when.as_ref() {
758            let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
759            let evaluation = self
760                .evaluate_predicate(
761                    binding,
762                    predicate,
763                    &event,
764                    replay_of_event_id.as_ref(),
765                    autonomy_tier,
766                )
767                .await?;
768            let passed = evaluation.result;
769            self.emit_action_graph(
770                &event,
771                vec![RunActionGraphNodeRecord {
772                    id: predicate_node_id.clone(),
773                    label: predicate.raw.clone(),
774                    kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
775                    status: "completed".to_string(),
776                    outcome: passed.to_string(),
777                    trace_id: Some(event.trace_id.0.clone()),
778                    stage_id: None,
779                    node_id: None,
780                    worker_id: None,
781                    run_id: None,
782                    run_path: None,
783                }],
784                vec![RunActionGraphEdgeRecord {
785                    from_id: source_node_id.clone(),
786                    to_id: predicate_node_id.clone(),
787                    kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
788                    label: None,
789                }],
790                serde_json::json!({
791                    "source": "dispatcher",
792                    "trigger_id": binding.id.as_str(),
793                    "binding_key": binding.binding_key(),
794                    "event_id": event.id.0,
795                    "predicate": predicate.raw,
796                    "reason": evaluation.reason,
797                    "cached": evaluation.cached,
798                    "cost_usd": evaluation.cost_usd,
799                    "tokens": evaluation.tokens,
800                    "latency_ms": evaluation.latency_ms,
801                    "replay_of_event_id": replay_of_event_id,
802                }),
803            )
804            .await?;
805
806            if !passed {
807                finish_in_flight(
808                    binding.id.as_str(),
809                    binding.version,
810                    TriggerDispatchOutcome::Dispatched,
811                )
812                .await
813                .map_err(|error| DispatchError::Registry(error.to_string()))?;
814                decrement_in_flight(&self.state);
815                self.append_dispatch_trust_record(
816                    binding,
817                    &route,
818                    &event,
819                    replay_of_event_id.as_ref(),
820                    autonomy_tier,
821                    TrustOutcome::Denied,
822                    "skipped",
823                    0,
824                    None,
825                )
826                .await?;
827                return Ok(DispatchOutcome {
828                    trigger_id: binding.id.as_str().to_string(),
829                    binding_key: binding.binding_key(),
830                    event_id: event.id.0,
831                    attempt_count: 0,
832                    status: DispatchStatus::Skipped,
833                    handler_kind: route.kind().to_string(),
834                    target_uri: route.target_uri(),
835                    replay_of_event_id,
836                    result: Some(serde_json::json!({
837                        "skipped": true,
838                        "predicate": predicate.raw,
839                        "reason": evaluation.reason,
840                    })),
841                    error: None,
842                });
843            }
844
845            source_node_id = predicate_node_id;
846        }
847
848        let (event, mut acquired_flow) = match self
849            .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
850            .await?
851        {
852            FlowControlOutcome::Dispatch { event, acquired } => (*event, acquired),
853            FlowControlOutcome::Skip { reason } => {
854                finish_in_flight(
855                    binding.id.as_str(),
856                    binding.version,
857                    TriggerDispatchOutcome::Dispatched,
858                )
859                .await
860                .map_err(|error| DispatchError::Registry(error.to_string()))?;
861                decrement_in_flight(&self.state);
862                return Ok(DispatchOutcome {
863                    trigger_id: binding.id.as_str().to_string(),
864                    binding_key: binding.binding_key(),
865                    event_id: event.id.0,
866                    attempt_count: 0,
867                    status: DispatchStatus::Skipped,
868                    handler_kind: route.kind().to_string(),
869                    target_uri: route.target_uri(),
870                    replay_of_event_id,
871                    result: Some(serde_json::json!({
872                        "skipped": true,
873                        "flow_control": reason,
874                    })),
875                    error: None,
876                });
877            }
878        };
879
880        let mut previous_retry_node = None;
881        let max_attempts = binding.retry.max_attempts();
882        for attempt in 1..=max_attempts {
883            if dispatch_cancel_requested(
884                &self.event_log,
885                &binding_key,
886                &event.id.0,
887                replay_of_event_id.as_ref(),
888            )
889            .await?
890            {
891                finish_in_flight(
892                    binding.id.as_str(),
893                    binding.version,
894                    TriggerDispatchOutcome::Failed,
895                )
896                .await
897                .map_err(|error| DispatchError::Registry(error.to_string()))?;
898                decrement_in_flight(&self.state);
899                return Ok(cancelled_dispatch_outcome(
900                    binding,
901                    &route,
902                    &event,
903                    replay_of_event_id,
904                    attempt.saturating_sub(1),
905                    format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
906                ));
907            }
908            maybe_fail_before_outbox();
909            let started_at = now_rfc3339();
910            let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
911            self.append_lifecycle_event(
912                "DispatchStarted",
913                &event,
914                binding,
915                serde_json::json!({
916                    "event_id": event.id.0,
917                    "attempt": attempt,
918                    "handler_kind": route.kind(),
919                    "target_uri": route.target_uri(),
920                    "replay_of_event_id": replay_of_event_id,
921                }),
922                replay_of_event_id.as_ref(),
923            )
924            .await?;
925            self.append_topic_event(
926                TRIGGER_OUTBOX_TOPIC,
927                "dispatch_started",
928                &event,
929                Some(binding),
930                Some(attempt),
931                serde_json::json!({
932                    "event_id": event.id.0,
933                    "attempt": attempt,
934                    "trigger_id": binding.id.as_str(),
935                    "binding_key": binding.binding_key(),
936                    "handler_kind": route.kind(),
937                    "target_uri": route.target_uri(),
938                    "replay_of_event_id": replay_of_event_id,
939                }),
940                replay_of_event_id.as_ref(),
941            )
942            .await?;
943
944            let mut dispatch_edges = Vec::new();
945            if attempt == 1 {
946                dispatch_edges.push(RunActionGraphEdgeRecord {
947                    from_id: source_node_id.clone(),
948                    to_id: attempt_node_id.clone(),
949                    kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
950                    label: binding.when.as_ref().map(|_| "true".to_string()),
951                });
952            } else if let Some(retry_node_id) = previous_retry_node.take() {
953                dispatch_edges.push(RunActionGraphEdgeRecord {
954                    from_id: retry_node_id,
955                    to_id: attempt_node_id.clone(),
956                    kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
957                    label: Some(format!("attempt {attempt}")),
958                });
959            }
960
961            self.emit_action_graph(
962                &event,
963                vec![RunActionGraphNodeRecord {
964                    id: attempt_node_id.clone(),
965                    label: dispatch_node_label(&route),
966                    kind: dispatch_node_kind(&route).to_string(),
967                    status: "running".to_string(),
968                    outcome: format!("attempt_{attempt}"),
969                    trace_id: Some(event.trace_id.0.clone()),
970                    stage_id: None,
971                    node_id: None,
972                    worker_id: None,
973                    run_id: None,
974                    run_path: None,
975                }],
976                dispatch_edges,
977                serde_json::json!({
978                    "source": "dispatcher",
979                    "trigger_id": binding.id.as_str(),
980                    "binding_key": binding.binding_key(),
981                    "event_id": event.id.0,
982                    "attempt": attempt,
983                    "handler_kind": route.kind(),
984                    "target_uri": route.target_uri(),
985                    "target_agent": dispatch_target_agent(&route),
986                    "replay_of_event_id": replay_of_event_id,
987                }),
988            )
989            .await?;
990
991            let result = self
992                .dispatch_once(
993                    binding,
994                    &route,
995                    &event,
996                    autonomy_tier,
997                    &mut self.cancel_tx.subscribe(),
998                )
999                .await;
1000            let completed_at = now_rfc3339();
1001
1002            match result {
1003                Ok(result) => {
1004                    let attempt_record = DispatchAttemptRecord {
1005                        trigger_id: binding.id.as_str().to_string(),
1006                        binding_key: binding.binding_key(),
1007                        event_id: event.id.0.clone(),
1008                        attempt,
1009                        handler_kind: route.kind().to_string(),
1010                        started_at,
1011                        completed_at,
1012                        outcome: "success".to_string(),
1013                        error_msg: None,
1014                    };
1015                    attempts.push(attempt_record.clone());
1016                    self.append_attempt_record(
1017                        &event,
1018                        binding,
1019                        &attempt_record,
1020                        replay_of_event_id.as_ref(),
1021                    )
1022                    .await?;
1023                    self.append_lifecycle_event(
1024                        "DispatchSucceeded",
1025                        &event,
1026                        binding,
1027                        serde_json::json!({
1028                            "event_id": event.id.0,
1029                            "attempt": attempt,
1030                            "handler_kind": route.kind(),
1031                            "target_uri": route.target_uri(),
1032                            "result": result,
1033                            "replay_of_event_id": replay_of_event_id,
1034                        }),
1035                        replay_of_event_id.as_ref(),
1036                    )
1037                    .await?;
1038                    self.append_topic_event(
1039                        TRIGGER_OUTBOX_TOPIC,
1040                        "dispatch_succeeded",
1041                        &event,
1042                        Some(binding),
1043                        Some(attempt),
1044                        serde_json::json!({
1045                            "event_id": event.id.0,
1046                            "attempt": attempt,
1047                            "trigger_id": binding.id.as_str(),
1048                            "binding_key": binding.binding_key(),
1049                            "handler_kind": route.kind(),
1050                            "target_uri": route.target_uri(),
1051                            "result": result,
1052                            "replay_of_event_id": replay_of_event_id,
1053                        }),
1054                        replay_of_event_id.as_ref(),
1055                    )
1056                    .await?;
1057                    finish_in_flight(
1058                        binding.id.as_str(),
1059                        binding.version,
1060                        TriggerDispatchOutcome::Dispatched,
1061                    )
1062                    .await
1063                    .map_err(|error| DispatchError::Registry(error.to_string()))?;
1064                    self.release_flow_control(&mut acquired_flow).await?;
1065                    decrement_in_flight(&self.state);
1066                    self.append_dispatch_trust_record(
1067                        binding,
1068                        &route,
1069                        &event,
1070                        replay_of_event_id.as_ref(),
1071                        autonomy_tier,
1072                        TrustOutcome::Success,
1073                        "succeeded",
1074                        attempt,
1075                        None,
1076                    )
1077                    .await?;
1078                    return Ok(DispatchOutcome {
1079                        trigger_id: binding.id.as_str().to_string(),
1080                        binding_key: binding.binding_key(),
1081                        event_id: event.id.0,
1082                        attempt_count: attempt,
1083                        status: DispatchStatus::Succeeded,
1084                        handler_kind: route.kind().to_string(),
1085                        target_uri: route.target_uri(),
1086                        replay_of_event_id,
1087                        result: Some(result),
1088                        error: None,
1089                    });
1090                }
1091                Err(error) => {
1092                    let attempt_record = DispatchAttemptRecord {
1093                        trigger_id: binding.id.as_str().to_string(),
1094                        binding_key: binding.binding_key(),
1095                        event_id: event.id.0.clone(),
1096                        attempt,
1097                        handler_kind: route.kind().to_string(),
1098                        started_at,
1099                        completed_at,
1100                        outcome: dispatch_error_label(&error).to_string(),
1101                        error_msg: Some(error.to_string()),
1102                    };
1103                    attempts.push(attempt_record.clone());
1104                    self.append_attempt_record(
1105                        &event,
1106                        binding,
1107                        &attempt_record,
1108                        replay_of_event_id.as_ref(),
1109                    )
1110                    .await?;
1111                    self.append_lifecycle_event(
1112                        "DispatchFailed",
1113                        &event,
1114                        binding,
1115                        serde_json::json!({
1116                            "event_id": event.id.0,
1117                            "attempt": attempt,
1118                            "handler_kind": route.kind(),
1119                            "target_uri": route.target_uri(),
1120                            "error": error.to_string(),
1121                            "replay_of_event_id": replay_of_event_id,
1122                        }),
1123                        replay_of_event_id.as_ref(),
1124                    )
1125                    .await?;
1126                    self.append_topic_event(
1127                        TRIGGER_OUTBOX_TOPIC,
1128                        "dispatch_failed",
1129                        &event,
1130                        Some(binding),
1131                        Some(attempt),
1132                        serde_json::json!({
1133                            "event_id": event.id.0,
1134                            "attempt": attempt,
1135                            "trigger_id": binding.id.as_str(),
1136                            "binding_key": binding.binding_key(),
1137                            "handler_kind": route.kind(),
1138                            "target_uri": route.target_uri(),
1139                            "error": error.to_string(),
1140                            "replay_of_event_id": replay_of_event_id,
1141                        }),
1142                        replay_of_event_id.as_ref(),
1143                    )
1144                    .await?;
1145
1146                    if !error.retryable() {
1147                        finish_in_flight(
1148                            binding.id.as_str(),
1149                            binding.version,
1150                            TriggerDispatchOutcome::Failed,
1151                        )
1152                        .await
1153                        .map_err(|registry_error| {
1154                            DispatchError::Registry(registry_error.to_string())
1155                        })?;
1156                        self.release_flow_control(&mut acquired_flow).await?;
1157                        decrement_in_flight(&self.state);
1158                        let trust_outcome = match error {
1159                            DispatchError::Denied(_) => TrustOutcome::Denied,
1160                            DispatchError::Timeout(_) => TrustOutcome::Timeout,
1161                            _ => TrustOutcome::Failure,
1162                        };
1163                        let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1164                            "cancelled"
1165                        } else {
1166                            "failed"
1167                        };
1168                        self.append_dispatch_trust_record(
1169                            binding,
1170                            &route,
1171                            &event,
1172                            replay_of_event_id.as_ref(),
1173                            autonomy_tier,
1174                            trust_outcome,
1175                            terminal_status,
1176                            attempt,
1177                            Some(error.to_string()),
1178                        )
1179                        .await?;
1180                        return Ok(DispatchOutcome {
1181                            trigger_id: binding.id.as_str().to_string(),
1182                            binding_key: binding.binding_key(),
1183                            event_id: event.id.0,
1184                            attempt_count: attempt,
1185                            status: if matches!(error, DispatchError::Cancelled(_)) {
1186                                DispatchStatus::Cancelled
1187                            } else {
1188                                DispatchStatus::Failed
1189                            },
1190                            handler_kind: route.kind().to_string(),
1191                            target_uri: route.target_uri(),
1192                            replay_of_event_id,
1193                            result: None,
1194                            error: Some(error.to_string()),
1195                        });
1196                    }
1197
1198                    if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1199                        let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1200                        previous_retry_node = Some(retry_node_id.clone());
1201                        self.emit_action_graph(
1202                            &event,
1203                            vec![RunActionGraphNodeRecord {
1204                                id: retry_node_id.clone(),
1205                                label: format!("retry in {}ms", delay.as_millis()),
1206                                kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1207                                status: "scheduled".to_string(),
1208                                outcome: format!("attempt_{}", attempt + 1),
1209                                trace_id: Some(event.trace_id.0.clone()),
1210                                stage_id: None,
1211                                node_id: None,
1212                                worker_id: None,
1213                                run_id: None,
1214                                run_path: None,
1215                            }],
1216                            vec![RunActionGraphEdgeRecord {
1217                                from_id: attempt_node_id,
1218                                to_id: retry_node_id.clone(),
1219                                kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1220                                label: Some(format!("attempt {}", attempt + 1)),
1221                            }],
1222                            serde_json::json!({
1223                                "source": "dispatcher",
1224                                "trigger_id": binding.id.as_str(),
1225                                "binding_key": binding.binding_key(),
1226                                "event_id": event.id.0,
1227                                "attempt": attempt + 1,
1228                                "delay_ms": delay.as_millis(),
1229                                "replay_of_event_id": replay_of_event_id,
1230                            }),
1231                        )
1232                        .await?;
1233                        self.append_lifecycle_event(
1234                            "RetryScheduled",
1235                            &event,
1236                            binding,
1237                            serde_json::json!({
1238                                "event_id": event.id.0,
1239                                "attempt": attempt + 1,
1240                                "delay_ms": delay.as_millis(),
1241                                "error": error.to_string(),
1242                                "replay_of_event_id": replay_of_event_id,
1243                            }),
1244                            replay_of_event_id.as_ref(),
1245                        )
1246                        .await?;
1247                        self.append_topic_event(
1248                            TRIGGER_ATTEMPTS_TOPIC,
1249                            "retry_scheduled",
1250                            &event,
1251                            Some(binding),
1252                            Some(attempt + 1),
1253                            serde_json::json!({
1254                                "event_id": event.id.0,
1255                                "attempt": attempt + 1,
1256                                "trigger_id": binding.id.as_str(),
1257                                "binding_key": binding.binding_key(),
1258                                "delay_ms": delay.as_millis(),
1259                                "error": error.to_string(),
1260                                "replay_of_event_id": replay_of_event_id,
1261                            }),
1262                            replay_of_event_id.as_ref(),
1263                        )
1264                        .await?;
1265                        self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1266                        let sleep_result = sleep_or_cancel_or_request(
1267                            &self.event_log,
1268                            delay,
1269                            &binding_key,
1270                            &event.id.0,
1271                            replay_of_event_id.as_ref(),
1272                            &mut self.cancel_tx.subscribe(),
1273                        )
1274                        .await;
1275                        decrement_retry_queue_depth(&self.state);
1276                        if sleep_result.is_err() {
1277                            finish_in_flight(
1278                                binding.id.as_str(),
1279                                binding.version,
1280                                TriggerDispatchOutcome::Failed,
1281                            )
1282                            .await
1283                            .map_err(|registry_error| {
1284                                DispatchError::Registry(registry_error.to_string())
1285                            })?;
1286                            self.release_flow_control(&mut acquired_flow).await?;
1287                            decrement_in_flight(&self.state);
1288                            self.append_dispatch_trust_record(
1289                                binding,
1290                                &route,
1291                                &event,
1292                                replay_of_event_id.as_ref(),
1293                                autonomy_tier,
1294                                TrustOutcome::Failure,
1295                                "cancelled",
1296                                attempt,
1297                                Some("dispatcher shutdown cancelled retry wait".to_string()),
1298                            )
1299                            .await?;
1300                            return Ok(DispatchOutcome {
1301                                trigger_id: binding.id.as_str().to_string(),
1302                                binding_key: binding.binding_key(),
1303                                event_id: event.id.0,
1304                                attempt_count: attempt,
1305                                status: DispatchStatus::Cancelled,
1306                                handler_kind: route.kind().to_string(),
1307                                target_uri: route.target_uri(),
1308                                replay_of_event_id,
1309                                result: None,
1310                                error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1311                            });
1312                        }
1313                        continue;
1314                    }
1315
1316                    let final_error = error.to_string();
1317                    let dlq_entry = DlqEntry {
1318                        trigger_id: binding.id.as_str().to_string(),
1319                        binding_key: binding.binding_key(),
1320                        event: event.clone(),
1321                        attempt_count: attempt,
1322                        final_error: final_error.clone(),
1323                        attempts: attempts.clone(),
1324                    };
1325                    self.state
1326                        .dlq
1327                        .lock()
1328                        .expect("dispatcher dlq poisoned")
1329                        .push(dlq_entry.clone());
1330                    self.emit_action_graph(
1331                        &event,
1332                        vec![RunActionGraphNodeRecord {
1333                            id: format!("dlq:{binding_key}:{}", event.id.0),
1334                            label: binding.id.as_str().to_string(),
1335                            kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1336                            status: "queued".to_string(),
1337                            outcome: "retry_exhausted".to_string(),
1338                            trace_id: Some(event.trace_id.0.clone()),
1339                            stage_id: None,
1340                            node_id: None,
1341                            worker_id: None,
1342                            run_id: None,
1343                            run_path: None,
1344                        }],
1345                        vec![RunActionGraphEdgeRecord {
1346                            from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1347                            to_id: format!("dlq:{binding_key}:{}", event.id.0),
1348                            kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1349                            label: Some(format!("{attempt} attempts")),
1350                        }],
1351                        serde_json::json!({
1352                            "source": "dispatcher",
1353                            "trigger_id": binding.id.as_str(),
1354                            "binding_key": binding.binding_key(),
1355                            "event_id": event.id.0,
1356                            "attempt_count": attempt,
1357                            "final_error": final_error,
1358                            "replay_of_event_id": replay_of_event_id,
1359                        }),
1360                    )
1361                    .await?;
1362                    self.append_lifecycle_event(
1363                        "DlqMoved",
1364                        &event,
1365                        binding,
1366                        serde_json::json!({
1367                            "event_id": event.id.0,
1368                            "attempt_count": attempt,
1369                            "final_error": dlq_entry.final_error,
1370                            "replay_of_event_id": replay_of_event_id,
1371                        }),
1372                        replay_of_event_id.as_ref(),
1373                    )
1374                    .await?;
1375                    self.append_topic_event(
1376                        TRIGGER_DLQ_TOPIC,
1377                        "dlq_moved",
1378                        &event,
1379                        Some(binding),
1380                        Some(attempt),
1381                        serde_json::to_value(&dlq_entry)
1382                            .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1383                        replay_of_event_id.as_ref(),
1384                    )
1385                    .await?;
1386                    finish_in_flight(
1387                        binding.id.as_str(),
1388                        binding.version,
1389                        TriggerDispatchOutcome::Dlq,
1390                    )
1391                    .await
1392                    .map_err(|registry_error| {
1393                        DispatchError::Registry(registry_error.to_string())
1394                    })?;
1395                    self.release_flow_control(&mut acquired_flow).await?;
1396                    decrement_in_flight(&self.state);
1397                    self.append_dispatch_trust_record(
1398                        binding,
1399                        &route,
1400                        &event,
1401                        replay_of_event_id.as_ref(),
1402                        autonomy_tier,
1403                        TrustOutcome::Failure,
1404                        "dlq",
1405                        attempt,
1406                        Some(error.to_string()),
1407                    )
1408                    .await?;
1409                    return Ok(DispatchOutcome {
1410                        trigger_id: binding.id.as_str().to_string(),
1411                        binding_key: binding.binding_key(),
1412                        event_id: event.id.0,
1413                        attempt_count: attempt,
1414                        status: DispatchStatus::Dlq,
1415                        handler_kind: route.kind().to_string(),
1416                        target_uri: route.target_uri(),
1417                        replay_of_event_id,
1418                        result: None,
1419                        error: Some(error.to_string()),
1420                    });
1421                }
1422            }
1423        }
1424
1425        finish_in_flight(
1426            binding.id.as_str(),
1427            binding.version,
1428            TriggerDispatchOutcome::Failed,
1429        )
1430        .await
1431        .map_err(|error| DispatchError::Registry(error.to_string()))?;
1432        self.release_flow_control(&mut acquired_flow).await?;
1433        decrement_in_flight(&self.state);
1434        self.append_dispatch_trust_record(
1435            binding,
1436            &route,
1437            &event,
1438            replay_of_event_id.as_ref(),
1439            autonomy_tier,
1440            TrustOutcome::Failure,
1441            "failed",
1442            max_attempts,
1443            Some("dispatch exhausted without terminal outcome".to_string()),
1444        )
1445        .await?;
1446        Ok(DispatchOutcome {
1447            trigger_id: binding.id.as_str().to_string(),
1448            binding_key: binding.binding_key(),
1449            event_id: event.id.0,
1450            attempt_count: max_attempts,
1451            status: DispatchStatus::Failed,
1452            handler_kind: route.kind().to_string(),
1453            target_uri: route.target_uri(),
1454            replay_of_event_id,
1455            result: None,
1456            error: Some("dispatch exhausted without terminal outcome".to_string()),
1457        })
1458    }
1459
1460    async fn dispatch_once(
1461        &self,
1462        binding: &TriggerBinding,
1463        route: &DispatchUri,
1464        event: &TriggerEvent,
1465        autonomy_tier: AutonomyTier,
1466        cancel_rx: &mut broadcast::Receiver<()>,
1467    ) -> Result<serde_json::Value, DispatchError> {
1468        match route {
1469            DispatchUri::Local { .. } => {
1470                let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1471                    return Err(DispatchError::Local(format!(
1472                        "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1473                        binding.id.as_str()
1474                    )));
1475                };
1476                let value = self
1477                    .invoke_vm_callable(
1478                        closure,
1479                        &binding.binding_key(),
1480                        event,
1481                        None,
1482                        binding.id.as_str(),
1483                        &format!("{}.{}", event.provider.as_str(), event.kind),
1484                        autonomy_tier,
1485                        cancel_rx,
1486                    )
1487                    .await?;
1488                Ok(vm_value_to_json(&value))
1489            }
1490            DispatchUri::A2a {
1491                target,
1492                allow_cleartext,
1493            } => {
1494                if self.state.shutting_down.load(Ordering::SeqCst) {
1495                    return Err(DispatchError::Cancelled(
1496                        "dispatcher shutdown cancelled A2A dispatch".to_string(),
1497                    ));
1498                }
1499                let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1500                    target,
1501                    *allow_cleartext,
1502                    binding.id.as_str(),
1503                    &binding.binding_key(),
1504                    event,
1505                    cancel_rx,
1506                )
1507                .await
1508                .map_err(|error| match error {
1509                    crate::a2a::A2aClientError::Cancelled(message) => {
1510                        DispatchError::Cancelled(message)
1511                    }
1512                    other => DispatchError::A2a(other.to_string()),
1513                })?;
1514                match ack {
1515                    crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1516                    crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1517                }
1518            }
1519            DispatchUri::Worker { queue } => {
1520                let receipt = crate::WorkerQueue::new(self.event_log.clone())
1521                    .enqueue(&crate::WorkerQueueJob {
1522                        queue: queue.clone(),
1523                        trigger_id: binding.id.as_str().to_string(),
1524                        binding_key: binding.binding_key(),
1525                        binding_version: binding.version,
1526                        event: event.clone(),
1527                        replay_of_event_id: current_dispatch_context()
1528                            .and_then(|context| context.replay_of_event_id),
1529                        priority: worker_queue_priority(binding, event),
1530                    })
1531                    .await
1532                    .map_err(DispatchError::from)?;
1533                Ok(serde_json::to_value(receipt)
1534                    .map_err(|error| DispatchError::Serde(error.to_string()))?)
1535            }
1536        }
1537    }
1538
1539    async fn apply_flow_control(
1540        &self,
1541        binding: &TriggerBinding,
1542        event: &TriggerEvent,
1543        replay_of_event_id: Option<&String>,
1544    ) -> Result<FlowControlOutcome, DispatchError> {
1545        let flow = &binding.flow_control;
1546        let mut managed_event = event.clone();
1547
1548        if let Some(batch) = &flow.batch {
1549            let gate = self
1550                .resolve_flow_gate(
1551                    &binding.binding_key(),
1552                    batch.key.as_ref(),
1553                    &managed_event,
1554                    replay_of_event_id,
1555                )
1556                .await?;
1557            match self
1558                .state
1559                .flow_control
1560                .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1561                .await
1562                .map_err(DispatchError::from)?
1563            {
1564                BatchDecision::Dispatch(events) => {
1565                    managed_event = build_batched_event(events)?;
1566                }
1567                BatchDecision::Merged => {
1568                    return Ok(FlowControlOutcome::Skip {
1569                        reason: "batch_merged".to_string(),
1570                    })
1571                }
1572            }
1573        }
1574
1575        if let Some(debounce) = &flow.debounce {
1576            let gate = self
1577                .resolve_flow_gate(
1578                    &binding.binding_key(),
1579                    Some(&debounce.key),
1580                    &managed_event,
1581                    replay_of_event_id,
1582                )
1583                .await?;
1584            let latest = self
1585                .state
1586                .flow_control
1587                .debounce(&gate, debounce.period)
1588                .await
1589                .map_err(DispatchError::from)?;
1590            if !latest {
1591                return Ok(FlowControlOutcome::Skip {
1592                    reason: "debounced".to_string(),
1593                });
1594            }
1595        }
1596
1597        if let Some(rate_limit) = &flow.rate_limit {
1598            let gate = self
1599                .resolve_flow_gate(
1600                    &binding.binding_key(),
1601                    rate_limit.key.as_ref(),
1602                    &managed_event,
1603                    replay_of_event_id,
1604                )
1605                .await?;
1606            let allowed = self
1607                .state
1608                .flow_control
1609                .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1610                .await
1611                .map_err(DispatchError::from)?;
1612            if !allowed {
1613                return Ok(FlowControlOutcome::Skip {
1614                    reason: "rate_limited".to_string(),
1615                });
1616            }
1617        }
1618
1619        if let Some(throttle) = &flow.throttle {
1620            let gate = self
1621                .resolve_flow_gate(
1622                    &binding.binding_key(),
1623                    throttle.key.as_ref(),
1624                    &managed_event,
1625                    replay_of_event_id,
1626                )
1627                .await?;
1628            self.state
1629                .flow_control
1630                .wait_for_throttle(&gate, throttle.period, throttle.max)
1631                .await
1632                .map_err(DispatchError::from)?;
1633        }
1634
1635        let mut acquired = AcquiredFlowControl::default();
1636        if let Some(singleton) = &flow.singleton {
1637            let gate = self
1638                .resolve_flow_gate(
1639                    &binding.binding_key(),
1640                    singleton.key.as_ref(),
1641                    &managed_event,
1642                    replay_of_event_id,
1643                )
1644                .await?;
1645            let acquired_singleton = self
1646                .state
1647                .flow_control
1648                .try_acquire_singleton(&gate)
1649                .await
1650                .map_err(DispatchError::from)?;
1651            if !acquired_singleton {
1652                return Ok(FlowControlOutcome::Skip {
1653                    reason: "singleton_active".to_string(),
1654                });
1655            }
1656            acquired.singleton_gate = Some(gate);
1657        }
1658
1659        if let Some(concurrency) = &flow.concurrency {
1660            let gate = self
1661                .resolve_flow_gate(
1662                    &binding.binding_key(),
1663                    concurrency.key.as_ref(),
1664                    &managed_event,
1665                    replay_of_event_id,
1666                )
1667                .await?;
1668            let priority_rank = self
1669                .resolve_priority_rank(
1670                    &binding.binding_key(),
1671                    flow.priority.as_ref(),
1672                    &managed_event,
1673                    replay_of_event_id,
1674                )
1675                .await?;
1676            acquired.concurrency = Some(
1677                self.state
1678                    .flow_control
1679                    .acquire_concurrency(&gate, concurrency.max, priority_rank)
1680                    .await
1681                    .map_err(DispatchError::from)?,
1682            );
1683        }
1684
1685        Ok(FlowControlOutcome::Dispatch {
1686            event: Box::new(managed_event),
1687            acquired,
1688        })
1689    }
1690
1691    async fn release_flow_control(
1692        &self,
1693        acquired: &mut AcquiredFlowControl,
1694    ) -> Result<(), DispatchError> {
1695        if let Some(gate) = acquired.singleton_gate.take() {
1696            self.state
1697                .flow_control
1698                .release_singleton(&gate)
1699                .await
1700                .map_err(DispatchError::from)?;
1701        }
1702        if let Some(permit) = acquired.concurrency.take() {
1703            self.state
1704                .flow_control
1705                .release_concurrency(permit)
1706                .await
1707                .map_err(DispatchError::from)?;
1708        }
1709        Ok(())
1710    }
1711
1712    async fn resolve_flow_gate(
1713        &self,
1714        binding_key: &str,
1715        expr: Option<&crate::triggers::TriggerExpressionSpec>,
1716        event: &TriggerEvent,
1717        replay_of_event_id: Option<&String>,
1718    ) -> Result<String, DispatchError> {
1719        let key = match expr {
1720            Some(expr) => {
1721                self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
1722                    .await?
1723            }
1724            None => "_global".to_string(),
1725        };
1726        Ok(format!("{binding_key}:{key}"))
1727    }
1728
1729    async fn resolve_priority_rank(
1730        &self,
1731        binding_key: &str,
1732        priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
1733        event: &TriggerEvent,
1734        replay_of_event_id: Option<&String>,
1735    ) -> Result<usize, DispatchError> {
1736        let Some(priority) = priority else {
1737            return Ok(0);
1738        };
1739        let value = self
1740            .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
1741            .await?;
1742        Ok(priority
1743            .order
1744            .iter()
1745            .position(|candidate| candidate == &value)
1746            .unwrap_or(priority.order.len()))
1747    }
1748
1749    async fn evaluate_flow_expression(
1750        &self,
1751        binding_key: &str,
1752        expr: &crate::triggers::TriggerExpressionSpec,
1753        event: &TriggerEvent,
1754        replay_of_event_id: Option<&String>,
1755    ) -> Result<String, DispatchError> {
1756        let value = self
1757            .invoke_vm_callable(
1758                &expr.closure,
1759                binding_key,
1760                event,
1761                replay_of_event_id,
1762                "",
1763                "flow_control",
1764                AutonomyTier::Suggest,
1765                &mut self.cancel_tx.subscribe(),
1766            )
1767            .await?;
1768        Ok(json_value_to_gate(&vm_value_to_json(&value)))
1769    }
1770
1771    #[allow(clippy::too_many_arguments)]
1772    async fn invoke_vm_callable(
1773        &self,
1774        closure: &crate::value::VmClosure,
1775        binding_key: &str,
1776        event: &TriggerEvent,
1777        replay_of_event_id: Option<&String>,
1778        agent_id: &str,
1779        action: &str,
1780        autonomy_tier: AutonomyTier,
1781        cancel_rx: &mut broadcast::Receiver<()>,
1782    ) -> Result<VmValue, DispatchError> {
1783        let mut vm = self.base_vm.child_vm();
1784        let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
1785        if self.state.shutting_down.load(Ordering::SeqCst) {
1786            cancel_token.store(true, Ordering::SeqCst);
1787        }
1788        self.state
1789            .cancel_tokens
1790            .lock()
1791            .expect("dispatcher cancel tokens poisoned")
1792            .push(cancel_token.clone());
1793        vm.install_cancel_token(cancel_token.clone());
1794        let arg = event_to_handler_value(event)?;
1795        let args = [arg];
1796        let future = vm.call_closure_pub(closure, &args, &[]);
1797        pin_mut!(future);
1798        let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1799            slot.borrow_mut().replace(DispatchContext {
1800                trigger_event: event.clone(),
1801                replay_of_event_id: replay_of_event_id.cloned(),
1802                agent_id: agent_id.to_string(),
1803                action: action.to_string(),
1804                autonomy_tier,
1805            })
1806        });
1807        let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
1808        crate::stdlib::hitl::reset_hitl_state();
1809        let mut poll = tokio::time::interval(Duration::from_millis(100));
1810        let result = loop {
1811            tokio::select! {
1812                result = &mut future => break result,
1813                _ = recv_cancel(cancel_rx) => {
1814                    cancel_token.store(true, Ordering::SeqCst);
1815                }
1816                _ = poll.tick() => {
1817                    if dispatch_cancel_requested(
1818                        &self.event_log,
1819                        binding_key,
1820                        &event.id.0,
1821                        replay_of_event_id,
1822                    )
1823                    .await? {
1824                        cancel_token.store(true, Ordering::SeqCst);
1825                    }
1826                }
1827            }
1828        };
1829        ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1830            *slot.borrow_mut() = prior_context;
1831        });
1832        crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
1833        {
1834            let mut tokens = self
1835                .state
1836                .cancel_tokens
1837                .lock()
1838                .expect("dispatcher cancel tokens poisoned");
1839            tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
1840        }
1841
1842        if cancel_token.load(Ordering::SeqCst) {
1843            if dispatch_cancel_requested(
1844                &self.event_log,
1845                binding_key,
1846                &event.id.0,
1847                replay_of_event_id,
1848            )
1849            .await?
1850            {
1851                Err(DispatchError::Cancelled(
1852                    "trigger cancel request cancelled local handler".to_string(),
1853                ))
1854            } else {
1855                Err(DispatchError::Cancelled(
1856                    "dispatcher shutdown cancelled local handler".to_string(),
1857                ))
1858            }
1859        } else {
1860            result.map_err(dispatch_error_from_vm_error)
1861        }
1862    }
1863
1864    async fn evaluate_predicate(
1865        &self,
1866        binding: &TriggerBinding,
1867        predicate: &super::registry::TriggerPredicateSpec,
1868        event: &TriggerEvent,
1869        replay_of_event_id: Option<&String>,
1870        autonomy_tier: AutonomyTier,
1871    ) -> Result<PredicateEvaluationRecord, DispatchError> {
1872        let event_id = event.id.0.clone();
1873        let trigger_id = binding.id.as_str().to_string();
1874        let now_ms = now_unix_ms();
1875        let today = utc_day_key();
1876
1877        let breaker_open_until = {
1878            let mut state = binding
1879                .predicate_state
1880                .lock()
1881                .expect("trigger predicate state poisoned");
1882            if state.budget_day_utc != Some(today) {
1883                state.budget_day_utc = Some(today);
1884                binding
1885                    .metrics
1886                    .cost_today_usd_micros
1887                    .store(0, Ordering::Relaxed);
1888            }
1889            if state
1890                .breaker_open_until_ms
1891                .is_some_and(|until_ms| until_ms > now_ms)
1892            {
1893                state.breaker_open_until_ms
1894            } else {
1895                None
1896            }
1897        };
1898
1899        if breaker_open_until.is_some() {
1900            let mut metadata = BTreeMap::new();
1901            metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
1902            metadata.insert("event_id".to_string(), serde_json::json!(event_id));
1903            metadata.insert(
1904                "breaker_open_until_ms".to_string(),
1905                serde_json::json!(breaker_open_until),
1906            );
1907            crate::events::log_warn_meta(
1908                "trigger.predicate.circuit_breaker",
1909                "trigger predicate circuit breaker is open; short-circuiting to false",
1910                metadata,
1911            );
1912            let record = PredicateEvaluationRecord {
1913                result: false,
1914                reason: Some("circuit_open".to_string()),
1915                ..Default::default()
1916            };
1917            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1918                .await?;
1919            return Ok(record);
1920        }
1921
1922        if binding
1923            .daily_cost_usd
1924            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
1925        {
1926            self.append_lifecycle_event(
1927                "predicate.daily_budget_exceeded",
1928                event,
1929                binding,
1930                serde_json::json!({
1931                    "trigger_id": binding.id.as_str(),
1932                    "event_id": event.id.0,
1933                    "limit_usd": binding.daily_cost_usd,
1934                    "cost_today_usd": current_predicate_daily_cost(binding),
1935                    "replay_of_event_id": replay_of_event_id,
1936                }),
1937                replay_of_event_id,
1938            )
1939            .await?;
1940            let record = PredicateEvaluationRecord {
1941                result: false,
1942                reason: Some("daily_budget_exceeded".to_string()),
1943                ..Default::default()
1944            };
1945            self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1946                .await?;
1947            return Ok(record);
1948        }
1949
1950        let replay_cache = self
1951            .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
1952            .await?;
1953        let guard = start_predicate_evaluation(
1954            binding.when_budget.clone().unwrap_or_default(),
1955            replay_cache,
1956        );
1957        let started = std::time::Instant::now();
1958        let eval = self
1959            .invoke_vm_callable_with_timeout(
1960                &predicate.closure,
1961                &binding.binding_key(),
1962                event,
1963                replay_of_event_id,
1964                binding.id.as_str(),
1965                &format!("{}.{}", event.provider.as_str(), event.kind),
1966                autonomy_tier,
1967                &mut self.cancel_tx.subscribe(),
1968                binding
1969                    .when_budget
1970                    .as_ref()
1971                    .and_then(|budget| budget.timeout()),
1972            )
1973            .await;
1974        let capture = guard.finish();
1975        let latency_ms = started.elapsed().as_millis() as u64;
1976        if replay_of_event_id.is_none() && !capture.entries.is_empty() {
1977            self.append_predicate_cache_record(binding, event, &capture.entries)
1978                .await?;
1979        }
1980
1981        let mut record = PredicateEvaluationRecord {
1982            result: false,
1983            cost_usd: capture.total_cost_usd,
1984            tokens: capture.total_tokens,
1985            latency_ms,
1986            cached: capture.cached,
1987            reason: None,
1988        };
1989
1990        let mut count_failure = false;
1991        let mut opened_breaker = false;
1992
1993        match eval {
1994            Ok(value) => match predicate_value_as_bool(value) {
1995                Ok(result) => {
1996                    record.result = result;
1997                }
1998                Err(reason) => {
1999                    count_failure = true;
2000                    record.reason = Some(reason);
2001                }
2002            },
2003            Err(error) => {
2004                count_failure = true;
2005                record.reason = Some(error.to_string());
2006            }
2007        }
2008
2009        let cost_usd_micros = usd_to_micros(record.cost_usd);
2010        if cost_usd_micros > 0 {
2011            binding
2012                .metrics
2013                .cost_total_usd_micros
2014                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2015            binding
2016                .metrics
2017                .cost_today_usd_micros
2018                .fetch_add(cost_usd_micros, Ordering::Relaxed);
2019        }
2020
2021        let timed_out = matches!(
2022            record.reason.as_deref(),
2023            Some("predicate evaluation timed out")
2024        );
2025        if capture.budget_exceeded || timed_out {
2026            record.result = false;
2027            record.reason = Some("budget_exceeded".to_string());
2028            self.append_lifecycle_event(
2029                "predicate.budget_exceeded",
2030                event,
2031                binding,
2032                serde_json::json!({
2033                    "trigger_id": binding.id.as_str(),
2034                    "event_id": event.id.0,
2035                    "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2036                    "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2037                    "cost_usd": record.cost_usd,
2038                    "tokens": record.tokens,
2039                    "replay_of_event_id": replay_of_event_id,
2040                }),
2041                replay_of_event_id,
2042            )
2043            .await?;
2044        }
2045
2046        if binding
2047            .daily_cost_usd
2048            .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2049        {
2050            record.result = false;
2051            record.reason = Some("daily_budget_exceeded".to_string());
2052            self.append_lifecycle_event(
2053                "predicate.daily_budget_exceeded",
2054                event,
2055                binding,
2056                serde_json::json!({
2057                    "trigger_id": binding.id.as_str(),
2058                    "event_id": event.id.0,
2059                    "limit_usd": binding.daily_cost_usd,
2060                    "cost_today_usd": current_predicate_daily_cost(binding),
2061                    "replay_of_event_id": replay_of_event_id,
2062                }),
2063                replay_of_event_id,
2064            )
2065            .await?;
2066        }
2067
2068        {
2069            let mut state = binding
2070                .predicate_state
2071                .lock()
2072                .expect("trigger predicate state poisoned");
2073            if state.budget_day_utc != Some(today) {
2074                state.budget_day_utc = Some(today);
2075                binding
2076                    .metrics
2077                    .cost_today_usd_micros
2078                    .store(cost_usd_micros, Ordering::Relaxed);
2079            }
2080            if count_failure {
2081                state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2082                if state.consecutive_failures >= 3 {
2083                    state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2084                    opened_breaker = true;
2085                }
2086            } else {
2087                state.consecutive_failures = 0;
2088                state.breaker_open_until_ms = None;
2089            }
2090        }
2091
2092        if opened_breaker {
2093            let mut metadata = BTreeMap::new();
2094            metadata.insert(
2095                "trigger_id".to_string(),
2096                serde_json::json!(binding.id.as_str()),
2097            );
2098            metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2099            metadata.insert("failure_count".to_string(), serde_json::json!(3));
2100            metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2101            crate::events::log_warn_meta(
2102                "trigger.predicate.circuit_breaker",
2103                "trigger predicate circuit breaker opened for 5 minutes",
2104                metadata,
2105            );
2106        }
2107
2108        self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2109            .await?;
2110        Ok(record)
2111    }
2112
2113    #[allow(clippy::too_many_arguments)]
2114    #[allow(clippy::too_many_arguments)]
2115    async fn invoke_vm_callable_with_timeout(
2116        &self,
2117        closure: &crate::value::VmClosure,
2118        binding_key: &str,
2119        event: &TriggerEvent,
2120        replay_of_event_id: Option<&String>,
2121        agent_id: &str,
2122        action: &str,
2123        autonomy_tier: AutonomyTier,
2124        cancel_rx: &mut broadcast::Receiver<()>,
2125        timeout: Option<Duration>,
2126    ) -> Result<VmValue, DispatchError> {
2127        let future = self.invoke_vm_callable(
2128            closure,
2129            binding_key,
2130            event,
2131            replay_of_event_id,
2132            agent_id,
2133            action,
2134            autonomy_tier,
2135            cancel_rx,
2136        );
2137        pin_mut!(future);
2138        if let Some(timeout) = timeout {
2139            match tokio::time::timeout(timeout, future).await {
2140                Ok(result) => result,
2141                Err(_) => Err(DispatchError::Local(
2142                    "predicate evaluation timed out".to_string(),
2143                )),
2144            }
2145        } else {
2146            future.await
2147        }
2148    }
2149
2150    async fn append_predicate_evaluated_event(
2151        &self,
2152        binding: &TriggerBinding,
2153        event: &TriggerEvent,
2154        record: &PredicateEvaluationRecord,
2155        replay_of_event_id: Option<&String>,
2156    ) -> Result<(), DispatchError> {
2157        self.append_lifecycle_event(
2158            "predicate.evaluated",
2159            event,
2160            binding,
2161            serde_json::json!({
2162                "trigger_id": binding.id.as_str(),
2163                "event_id": event.id.0,
2164                "result": record.result,
2165                "cost_usd": record.cost_usd,
2166                "tokens": record.tokens,
2167                "latency_ms": record.latency_ms,
2168                "cached": record.cached,
2169                "reason": record.reason,
2170                "replay_of_event_id": replay_of_event_id,
2171            }),
2172            replay_of_event_id,
2173        )
2174        .await
2175    }
2176
2177    async fn append_predicate_cache_record(
2178        &self,
2179        binding: &TriggerBinding,
2180        event: &TriggerEvent,
2181        entries: &[PredicateCacheEntry],
2182    ) -> Result<(), DispatchError> {
2183        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2184            .expect("static trigger inbox legacy topic name is valid");
2185        let payload = serde_json::to_value(PredicateCacheRecord {
2186            trigger_id: binding.id.as_str().to_string(),
2187            event_id: event.id.0.clone(),
2188            entries: entries.to_vec(),
2189        })
2190        .map_err(|error| DispatchError::Serde(error.to_string()))?;
2191        self.event_log
2192            .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2193            .await
2194            .map_err(DispatchError::from)
2195            .map(|_| ())
2196    }
2197
2198    async fn read_predicate_cache_record(
2199        &self,
2200        event_id: &str,
2201    ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2202        let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2203            .expect("static trigger inbox legacy topic name is valid");
2204        let records = self
2205            .event_log
2206            .read_range(&topic, None, usize::MAX)
2207            .await
2208            .map_err(DispatchError::from)?;
2209        Ok(records
2210            .into_iter()
2211            .filter(|(_, event)| event.kind == "predicate_llm_cache")
2212            .filter_map(|(_, event)| {
2213                serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2214            })
2215            .filter(|record| record.event_id == event_id)
2216            .flat_map(|record| record.entries)
2217            .collect())
2218    }
2219
2220    #[allow(clippy::too_many_arguments)]
2221    async fn append_dispatch_trust_record(
2222        &self,
2223        binding: &TriggerBinding,
2224        route: &DispatchUri,
2225        event: &TriggerEvent,
2226        replay_of_event_id: Option<&String>,
2227        autonomy_tier: AutonomyTier,
2228        outcome: TrustOutcome,
2229        terminal_status: &str,
2230        attempt_count: u32,
2231        error: Option<String>,
2232    ) -> Result<(), DispatchError> {
2233        let mut record = TrustRecord::new(
2234            binding.id.as_str().to_string(),
2235            format!("{}.{}", event.provider.as_str(), event.kind),
2236            None,
2237            outcome,
2238            event.trace_id.0.clone(),
2239            autonomy_tier,
2240        );
2241        record.metadata.insert(
2242            "binding_key".to_string(),
2243            serde_json::json!(binding.binding_key()),
2244        );
2245        record.metadata.insert(
2246            "binding_version".to_string(),
2247            serde_json::json!(binding.version),
2248        );
2249        record.metadata.insert(
2250            "provider".to_string(),
2251            serde_json::json!(event.provider.as_str()),
2252        );
2253        record
2254            .metadata
2255            .insert("event_kind".to_string(), serde_json::json!(event.kind));
2256        record
2257            .metadata
2258            .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2259        record.metadata.insert(
2260            "target_uri".to_string(),
2261            serde_json::json!(route.target_uri()),
2262        );
2263        record.metadata.insert(
2264            "terminal_status".to_string(),
2265            serde_json::json!(terminal_status),
2266        );
2267        record.metadata.insert(
2268            "attempt_count".to_string(),
2269            serde_json::json!(attempt_count),
2270        );
2271        if let Some(replay_of_event_id) = replay_of_event_id {
2272            record.metadata.insert(
2273                "replay_of_event_id".to_string(),
2274                serde_json::json!(replay_of_event_id),
2275            );
2276        }
2277        if let Some(error) = error {
2278            record
2279                .metadata
2280                .insert("error".to_string(), serde_json::json!(error));
2281        }
2282        append_trust_record(&self.event_log, &record)
2283            .await
2284            .map_err(DispatchError::from)
2285    }
2286
2287    async fn append_attempt_record(
2288        &self,
2289        event: &TriggerEvent,
2290        binding: &TriggerBinding,
2291        attempt: &DispatchAttemptRecord,
2292        replay_of_event_id: Option<&String>,
2293    ) -> Result<(), DispatchError> {
2294        self.append_topic_event(
2295            TRIGGER_ATTEMPTS_TOPIC,
2296            "attempt_recorded",
2297            event,
2298            Some(binding),
2299            Some(attempt.attempt),
2300            serde_json::to_value(attempt)
2301                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2302            replay_of_event_id,
2303        )
2304        .await
2305    }
2306
2307    async fn append_lifecycle_event(
2308        &self,
2309        kind: &str,
2310        event: &TriggerEvent,
2311        binding: &TriggerBinding,
2312        payload: serde_json::Value,
2313        replay_of_event_id: Option<&String>,
2314    ) -> Result<(), DispatchError> {
2315        self.append_topic_event(
2316            TRIGGERS_LIFECYCLE_TOPIC,
2317            kind,
2318            event,
2319            Some(binding),
2320            None,
2321            payload,
2322            replay_of_event_id,
2323        )
2324        .await
2325    }
2326
2327    async fn append_topic_event(
2328        &self,
2329        topic_name: &str,
2330        kind: &str,
2331        event: &TriggerEvent,
2332        binding: Option<&TriggerBinding>,
2333        attempt: Option<u32>,
2334        payload: serde_json::Value,
2335        replay_of_event_id: Option<&String>,
2336    ) -> Result<(), DispatchError> {
2337        let topic = Topic::new(topic_name)
2338            .expect("static trigger dispatcher topic names should always be valid");
2339        let headers = event_headers(event, binding, attempt, replay_of_event_id);
2340        self.event_log
2341            .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2342            .await
2343            .map_err(DispatchError::from)
2344            .map(|_| ())
2345    }
2346
2347    async fn emit_action_graph(
2348        &self,
2349        event: &TriggerEvent,
2350        nodes: Vec<RunActionGraphNodeRecord>,
2351        edges: Vec<RunActionGraphEdgeRecord>,
2352        extra: serde_json::Value,
2353    ) -> Result<(), DispatchError> {
2354        let mut headers = BTreeMap::new();
2355        headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2356        headers.insert("event_id".to_string(), event.id.0.clone());
2357        let observability = RunObservabilityRecord {
2358            schema_version: 1,
2359            action_graph_nodes: nodes,
2360            action_graph_edges: edges,
2361            ..Default::default()
2362        };
2363        append_action_graph_update(
2364            headers,
2365            serde_json::json!({
2366                "source": "dispatcher",
2367                "trace_id": event.trace_id.0,
2368                "event_id": event.id.0,
2369                "observability": observability,
2370                "context": extra,
2371            }),
2372        )
2373        .await
2374        .map_err(DispatchError::from)
2375    }
2376}
2377
2378async fn dispatch_cancel_requested(
2379    event_log: &Arc<AnyEventLog>,
2380    binding_key: &str,
2381    event_id: &str,
2382    replay_of_event_id: Option<&String>,
2383) -> Result<bool, DispatchError> {
2384    if replay_of_event_id.is_some() {
2385        return Ok(false);
2386    }
2387    let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2388        .expect("static trigger cancel topic should always be valid");
2389    let events = event_log.read_range(&topic, None, usize::MAX).await?;
2390    let requested = events
2391        .into_iter()
2392        .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2393        .filter_map(|(_, event)| {
2394            serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2395        })
2396        .collect::<BTreeSet<_>>();
2397    Ok(requested
2398        .iter()
2399        .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2400}
2401
2402async fn sleep_or_cancel_or_request(
2403    event_log: &Arc<AnyEventLog>,
2404    delay: Duration,
2405    binding_key: &str,
2406    event_id: &str,
2407    replay_of_event_id: Option<&String>,
2408    cancel_rx: &mut broadcast::Receiver<()>,
2409) -> Result<(), DispatchError> {
2410    let sleep = tokio::time::sleep(delay);
2411    pin_mut!(sleep);
2412    let mut poll = tokio::time::interval(Duration::from_millis(100));
2413    loop {
2414        tokio::select! {
2415            _ = &mut sleep => return Ok(()),
2416            _ = recv_cancel(cancel_rx) => {
2417                return Err(DispatchError::Cancelled(
2418                    "dispatcher shutdown cancelled retry wait".to_string(),
2419                ));
2420            }
2421            _ = poll.tick() => {
2422                if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2423                    return Err(DispatchError::Cancelled(
2424                        "trigger cancel request cancelled retry wait".to_string(),
2425                    ));
2426                }
2427            }
2428        }
2429    }
2430}
2431
2432fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2433    let mut iter = events.into_iter();
2434    let Some(mut root) = iter.next() else {
2435        return Err(DispatchError::Registry(
2436            "batch dispatch produced an empty event list".to_string(),
2437        ));
2438    };
2439    let mut batch = Vec::new();
2440    batch.push(
2441        serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2442    );
2443    for event in iter {
2444        batch.push(
2445            serde_json::to_value(&event)
2446                .map_err(|error| DispatchError::Serde(error.to_string()))?,
2447        );
2448    }
2449    root.batch = Some(batch);
2450    Ok(root)
2451}
2452
2453fn json_value_to_gate(value: &serde_json::Value) -> String {
2454    match value {
2455        serde_json::Value::Null => "null".to_string(),
2456        serde_json::Value::String(text) => text.clone(),
2457        serde_json::Value::Bool(value) => value.to_string(),
2458        serde_json::Value::Number(value) => value.to_string(),
2459        other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2460    }
2461}
2462
2463fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
2464    let json =
2465        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2466    let value = json_to_vm_value(&json);
2467    match (&event.raw_body, value) {
2468        (Some(raw_body), VmValue::Dict(dict)) => {
2469            let mut map = (*dict).clone();
2470            map.insert(
2471                "raw_body".to_string(),
2472                VmValue::Bytes(Rc::new(raw_body.clone())),
2473            );
2474            Ok(VmValue::Dict(Rc::new(map)))
2475        }
2476        (_, other) => Ok(other),
2477    }
2478}
2479
2480fn decrement_in_flight(state: &DispatcherRuntimeState) {
2481    let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2482    if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2483        state.idle_notify.notify_waiters();
2484    }
2485}
2486
2487fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2488    let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2489    if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2490        state.idle_notify.notify_waiters();
2491    }
2492}
2493
2494#[cfg(test)]
2495fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2496    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2497        *slot.borrow_mut() = Some(tx);
2498    });
2499}
2500
2501#[cfg(not(test))]
2502fn notify_test_inbox_dequeued() {}
2503
2504#[cfg(test)]
2505fn notify_test_inbox_dequeued() {
2506    TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2507        if let Some(tx) = slot.borrow_mut().take() {
2508            let _ = tx.send(());
2509        }
2510    });
2511}
2512
2513pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2514    event_log: &L,
2515    event: &TriggerEvent,
2516) -> Result<u64, DispatchError> {
2517    let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2518        .expect("static trigger.inbox.envelopes topic is valid");
2519    let headers = event_headers(event, None, None, None);
2520    let payload =
2521        serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2522    event_log
2523        .append(
2524            &topic,
2525            LogEvent::new("event_ingested", payload).with_headers(headers),
2526        )
2527        .await
2528        .map_err(DispatchError::from)
2529}
2530
2531pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2532    ACTIVE_DISPATCHER_STATE.with(|slot| {
2533        slot.borrow()
2534            .as_ref()
2535            .map(|state| DispatcherStatsSnapshot {
2536                in_flight: state.in_flight.load(Ordering::Relaxed),
2537                retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2538                dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2539            })
2540            .unwrap_or_default()
2541    })
2542}
2543
2544pub fn clear_dispatcher_state() {
2545    ACTIVE_DISPATCHER_STATE.with(|slot| {
2546        *slot.borrow_mut() = None;
2547    });
2548}
2549
2550fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2551    if is_cancelled_vm_error(&error) {
2552        return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2553    }
2554    if let VmError::Thrown(VmValue::String(message)) = &error {
2555        return DispatchError::Local(message.to_string());
2556    }
2557    match error_to_category(&error) {
2558        ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2559        ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2560        ErrorCategory::Cancelled => {
2561            DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2562        }
2563        _ => DispatchError::Local(error.to_string()),
2564    }
2565}
2566
2567fn dispatch_error_label(error: &DispatchError) -> &'static str {
2568    match error {
2569        DispatchError::Denied(_) => "denied",
2570        DispatchError::Timeout(_) => "timeout",
2571        DispatchError::Cancelled(_) => "cancelled",
2572        _ => "failed",
2573    }
2574}
2575
2576fn dispatch_node_id(
2577    route: &DispatchUri,
2578    binding_key: &str,
2579    event_id: &str,
2580    attempt: u32,
2581) -> String {
2582    let prefix = match route {
2583        DispatchUri::A2a { .. } => "a2a",
2584        _ => "dispatch",
2585    };
2586    format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2587}
2588
2589fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
2590    match route {
2591        DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
2592        _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
2593    }
2594}
2595
2596fn dispatch_node_label(route: &DispatchUri) -> String {
2597    match route {
2598        DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
2599        _ => route.target_uri(),
2600    }
2601}
2602
2603fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
2604    match route {
2605        DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
2606        _ => None,
2607    }
2608}
2609
2610fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
2611    match route {
2612        DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
2613        _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
2614        _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
2615    }
2616}
2617
2618fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
2619    match value {
2620        VmValue::Bool(result) => Ok(result),
2621        VmValue::EnumVariant {
2622            enum_name,
2623            variant,
2624            mut fields,
2625        } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
2626            Some(VmValue::Bool(result)) => Ok(result),
2627            Some(other) => Err(format!(
2628                "predicate Result.Ok payload must be bool, got {}",
2629                other.type_name()
2630            )),
2631            None => Err("predicate Result.Ok payload is missing".to_string()),
2632        },
2633        VmValue::EnumVariant {
2634            enum_name,
2635            variant,
2636            fields,
2637        } if enum_name == "Result" && variant == "Err" => Err(fields
2638            .first()
2639            .map(VmValue::display)
2640            .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
2641        other => Err(format!(
2642            "predicate must return bool or Result<bool, _>, got {}",
2643            other.type_name()
2644        )),
2645    }
2646}
2647
2648fn usd_to_micros(value: f64) -> u64 {
2649    if !value.is_finite() || value <= 0.0 {
2650        return 0;
2651    }
2652    (value * 1_000_000.0).round() as u64
2653}
2654
2655fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
2656    binding
2657        .metrics
2658        .cost_today_usd_micros
2659        .load(Ordering::Relaxed) as f64
2660        / 1_000_000.0
2661}
2662
2663fn is_cancelled_vm_error(error: &VmError) -> bool {
2664    matches!(
2665        error,
2666        VmError::Thrown(VmValue::String(message))
2667            if message.starts_with("kind:cancelled:")
2668    ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
2669}
2670
2671fn event_headers(
2672    event: &TriggerEvent,
2673    binding: Option<&TriggerBinding>,
2674    attempt: Option<u32>,
2675    replay_of_event_id: Option<&String>,
2676) -> BTreeMap<String, String> {
2677    let mut headers = BTreeMap::new();
2678    headers.insert("event_id".to_string(), event.id.0.clone());
2679    headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2680    headers.insert("provider".to_string(), event.provider.as_str().to_string());
2681    headers.insert("kind".to_string(), event.kind.clone());
2682    if let Some(replay_of_event_id) = replay_of_event_id {
2683        headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
2684    }
2685    if let Some(binding) = binding {
2686        headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
2687        headers.insert("binding_key".to_string(), binding.binding_key());
2688        headers.insert(
2689            "handler_kind".to_string(),
2690            DispatchUri::from(&binding.handler).kind().to_string(),
2691        );
2692    }
2693    if let Some(attempt) = attempt {
2694        headers.insert("attempt".to_string(), attempt.to_string());
2695    }
2696    headers
2697}
2698
2699fn worker_queue_priority(
2700    binding: &super::registry::TriggerBinding,
2701    event: &TriggerEvent,
2702) -> crate::WorkerQueuePriority {
2703    match event
2704        .headers
2705        .get("priority")
2706        .map(|value| value.trim().to_ascii_lowercase())
2707        .as_deref()
2708    {
2709        Some("high") => crate::WorkerQueuePriority::High,
2710        Some("low") => crate::WorkerQueuePriority::Low,
2711        _ => binding.dispatch_priority,
2712    }
2713}
2714
2715const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
2716
2717fn maybe_fail_before_outbox() {
2718    if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
2719        std::process::exit(86);
2720    }
2721}
2722
2723fn now_rfc3339() -> String {
2724    time::OffsetDateTime::now_utc()
2725        .format(&time::format_description::well_known::Rfc3339)
2726        .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2727}
2728
2729fn now_unix_ms() -> i64 {
2730    (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
2731}
2732
2733fn utc_day_key() -> i32 {
2734    time::OffsetDateTime::now_utc().date().to_julian_day()
2735}
2736
2737fn cancelled_dispatch_outcome(
2738    binding: &TriggerBinding,
2739    route: &DispatchUri,
2740    event: &TriggerEvent,
2741    replay_of_event_id: Option<String>,
2742    attempt_count: u32,
2743    error: String,
2744) -> DispatchOutcome {
2745    DispatchOutcome {
2746        trigger_id: binding.id.as_str().to_string(),
2747        binding_key: binding.binding_key(),
2748        event_id: event.id.0.clone(),
2749        attempt_count,
2750        status: DispatchStatus::Cancelled,
2751        handler_kind: route.kind().to_string(),
2752        target_uri: route.target_uri(),
2753        replay_of_event_id,
2754        result: None,
2755        error: Some(error),
2756    }
2757}
2758
2759async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
2760    let _ = cancel_rx.recv().await;
2761}
2762
2763#[cfg(test)]
2764mod tests;