1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::{Mutex as AsyncMutex, Notify};
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25 ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26 ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE, ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40 TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
54 #[cfg(test)]
55 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
56}
57
58tokio::task_local! {
59 static ACTIVE_DISPATCH_IS_REPLAY: bool;
60}
61
62#[derive(Clone, Debug)]
63pub(crate) struct DispatchContext {
64 pub trigger_event: TriggerEvent,
65 pub replay_of_event_id: Option<String>,
66 pub binding_id: String,
67 pub binding_version: u32,
68 pub agent_id: String,
69 pub action: String,
70 pub autonomy_tier: AutonomyTier,
71}
72
73#[derive(Clone, Debug, Serialize, Deserialize)]
74struct PredicateCacheRecord {
75 trigger_id: String,
76 event_id: String,
77 entries: Vec<PredicateCacheEntry>,
78}
79
80#[derive(Clone, Debug, Default)]
81struct PredicateEvaluationRecord {
82 result: bool,
83 cost_usd: f64,
84 tokens: u64,
85 latency_ms: u64,
86 cached: bool,
87 reason: Option<String>,
88}
89
90pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
91 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
92}
93
94pub(crate) fn current_dispatch_is_replay() -> bool {
95 ACTIVE_DISPATCH_IS_REPLAY
96 .try_with(|is_replay| *is_replay)
97 .unwrap_or(false)
98}
99
100pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
101 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
102}
103
104#[derive(Clone)]
105pub struct Dispatcher {
106 base_vm: Rc<Vm>,
107 event_log: Arc<AnyEventLog>,
108 cancel_tx: broadcast::Sender<()>,
109 state: Arc<DispatcherRuntimeState>,
110 metrics: Option<Arc<crate::MetricsRegistry>>,
111}
112
113#[derive(Debug)]
114struct DispatcherRuntimeState {
115 in_flight: AtomicU64,
116 retry_queue_depth: AtomicU64,
117 dlq: Mutex<Vec<DlqEntry>>,
118 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
119 shutting_down: std::sync::atomic::AtomicBool,
120 idle_notify: Notify,
121 flow_control: FlowControlManager,
122}
123
124impl DispatcherRuntimeState {
125 fn new(event_log: Arc<AnyEventLog>) -> Self {
126 Self {
127 in_flight: AtomicU64::new(0),
128 retry_queue_depth: AtomicU64::new(0),
129 dlq: Mutex::new(Vec::new()),
130 cancel_tokens: Mutex::new(Vec::new()),
131 shutting_down: std::sync::atomic::AtomicBool::new(false),
132 idle_notify: Notify::new(),
133 flow_control: FlowControlManager::new(event_log),
134 }
135 }
136}
137
138#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
139#[serde(default)]
140pub struct DispatcherStatsSnapshot {
141 pub in_flight: u64,
142 pub retry_queue_depth: u64,
143 pub dlq_depth: u64,
144}
145
146#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
147#[serde(rename_all = "snake_case")]
148pub enum DispatchStatus {
149 Succeeded,
150 Failed,
151 Dlq,
152 Skipped,
153 Waiting,
154 Cancelled,
155}
156
157impl DispatchStatus {
158 pub fn as_str(&self) -> &'static str {
159 match self {
160 Self::Succeeded => "succeeded",
161 Self::Failed => "failed",
162 Self::Dlq => "dlq",
163 Self::Skipped => "skipped",
164 Self::Waiting => "waiting",
165 Self::Cancelled => "cancelled",
166 }
167 }
168}
169
170#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
171#[serde(default)]
172pub struct DispatchOutcome {
173 pub trigger_id: String,
174 pub binding_key: String,
175 pub event_id: String,
176 pub attempt_count: u32,
177 pub status: DispatchStatus,
178 pub handler_kind: String,
179 pub target_uri: String,
180 pub replay_of_event_id: Option<String>,
181 pub result: Option<serde_json::Value>,
182 pub error: Option<String>,
183}
184
185#[derive(Clone, Debug, Serialize, Deserialize)]
186pub struct InboxEnvelope {
187 pub trigger_id: Option<String>,
188 pub binding_version: Option<u32>,
189 pub event: TriggerEvent,
190}
191
192#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
193#[serde(default)]
194pub struct DispatcherDrainReport {
195 pub drained: bool,
196 pub in_flight: u64,
197 pub retry_queue_depth: u64,
198 pub dlq_depth: u64,
199}
200
201impl Default for DispatchOutcome {
202 fn default() -> Self {
203 Self {
204 trigger_id: String::new(),
205 binding_key: String::new(),
206 event_id: String::new(),
207 attempt_count: 0,
208 status: DispatchStatus::Failed,
209 handler_kind: String::new(),
210 target_uri: String::new(),
211 replay_of_event_id: None,
212 result: None,
213 error: None,
214 }
215 }
216}
217
218#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
219#[serde(default)]
220pub struct DispatchAttemptRecord {
221 pub trigger_id: String,
222 pub binding_key: String,
223 pub event_id: String,
224 pub attempt: u32,
225 pub handler_kind: String,
226 pub started_at: String,
227 pub completed_at: String,
228 pub outcome: String,
229 pub error_msg: Option<String>,
230}
231
232#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
233pub struct DispatchCancelRequest {
234 pub binding_key: String,
235 pub event_id: String,
236 #[serde(with = "time::serde::rfc3339")]
237 pub requested_at: time::OffsetDateTime,
238 #[serde(default)]
239 pub requested_by: Option<String>,
240 #[serde(default)]
241 pub audit_id: Option<String>,
242}
243
244#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
245pub struct DlqEntry {
246 pub trigger_id: String,
247 pub binding_key: String,
248 pub event: TriggerEvent,
249 pub attempt_count: u32,
250 pub final_error: String,
251 pub attempts: Vec<DispatchAttemptRecord>,
252}
253
254#[derive(Clone, Debug)]
255struct SingletonLease {
256 gate: String,
257 held: bool,
258}
259
260#[derive(Clone, Debug)]
261struct ConcurrencyLease {
262 gate: String,
263 max: u32,
264 priority_rank: usize,
265 permit: Option<ConcurrencyPermit>,
266}
267
268#[derive(Default, Debug)]
269struct AcquiredFlowControl {
270 singleton: Option<SingletonLease>,
271 concurrency: Option<ConcurrencyLease>,
272}
273
274#[derive(Clone)]
275pub(crate) struct DispatchWaitLease {
276 state: Arc<DispatcherRuntimeState>,
277 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
278 suspended: Arc<AtomicBool>,
279}
280
281impl DispatchWaitLease {
282 fn new(
283 state: Arc<DispatcherRuntimeState>,
284 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
285 ) -> Self {
286 Self {
287 state,
288 acquired,
289 suspended: Arc::new(AtomicBool::new(false)),
290 }
291 }
292
293 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
294 if self.suspended.swap(true, Ordering::SeqCst) {
295 return Ok(());
296 }
297 let (singleton_gate, concurrency_permit) = {
298 let mut acquired = self.acquired.lock().await;
299 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
300 if lease.held {
301 lease.held = false;
302 Some(lease.gate.clone())
303 } else {
304 None
305 }
306 });
307 let concurrency_permit = acquired
308 .concurrency
309 .as_mut()
310 .and_then(|lease| lease.permit.take());
311 (singleton_gate, concurrency_permit)
312 };
313
314 if let Some(gate) = singleton_gate {
315 self.state
316 .flow_control
317 .release_singleton(&gate)
318 .await
319 .map_err(DispatchError::from)?;
320 }
321 if let Some(permit) = concurrency_permit {
322 self.state
323 .flow_control
324 .release_concurrency(permit)
325 .await
326 .map_err(DispatchError::from)?;
327 }
328 Ok(())
329 }
330
331 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
332 if !self.suspended.swap(false, Ordering::SeqCst) {
333 return Ok(());
334 }
335
336 let singleton_gate = {
337 let acquired = self.acquired.lock().await;
338 acquired.singleton.as_ref().and_then(|lease| {
339 if lease.held {
340 None
341 } else {
342 Some(lease.gate.clone())
343 }
344 })
345 };
346 if let Some(gate) = singleton_gate {
347 self.state
348 .flow_control
349 .acquire_singleton(&gate)
350 .await
351 .map_err(DispatchError::from)?;
352 let mut acquired = self.acquired.lock().await;
353 if let Some(lease) = acquired.singleton.as_mut() {
354 lease.held = true;
355 }
356 }
357
358 let concurrency_spec = {
359 let acquired = self.acquired.lock().await;
360 acquired.concurrency.as_ref().and_then(|lease| {
361 if lease.permit.is_some() {
362 None
363 } else {
364 Some((lease.gate.clone(), lease.max, lease.priority_rank))
365 }
366 })
367 };
368 if let Some((gate, max, priority_rank)) = concurrency_spec {
369 let permit = self
370 .state
371 .flow_control
372 .acquire_concurrency(&gate, max, priority_rank)
373 .await
374 .map_err(DispatchError::from)?;
375 let mut acquired = self.acquired.lock().await;
376 if let Some(lease) = acquired.concurrency.as_mut() {
377 lease.permit = Some(permit);
378 }
379 }
380 Ok(())
381 }
382}
383
384enum FlowControlOutcome {
385 Dispatch {
386 event: Box<TriggerEvent>,
387 acquired: AcquiredFlowControl,
388 },
389 Skip {
390 reason: String,
391 },
392}
393
394#[derive(Clone, Debug)]
395enum DispatchSkipStage {
396 Predicate,
397 FlowControl,
398}
399
400#[derive(Debug)]
401pub enum DispatchError {
402 EventLog(String),
403 Registry(String),
404 Serde(String),
405 Local(String),
406 A2a(String),
407 Denied(String),
408 Timeout(String),
409 Waiting(String),
410 Cancelled(String),
411 NotImplemented(String),
412}
413
414impl std::fmt::Display for DispatchError {
415 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
416 match self {
417 Self::EventLog(message)
418 | Self::Registry(message)
419 | Self::Serde(message)
420 | Self::Local(message)
421 | Self::A2a(message)
422 | Self::Denied(message)
423 | Self::Timeout(message)
424 | Self::Waiting(message)
425 | Self::Cancelled(message)
426 | Self::NotImplemented(message) => f.write_str(message),
427 }
428 }
429}
430
431impl std::error::Error for DispatchError {}
432
433impl DispatchError {
434 fn retryable(&self) -> bool {
435 !matches!(
436 self,
437 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
438 )
439 }
440}
441
442impl DispatchSkipStage {
443 fn as_str(&self) -> &'static str {
444 match self {
445 Self::Predicate => "predicate",
446 Self::FlowControl => "flow_control",
447 }
448 }
449}
450
451impl From<LogError> for DispatchError {
452 fn from(value: LogError) -> Self {
453 Self::EventLog(value.to_string())
454 }
455}
456
457pub async fn append_dispatch_cancel_request(
458 event_log: &Arc<AnyEventLog>,
459 request: &DispatchCancelRequest,
460) -> Result<u64, DispatchError> {
461 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
462 .expect("static trigger cancel topic should always be valid");
463 event_log
464 .append(
465 &topic,
466 LogEvent::new(
467 "dispatch_cancel_requested",
468 serde_json::to_value(request)
469 .map_err(|error| DispatchError::Serde(error.to_string()))?,
470 ),
471 )
472 .await
473 .map_err(DispatchError::from)
474}
475
476impl Dispatcher {
477 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
478 self.event_log.clone()
479 }
480
481 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
482 let event_log = active_event_log().ok_or_else(|| {
483 DispatchError::EventLog("dispatcher requires an active event log".to_string())
484 })?;
485 Ok(Self::with_event_log(base_vm, event_log))
486 }
487
488 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
489 Self::with_event_log_and_metrics(base_vm, event_log, None)
490 }
491
492 pub fn with_event_log_and_metrics(
493 base_vm: Vm,
494 event_log: Arc<AnyEventLog>,
495 metrics: Option<Arc<crate::MetricsRegistry>>,
496 ) -> Self {
497 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
498 ACTIVE_DISPATCHER_STATE.with(|slot| {
499 *slot.borrow_mut() = Some(state.clone());
500 });
501 let (cancel_tx, _) = broadcast::channel(32);
502 Self {
503 base_vm: Rc::new(base_vm),
504 event_log,
505 cancel_tx,
506 state,
507 metrics,
508 }
509 }
510
511 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
512 DispatcherStatsSnapshot {
513 in_flight: self.state.in_flight.load(Ordering::Relaxed),
514 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
515 dlq_depth: self
516 .state
517 .dlq
518 .lock()
519 .expect("dispatcher dlq poisoned")
520 .len() as u64,
521 }
522 }
523
524 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
525 self.state
526 .dlq
527 .lock()
528 .expect("dispatcher dlq poisoned")
529 .clone()
530 }
531
532 pub fn shutdown(&self) {
533 self.state.shutting_down.store(true, Ordering::SeqCst);
534 for token in self
535 .state
536 .cancel_tokens
537 .lock()
538 .expect("dispatcher cancel tokens poisoned")
539 .iter()
540 {
541 token.store(true, Ordering::SeqCst);
542 }
543 let _ = self.cancel_tx.send(());
544 }
545
546 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
547 self.enqueue_targeted(None, None, event).await
548 }
549
550 pub async fn enqueue_targeted(
551 &self,
552 trigger_id: Option<String>,
553 binding_version: Option<u32>,
554 event: TriggerEvent,
555 ) -> Result<u64, DispatchError> {
556 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
557 .expect("static trigger inbox envelopes topic is valid");
558 let headers = event_headers(&event, None, None, None);
559 let payload = serde_json::to_value(InboxEnvelope {
560 trigger_id,
561 binding_version,
562 event,
563 })
564 .map_err(|error| DispatchError::Serde(error.to_string()))?;
565 self.event_log
566 .append(
567 &topic,
568 LogEvent::new("event_ingested", payload).with_headers(headers),
569 )
570 .await
571 .map_err(DispatchError::from)
572 }
573
574 pub async fn run(&self) -> Result<(), DispatchError> {
575 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
576 .expect("static trigger inbox envelopes topic is valid");
577 let start_from = self.event_log.latest(&topic).await?;
578 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
579 pin_mut!(stream);
580 let mut cancel_rx = self.cancel_tx.subscribe();
581
582 loop {
583 tokio::select! {
584 received = stream.next() => {
585 let Some(received) = received else {
586 break;
587 };
588 let (_, event) = received.map_err(DispatchError::from)?;
589 if event.kind != "event_ingested" {
590 continue;
591 }
592 let parent_headers = event.headers.clone();
593 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
594 .map_err(|error| DispatchError::Serde(error.to_string()))?;
595 notify_test_inbox_dequeued();
596 let _ = self
597 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
598 .await;
599 }
600 _ = recv_cancel(&mut cancel_rx) => break,
601 }
602 }
603
604 Ok(())
605 }
606
607 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
608 let deadline = tokio::time::Instant::now() + timeout;
609 loop {
610 let snapshot = self.snapshot();
611 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
612 return Ok(DispatcherDrainReport {
613 drained: true,
614 in_flight: snapshot.in_flight,
615 retry_queue_depth: snapshot.retry_queue_depth,
616 dlq_depth: snapshot.dlq_depth,
617 });
618 }
619
620 let notified = self.state.idle_notify.notified();
621 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
622 if remaining.is_zero() {
623 return Ok(DispatcherDrainReport {
624 drained: false,
625 in_flight: snapshot.in_flight,
626 retry_queue_depth: snapshot.retry_queue_depth,
627 dlq_depth: snapshot.dlq_depth,
628 });
629 }
630 if tokio::time::timeout(remaining, notified).await.is_err() {
631 let snapshot = self.snapshot();
632 return Ok(DispatcherDrainReport {
633 drained: false,
634 in_flight: snapshot.in_flight,
635 retry_queue_depth: snapshot.retry_queue_depth,
636 dlq_depth: snapshot.dlq_depth,
637 });
638 }
639 }
640 }
641
642 pub async fn dispatch_inbox_envelope(
643 &self,
644 envelope: InboxEnvelope,
645 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
646 self.dispatch_inbox_envelope_with_headers(envelope, None)
647 .await
648 }
649
650 async fn dispatch_inbox_envelope_with_headers(
651 &self,
652 envelope: InboxEnvelope,
653 parent_headers: Option<&BTreeMap<String, String>>,
654 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
655 if let Some(trigger_id) = envelope.trigger_id {
656 let binding = super::registry::resolve_live_trigger_binding(
657 &trigger_id,
658 envelope.binding_version,
659 )
660 .map_err(|error| DispatchError::Registry(error.to_string()))?;
661 return Ok(vec![
662 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
663 .await?,
664 ]);
665 }
666
667 let cron_target = match &envelope.event.provider_payload {
668 crate::triggers::ProviderPayload::Known(
669 crate::triggers::event::KnownProviderPayload::Cron(payload),
670 ) => payload.cron_id.clone(),
671 _ => None,
672 };
673 if let Some(trigger_id) = cron_target {
674 let binding = super::registry::resolve_live_trigger_binding(
675 &trigger_id,
676 envelope.binding_version,
677 )
678 .map_err(|error| DispatchError::Registry(error.to_string()))?;
679 return Ok(vec![
680 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
681 .await?,
682 ]);
683 }
684
685 self.dispatch_event(envelope.event).await
686 }
687
688 pub async fn dispatch_event(
689 &self,
690 event: TriggerEvent,
691 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
692 let bindings = matching_bindings(&event);
693 let mut outcomes = Vec::new();
694 for binding in bindings {
695 outcomes.push(self.dispatch(&binding, event.clone()).await?);
696 }
697 Ok(outcomes)
698 }
699
700 pub async fn dispatch(
701 &self,
702 binding: &TriggerBinding,
703 event: TriggerEvent,
704 ) -> Result<DispatchOutcome, DispatchError> {
705 self.dispatch_with_replay(binding, event, None, None, None)
706 .await
707 }
708
709 pub async fn dispatch_replay(
710 &self,
711 binding: &TriggerBinding,
712 event: TriggerEvent,
713 replay_of_event_id: String,
714 ) -> Result<DispatchOutcome, DispatchError> {
715 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
716 .await
717 }
718
719 pub async fn dispatch_with_parent_span_id(
720 &self,
721 binding: &TriggerBinding,
722 event: TriggerEvent,
723 parent_span_id: Option<String>,
724 ) -> Result<DispatchOutcome, DispatchError> {
725 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
726 .await
727 }
728
729 async fn dispatch_with_replay(
730 &self,
731 binding: &TriggerBinding,
732 event: TriggerEvent,
733 replay_of_event_id: Option<String>,
734 parent_span_id: Option<String>,
735 parent_headers: Option<&BTreeMap<String, String>>,
736 ) -> Result<DispatchOutcome, DispatchError> {
737 let span = tracing::info_span!(
738 "dispatch",
739 trigger_id = %binding.id.as_str(),
740 binding_version = binding.version,
741 trace_id = %event.trace_id.0
742 );
743 #[cfg(feature = "otel")]
744 let span_for_otel = span.clone();
745 let _ = if let Some(headers) = parent_headers {
746 crate::observability::otel::set_span_parent_from_headers(
747 &span,
748 headers,
749 &event.trace_id,
750 parent_span_id.as_deref(),
751 )
752 } else {
753 crate::observability::otel::set_span_parent(
754 &span,
755 &event.trace_id,
756 parent_span_id.as_deref(),
757 )
758 };
759 #[cfg(feature = "otel")]
760 let started_at = Instant::now();
761 let metrics = self.metrics.clone();
762 let outcome = ACTIVE_DISPATCH_IS_REPLAY
763 .scope(
764 replay_of_event_id.is_some(),
765 self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
766 .instrument(span),
767 )
768 .await;
769 if let Some(metrics) = metrics.as_ref() {
770 match &outcome {
771 Ok(dispatch_outcome) => match dispatch_outcome.status {
772 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
773 metrics.record_dispatch_succeeded();
774 }
775 DispatchStatus::Waiting => {}
776 _ => metrics.record_dispatch_failed(),
777 },
778 Err(_) => metrics.record_dispatch_failed(),
779 }
780 }
781 #[cfg(feature = "otel")]
782 {
783 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
784
785 let duration_ms = started_at.elapsed().as_millis() as i64;
786 let status = match &outcome {
787 Ok(dispatch_outcome) => match dispatch_outcome.status {
788 DispatchStatus::Succeeded => "succeeded",
789 DispatchStatus::Skipped => "skipped",
790 DispatchStatus::Waiting => "waiting",
791 DispatchStatus::Cancelled => "cancelled",
792 DispatchStatus::Failed => "failed",
793 DispatchStatus::Dlq => "dlq",
794 },
795 Err(DispatchError::Cancelled(_)) => "cancelled",
796 Err(_) => "failed",
797 };
798 span_for_otel.set_attribute("result.status", status);
799 span_for_otel.set_attribute("result.duration_ms", duration_ms);
800 }
801 outcome
802 }
803
804 async fn dispatch_with_replay_inner(
805 &self,
806 binding: &TriggerBinding,
807 event: TriggerEvent,
808 replay_of_event_id: Option<String>,
809 ) -> Result<DispatchOutcome, DispatchError> {
810 let autonomy_tier = crate::resolve_agent_autonomy_tier(
811 &self.event_log,
812 binding.id.as_str(),
813 binding.autonomy_tier,
814 )
815 .await
816 .unwrap_or(binding.autonomy_tier);
817 let binding_key = binding.binding_key();
818 let route = DispatchUri::from(&binding.handler);
819 let trigger_id = binding.id.as_str().to_string();
820 let event_id = event.id.0.clone();
821 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
822 let begin = if replay_of_event_id.is_some() {
823 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
824 } else {
825 begin_in_flight(binding.id.as_str(), binding.version)
826 };
827 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
828
829 let mut attempts = Vec::new();
830 let mut source_node_id = format!("trigger:{}", event.id.0);
831 let mut initial_nodes = Vec::new();
832 let mut initial_edges = Vec::new();
833 if let Some(original_event_id) = replay_of_event_id.as_ref() {
834 let original_node_id = format!("trigger:{original_event_id}");
835 initial_nodes.push(RunActionGraphNodeRecord {
836 id: original_node_id.clone(),
837 label: format!(
838 "{}:{} (original {})",
839 event.provider.as_str(),
840 event.kind,
841 original_event_id
842 ),
843 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
844 status: "historical".to_string(),
845 outcome: "replayed_from".to_string(),
846 trace_id: Some(event.trace_id.0.clone()),
847 stage_id: None,
848 node_id: None,
849 worker_id: None,
850 run_id: None,
851 run_path: None,
852 metadata: trigger_node_metadata(&event),
853 });
854 initial_edges.push(RunActionGraphEdgeRecord {
855 from_id: original_node_id,
856 to_id: source_node_id.clone(),
857 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
858 label: Some("replay chain".to_string()),
859 });
860 }
861 initial_nodes.push(RunActionGraphNodeRecord {
862 id: source_node_id.clone(),
863 label: format!("{}:{}", event.provider.as_str(), event.kind),
864 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
865 status: "received".to_string(),
866 outcome: "received".to_string(),
867 trace_id: Some(event.trace_id.0.clone()),
868 stage_id: None,
869 node_id: None,
870 worker_id: None,
871 run_id: None,
872 run_path: None,
873 metadata: trigger_node_metadata(&event),
874 });
875 self.emit_action_graph(
876 &event,
877 initial_nodes,
878 initial_edges,
879 serde_json::json!({
880 "source": "dispatcher",
881 "trigger_id": trigger_id,
882 "binding_key": binding_key,
883 "event_id": event_id,
884 "replay_of_event_id": replay_of_event_id,
885 }),
886 )
887 .await?;
888
889 if dispatch_cancel_requested(
890 &self.event_log,
891 &binding_key,
892 &event.id.0,
893 replay_of_event_id.as_ref(),
894 )
895 .await?
896 {
897 finish_in_flight(
898 binding.id.as_str(),
899 binding.version,
900 TriggerDispatchOutcome::Failed,
901 )
902 .await
903 .map_err(|error| DispatchError::Registry(error.to_string()))?;
904 decrement_in_flight(&self.state);
905 return Ok(cancelled_dispatch_outcome(
906 binding,
907 &route,
908 &event,
909 replay_of_event_id,
910 0,
911 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
912 ));
913 }
914
915 if let Some(predicate) = binding.when.as_ref() {
916 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
917 let evaluation = self
918 .evaluate_predicate(
919 binding,
920 predicate,
921 &event,
922 replay_of_event_id.as_ref(),
923 autonomy_tier,
924 )
925 .await?;
926 let passed = evaluation.result;
927 self.emit_action_graph(
928 &event,
929 vec![RunActionGraphNodeRecord {
930 id: predicate_node_id.clone(),
931 label: predicate.raw.clone(),
932 kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
933 status: "completed".to_string(),
934 outcome: passed.to_string(),
935 trace_id: Some(event.trace_id.0.clone()),
936 stage_id: None,
937 node_id: None,
938 worker_id: None,
939 run_id: None,
940 run_path: None,
941 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
942 }],
943 vec![RunActionGraphEdgeRecord {
944 from_id: source_node_id.clone(),
945 to_id: predicate_node_id.clone(),
946 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
947 label: None,
948 }],
949 serde_json::json!({
950 "source": "dispatcher",
951 "trigger_id": binding.id.as_str(),
952 "binding_key": binding.binding_key(),
953 "event_id": event.id.0,
954 "predicate": predicate.raw,
955 "reason": evaluation.reason,
956 "cached": evaluation.cached,
957 "cost_usd": evaluation.cost_usd,
958 "tokens": evaluation.tokens,
959 "latency_ms": evaluation.latency_ms,
960 "replay_of_event_id": replay_of_event_id,
961 }),
962 )
963 .await?;
964
965 if !passed {
966 self.append_skipped_outbox_event(
967 binding,
968 &route,
969 &event,
970 replay_of_event_id.as_ref(),
971 DispatchSkipStage::Predicate,
972 serde_json::json!({
973 "predicate": predicate.raw,
974 "reason": evaluation.reason,
975 }),
976 )
977 .await?;
978 finish_in_flight(
979 binding.id.as_str(),
980 binding.version,
981 TriggerDispatchOutcome::Dispatched,
982 )
983 .await
984 .map_err(|error| DispatchError::Registry(error.to_string()))?;
985 decrement_in_flight(&self.state);
986 self.append_dispatch_trust_record(
987 binding,
988 &route,
989 &event,
990 replay_of_event_id.as_ref(),
991 autonomy_tier,
992 TrustOutcome::Denied,
993 "skipped",
994 0,
995 None,
996 )
997 .await?;
998 return Ok(DispatchOutcome {
999 trigger_id: binding.id.as_str().to_string(),
1000 binding_key: binding.binding_key(),
1001 event_id: event.id.0,
1002 attempt_count: 0,
1003 status: DispatchStatus::Skipped,
1004 handler_kind: route.kind().to_string(),
1005 target_uri: route.target_uri(),
1006 replay_of_event_id,
1007 result: Some(serde_json::json!({
1008 "skipped": true,
1009 "predicate": predicate.raw,
1010 "reason": evaluation.reason,
1011 })),
1012 error: None,
1013 });
1014 }
1015
1016 source_node_id = predicate_node_id;
1017 }
1018
1019 let (event, acquired_flow) = match self
1020 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1021 .await?
1022 {
1023 FlowControlOutcome::Dispatch { event, acquired } => {
1024 (*event, Arc::new(AsyncMutex::new(acquired)))
1025 }
1026 FlowControlOutcome::Skip { reason } => {
1027 self.append_skipped_outbox_event(
1028 binding,
1029 &route,
1030 &event,
1031 replay_of_event_id.as_ref(),
1032 DispatchSkipStage::FlowControl,
1033 serde_json::json!({
1034 "flow_control": reason,
1035 }),
1036 )
1037 .await?;
1038 finish_in_flight(
1039 binding.id.as_str(),
1040 binding.version,
1041 TriggerDispatchOutcome::Dispatched,
1042 )
1043 .await
1044 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1045 decrement_in_flight(&self.state);
1046 return Ok(DispatchOutcome {
1047 trigger_id: binding.id.as_str().to_string(),
1048 binding_key: binding.binding_key(),
1049 event_id: event.id.0,
1050 attempt_count: 0,
1051 status: DispatchStatus::Skipped,
1052 handler_kind: route.kind().to_string(),
1053 target_uri: route.target_uri(),
1054 replay_of_event_id,
1055 result: Some(serde_json::json!({
1056 "skipped": true,
1057 "flow_control": reason,
1058 })),
1059 error: None,
1060 });
1061 }
1062 };
1063
1064 let mut previous_retry_node = None;
1065 let max_attempts = binding.retry.max_attempts();
1066 for attempt in 1..=max_attempts {
1067 if dispatch_cancel_requested(
1068 &self.event_log,
1069 &binding_key,
1070 &event.id.0,
1071 replay_of_event_id.as_ref(),
1072 )
1073 .await?
1074 {
1075 finish_in_flight(
1076 binding.id.as_str(),
1077 binding.version,
1078 TriggerDispatchOutcome::Failed,
1079 )
1080 .await
1081 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1082 decrement_in_flight(&self.state);
1083 return Ok(cancelled_dispatch_outcome(
1084 binding,
1085 &route,
1086 &event,
1087 replay_of_event_id,
1088 attempt.saturating_sub(1),
1089 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1090 ));
1091 }
1092 maybe_fail_before_outbox();
1093 let started_at = now_rfc3339();
1094 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1095 self.append_lifecycle_event(
1096 "DispatchStarted",
1097 &event,
1098 binding,
1099 serde_json::json!({
1100 "event_id": event.id.0,
1101 "attempt": attempt,
1102 "handler_kind": route.kind(),
1103 "target_uri": route.target_uri(),
1104 "replay_of_event_id": replay_of_event_id,
1105 }),
1106 replay_of_event_id.as_ref(),
1107 )
1108 .await?;
1109 self.append_topic_event(
1110 TRIGGER_OUTBOX_TOPIC,
1111 "dispatch_started",
1112 &event,
1113 Some(binding),
1114 Some(attempt),
1115 serde_json::json!({
1116 "event_id": event.id.0,
1117 "attempt": attempt,
1118 "trigger_id": binding.id.as_str(),
1119 "binding_key": binding.binding_key(),
1120 "handler_kind": route.kind(),
1121 "target_uri": route.target_uri(),
1122 "replay_of_event_id": replay_of_event_id,
1123 }),
1124 replay_of_event_id.as_ref(),
1125 )
1126 .await?;
1127
1128 let mut dispatch_edges = Vec::new();
1129 if attempt == 1 {
1130 dispatch_edges.push(RunActionGraphEdgeRecord {
1131 from_id: source_node_id.clone(),
1132 to_id: attempt_node_id.clone(),
1133 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1134 label: binding.when.as_ref().map(|_| "true".to_string()),
1135 });
1136 } else if let Some(retry_node_id) = previous_retry_node.take() {
1137 dispatch_edges.push(RunActionGraphEdgeRecord {
1138 from_id: retry_node_id,
1139 to_id: attempt_node_id.clone(),
1140 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1141 label: Some(format!("attempt {attempt}")),
1142 });
1143 }
1144
1145 self.emit_action_graph(
1146 &event,
1147 vec![RunActionGraphNodeRecord {
1148 id: attempt_node_id.clone(),
1149 label: dispatch_node_label(&route),
1150 kind: dispatch_node_kind(&route).to_string(),
1151 status: "running".to_string(),
1152 outcome: format!("attempt_{attempt}"),
1153 trace_id: Some(event.trace_id.0.clone()),
1154 stage_id: None,
1155 node_id: None,
1156 worker_id: None,
1157 run_id: None,
1158 run_path: None,
1159 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1160 }],
1161 dispatch_edges,
1162 serde_json::json!({
1163 "source": "dispatcher",
1164 "trigger_id": binding.id.as_str(),
1165 "binding_key": binding.binding_key(),
1166 "event_id": event.id.0,
1167 "attempt": attempt,
1168 "handler_kind": route.kind(),
1169 "target_uri": route.target_uri(),
1170 "target_agent": dispatch_target_agent(&route),
1171 "replay_of_event_id": replay_of_event_id,
1172 }),
1173 )
1174 .await?;
1175
1176 let result = self
1177 .dispatch_once(
1178 binding,
1179 &route,
1180 &event,
1181 autonomy_tier,
1182 Some(DispatchWaitLease::new(
1183 self.state.clone(),
1184 acquired_flow.clone(),
1185 )),
1186 &mut self.cancel_tx.subscribe(),
1187 )
1188 .await;
1189 let completed_at = now_rfc3339();
1190
1191 match result {
1192 Ok(result) => {
1193 let attempt_record = DispatchAttemptRecord {
1194 trigger_id: binding.id.as_str().to_string(),
1195 binding_key: binding.binding_key(),
1196 event_id: event.id.0.clone(),
1197 attempt,
1198 handler_kind: route.kind().to_string(),
1199 started_at,
1200 completed_at,
1201 outcome: "success".to_string(),
1202 error_msg: None,
1203 };
1204 attempts.push(attempt_record.clone());
1205 self.append_attempt_record(
1206 &event,
1207 binding,
1208 &attempt_record,
1209 replay_of_event_id.as_ref(),
1210 )
1211 .await?;
1212 self.append_lifecycle_event(
1213 "DispatchSucceeded",
1214 &event,
1215 binding,
1216 serde_json::json!({
1217 "event_id": event.id.0,
1218 "attempt": attempt,
1219 "handler_kind": route.kind(),
1220 "target_uri": route.target_uri(),
1221 "result": result,
1222 "replay_of_event_id": replay_of_event_id,
1223 }),
1224 replay_of_event_id.as_ref(),
1225 )
1226 .await?;
1227 self.append_topic_event(
1228 TRIGGER_OUTBOX_TOPIC,
1229 "dispatch_succeeded",
1230 &event,
1231 Some(binding),
1232 Some(attempt),
1233 serde_json::json!({
1234 "event_id": event.id.0,
1235 "attempt": attempt,
1236 "trigger_id": binding.id.as_str(),
1237 "binding_key": binding.binding_key(),
1238 "handler_kind": route.kind(),
1239 "target_uri": route.target_uri(),
1240 "result": result,
1241 "replay_of_event_id": replay_of_event_id,
1242 }),
1243 replay_of_event_id.as_ref(),
1244 )
1245 .await?;
1246 self.emit_action_graph(
1247 &event,
1248 vec![RunActionGraphNodeRecord {
1249 id: attempt_node_id.clone(),
1250 label: dispatch_node_label(&route),
1251 kind: dispatch_node_kind(&route).to_string(),
1252 status: "completed".to_string(),
1253 outcome: dispatch_success_outcome(&route, &result).to_string(),
1254 trace_id: Some(event.trace_id.0.clone()),
1255 stage_id: None,
1256 node_id: None,
1257 worker_id: None,
1258 run_id: None,
1259 run_path: None,
1260 metadata: dispatch_success_metadata(
1261 &route, binding, &event, attempt, &result,
1262 ),
1263 }],
1264 Vec::new(),
1265 serde_json::json!({
1266 "source": "dispatcher",
1267 "trigger_id": binding.id.as_str(),
1268 "binding_key": binding.binding_key(),
1269 "event_id": event.id.0,
1270 "attempt": attempt,
1271 "handler_kind": route.kind(),
1272 "target_uri": route.target_uri(),
1273 "result": result,
1274 "replay_of_event_id": replay_of_event_id,
1275 }),
1276 )
1277 .await?;
1278 finish_in_flight(
1279 binding.id.as_str(),
1280 binding.version,
1281 TriggerDispatchOutcome::Dispatched,
1282 )
1283 .await
1284 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1285 self.release_flow_control(&acquired_flow).await?;
1286 decrement_in_flight(&self.state);
1287 self.append_dispatch_trust_record(
1288 binding,
1289 &route,
1290 &event,
1291 replay_of_event_id.as_ref(),
1292 autonomy_tier,
1293 TrustOutcome::Success,
1294 "succeeded",
1295 attempt,
1296 None,
1297 )
1298 .await?;
1299 return Ok(DispatchOutcome {
1300 trigger_id: binding.id.as_str().to_string(),
1301 binding_key: binding.binding_key(),
1302 event_id: event.id.0,
1303 attempt_count: attempt,
1304 status: DispatchStatus::Succeeded,
1305 handler_kind: route.kind().to_string(),
1306 target_uri: route.target_uri(),
1307 replay_of_event_id,
1308 result: Some(result),
1309 error: None,
1310 });
1311 }
1312 Err(error) => {
1313 let attempt_record = DispatchAttemptRecord {
1314 trigger_id: binding.id.as_str().to_string(),
1315 binding_key: binding.binding_key(),
1316 event_id: event.id.0.clone(),
1317 attempt,
1318 handler_kind: route.kind().to_string(),
1319 started_at,
1320 completed_at,
1321 outcome: dispatch_error_label(&error).to_string(),
1322 error_msg: Some(error.to_string()),
1323 };
1324 attempts.push(attempt_record.clone());
1325 self.append_attempt_record(
1326 &event,
1327 binding,
1328 &attempt_record,
1329 replay_of_event_id.as_ref(),
1330 )
1331 .await?;
1332 if let DispatchError::Waiting(message) = &error {
1333 self.append_lifecycle_event(
1334 "DispatchWaiting",
1335 &event,
1336 binding,
1337 serde_json::json!({
1338 "event_id": event.id.0,
1339 "attempt": attempt,
1340 "handler_kind": route.kind(),
1341 "target_uri": route.target_uri(),
1342 "message": message,
1343 "replay_of_event_id": replay_of_event_id,
1344 }),
1345 replay_of_event_id.as_ref(),
1346 )
1347 .await?;
1348 self.append_topic_event(
1349 TRIGGER_OUTBOX_TOPIC,
1350 "dispatch_waiting",
1351 &event,
1352 Some(binding),
1353 Some(attempt),
1354 serde_json::json!({
1355 "event_id": event.id.0,
1356 "attempt": attempt,
1357 "trigger_id": binding.id.as_str(),
1358 "binding_key": binding.binding_key(),
1359 "handler_kind": route.kind(),
1360 "target_uri": route.target_uri(),
1361 "message": message,
1362 "replay_of_event_id": replay_of_event_id,
1363 }),
1364 replay_of_event_id.as_ref(),
1365 )
1366 .await?;
1367 finish_in_flight(
1368 binding.id.as_str(),
1369 binding.version,
1370 TriggerDispatchOutcome::Dispatched,
1371 )
1372 .await
1373 .map_err(|registry_error| {
1374 DispatchError::Registry(registry_error.to_string())
1375 })?;
1376 self.release_flow_control(&acquired_flow).await?;
1377 decrement_in_flight(&self.state);
1378 return Ok(DispatchOutcome {
1379 trigger_id: binding.id.as_str().to_string(),
1380 binding_key: binding.binding_key(),
1381 event_id: event.id.0,
1382 attempt_count: attempt,
1383 status: DispatchStatus::Waiting,
1384 handler_kind: route.kind().to_string(),
1385 target_uri: route.target_uri(),
1386 replay_of_event_id,
1387 result: Some(serde_json::json!({
1388 "waiting": true,
1389 "message": message,
1390 })),
1391 error: None,
1392 });
1393 }
1394
1395 self.append_lifecycle_event(
1396 "DispatchFailed",
1397 &event,
1398 binding,
1399 serde_json::json!({
1400 "event_id": event.id.0,
1401 "attempt": attempt,
1402 "handler_kind": route.kind(),
1403 "target_uri": route.target_uri(),
1404 "error": error.to_string(),
1405 "replay_of_event_id": replay_of_event_id,
1406 }),
1407 replay_of_event_id.as_ref(),
1408 )
1409 .await?;
1410 self.append_topic_event(
1411 TRIGGER_OUTBOX_TOPIC,
1412 "dispatch_failed",
1413 &event,
1414 Some(binding),
1415 Some(attempt),
1416 serde_json::json!({
1417 "event_id": event.id.0,
1418 "attempt": attempt,
1419 "trigger_id": binding.id.as_str(),
1420 "binding_key": binding.binding_key(),
1421 "handler_kind": route.kind(),
1422 "target_uri": route.target_uri(),
1423 "error": error.to_string(),
1424 "replay_of_event_id": replay_of_event_id,
1425 }),
1426 replay_of_event_id.as_ref(),
1427 )
1428 .await?;
1429 self.emit_action_graph(
1430 &event,
1431 vec![RunActionGraphNodeRecord {
1432 id: attempt_node_id.clone(),
1433 label: dispatch_node_label(&route),
1434 kind: dispatch_node_kind(&route).to_string(),
1435 status: if matches!(error, DispatchError::Cancelled(_)) {
1436 "cancelled".to_string()
1437 } else {
1438 "failed".to_string()
1439 },
1440 outcome: dispatch_error_label(&error).to_string(),
1441 trace_id: Some(event.trace_id.0.clone()),
1442 stage_id: None,
1443 node_id: None,
1444 worker_id: None,
1445 run_id: None,
1446 run_path: None,
1447 metadata: dispatch_error_metadata(
1448 &route, binding, &event, attempt, &error,
1449 ),
1450 }],
1451 Vec::new(),
1452 serde_json::json!({
1453 "source": "dispatcher",
1454 "trigger_id": binding.id.as_str(),
1455 "binding_key": binding.binding_key(),
1456 "event_id": event.id.0,
1457 "attempt": attempt,
1458 "handler_kind": route.kind(),
1459 "target_uri": route.target_uri(),
1460 "error": error.to_string(),
1461 "replay_of_event_id": replay_of_event_id,
1462 }),
1463 )
1464 .await?;
1465
1466 if !error.retryable() {
1467 finish_in_flight(
1468 binding.id.as_str(),
1469 binding.version,
1470 TriggerDispatchOutcome::Failed,
1471 )
1472 .await
1473 .map_err(|registry_error| {
1474 DispatchError::Registry(registry_error.to_string())
1475 })?;
1476 self.release_flow_control(&acquired_flow).await?;
1477 decrement_in_flight(&self.state);
1478 let trust_outcome = match error {
1479 DispatchError::Denied(_) => TrustOutcome::Denied,
1480 DispatchError::Timeout(_) => TrustOutcome::Timeout,
1481 _ => TrustOutcome::Failure,
1482 };
1483 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1484 "cancelled"
1485 } else {
1486 "failed"
1487 };
1488 self.append_dispatch_trust_record(
1489 binding,
1490 &route,
1491 &event,
1492 replay_of_event_id.as_ref(),
1493 autonomy_tier,
1494 trust_outcome,
1495 terminal_status,
1496 attempt,
1497 Some(error.to_string()),
1498 )
1499 .await?;
1500 return Ok(DispatchOutcome {
1501 trigger_id: binding.id.as_str().to_string(),
1502 binding_key: binding.binding_key(),
1503 event_id: event.id.0,
1504 attempt_count: attempt,
1505 status: if matches!(error, DispatchError::Cancelled(_)) {
1506 DispatchStatus::Cancelled
1507 } else {
1508 DispatchStatus::Failed
1509 },
1510 handler_kind: route.kind().to_string(),
1511 target_uri: route.target_uri(),
1512 replay_of_event_id,
1513 result: None,
1514 error: Some(error.to_string()),
1515 });
1516 }
1517
1518 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1519 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1520 previous_retry_node = Some(retry_node_id.clone());
1521 self.emit_action_graph(
1522 &event,
1523 vec![RunActionGraphNodeRecord {
1524 id: retry_node_id.clone(),
1525 label: format!("retry in {}ms", delay.as_millis()),
1526 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1527 status: "scheduled".to_string(),
1528 outcome: format!("attempt_{}", attempt + 1),
1529 trace_id: Some(event.trace_id.0.clone()),
1530 stage_id: None,
1531 node_id: None,
1532 worker_id: None,
1533 run_id: None,
1534 run_path: None,
1535 metadata: retry_node_metadata(
1536 binding,
1537 &event,
1538 attempt + 1,
1539 delay,
1540 &error,
1541 ),
1542 }],
1543 vec![RunActionGraphEdgeRecord {
1544 from_id: attempt_node_id,
1545 to_id: retry_node_id.clone(),
1546 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1547 label: Some(format!("attempt {}", attempt + 1)),
1548 }],
1549 serde_json::json!({
1550 "source": "dispatcher",
1551 "trigger_id": binding.id.as_str(),
1552 "binding_key": binding.binding_key(),
1553 "event_id": event.id.0,
1554 "attempt": attempt + 1,
1555 "delay_ms": delay.as_millis(),
1556 "replay_of_event_id": replay_of_event_id,
1557 }),
1558 )
1559 .await?;
1560 self.append_lifecycle_event(
1561 "RetryScheduled",
1562 &event,
1563 binding,
1564 serde_json::json!({
1565 "event_id": event.id.0,
1566 "attempt": attempt + 1,
1567 "delay_ms": delay.as_millis(),
1568 "error": error.to_string(),
1569 "replay_of_event_id": replay_of_event_id,
1570 }),
1571 replay_of_event_id.as_ref(),
1572 )
1573 .await?;
1574 self.append_topic_event(
1575 TRIGGER_ATTEMPTS_TOPIC,
1576 "retry_scheduled",
1577 &event,
1578 Some(binding),
1579 Some(attempt + 1),
1580 serde_json::json!({
1581 "event_id": event.id.0,
1582 "attempt": attempt + 1,
1583 "trigger_id": binding.id.as_str(),
1584 "binding_key": binding.binding_key(),
1585 "delay_ms": delay.as_millis(),
1586 "error": error.to_string(),
1587 "replay_of_event_id": replay_of_event_id,
1588 }),
1589 replay_of_event_id.as_ref(),
1590 )
1591 .await?;
1592 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1593 let sleep_result = sleep_or_cancel_or_request(
1594 &self.event_log,
1595 delay,
1596 &binding_key,
1597 &event.id.0,
1598 replay_of_event_id.as_ref(),
1599 &mut self.cancel_tx.subscribe(),
1600 )
1601 .await;
1602 decrement_retry_queue_depth(&self.state);
1603 if sleep_result.is_err() {
1604 finish_in_flight(
1605 binding.id.as_str(),
1606 binding.version,
1607 TriggerDispatchOutcome::Failed,
1608 )
1609 .await
1610 .map_err(|registry_error| {
1611 DispatchError::Registry(registry_error.to_string())
1612 })?;
1613 self.release_flow_control(&acquired_flow).await?;
1614 decrement_in_flight(&self.state);
1615 self.append_dispatch_trust_record(
1616 binding,
1617 &route,
1618 &event,
1619 replay_of_event_id.as_ref(),
1620 autonomy_tier,
1621 TrustOutcome::Failure,
1622 "cancelled",
1623 attempt,
1624 Some("dispatcher shutdown cancelled retry wait".to_string()),
1625 )
1626 .await?;
1627 return Ok(DispatchOutcome {
1628 trigger_id: binding.id.as_str().to_string(),
1629 binding_key: binding.binding_key(),
1630 event_id: event.id.0,
1631 attempt_count: attempt,
1632 status: DispatchStatus::Cancelled,
1633 handler_kind: route.kind().to_string(),
1634 target_uri: route.target_uri(),
1635 replay_of_event_id,
1636 result: None,
1637 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1638 });
1639 }
1640 continue;
1641 }
1642
1643 let final_error = error.to_string();
1644 let dlq_entry = DlqEntry {
1645 trigger_id: binding.id.as_str().to_string(),
1646 binding_key: binding.binding_key(),
1647 event: event.clone(),
1648 attempt_count: attempt,
1649 final_error: final_error.clone(),
1650 attempts: attempts.clone(),
1651 };
1652 self.state
1653 .dlq
1654 .lock()
1655 .expect("dispatcher dlq poisoned")
1656 .push(dlq_entry.clone());
1657 self.emit_action_graph(
1658 &event,
1659 vec![RunActionGraphNodeRecord {
1660 id: format!("dlq:{binding_key}:{}", event.id.0),
1661 label: binding.id.as_str().to_string(),
1662 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1663 status: "queued".to_string(),
1664 outcome: "retry_exhausted".to_string(),
1665 trace_id: Some(event.trace_id.0.clone()),
1666 stage_id: None,
1667 node_id: None,
1668 worker_id: None,
1669 run_id: None,
1670 run_path: None,
1671 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
1672 }],
1673 vec![RunActionGraphEdgeRecord {
1674 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1675 to_id: format!("dlq:{binding_key}:{}", event.id.0),
1676 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1677 label: Some(format!("{attempt} attempts")),
1678 }],
1679 serde_json::json!({
1680 "source": "dispatcher",
1681 "trigger_id": binding.id.as_str(),
1682 "binding_key": binding.binding_key(),
1683 "event_id": event.id.0,
1684 "attempt_count": attempt,
1685 "final_error": final_error,
1686 "replay_of_event_id": replay_of_event_id,
1687 }),
1688 )
1689 .await?;
1690 self.append_lifecycle_event(
1691 "DlqMoved",
1692 &event,
1693 binding,
1694 serde_json::json!({
1695 "event_id": event.id.0,
1696 "attempt_count": attempt,
1697 "final_error": dlq_entry.final_error,
1698 "replay_of_event_id": replay_of_event_id,
1699 }),
1700 replay_of_event_id.as_ref(),
1701 )
1702 .await?;
1703 self.append_topic_event(
1704 TRIGGER_DLQ_TOPIC,
1705 "dlq_moved",
1706 &event,
1707 Some(binding),
1708 Some(attempt),
1709 serde_json::to_value(&dlq_entry)
1710 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1711 replay_of_event_id.as_ref(),
1712 )
1713 .await?;
1714 finish_in_flight(
1715 binding.id.as_str(),
1716 binding.version,
1717 TriggerDispatchOutcome::Dlq,
1718 )
1719 .await
1720 .map_err(|registry_error| {
1721 DispatchError::Registry(registry_error.to_string())
1722 })?;
1723 self.release_flow_control(&acquired_flow).await?;
1724 decrement_in_flight(&self.state);
1725 self.append_dispatch_trust_record(
1726 binding,
1727 &route,
1728 &event,
1729 replay_of_event_id.as_ref(),
1730 autonomy_tier,
1731 TrustOutcome::Failure,
1732 "dlq",
1733 attempt,
1734 Some(error.to_string()),
1735 )
1736 .await?;
1737 return Ok(DispatchOutcome {
1738 trigger_id: binding.id.as_str().to_string(),
1739 binding_key: binding.binding_key(),
1740 event_id: event.id.0,
1741 attempt_count: attempt,
1742 status: DispatchStatus::Dlq,
1743 handler_kind: route.kind().to_string(),
1744 target_uri: route.target_uri(),
1745 replay_of_event_id,
1746 result: None,
1747 error: Some(error.to_string()),
1748 });
1749 }
1750 }
1751 }
1752
1753 finish_in_flight(
1754 binding.id.as_str(),
1755 binding.version,
1756 TriggerDispatchOutcome::Failed,
1757 )
1758 .await
1759 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1760 self.release_flow_control(&acquired_flow).await?;
1761 decrement_in_flight(&self.state);
1762 self.append_dispatch_trust_record(
1763 binding,
1764 &route,
1765 &event,
1766 replay_of_event_id.as_ref(),
1767 autonomy_tier,
1768 TrustOutcome::Failure,
1769 "failed",
1770 max_attempts,
1771 Some("dispatch exhausted without terminal outcome".to_string()),
1772 )
1773 .await?;
1774 Ok(DispatchOutcome {
1775 trigger_id: binding.id.as_str().to_string(),
1776 binding_key: binding.binding_key(),
1777 event_id: event.id.0,
1778 attempt_count: max_attempts,
1779 status: DispatchStatus::Failed,
1780 handler_kind: route.kind().to_string(),
1781 target_uri: route.target_uri(),
1782 replay_of_event_id,
1783 result: None,
1784 error: Some("dispatch exhausted without terminal outcome".to_string()),
1785 })
1786 }
1787
1788 async fn dispatch_once(
1789 &self,
1790 binding: &TriggerBinding,
1791 route: &DispatchUri,
1792 event: &TriggerEvent,
1793 autonomy_tier: AutonomyTier,
1794 wait_lease: Option<DispatchWaitLease>,
1795 cancel_rx: &mut broadcast::Receiver<()>,
1796 ) -> Result<serde_json::Value, DispatchError> {
1797 match route {
1798 DispatchUri::Local { .. } => {
1799 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1800 return Err(DispatchError::Local(format!(
1801 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1802 binding.id.as_str()
1803 )));
1804 };
1805 let value = self
1806 .invoke_vm_callable(
1807 closure,
1808 &binding.binding_key(),
1809 event,
1810 None,
1811 binding.id.as_str(),
1812 &format!("{}.{}", event.provider.as_str(), event.kind),
1813 autonomy_tier,
1814 wait_lease,
1815 cancel_rx,
1816 )
1817 .await?;
1818 Ok(vm_value_to_json(&value))
1819 }
1820 DispatchUri::A2a {
1821 target,
1822 allow_cleartext,
1823 } => {
1824 if self.state.shutting_down.load(Ordering::SeqCst) {
1825 return Err(DispatchError::Cancelled(
1826 "dispatcher shutdown cancelled A2A dispatch".to_string(),
1827 ));
1828 }
1829 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1830 target,
1831 *allow_cleartext,
1832 binding.id.as_str(),
1833 &binding.binding_key(),
1834 event,
1835 cancel_rx,
1836 )
1837 .await
1838 .map_err(|error| match error {
1839 crate::a2a::A2aClientError::Cancelled(message) => {
1840 DispatchError::Cancelled(message)
1841 }
1842 other => DispatchError::A2a(other.to_string()),
1843 })?;
1844 match ack {
1845 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1846 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1847 }
1848 }
1849 DispatchUri::Worker { queue } => {
1850 let receipt = crate::WorkerQueue::new(self.event_log.clone())
1851 .enqueue(&crate::WorkerQueueJob {
1852 queue: queue.clone(),
1853 trigger_id: binding.id.as_str().to_string(),
1854 binding_key: binding.binding_key(),
1855 binding_version: binding.version,
1856 event: event.clone(),
1857 replay_of_event_id: current_dispatch_context()
1858 .and_then(|context| context.replay_of_event_id),
1859 priority: worker_queue_priority(binding, event),
1860 })
1861 .await
1862 .map_err(DispatchError::from)?;
1863 Ok(serde_json::to_value(receipt)
1864 .map_err(|error| DispatchError::Serde(error.to_string()))?)
1865 }
1866 }
1867 }
1868
1869 async fn apply_flow_control(
1870 &self,
1871 binding: &TriggerBinding,
1872 event: &TriggerEvent,
1873 replay_of_event_id: Option<&String>,
1874 ) -> Result<FlowControlOutcome, DispatchError> {
1875 let flow = &binding.flow_control;
1876 let mut managed_event = event.clone();
1877
1878 if let Some(batch) = &flow.batch {
1879 let gate = self
1880 .resolve_flow_gate(
1881 &binding.binding_key(),
1882 batch.key.as_ref(),
1883 &managed_event,
1884 replay_of_event_id,
1885 )
1886 .await?;
1887 match self
1888 .state
1889 .flow_control
1890 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1891 .await
1892 .map_err(DispatchError::from)?
1893 {
1894 BatchDecision::Dispatch(events) => {
1895 managed_event = build_batched_event(events)?;
1896 }
1897 BatchDecision::Merged => {
1898 return Ok(FlowControlOutcome::Skip {
1899 reason: "batch_merged".to_string(),
1900 })
1901 }
1902 }
1903 }
1904
1905 if let Some(debounce) = &flow.debounce {
1906 let gate = self
1907 .resolve_flow_gate(
1908 &binding.binding_key(),
1909 Some(&debounce.key),
1910 &managed_event,
1911 replay_of_event_id,
1912 )
1913 .await?;
1914 let latest = self
1915 .state
1916 .flow_control
1917 .debounce(&gate, debounce.period)
1918 .await
1919 .map_err(DispatchError::from)?;
1920 if !latest {
1921 return Ok(FlowControlOutcome::Skip {
1922 reason: "debounced".to_string(),
1923 });
1924 }
1925 }
1926
1927 if let Some(rate_limit) = &flow.rate_limit {
1928 let gate = self
1929 .resolve_flow_gate(
1930 &binding.binding_key(),
1931 rate_limit.key.as_ref(),
1932 &managed_event,
1933 replay_of_event_id,
1934 )
1935 .await?;
1936 let allowed = self
1937 .state
1938 .flow_control
1939 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1940 .await
1941 .map_err(DispatchError::from)?;
1942 if !allowed {
1943 return Ok(FlowControlOutcome::Skip {
1944 reason: "rate_limited".to_string(),
1945 });
1946 }
1947 }
1948
1949 if let Some(throttle) = &flow.throttle {
1950 let gate = self
1951 .resolve_flow_gate(
1952 &binding.binding_key(),
1953 throttle.key.as_ref(),
1954 &managed_event,
1955 replay_of_event_id,
1956 )
1957 .await?;
1958 self.state
1959 .flow_control
1960 .wait_for_throttle(&gate, throttle.period, throttle.max)
1961 .await
1962 .map_err(DispatchError::from)?;
1963 }
1964
1965 let mut acquired = AcquiredFlowControl::default();
1966 if let Some(singleton) = &flow.singleton {
1967 let gate = self
1968 .resolve_flow_gate(
1969 &binding.binding_key(),
1970 singleton.key.as_ref(),
1971 &managed_event,
1972 replay_of_event_id,
1973 )
1974 .await?;
1975 let acquired_singleton = self
1976 .state
1977 .flow_control
1978 .try_acquire_singleton(&gate)
1979 .await
1980 .map_err(DispatchError::from)?;
1981 if !acquired_singleton {
1982 return Ok(FlowControlOutcome::Skip {
1983 reason: "singleton_active".to_string(),
1984 });
1985 }
1986 acquired.singleton = Some(SingletonLease { gate, held: true });
1987 }
1988
1989 if let Some(concurrency) = &flow.concurrency {
1990 let gate = self
1991 .resolve_flow_gate(
1992 &binding.binding_key(),
1993 concurrency.key.as_ref(),
1994 &managed_event,
1995 replay_of_event_id,
1996 )
1997 .await?;
1998 let priority_rank = self
1999 .resolve_priority_rank(
2000 &binding.binding_key(),
2001 flow.priority.as_ref(),
2002 &managed_event,
2003 replay_of_event_id,
2004 )
2005 .await?;
2006 let permit = self
2007 .state
2008 .flow_control
2009 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2010 .await
2011 .map_err(DispatchError::from)?;
2012 acquired.concurrency = Some(ConcurrencyLease {
2013 gate,
2014 max: concurrency.max,
2015 priority_rank,
2016 permit: Some(permit),
2017 });
2018 }
2019
2020 Ok(FlowControlOutcome::Dispatch {
2021 event: Box::new(managed_event),
2022 acquired,
2023 })
2024 }
2025
2026 async fn release_flow_control(
2027 &self,
2028 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2029 ) -> Result<(), DispatchError> {
2030 let (singleton_gate, concurrency_permit) = {
2031 let mut acquired = acquired.lock().await;
2032 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2033 if lease.held {
2034 lease.held = false;
2035 Some(lease.gate.clone())
2036 } else {
2037 None
2038 }
2039 });
2040 let concurrency_permit = acquired
2041 .concurrency
2042 .as_mut()
2043 .and_then(|lease| lease.permit.take());
2044 (singleton_gate, concurrency_permit)
2045 };
2046 if let Some(gate) = singleton_gate {
2047 self.state
2048 .flow_control
2049 .release_singleton(&gate)
2050 .await
2051 .map_err(DispatchError::from)?;
2052 }
2053 if let Some(permit) = concurrency_permit {
2054 self.state
2055 .flow_control
2056 .release_concurrency(permit)
2057 .await
2058 .map_err(DispatchError::from)?;
2059 }
2060 Ok(())
2061 }
2062
2063 async fn resolve_flow_gate(
2064 &self,
2065 binding_key: &str,
2066 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2067 event: &TriggerEvent,
2068 replay_of_event_id: Option<&String>,
2069 ) -> Result<String, DispatchError> {
2070 let key = match expr {
2071 Some(expr) => {
2072 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2073 .await?
2074 }
2075 None => "_global".to_string(),
2076 };
2077 Ok(format!("{binding_key}:{key}"))
2078 }
2079
2080 async fn resolve_priority_rank(
2081 &self,
2082 binding_key: &str,
2083 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2084 event: &TriggerEvent,
2085 replay_of_event_id: Option<&String>,
2086 ) -> Result<usize, DispatchError> {
2087 let Some(priority) = priority else {
2088 return Ok(0);
2089 };
2090 let value = self
2091 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2092 .await?;
2093 Ok(priority
2094 .order
2095 .iter()
2096 .position(|candidate| candidate == &value)
2097 .unwrap_or(priority.order.len()))
2098 }
2099
2100 async fn evaluate_flow_expression(
2101 &self,
2102 binding_key: &str,
2103 expr: &crate::triggers::TriggerExpressionSpec,
2104 event: &TriggerEvent,
2105 replay_of_event_id: Option<&String>,
2106 ) -> Result<String, DispatchError> {
2107 let value = self
2108 .invoke_vm_callable(
2109 &expr.closure,
2110 binding_key,
2111 event,
2112 replay_of_event_id,
2113 "",
2114 "flow_control",
2115 AutonomyTier::Suggest,
2116 None,
2117 &mut self.cancel_tx.subscribe(),
2118 )
2119 .await?;
2120 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2121 }
2122
2123 #[allow(clippy::too_many_arguments)]
2124 async fn invoke_vm_callable(
2125 &self,
2126 closure: &crate::value::VmClosure,
2127 binding_key: &str,
2128 event: &TriggerEvent,
2129 replay_of_event_id: Option<&String>,
2130 agent_id: &str,
2131 action: &str,
2132 autonomy_tier: AutonomyTier,
2133 wait_lease: Option<DispatchWaitLease>,
2134 cancel_rx: &mut broadcast::Receiver<()>,
2135 ) -> Result<VmValue, DispatchError> {
2136 let mut vm = self.base_vm.child_vm();
2137 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2138 if self.state.shutting_down.load(Ordering::SeqCst) {
2139 cancel_token.store(true, Ordering::SeqCst);
2140 }
2141 self.state
2142 .cancel_tokens
2143 .lock()
2144 .expect("dispatcher cancel tokens poisoned")
2145 .push(cancel_token.clone());
2146 vm.install_cancel_token(cancel_token.clone());
2147 let arg = event_to_handler_value(event)?;
2148 let args = [arg];
2149 let future = vm.call_closure_pub(closure, &args, &[]);
2150 pin_mut!(future);
2151 let (binding_id, binding_version) = split_binding_key(binding_key);
2152 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2153 slot.borrow_mut().replace(DispatchContext {
2154 trigger_event: event.clone(),
2155 replay_of_event_id: replay_of_event_id.cloned(),
2156 binding_id,
2157 binding_version,
2158 agent_id: agent_id.to_string(),
2159 action: action.to_string(),
2160 autonomy_tier,
2161 })
2162 });
2163 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2164 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2165 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2166 crate::stdlib::hitl::reset_hitl_state();
2167 let mut poll = tokio::time::interval(Duration::from_millis(100));
2168 let result = loop {
2169 tokio::select! {
2170 result = &mut future => break result,
2171 _ = recv_cancel(cancel_rx) => {
2172 cancel_token.store(true, Ordering::SeqCst);
2173 }
2174 _ = poll.tick() => {
2175 if dispatch_cancel_requested(
2176 &self.event_log,
2177 binding_key,
2178 &event.id.0,
2179 replay_of_event_id,
2180 )
2181 .await? {
2182 cancel_token.store(true, Ordering::SeqCst);
2183 }
2184 }
2185 }
2186 };
2187 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2188 *slot.borrow_mut() = prior_context;
2189 });
2190 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2191 *slot.borrow_mut() = prior_wait_lease;
2192 });
2193 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2194 {
2195 let mut tokens = self
2196 .state
2197 .cancel_tokens
2198 .lock()
2199 .expect("dispatcher cancel tokens poisoned");
2200 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2201 }
2202
2203 if cancel_token.load(Ordering::SeqCst) {
2204 if dispatch_cancel_requested(
2205 &self.event_log,
2206 binding_key,
2207 &event.id.0,
2208 replay_of_event_id,
2209 )
2210 .await?
2211 {
2212 Err(DispatchError::Cancelled(
2213 "trigger cancel request cancelled local handler".to_string(),
2214 ))
2215 } else {
2216 Err(DispatchError::Cancelled(
2217 "dispatcher shutdown cancelled local handler".to_string(),
2218 ))
2219 }
2220 } else {
2221 result.map_err(dispatch_error_from_vm_error)
2222 }
2223 }
2224
2225 async fn evaluate_predicate(
2226 &self,
2227 binding: &TriggerBinding,
2228 predicate: &super::registry::TriggerPredicateSpec,
2229 event: &TriggerEvent,
2230 replay_of_event_id: Option<&String>,
2231 autonomy_tier: AutonomyTier,
2232 ) -> Result<PredicateEvaluationRecord, DispatchError> {
2233 let event_id = event.id.0.clone();
2234 let trigger_id = binding.id.as_str().to_string();
2235 let now_ms = now_unix_ms();
2236 let today = utc_day_key();
2237
2238 let breaker_open_until = {
2239 let mut state = binding
2240 .predicate_state
2241 .lock()
2242 .expect("trigger predicate state poisoned");
2243 if state.budget_day_utc != Some(today) {
2244 state.budget_day_utc = Some(today);
2245 binding
2246 .metrics
2247 .cost_today_usd_micros
2248 .store(0, Ordering::Relaxed);
2249 }
2250 if state
2251 .breaker_open_until_ms
2252 .is_some_and(|until_ms| until_ms > now_ms)
2253 {
2254 state.breaker_open_until_ms
2255 } else {
2256 None
2257 }
2258 };
2259
2260 if breaker_open_until.is_some() {
2261 let mut metadata = BTreeMap::new();
2262 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2263 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2264 metadata.insert(
2265 "breaker_open_until_ms".to_string(),
2266 serde_json::json!(breaker_open_until),
2267 );
2268 crate::events::log_warn_meta(
2269 "trigger.predicate.circuit_breaker",
2270 "trigger predicate circuit breaker is open; short-circuiting to false",
2271 metadata,
2272 );
2273 let record = PredicateEvaluationRecord {
2274 result: false,
2275 reason: Some("circuit_open".to_string()),
2276 ..Default::default()
2277 };
2278 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2279 .await?;
2280 return Ok(record);
2281 }
2282
2283 if binding
2284 .daily_cost_usd
2285 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2286 {
2287 self.append_lifecycle_event(
2288 "predicate.daily_budget_exceeded",
2289 event,
2290 binding,
2291 serde_json::json!({
2292 "trigger_id": binding.id.as_str(),
2293 "event_id": event.id.0,
2294 "limit_usd": binding.daily_cost_usd,
2295 "cost_today_usd": current_predicate_daily_cost(binding),
2296 "replay_of_event_id": replay_of_event_id,
2297 }),
2298 replay_of_event_id,
2299 )
2300 .await?;
2301 let record = PredicateEvaluationRecord {
2302 result: false,
2303 reason: Some("daily_budget_exceeded".to_string()),
2304 ..Default::default()
2305 };
2306 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2307 .await?;
2308 return Ok(record);
2309 }
2310
2311 let replay_cache = self
2312 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2313 .await?;
2314 let guard = start_predicate_evaluation(
2315 binding.when_budget.clone().unwrap_or_default(),
2316 replay_cache,
2317 );
2318 let started = std::time::Instant::now();
2319 let eval = self
2320 .invoke_vm_callable_with_timeout(
2321 &predicate.closure,
2322 &binding.binding_key(),
2323 event,
2324 replay_of_event_id,
2325 binding.id.as_str(),
2326 &format!("{}.{}", event.provider.as_str(), event.kind),
2327 autonomy_tier,
2328 &mut self.cancel_tx.subscribe(),
2329 binding
2330 .when_budget
2331 .as_ref()
2332 .and_then(|budget| budget.timeout()),
2333 )
2334 .await;
2335 let capture = guard.finish();
2336 let latency_ms = started.elapsed().as_millis() as u64;
2337 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
2338 self.append_predicate_cache_record(binding, event, &capture.entries)
2339 .await?;
2340 }
2341
2342 let mut record = PredicateEvaluationRecord {
2343 result: false,
2344 cost_usd: capture.total_cost_usd,
2345 tokens: capture.total_tokens,
2346 latency_ms,
2347 cached: capture.cached,
2348 reason: None,
2349 };
2350
2351 let mut count_failure = false;
2352 let mut opened_breaker = false;
2353
2354 match eval {
2355 Ok(value) => match predicate_value_as_bool(value) {
2356 Ok(result) => {
2357 record.result = result;
2358 }
2359 Err(reason) => {
2360 count_failure = true;
2361 record.reason = Some(reason);
2362 }
2363 },
2364 Err(error) => {
2365 count_failure = true;
2366 record.reason = Some(error.to_string());
2367 }
2368 }
2369
2370 let cost_usd_micros = usd_to_micros(record.cost_usd);
2371 if cost_usd_micros > 0 {
2372 binding
2373 .metrics
2374 .cost_total_usd_micros
2375 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2376 binding
2377 .metrics
2378 .cost_today_usd_micros
2379 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2380 }
2381
2382 let timed_out = matches!(
2383 record.reason.as_deref(),
2384 Some("predicate evaluation timed out")
2385 );
2386 if capture.budget_exceeded || timed_out {
2387 record.result = false;
2388 record.reason = Some("budget_exceeded".to_string());
2389 self.append_lifecycle_event(
2390 "predicate.budget_exceeded",
2391 event,
2392 binding,
2393 serde_json::json!({
2394 "trigger_id": binding.id.as_str(),
2395 "event_id": event.id.0,
2396 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2397 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2398 "cost_usd": record.cost_usd,
2399 "tokens": record.tokens,
2400 "replay_of_event_id": replay_of_event_id,
2401 }),
2402 replay_of_event_id,
2403 )
2404 .await?;
2405 }
2406
2407 if binding
2408 .daily_cost_usd
2409 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2410 {
2411 record.result = false;
2412 record.reason = Some("daily_budget_exceeded".to_string());
2413 self.append_lifecycle_event(
2414 "predicate.daily_budget_exceeded",
2415 event,
2416 binding,
2417 serde_json::json!({
2418 "trigger_id": binding.id.as_str(),
2419 "event_id": event.id.0,
2420 "limit_usd": binding.daily_cost_usd,
2421 "cost_today_usd": current_predicate_daily_cost(binding),
2422 "replay_of_event_id": replay_of_event_id,
2423 }),
2424 replay_of_event_id,
2425 )
2426 .await?;
2427 }
2428
2429 {
2430 let mut state = binding
2431 .predicate_state
2432 .lock()
2433 .expect("trigger predicate state poisoned");
2434 if state.budget_day_utc != Some(today) {
2435 state.budget_day_utc = Some(today);
2436 binding
2437 .metrics
2438 .cost_today_usd_micros
2439 .store(cost_usd_micros, Ordering::Relaxed);
2440 }
2441 if count_failure {
2442 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2443 if state.consecutive_failures >= 3 {
2444 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2445 opened_breaker = true;
2446 }
2447 } else {
2448 state.consecutive_failures = 0;
2449 state.breaker_open_until_ms = None;
2450 }
2451 }
2452
2453 if opened_breaker {
2454 let mut metadata = BTreeMap::new();
2455 metadata.insert(
2456 "trigger_id".to_string(),
2457 serde_json::json!(binding.id.as_str()),
2458 );
2459 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2460 metadata.insert("failure_count".to_string(), serde_json::json!(3));
2461 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2462 crate::events::log_warn_meta(
2463 "trigger.predicate.circuit_breaker",
2464 "trigger predicate circuit breaker opened for 5 minutes",
2465 metadata,
2466 );
2467 }
2468
2469 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2470 .await?;
2471 Ok(record)
2472 }
2473
2474 #[allow(clippy::too_many_arguments)]
2475 #[allow(clippy::too_many_arguments)]
2476 async fn invoke_vm_callable_with_timeout(
2477 &self,
2478 closure: &crate::value::VmClosure,
2479 binding_key: &str,
2480 event: &TriggerEvent,
2481 replay_of_event_id: Option<&String>,
2482 agent_id: &str,
2483 action: &str,
2484 autonomy_tier: AutonomyTier,
2485 cancel_rx: &mut broadcast::Receiver<()>,
2486 timeout: Option<Duration>,
2487 ) -> Result<VmValue, DispatchError> {
2488 let future = self.invoke_vm_callable(
2489 closure,
2490 binding_key,
2491 event,
2492 replay_of_event_id,
2493 agent_id,
2494 action,
2495 autonomy_tier,
2496 None,
2497 cancel_rx,
2498 );
2499 pin_mut!(future);
2500 if let Some(timeout) = timeout {
2501 match tokio::time::timeout(timeout, future).await {
2502 Ok(result) => result,
2503 Err(_) => Err(DispatchError::Local(
2504 "predicate evaluation timed out".to_string(),
2505 )),
2506 }
2507 } else {
2508 future.await
2509 }
2510 }
2511
2512 async fn append_predicate_evaluated_event(
2513 &self,
2514 binding: &TriggerBinding,
2515 event: &TriggerEvent,
2516 record: &PredicateEvaluationRecord,
2517 replay_of_event_id: Option<&String>,
2518 ) -> Result<(), DispatchError> {
2519 self.append_lifecycle_event(
2520 "predicate.evaluated",
2521 event,
2522 binding,
2523 serde_json::json!({
2524 "trigger_id": binding.id.as_str(),
2525 "event_id": event.id.0,
2526 "result": record.result,
2527 "cost_usd": record.cost_usd,
2528 "tokens": record.tokens,
2529 "latency_ms": record.latency_ms,
2530 "cached": record.cached,
2531 "reason": record.reason,
2532 "replay_of_event_id": replay_of_event_id,
2533 }),
2534 replay_of_event_id,
2535 )
2536 .await
2537 }
2538
2539 async fn append_predicate_cache_record(
2540 &self,
2541 binding: &TriggerBinding,
2542 event: &TriggerEvent,
2543 entries: &[PredicateCacheEntry],
2544 ) -> Result<(), DispatchError> {
2545 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2546 .expect("static trigger inbox legacy topic name is valid");
2547 let payload = serde_json::to_value(PredicateCacheRecord {
2548 trigger_id: binding.id.as_str().to_string(),
2549 event_id: event.id.0.clone(),
2550 entries: entries.to_vec(),
2551 })
2552 .map_err(|error| DispatchError::Serde(error.to_string()))?;
2553 self.event_log
2554 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2555 .await
2556 .map_err(DispatchError::from)
2557 .map(|_| ())
2558 }
2559
2560 async fn read_predicate_cache_record(
2561 &self,
2562 event_id: &str,
2563 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2564 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2565 .expect("static trigger inbox legacy topic name is valid");
2566 let records = self
2567 .event_log
2568 .read_range(&topic, None, usize::MAX)
2569 .await
2570 .map_err(DispatchError::from)?;
2571 Ok(records
2572 .into_iter()
2573 .filter(|(_, event)| event.kind == "predicate_llm_cache")
2574 .filter_map(|(_, event)| {
2575 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2576 })
2577 .filter(|record| record.event_id == event_id)
2578 .flat_map(|record| record.entries)
2579 .collect())
2580 }
2581
2582 #[allow(clippy::too_many_arguments)]
2583 async fn append_dispatch_trust_record(
2584 &self,
2585 binding: &TriggerBinding,
2586 route: &DispatchUri,
2587 event: &TriggerEvent,
2588 replay_of_event_id: Option<&String>,
2589 autonomy_tier: AutonomyTier,
2590 outcome: TrustOutcome,
2591 terminal_status: &str,
2592 attempt_count: u32,
2593 error: Option<String>,
2594 ) -> Result<(), DispatchError> {
2595 let mut record = TrustRecord::new(
2596 binding.id.as_str().to_string(),
2597 format!("{}.{}", event.provider.as_str(), event.kind),
2598 None,
2599 outcome,
2600 event.trace_id.0.clone(),
2601 autonomy_tier,
2602 );
2603 record.metadata.insert(
2604 "binding_key".to_string(),
2605 serde_json::json!(binding.binding_key()),
2606 );
2607 record.metadata.insert(
2608 "binding_version".to_string(),
2609 serde_json::json!(binding.version),
2610 );
2611 record.metadata.insert(
2612 "provider".to_string(),
2613 serde_json::json!(event.provider.as_str()),
2614 );
2615 record
2616 .metadata
2617 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2618 record
2619 .metadata
2620 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2621 record.metadata.insert(
2622 "target_uri".to_string(),
2623 serde_json::json!(route.target_uri()),
2624 );
2625 record.metadata.insert(
2626 "terminal_status".to_string(),
2627 serde_json::json!(terminal_status),
2628 );
2629 record.metadata.insert(
2630 "attempt_count".to_string(),
2631 serde_json::json!(attempt_count),
2632 );
2633 if let Some(replay_of_event_id) = replay_of_event_id {
2634 record.metadata.insert(
2635 "replay_of_event_id".to_string(),
2636 serde_json::json!(replay_of_event_id),
2637 );
2638 }
2639 if let Some(error) = error {
2640 record
2641 .metadata
2642 .insert("error".to_string(), serde_json::json!(error));
2643 }
2644 append_trust_record(&self.event_log, &record)
2645 .await
2646 .map_err(DispatchError::from)
2647 }
2648
2649 async fn append_attempt_record(
2650 &self,
2651 event: &TriggerEvent,
2652 binding: &TriggerBinding,
2653 attempt: &DispatchAttemptRecord,
2654 replay_of_event_id: Option<&String>,
2655 ) -> Result<(), DispatchError> {
2656 self.append_topic_event(
2657 TRIGGER_ATTEMPTS_TOPIC,
2658 "attempt_recorded",
2659 event,
2660 Some(binding),
2661 Some(attempt.attempt),
2662 serde_json::to_value(attempt)
2663 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2664 replay_of_event_id,
2665 )
2666 .await
2667 }
2668
2669 async fn append_lifecycle_event(
2670 &self,
2671 kind: &str,
2672 event: &TriggerEvent,
2673 binding: &TriggerBinding,
2674 payload: serde_json::Value,
2675 replay_of_event_id: Option<&String>,
2676 ) -> Result<(), DispatchError> {
2677 self.append_topic_event(
2678 TRIGGERS_LIFECYCLE_TOPIC,
2679 kind,
2680 event,
2681 Some(binding),
2682 None,
2683 payload,
2684 replay_of_event_id,
2685 )
2686 .await
2687 }
2688
2689 async fn append_skipped_outbox_event(
2690 &self,
2691 binding: &TriggerBinding,
2692 route: &DispatchUri,
2693 event: &TriggerEvent,
2694 replay_of_event_id: Option<&String>,
2695 stage: DispatchSkipStage,
2696 detail: serde_json::Value,
2697 ) -> Result<(), DispatchError> {
2698 self.append_topic_event(
2699 TRIGGER_OUTBOX_TOPIC,
2700 "dispatch_skipped",
2701 event,
2702 Some(binding),
2703 None,
2704 serde_json::json!({
2705 "event_id": event.id.0,
2706 "trigger_id": binding.id.as_str(),
2707 "binding_key": binding.binding_key(),
2708 "handler_kind": route.kind(),
2709 "target_uri": route.target_uri(),
2710 "skip_stage": stage.as_str(),
2711 "detail": detail,
2712 "replay_of_event_id": replay_of_event_id,
2713 }),
2714 replay_of_event_id,
2715 )
2716 .await
2717 }
2718
2719 async fn append_topic_event(
2720 &self,
2721 topic_name: &str,
2722 kind: &str,
2723 event: &TriggerEvent,
2724 binding: Option<&TriggerBinding>,
2725 attempt: Option<u32>,
2726 payload: serde_json::Value,
2727 replay_of_event_id: Option<&String>,
2728 ) -> Result<(), DispatchError> {
2729 let topic = Topic::new(topic_name)
2730 .expect("static trigger dispatcher topic names should always be valid");
2731 let headers = event_headers(event, binding, attempt, replay_of_event_id);
2732 self.event_log
2733 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2734 .await
2735 .map_err(DispatchError::from)
2736 .map(|_| ())
2737 }
2738
2739 async fn emit_action_graph(
2740 &self,
2741 event: &TriggerEvent,
2742 nodes: Vec<RunActionGraphNodeRecord>,
2743 edges: Vec<RunActionGraphEdgeRecord>,
2744 extra: serde_json::Value,
2745 ) -> Result<(), DispatchError> {
2746 let mut headers = BTreeMap::new();
2747 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2748 headers.insert("event_id".to_string(), event.id.0.clone());
2749 let observability = RunObservabilityRecord {
2750 schema_version: 1,
2751 action_graph_nodes: nodes,
2752 action_graph_edges: edges,
2753 ..Default::default()
2754 };
2755 append_action_graph_update(
2756 headers,
2757 serde_json::json!({
2758 "source": "dispatcher",
2759 "trace_id": event.trace_id.0,
2760 "event_id": event.id.0,
2761 "observability": observability,
2762 "context": extra,
2763 }),
2764 )
2765 .await
2766 .map_err(DispatchError::from)
2767 }
2768}
2769
2770async fn dispatch_cancel_requested(
2771 event_log: &Arc<AnyEventLog>,
2772 binding_key: &str,
2773 event_id: &str,
2774 replay_of_event_id: Option<&String>,
2775) -> Result<bool, DispatchError> {
2776 if replay_of_event_id.is_some() {
2777 return Ok(false);
2778 }
2779 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2780 .expect("static trigger cancel topic should always be valid");
2781 let events = event_log.read_range(&topic, None, usize::MAX).await?;
2782 let requested = events
2783 .into_iter()
2784 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2785 .filter_map(|(_, event)| {
2786 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2787 })
2788 .collect::<BTreeSet<_>>();
2789 Ok(requested
2790 .iter()
2791 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2792}
2793
2794async fn sleep_or_cancel_or_request(
2795 event_log: &Arc<AnyEventLog>,
2796 delay: Duration,
2797 binding_key: &str,
2798 event_id: &str,
2799 replay_of_event_id: Option<&String>,
2800 cancel_rx: &mut broadcast::Receiver<()>,
2801) -> Result<(), DispatchError> {
2802 let sleep = tokio::time::sleep(delay);
2803 pin_mut!(sleep);
2804 let mut poll = tokio::time::interval(Duration::from_millis(100));
2805 loop {
2806 tokio::select! {
2807 _ = &mut sleep => return Ok(()),
2808 _ = recv_cancel(cancel_rx) => {
2809 return Err(DispatchError::Cancelled(
2810 "dispatcher shutdown cancelled retry wait".to_string(),
2811 ));
2812 }
2813 _ = poll.tick() => {
2814 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2815 return Err(DispatchError::Cancelled(
2816 "trigger cancel request cancelled retry wait".to_string(),
2817 ));
2818 }
2819 }
2820 }
2821 }
2822}
2823
2824fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2825 let mut iter = events.into_iter();
2826 let Some(mut root) = iter.next() else {
2827 return Err(DispatchError::Registry(
2828 "batch dispatch produced an empty event list".to_string(),
2829 ));
2830 };
2831 let mut batch = Vec::new();
2832 batch.push(
2833 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2834 );
2835 for event in iter {
2836 batch.push(
2837 serde_json::to_value(&event)
2838 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2839 );
2840 }
2841 root.batch = Some(batch);
2842 Ok(root)
2843}
2844
2845fn json_value_to_gate(value: &serde_json::Value) -> String {
2846 match value {
2847 serde_json::Value::Null => "null".to_string(),
2848 serde_json::Value::String(text) => text.clone(),
2849 serde_json::Value::Bool(value) => value.to_string(),
2850 serde_json::Value::Number(value) => value.to_string(),
2851 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2852 }
2853}
2854
2855fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
2856 let json =
2857 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2858 let value = json_to_vm_value(&json);
2859 match (&event.raw_body, value) {
2860 (Some(raw_body), VmValue::Dict(dict)) => {
2861 let mut map = (*dict).clone();
2862 map.insert(
2863 "raw_body".to_string(),
2864 VmValue::Bytes(Rc::new(raw_body.clone())),
2865 );
2866 Ok(VmValue::Dict(Rc::new(map)))
2867 }
2868 (_, other) => Ok(other),
2869 }
2870}
2871
2872fn decrement_in_flight(state: &DispatcherRuntimeState) {
2873 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2874 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2875 state.idle_notify.notify_waiters();
2876 }
2877}
2878
2879fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2880 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2881 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2882 state.idle_notify.notify_waiters();
2883 }
2884}
2885
2886#[cfg(test)]
2887fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2888 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2889 *slot.borrow_mut() = Some(tx);
2890 });
2891}
2892
2893#[cfg(not(test))]
2894fn notify_test_inbox_dequeued() {}
2895
2896#[cfg(test)]
2897fn notify_test_inbox_dequeued() {
2898 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2899 if let Some(tx) = slot.borrow_mut().take() {
2900 let _ = tx.send(());
2901 }
2902 });
2903}
2904
2905pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2906 event_log: &L,
2907 event: &TriggerEvent,
2908) -> Result<u64, DispatchError> {
2909 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2910 .expect("static trigger.inbox.envelopes topic is valid");
2911 let headers = event_headers(event, None, None, None);
2912 let payload =
2913 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2914 event_log
2915 .append(
2916 &topic,
2917 LogEvent::new("event_ingested", payload).with_headers(headers),
2918 )
2919 .await
2920 .map_err(DispatchError::from)
2921}
2922
2923pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2924 ACTIVE_DISPATCHER_STATE.with(|slot| {
2925 slot.borrow()
2926 .as_ref()
2927 .map(|state| DispatcherStatsSnapshot {
2928 in_flight: state.in_flight.load(Ordering::Relaxed),
2929 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2930 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2931 })
2932 .unwrap_or_default()
2933 })
2934}
2935
2936pub fn clear_dispatcher_state() {
2937 ACTIVE_DISPATCHER_STATE.with(|slot| {
2938 *slot.borrow_mut() = None;
2939 });
2940 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2941 *slot.borrow_mut() = None;
2942 });
2943}
2944
2945fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2946 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
2947 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
2948 }
2949 if is_cancelled_vm_error(&error) {
2950 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2951 }
2952 if let VmError::Thrown(VmValue::String(message)) = &error {
2953 return DispatchError::Local(message.to_string());
2954 }
2955 match error_to_category(&error) {
2956 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2957 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2958 ErrorCategory::Cancelled => {
2959 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2960 }
2961 _ => DispatchError::Local(error.to_string()),
2962 }
2963}
2964
2965fn dispatch_error_label(error: &DispatchError) -> &'static str {
2966 match error {
2967 DispatchError::Denied(_) => "denied",
2968 DispatchError::Timeout(_) => "timeout",
2969 DispatchError::Waiting(_) => "waiting",
2970 DispatchError::Cancelled(_) => "cancelled",
2971 _ => "failed",
2972 }
2973}
2974
2975fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
2976 match route {
2977 DispatchUri::Worker { .. } => "enqueued",
2978 DispatchUri::A2a { .. }
2979 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
2980 {
2981 "pending"
2982 }
2983 DispatchUri::A2a { .. } => "completed",
2984 DispatchUri::Local { .. } => "success",
2985 }
2986}
2987
2988fn dispatch_node_id(
2989 route: &DispatchUri,
2990 binding_key: &str,
2991 event_id: &str,
2992 attempt: u32,
2993) -> String {
2994 let prefix = match route {
2995 DispatchUri::A2a { .. } => "a2a",
2996 _ => "dispatch",
2997 };
2998 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2999}
3000
3001fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3002 match route {
3003 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3004 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3005 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3006 }
3007}
3008
3009fn dispatch_node_label(route: &DispatchUri) -> String {
3010 match route {
3011 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3012 _ => route.target_uri(),
3013 }
3014}
3015
3016fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3017 match route {
3018 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3019 _ => None,
3020 }
3021}
3022
3023fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3024 match route {
3025 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3026 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3027 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3028 }
3029}
3030
3031fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3032 match status {
3033 crate::triggers::SignatureStatus::Verified => "verified",
3034 crate::triggers::SignatureStatus::Unsigned => "unsigned",
3035 crate::triggers::SignatureStatus::Failed { .. } => "failed",
3036 }
3037}
3038
3039fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3040 let mut metadata = BTreeMap::new();
3041 metadata.insert(
3042 "provider".to_string(),
3043 serde_json::json!(event.provider.as_str()),
3044 );
3045 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3046 metadata.insert(
3047 "dedupe_key".to_string(),
3048 serde_json::json!(event.dedupe_key),
3049 );
3050 metadata.insert(
3051 "signature_status".to_string(),
3052 serde_json::json!(signature_status_label(&event.signature_status)),
3053 );
3054 metadata
3055}
3056
3057fn predicate_node_metadata(
3058 binding: &TriggerBinding,
3059 predicate: &super::registry::TriggerPredicateSpec,
3060 event: &TriggerEvent,
3061 evaluation: &PredicateEvaluationRecord,
3062) -> BTreeMap<String, serde_json::Value> {
3063 let mut metadata = BTreeMap::new();
3064 metadata.insert(
3065 "trigger_id".to_string(),
3066 serde_json::json!(binding.id.as_str()),
3067 );
3068 metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
3069 metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
3070 metadata.insert(
3071 "cost_usd".to_string(),
3072 serde_json::json!(evaluation.cost_usd),
3073 );
3074 metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
3075 metadata.insert(
3076 "latency_ms".to_string(),
3077 serde_json::json!(evaluation.latency_ms),
3078 );
3079 metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
3080 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3081 if let Some(reason) = evaluation.reason.as_ref() {
3082 metadata.insert("reason".to_string(), serde_json::json!(reason));
3083 }
3084 metadata
3085}
3086
3087fn dispatch_node_metadata(
3088 route: &DispatchUri,
3089 binding: &TriggerBinding,
3090 event: &TriggerEvent,
3091 attempt: u32,
3092) -> BTreeMap<String, serde_json::Value> {
3093 let mut metadata = BTreeMap::new();
3094 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3095 metadata.insert(
3096 "target_uri".to_string(),
3097 serde_json::json!(route.target_uri()),
3098 );
3099 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3100 metadata.insert(
3101 "trigger_id".to_string(),
3102 serde_json::json!(binding.id.as_str()),
3103 );
3104 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3105 if let Some(target_agent) = dispatch_target_agent(route) {
3106 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3107 }
3108 if let DispatchUri::Worker { queue } = route {
3109 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3110 }
3111 metadata
3112}
3113
3114fn dispatch_success_metadata(
3115 route: &DispatchUri,
3116 binding: &TriggerBinding,
3117 event: &TriggerEvent,
3118 attempt: u32,
3119 result: &serde_json::Value,
3120) -> BTreeMap<String, serde_json::Value> {
3121 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3122 match route {
3123 DispatchUri::A2a { .. } => {
3124 if let Some(task_id) = result
3125 .get("task_id")
3126 .or_else(|| result.get("id"))
3127 .and_then(|value| value.as_str())
3128 {
3129 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3130 }
3131 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3132 metadata.insert("state".to_string(), serde_json::json!(state));
3133 }
3134 }
3135 DispatchUri::Worker { .. } => {
3136 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3137 {
3138 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3139 }
3140 if let Some(response_topic) = result
3141 .get("response_topic")
3142 .and_then(|value| value.as_str())
3143 {
3144 metadata.insert(
3145 "response_topic".to_string(),
3146 serde_json::json!(response_topic),
3147 );
3148 }
3149 }
3150 DispatchUri::Local { .. } => {}
3151 }
3152 metadata
3153}
3154
3155fn dispatch_error_metadata(
3156 route: &DispatchUri,
3157 binding: &TriggerBinding,
3158 event: &TriggerEvent,
3159 attempt: u32,
3160 error: &DispatchError,
3161) -> BTreeMap<String, serde_json::Value> {
3162 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3163 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3164 metadata
3165}
3166
3167fn retry_node_metadata(
3168 binding: &TriggerBinding,
3169 event: &TriggerEvent,
3170 attempt: u32,
3171 delay: Duration,
3172 error: &DispatchError,
3173) -> BTreeMap<String, serde_json::Value> {
3174 let mut metadata = BTreeMap::new();
3175 metadata.insert(
3176 "trigger_id".to_string(),
3177 serde_json::json!(binding.id.as_str()),
3178 );
3179 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3180 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3181 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3182 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3183 metadata
3184}
3185
3186fn dlq_node_metadata(
3187 binding: &TriggerBinding,
3188 event: &TriggerEvent,
3189 attempt_count: u32,
3190 final_error: &str,
3191) -> BTreeMap<String, serde_json::Value> {
3192 let mut metadata = BTreeMap::new();
3193 metadata.insert(
3194 "trigger_id".to_string(),
3195 serde_json::json!(binding.id.as_str()),
3196 );
3197 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3198 metadata.insert(
3199 "attempt_count".to_string(),
3200 serde_json::json!(attempt_count),
3201 );
3202 metadata.insert("final_error".to_string(), serde_json::json!(final_error));
3203 metadata
3204}
3205
3206fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
3207 match value {
3208 VmValue::Bool(result) => Ok(result),
3209 VmValue::EnumVariant {
3210 enum_name,
3211 variant,
3212 mut fields,
3213 } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
3214 Some(VmValue::Bool(result)) => Ok(result),
3215 Some(other) => Err(format!(
3216 "predicate Result.Ok payload must be bool, got {}",
3217 other.type_name()
3218 )),
3219 None => Err("predicate Result.Ok payload is missing".to_string()),
3220 },
3221 VmValue::EnumVariant {
3222 enum_name,
3223 variant,
3224 fields,
3225 } if enum_name == "Result" && variant == "Err" => Err(fields
3226 .first()
3227 .map(VmValue::display)
3228 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
3229 other => Err(format!(
3230 "predicate must return bool or Result<bool, _>, got {}",
3231 other.type_name()
3232 )),
3233 }
3234}
3235
3236fn usd_to_micros(value: f64) -> u64 {
3237 if !value.is_finite() || value <= 0.0 {
3238 return 0;
3239 }
3240 (value * 1_000_000.0).round() as u64
3241}
3242
3243fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
3244 binding
3245 .metrics
3246 .cost_today_usd_micros
3247 .load(Ordering::Relaxed) as f64
3248 / 1_000_000.0
3249}
3250
3251fn split_binding_key(binding_key: &str) -> (String, u32) {
3252 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3253 return (binding_key.to_string(), 0);
3254 };
3255 let version = suffix.parse::<u32>().unwrap_or(0);
3256 (binding_id.to_string(), version)
3257}
3258
3259fn is_cancelled_vm_error(error: &VmError) -> bool {
3260 matches!(
3261 error,
3262 VmError::Thrown(VmValue::String(message))
3263 if message.starts_with("kind:cancelled:")
3264 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3265}
3266
3267fn event_headers(
3268 event: &TriggerEvent,
3269 binding: Option<&TriggerBinding>,
3270 attempt: Option<u32>,
3271 replay_of_event_id: Option<&String>,
3272) -> BTreeMap<String, String> {
3273 let mut headers = BTreeMap::new();
3274 headers.insert("event_id".to_string(), event.id.0.clone());
3275 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3276 headers.insert("provider".to_string(), event.provider.as_str().to_string());
3277 headers.insert("kind".to_string(), event.kind.clone());
3278 if let Some(replay_of_event_id) = replay_of_event_id {
3279 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3280 }
3281 if let Some(binding) = binding {
3282 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3283 headers.insert("binding_key".to_string(), binding.binding_key());
3284 headers.insert(
3285 "handler_kind".to_string(),
3286 DispatchUri::from(&binding.handler).kind().to_string(),
3287 );
3288 }
3289 if let Some(attempt) = attempt {
3290 headers.insert("attempt".to_string(), attempt.to_string());
3291 }
3292 headers
3293}
3294
3295fn worker_queue_priority(
3296 binding: &super::registry::TriggerBinding,
3297 event: &TriggerEvent,
3298) -> crate::WorkerQueuePriority {
3299 match event
3300 .headers
3301 .get("priority")
3302 .map(|value| value.trim().to_ascii_lowercase())
3303 .as_deref()
3304 {
3305 Some("high") => crate::WorkerQueuePriority::High,
3306 Some("low") => crate::WorkerQueuePriority::Low,
3307 _ => binding.dispatch_priority,
3308 }
3309}
3310
3311const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3312
3313fn maybe_fail_before_outbox() {
3314 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3315 std::process::exit(86);
3316 }
3317}
3318
3319fn now_rfc3339() -> String {
3320 time::OffsetDateTime::now_utc()
3321 .format(&time::format_description::well_known::Rfc3339)
3322 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3323}
3324
3325fn now_unix_ms() -> i64 {
3326 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3327}
3328
3329fn utc_day_key() -> i32 {
3330 time::OffsetDateTime::now_utc().date().to_julian_day()
3331}
3332
3333fn cancelled_dispatch_outcome(
3334 binding: &TriggerBinding,
3335 route: &DispatchUri,
3336 event: &TriggerEvent,
3337 replay_of_event_id: Option<String>,
3338 attempt_count: u32,
3339 error: String,
3340) -> DispatchOutcome {
3341 DispatchOutcome {
3342 trigger_id: binding.id.as_str().to_string(),
3343 binding_key: binding.binding_key(),
3344 event_id: event.id.0.clone(),
3345 attempt_count,
3346 status: DispatchStatus::Cancelled,
3347 handler_kind: route.kind().to_string(),
3348 target_uri: route.target_uri(),
3349 replay_of_event_id,
3350 result: None,
3351 error: Some(error),
3352 }
3353}
3354
3355async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3356 let _ = cancel_rx.recv().await;
3357}
3358
3359#[cfg(test)]
3360mod tests;