1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::vm_value_to_json;
16use crate::orchestration::{
17 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
18 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
19 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
20 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
21 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
22 ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
23 ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
24};
25use crate::stdlib::json_to_vm_value;
26use crate::trust_graph::{
27 append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
28};
29use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
30use crate::vm::Vm;
31
32use self::uri::DispatchUri;
33use super::registry::{
34 binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
35 TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
36};
37use super::{
38 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
39 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
40 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
41};
42use circuits::{
43 destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
44 DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
45};
46use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
47use predicate_eval::predicate_node_metadata;
48
49mod circuits;
50mod flow_control;
51mod predicate_eval;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
58pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
59pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
60
61thread_local! {
62 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65 #[cfg(test)]
66 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70 static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75 pub trigger_event: TriggerEvent,
76 pub replay_of_event_id: Option<String>,
77 pub binding_id: String,
78 pub binding_version: u32,
79 pub agent_id: String,
80 pub action: String,
81 pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87 fn drop(&mut self) {
88 crate::orchestration::pop_execution_policy();
89 }
90}
91
92const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
93
94pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
95 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
96}
97
98pub(crate) fn current_dispatch_is_replay() -> bool {
99 ACTIVE_DISPATCH_IS_REPLAY
100 .try_with(|is_replay| *is_replay)
101 .unwrap_or(false)
102}
103
104pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
105 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
106}
107
108#[derive(Clone)]
109pub struct Dispatcher {
110 base_vm: Rc<Vm>,
111 event_log: Arc<AnyEventLog>,
112 cancel_tx: broadcast::Sender<()>,
113 state: Arc<DispatcherRuntimeState>,
114 metrics: Option<Arc<crate::MetricsRegistry>>,
115}
116
117#[derive(Debug)]
118struct DispatcherRuntimeState {
119 in_flight: AtomicU64,
120 retry_queue_depth: AtomicU64,
121 dlq: Mutex<Vec<DlqEntry>>,
122 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
123 shutting_down: std::sync::atomic::AtomicBool,
124 idle_notify: Notify,
125 flow_control: FlowControlManager,
126 destination_circuits: DestinationCircuitRegistry,
127}
128
129impl DispatcherRuntimeState {
130 fn new(event_log: Arc<AnyEventLog>) -> Self {
131 Self {
132 in_flight: AtomicU64::new(0),
133 retry_queue_depth: AtomicU64::new(0),
134 dlq: Mutex::new(Vec::new()),
135 cancel_tokens: Mutex::new(Vec::new()),
136 shutting_down: std::sync::atomic::AtomicBool::new(false),
137 idle_notify: Notify::new(),
138 flow_control: FlowControlManager::new(event_log),
139 destination_circuits: DestinationCircuitRegistry::default(),
140 }
141 }
142}
143
144#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
145#[serde(default)]
146pub struct DispatcherStatsSnapshot {
147 pub in_flight: u64,
148 pub retry_queue_depth: u64,
149 pub dlq_depth: u64,
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
153#[serde(rename_all = "snake_case")]
154pub enum DispatchStatus {
155 Succeeded,
156 Failed,
157 Dlq,
158 Skipped,
159 Waiting,
160 Cancelled,
161}
162
163impl DispatchStatus {
164 pub fn as_str(&self) -> &'static str {
165 match self {
166 Self::Succeeded => "succeeded",
167 Self::Failed => "failed",
168 Self::Dlq => "dlq",
169 Self::Skipped => "skipped",
170 Self::Waiting => "waiting",
171 Self::Cancelled => "cancelled",
172 }
173 }
174}
175
176#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
177#[serde(default)]
178pub struct DispatchOutcome {
179 pub trigger_id: String,
180 pub binding_key: String,
181 pub event_id: String,
182 pub attempt_count: u32,
183 pub status: DispatchStatus,
184 pub handler_kind: String,
185 pub target_uri: String,
186 pub replay_of_event_id: Option<String>,
187 pub result: Option<serde_json::Value>,
188 pub error: Option<String>,
189}
190
191#[derive(Clone, Debug, Serialize, Deserialize)]
192pub struct InboxEnvelope {
193 pub trigger_id: Option<String>,
194 pub binding_version: Option<u32>,
195 pub event: TriggerEvent,
196}
197
198#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
199#[serde(default)]
200pub struct DispatcherDrainReport {
201 pub drained: bool,
202 pub in_flight: u64,
203 pub retry_queue_depth: u64,
204 pub dlq_depth: u64,
205}
206
207impl Default for DispatchOutcome {
208 fn default() -> Self {
209 Self {
210 trigger_id: String::new(),
211 binding_key: String::new(),
212 event_id: String::new(),
213 attempt_count: 0,
214 status: DispatchStatus::Failed,
215 handler_kind: String::new(),
216 target_uri: String::new(),
217 replay_of_event_id: None,
218 result: None,
219 error: None,
220 }
221 }
222}
223
224#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
225#[serde(default)]
226pub struct DispatchAttemptRecord {
227 pub trigger_id: String,
228 pub binding_key: String,
229 pub event_id: String,
230 pub attempt: u32,
231 pub handler_kind: String,
232 pub started_at: String,
233 pub completed_at: String,
234 pub outcome: String,
235 pub error_msg: Option<String>,
236}
237
238#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
239pub struct DispatchCancelRequest {
240 pub binding_key: String,
241 pub event_id: String,
242 #[serde(with = "time::serde::rfc3339")]
243 pub requested_at: time::OffsetDateTime,
244 #[serde(default)]
245 pub requested_by: Option<String>,
246 #[serde(default)]
247 pub audit_id: Option<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
251pub struct DlqEntry {
252 pub trigger_id: String,
253 pub binding_key: String,
254 pub event: TriggerEvent,
255 pub attempt_count: u32,
256 pub final_error: String,
257 #[serde(default = "default_dlq_error_class")]
258 pub error_class: String,
259 pub attempts: Vec<DispatchAttemptRecord>,
260}
261
262fn default_dlq_error_class() -> String {
263 "unknown".to_string()
264}
265
266#[derive(Clone, Debug)]
267struct SingletonLease {
268 gate: String,
269 held: bool,
270}
271
272#[derive(Clone, Debug)]
273struct ConcurrencyLease {
274 gate: String,
275 max: u32,
276 priority_rank: usize,
277 permit: Option<ConcurrencyPermit>,
278}
279
280#[derive(Default, Debug)]
281struct AcquiredFlowControl {
282 singleton: Option<SingletonLease>,
283 concurrency: Option<ConcurrencyLease>,
284}
285
286#[derive(Clone)]
287pub(crate) struct DispatchWaitLease {
288 state: Arc<DispatcherRuntimeState>,
289 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
290 suspended: Arc<AtomicBool>,
291}
292
293impl DispatchWaitLease {
294 fn new(
295 state: Arc<DispatcherRuntimeState>,
296 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
297 ) -> Self {
298 Self {
299 state,
300 acquired,
301 suspended: Arc::new(AtomicBool::new(false)),
302 }
303 }
304
305 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
306 if self.suspended.swap(true, Ordering::SeqCst) {
307 return Ok(());
308 }
309 let (singleton_gate, concurrency_permit) = {
310 let mut acquired = self.acquired.lock().await;
311 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
312 if lease.held {
313 lease.held = false;
314 Some(lease.gate.clone())
315 } else {
316 None
317 }
318 });
319 let concurrency_permit = acquired
320 .concurrency
321 .as_mut()
322 .and_then(|lease| lease.permit.take());
323 (singleton_gate, concurrency_permit)
324 };
325
326 if let Some(gate) = singleton_gate {
327 self.state
328 .flow_control
329 .release_singleton(&gate)
330 .await
331 .map_err(DispatchError::from)?;
332 }
333 if let Some(permit) = concurrency_permit {
334 self.state
335 .flow_control
336 .release_concurrency(permit)
337 .await
338 .map_err(DispatchError::from)?;
339 }
340 Ok(())
341 }
342
343 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
344 if !self.suspended.swap(false, Ordering::SeqCst) {
345 return Ok(());
346 }
347
348 let singleton_gate = {
349 let acquired = self.acquired.lock().await;
350 acquired.singleton.as_ref().and_then(|lease| {
351 if lease.held {
352 None
353 } else {
354 Some(lease.gate.clone())
355 }
356 })
357 };
358 if let Some(gate) = singleton_gate {
359 self.state
360 .flow_control
361 .acquire_singleton(&gate)
362 .await
363 .map_err(DispatchError::from)?;
364 let mut acquired = self.acquired.lock().await;
365 if let Some(lease) = acquired.singleton.as_mut() {
366 lease.held = true;
367 }
368 }
369
370 let concurrency_spec = {
371 let acquired = self.acquired.lock().await;
372 acquired.concurrency.as_ref().and_then(|lease| {
373 if lease.permit.is_some() {
374 None
375 } else {
376 Some((lease.gate.clone(), lease.max, lease.priority_rank))
377 }
378 })
379 };
380 if let Some((gate, max, priority_rank)) = concurrency_spec {
381 let permit = self
382 .state
383 .flow_control
384 .acquire_concurrency(&gate, max, priority_rank)
385 .await
386 .map_err(DispatchError::from)?;
387 let mut acquired = self.acquired.lock().await;
388 if let Some(lease) = acquired.concurrency.as_mut() {
389 lease.permit = Some(permit);
390 }
391 }
392 Ok(())
393 }
394}
395
396enum FlowControlOutcome {
397 Dispatch {
398 event: Box<TriggerEvent>,
399 acquired: AcquiredFlowControl,
400 },
401 Skip {
402 reason: String,
403 },
404}
405
406#[derive(Clone, Debug)]
407enum DispatchSkipStage {
408 Predicate,
409 FlowControl,
410}
411
412#[derive(Debug)]
413pub enum DispatchError {
414 EventLog(String),
415 Registry(String),
416 Serde(String),
417 Local(String),
418 A2a(String),
419 Denied(String),
420 Timeout(String),
421 Waiting(String),
422 Cancelled(String),
423 NotImplemented(String),
424}
425
426impl std::fmt::Display for DispatchError {
427 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428 match self {
429 Self::EventLog(message)
430 | Self::Registry(message)
431 | Self::Serde(message)
432 | Self::Local(message)
433 | Self::A2a(message)
434 | Self::Denied(message)
435 | Self::Timeout(message)
436 | Self::Waiting(message)
437 | Self::Cancelled(message)
438 | Self::NotImplemented(message) => f.write_str(message),
439 }
440 }
441}
442
443impl std::error::Error for DispatchError {}
444
445impl DispatchError {
446 fn retryable(&self) -> bool {
447 !matches!(
448 self,
449 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
450 )
451 }
452}
453
454impl DispatchSkipStage {
455 fn as_str(&self) -> &'static str {
456 match self {
457 Self::Predicate => "predicate",
458 Self::FlowControl => "flow_control",
459 }
460 }
461}
462
463impl From<LogError> for DispatchError {
464 fn from(value: LogError) -> Self {
465 Self::EventLog(value.to_string())
466 }
467}
468
469pub async fn append_dispatch_cancel_request(
470 event_log: &Arc<AnyEventLog>,
471 request: &DispatchCancelRequest,
472) -> Result<u64, DispatchError> {
473 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
474 .expect("static trigger cancel topic should always be valid");
475 event_log
476 .append(
477 &topic,
478 LogEvent::new(
479 "dispatch_cancel_requested",
480 serde_json::to_value(request)
481 .map_err(|error| DispatchError::Serde(error.to_string()))?,
482 ),
483 )
484 .await
485 .map_err(DispatchError::from)
486}
487
488impl Dispatcher {
489 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
490 self.event_log.clone()
491 }
492
493 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
494 let event_log = active_event_log().ok_or_else(|| {
495 DispatchError::EventLog("dispatcher requires an active event log".to_string())
496 })?;
497 Ok(Self::with_event_log(base_vm, event_log))
498 }
499
500 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
501 Self::with_event_log_and_metrics(base_vm, event_log, None)
502 }
503
504 pub fn with_event_log_and_metrics(
505 base_vm: Vm,
506 event_log: Arc<AnyEventLog>,
507 metrics: Option<Arc<crate::MetricsRegistry>>,
508 ) -> Self {
509 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
510 ACTIVE_DISPATCHER_STATE.with(|slot| {
511 *slot.borrow_mut() = Some(state.clone());
512 });
513 let (cancel_tx, _) = broadcast::channel(32);
514 Self {
515 base_vm: Rc::new(base_vm),
516 event_log,
517 cancel_tx,
518 state,
519 metrics,
520 }
521 }
522
523 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
524 DispatcherStatsSnapshot {
525 in_flight: self.state.in_flight.load(Ordering::Relaxed),
526 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
527 dlq_depth: self
528 .state
529 .dlq
530 .lock()
531 .expect("dispatcher dlq poisoned")
532 .len() as u64,
533 }
534 }
535
536 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
537 self.state
538 .dlq
539 .lock()
540 .expect("dispatcher dlq poisoned")
541 .clone()
542 }
543
544 pub fn shutdown(&self) {
545 self.state.shutting_down.store(true, Ordering::SeqCst);
546 for token in self
547 .state
548 .cancel_tokens
549 .lock()
550 .expect("dispatcher cancel tokens poisoned")
551 .iter()
552 {
553 token.store(true, Ordering::SeqCst);
554 }
555 let _ = self.cancel_tx.send(());
556 }
557
558 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
559 self.enqueue_targeted(None, None, event).await
560 }
561
562 pub async fn enqueue_targeted(
563 &self,
564 trigger_id: Option<String>,
565 binding_version: Option<u32>,
566 event: TriggerEvent,
567 ) -> Result<u64, DispatchError> {
568 self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
569 .await
570 }
571
572 pub async fn enqueue_targeted_with_headers(
573 &self,
574 trigger_id: Option<String>,
575 binding_version: Option<u32>,
576 event: TriggerEvent,
577 parent_headers: Option<&BTreeMap<String, String>>,
578 ) -> Result<u64, DispatchError> {
579 let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
580 let trigger_id_for_metrics = trigger_id.clone();
581 let mut headers = parent_headers.cloned().unwrap_or_default();
582 headers.extend(event_headers(&event, None, None, None));
583 if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
584 headers.insert("trigger_id".to_string(), trigger_id.clone());
585 headers.insert(
586 "binding_key".to_string(),
587 binding_key_from_parts(trigger_id, binding_version),
588 );
589 }
590 headers
591 .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
592 .or_insert_with(|| unix_ms(event.received_at).to_string());
593 let payload = serde_json::to_value(InboxEnvelope {
594 trigger_id,
595 binding_version,
596 event: event.clone(),
597 })
598 .map_err(|error| DispatchError::Serde(error.to_string()))?;
599 let mut log_event = LogEvent::new("event_ingested", payload);
600 let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
601 let queue_appended_at_ms = headers
602 .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
603 .and_then(|value| value.parse::<i64>().ok())
604 .unwrap_or(log_event.occurred_at_ms);
605 headers
606 .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
607 .or_insert_with(|| log_event.occurred_at_ms.to_string());
608 if let (Some(metrics), Some(trigger_id)) =
609 (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
610 {
611 let binding_key = binding_key_from_parts(trigger_id, binding_version);
612 let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
613 if !had_queue_appended_at {
614 metrics.record_trigger_accepted_to_queue_append(
615 trigger_id,
616 &binding_key,
617 event.provider.as_str(),
618 tenant_id(&event),
619 "queued",
620 duration_between_ms(queue_appended_at_ms, accepted_at_ms),
621 );
622 }
623 metrics.note_trigger_pending_event(
624 event.id.0.as_str(),
625 trigger_id,
626 &binding_key,
627 event.provider.as_str(),
628 tenant_id(&event),
629 accepted_at_ms,
630 queue_appended_at_ms,
631 );
632 }
633 log_event.headers = headers;
634 self.event_log
635 .append(&topic, log_event)
636 .await
637 .map_err(DispatchError::from)
638 }
639
640 pub async fn run(&self) -> Result<(), DispatchError> {
641 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
642 .expect("static trigger inbox envelopes topic is valid");
643 let start_from = self.event_log.latest(&topic).await?;
644 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
645 pin_mut!(stream);
646 let mut cancel_rx = self.cancel_tx.subscribe();
647
648 loop {
649 tokio::select! {
650 received = stream.next() => {
651 let Some(received) = received else {
652 break;
653 };
654 let (_, event) = received.map_err(DispatchError::from)?;
655 if event.kind != "event_ingested" {
656 continue;
657 }
658 let parent_headers = event.headers.clone();
659 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
660 .map_err(|error| DispatchError::Serde(error.to_string()))?;
661 notify_test_inbox_dequeued();
662 let _ = self
663 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
664 .await;
665 }
666 _ = recv_cancel(&mut cancel_rx) => break,
667 }
668 }
669
670 Ok(())
671 }
672
673 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
674 let deadline = tokio::time::Instant::now() + timeout;
675 loop {
676 let snapshot = self.snapshot();
677 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
678 return Ok(DispatcherDrainReport {
679 drained: true,
680 in_flight: snapshot.in_flight,
681 retry_queue_depth: snapshot.retry_queue_depth,
682 dlq_depth: snapshot.dlq_depth,
683 });
684 }
685
686 let notified = self.state.idle_notify.notified();
687 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
688 if remaining.is_zero() {
689 return Ok(DispatcherDrainReport {
690 drained: false,
691 in_flight: snapshot.in_flight,
692 retry_queue_depth: snapshot.retry_queue_depth,
693 dlq_depth: snapshot.dlq_depth,
694 });
695 }
696 if tokio::time::timeout(remaining, notified).await.is_err() {
697 let snapshot = self.snapshot();
698 return Ok(DispatcherDrainReport {
699 drained: false,
700 in_flight: snapshot.in_flight,
701 retry_queue_depth: snapshot.retry_queue_depth,
702 dlq_depth: snapshot.dlq_depth,
703 });
704 }
705 }
706 }
707
708 pub async fn dispatch_inbox_envelope(
709 &self,
710 envelope: InboxEnvelope,
711 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
712 self.dispatch_inbox_envelope_with_headers(envelope, None)
713 .await
714 }
715
716 pub async fn dispatch_inbox_envelope_with_parent_headers(
717 &self,
718 envelope: InboxEnvelope,
719 parent_headers: &BTreeMap<String, String>,
720 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
721 self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
722 .await
723 }
724
725 async fn dispatch_inbox_envelope_with_headers(
726 &self,
727 envelope: InboxEnvelope,
728 parent_headers: Option<&BTreeMap<String, String>>,
729 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
730 if let Some(trigger_id) = envelope.trigger_id {
731 let binding = super::registry::resolve_live_trigger_binding(
732 &trigger_id,
733 envelope.binding_version,
734 )
735 .map_err(|error| DispatchError::Registry(error.to_string()))?;
736 return Ok(vec![
737 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
738 .await?,
739 ]);
740 }
741
742 let cron_target = match &envelope.event.provider_payload {
743 crate::triggers::ProviderPayload::Known(
744 crate::triggers::event::KnownProviderPayload::Cron(payload),
745 ) => payload.cron_id.clone(),
746 _ => None,
747 };
748 if let Some(trigger_id) = cron_target {
749 let binding = super::registry::resolve_live_trigger_binding(
750 &trigger_id,
751 envelope.binding_version,
752 )
753 .map_err(|error| DispatchError::Registry(error.to_string()))?;
754 return Ok(vec![
755 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
756 .await?,
757 ]);
758 }
759
760 self.dispatch_event_with_headers(envelope.event, parent_headers)
761 .await
762 }
763
764 pub async fn dispatch_event(
765 &self,
766 event: TriggerEvent,
767 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
768 self.dispatch_event_with_headers(event, None).await
769 }
770
771 async fn dispatch_event_with_headers(
772 &self,
773 event: TriggerEvent,
774 parent_headers: Option<&BTreeMap<String, String>>,
775 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
776 let bindings = matching_bindings(&event);
777 let mut outcomes = Vec::new();
778 for binding in bindings {
779 outcomes.push(
780 self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
781 .await?,
782 );
783 }
784 Ok(outcomes)
785 }
786
787 pub async fn dispatch(
788 &self,
789 binding: &TriggerBinding,
790 event: TriggerEvent,
791 ) -> Result<DispatchOutcome, DispatchError> {
792 self.dispatch_with_replay(binding, event, None, None, None)
793 .await
794 }
795
796 pub async fn dispatch_replay(
797 &self,
798 binding: &TriggerBinding,
799 event: TriggerEvent,
800 replay_of_event_id: String,
801 ) -> Result<DispatchOutcome, DispatchError> {
802 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
803 .await
804 }
805
806 pub async fn dispatch_with_parent_span_id(
807 &self,
808 binding: &TriggerBinding,
809 event: TriggerEvent,
810 parent_span_id: Option<String>,
811 ) -> Result<DispatchOutcome, DispatchError> {
812 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
813 .await
814 }
815
816 async fn dispatch_with_replay(
817 &self,
818 binding: &TriggerBinding,
819 event: TriggerEvent,
820 replay_of_event_id: Option<String>,
821 parent_span_id: Option<String>,
822 parent_headers: Option<&BTreeMap<String, String>>,
823 ) -> Result<DispatchOutcome, DispatchError> {
824 let parent_headers_for_metrics = parent_headers.cloned();
825 let admitted_at_ms = current_unix_ms();
826 if let Some(metrics) = self.metrics.as_ref() {
827 let binding_key = binding.binding_key();
828 let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
829 metrics.record_trigger_queue_age_at_dispatch_admission(
830 binding.id.as_str(),
831 &binding_key,
832 event.provider.as_str(),
833 tenant_id(&event),
834 "admitted",
835 duration_between_ms(admitted_at_ms, queue_appended_at_ms),
836 );
837 metrics.clear_trigger_pending_event(
838 event.id.0.as_str(),
839 binding.id.as_str(),
840 &binding_key,
841 event.provider.as_str(),
842 tenant_id(&event),
843 admitted_at_ms,
844 );
845 }
846 let span = tracing::info_span!(
847 "dispatch",
848 trigger_id = %binding.id.as_str(),
849 binding_version = binding.version,
850 trace_id = %event.trace_id.0
851 );
852 #[cfg(feature = "otel")]
853 let span_for_otel = span.clone();
854 let _ = if let Some(headers) = parent_headers {
855 crate::observability::otel::set_span_parent_from_headers(
856 &span,
857 headers,
858 &event.trace_id,
859 parent_span_id.as_deref(),
860 )
861 } else {
862 crate::observability::otel::set_span_parent(
863 &span,
864 &event.trace_id,
865 parent_span_id.as_deref(),
866 )
867 };
868 #[cfg(feature = "otel")]
869 let started_at = Instant::now();
870 let metrics = self.metrics.clone();
871 let outcome = ACTIVE_DISPATCH_IS_REPLAY
872 .scope(
873 replay_of_event_id.is_some(),
874 self.dispatch_with_replay_inner(
875 binding,
876 event,
877 replay_of_event_id,
878 parent_headers_for_metrics,
879 )
880 .instrument(span),
881 )
882 .await;
883 if let Some(metrics) = metrics.as_ref() {
884 match &outcome {
885 Ok(dispatch_outcome) => match dispatch_outcome.status {
886 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
887 metrics.record_dispatch_succeeded();
888 }
889 DispatchStatus::Waiting => {}
890 _ => metrics.record_dispatch_failed(),
891 },
892 Err(_) => metrics.record_dispatch_failed(),
893 }
894 let outcome_label = match &outcome {
895 Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
896 Err(DispatchError::Cancelled(_)) => "cancelled",
897 Err(_) => "failed",
898 };
899 metrics.record_trigger_dispatched(
900 binding.id.as_str(),
901 binding.handler.kind(),
902 outcome_label,
903 );
904 metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
905 }
906 #[cfg(feature = "otel")]
907 {
908 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
909
910 let duration_ms = started_at.elapsed().as_millis() as i64;
911 let status = match &outcome {
912 Ok(dispatch_outcome) => match dispatch_outcome.status {
913 DispatchStatus::Succeeded => "succeeded",
914 DispatchStatus::Skipped => "skipped",
915 DispatchStatus::Waiting => "waiting",
916 DispatchStatus::Cancelled => "cancelled",
917 DispatchStatus::Failed => "failed",
918 DispatchStatus::Dlq => "dlq",
919 },
920 Err(DispatchError::Cancelled(_)) => "cancelled",
921 Err(_) => "failed",
922 };
923 span_for_otel.set_attribute("result.status", status);
924 span_for_otel.set_attribute("result.duration_ms", duration_ms);
925 }
926 outcome
927 }
928
929 async fn dispatch_with_replay_inner(
930 &self,
931 binding: &TriggerBinding,
932 event: TriggerEvent,
933 replay_of_event_id: Option<String>,
934 parent_headers: Option<BTreeMap<String, String>>,
935 ) -> Result<DispatchOutcome, DispatchError> {
936 let autonomy_tier = crate::resolve_agent_autonomy_tier(
937 &self.event_log,
938 binding.id.as_str(),
939 binding.autonomy_tier,
940 )
941 .await
942 .unwrap_or(binding.autonomy_tier);
943 let binding_key = binding.binding_key();
944 let route = DispatchUri::from(&binding.handler);
945 let trigger_id = binding.id.as_str().to_string();
946 let event_id = event.id.0.clone();
947 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
948 let begin = if replay_of_event_id.is_some() {
949 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
950 } else {
951 begin_in_flight(binding.id.as_str(), binding.version)
952 };
953 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
954
955 let mut attempts = Vec::new();
956 let mut source_node_id = format!("trigger:{}", event.id.0);
957 let mut initial_nodes = Vec::new();
958 let mut initial_edges = Vec::new();
959 if let Some(original_event_id) = replay_of_event_id.as_ref() {
960 let original_node_id = format!("trigger:{original_event_id}");
961 initial_nodes.push(RunActionGraphNodeRecord {
962 id: original_node_id.clone(),
963 label: format!(
964 "{}:{} (original {})",
965 event.provider.as_str(),
966 event.kind,
967 original_event_id
968 ),
969 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
970 status: "historical".to_string(),
971 outcome: "replayed_from".to_string(),
972 trace_id: Some(event.trace_id.0.clone()),
973 stage_id: None,
974 node_id: None,
975 worker_id: None,
976 run_id: None,
977 run_path: None,
978 metadata: trigger_node_metadata(&event),
979 });
980 initial_edges.push(RunActionGraphEdgeRecord {
981 from_id: original_node_id,
982 to_id: source_node_id.clone(),
983 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
984 label: Some("replay chain".to_string()),
985 });
986 }
987 initial_nodes.push(RunActionGraphNodeRecord {
988 id: source_node_id.clone(),
989 label: format!("{}:{}", event.provider.as_str(), event.kind),
990 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
991 status: "received".to_string(),
992 outcome: "received".to_string(),
993 trace_id: Some(event.trace_id.0.clone()),
994 stage_id: None,
995 node_id: None,
996 worker_id: None,
997 run_id: None,
998 run_path: None,
999 metadata: trigger_node_metadata(&event),
1000 });
1001 self.emit_action_graph(
1002 &event,
1003 initial_nodes,
1004 initial_edges,
1005 serde_json::json!({
1006 "source": "dispatcher",
1007 "trigger_id": trigger_id,
1008 "binding_key": binding_key,
1009 "event_id": event_id,
1010 "replay_of_event_id": replay_of_event_id,
1011 }),
1012 )
1013 .await?;
1014
1015 if dispatch_cancel_requested(
1016 &self.event_log,
1017 &binding_key,
1018 &event.id.0,
1019 replay_of_event_id.as_ref(),
1020 )
1021 .await?
1022 {
1023 finish_in_flight(
1024 binding.id.as_str(),
1025 binding.version,
1026 TriggerDispatchOutcome::Failed,
1027 )
1028 .await
1029 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1030 decrement_in_flight(&self.state);
1031 return Ok(cancelled_dispatch_outcome(
1032 binding,
1033 &route,
1034 &event,
1035 replay_of_event_id,
1036 0,
1037 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1038 ));
1039 }
1040
1041 if let Some(predicate) = binding.when.as_ref() {
1042 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1043 let evaluation = self
1044 .evaluate_predicate(
1045 binding,
1046 predicate,
1047 &event,
1048 replay_of_event_id.as_ref(),
1049 autonomy_tier,
1050 )
1051 .await?;
1052 let passed = evaluation.result;
1053 self.emit_action_graph(
1054 &event,
1055 vec![RunActionGraphNodeRecord {
1056 id: predicate_node_id.clone(),
1057 label: predicate.raw.clone(),
1058 kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1059 status: "completed".to_string(),
1060 outcome: passed.to_string(),
1061 trace_id: Some(event.trace_id.0.clone()),
1062 stage_id: None,
1063 node_id: None,
1064 worker_id: None,
1065 run_id: None,
1066 run_path: None,
1067 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1068 }],
1069 vec![RunActionGraphEdgeRecord {
1070 from_id: source_node_id.clone(),
1071 to_id: predicate_node_id.clone(),
1072 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1073 label: None,
1074 }],
1075 serde_json::json!({
1076 "source": "dispatcher",
1077 "trigger_id": binding.id.as_str(),
1078 "binding_key": binding.binding_key(),
1079 "event_id": event.id.0,
1080 "predicate": predicate.raw,
1081 "reason": evaluation.reason,
1082 "cached": evaluation.cached,
1083 "cost_usd": evaluation.cost_usd,
1084 "tokens": evaluation.tokens,
1085 "latency_ms": evaluation.latency_ms,
1086 "replay_of_event_id": replay_of_event_id,
1087 }),
1088 )
1089 .await?;
1090
1091 if !passed {
1092 if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1093 let final_error = format!(
1094 "trigger budget exhausted: {}",
1095 evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1096 );
1097 self.move_budget_exhausted_to_dlq(
1098 binding,
1099 &route,
1100 &event,
1101 replay_of_event_id.as_ref(),
1102 &final_error,
1103 )
1104 .await?;
1105 finish_in_flight(
1106 binding.id.as_str(),
1107 binding.version,
1108 TriggerDispatchOutcome::Dlq,
1109 )
1110 .await
1111 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1112 decrement_in_flight(&self.state);
1113 self.append_dispatch_trust_record(
1114 binding,
1115 &route,
1116 &event,
1117 replay_of_event_id.as_ref(),
1118 autonomy_tier,
1119 TrustOutcome::Failure,
1120 "dlq",
1121 0,
1122 Some(final_error.clone()),
1123 )
1124 .await?;
1125 return Ok(DispatchOutcome {
1126 trigger_id: binding.id.as_str().to_string(),
1127 binding_key: binding.binding_key(),
1128 event_id: event.id.0,
1129 attempt_count: 0,
1130 status: DispatchStatus::Dlq,
1131 handler_kind: route.kind().to_string(),
1132 target_uri: route.target_uri(),
1133 replay_of_event_id,
1134 result: None,
1135 error: Some(final_error),
1136 });
1137 }
1138
1139 if evaluation.exhaustion_strategy
1140 == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1141 {
1142 self.append_budget_deferred_event(
1143 binding,
1144 &route,
1145 &event,
1146 replay_of_event_id.as_ref(),
1147 evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1148 )
1149 .await?;
1150 finish_in_flight(
1151 binding.id.as_str(),
1152 binding.version,
1153 TriggerDispatchOutcome::Dispatched,
1154 )
1155 .await
1156 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1157 decrement_in_flight(&self.state);
1158 self.append_dispatch_trust_record(
1159 binding,
1160 &route,
1161 &event,
1162 replay_of_event_id.as_ref(),
1163 autonomy_tier,
1164 TrustOutcome::Denied,
1165 "waiting",
1166 0,
1167 evaluation.reason.clone(),
1168 )
1169 .await?;
1170 return Ok(DispatchOutcome {
1171 trigger_id: binding.id.as_str().to_string(),
1172 binding_key: binding.binding_key(),
1173 event_id: event.id.0,
1174 attempt_count: 0,
1175 status: DispatchStatus::Waiting,
1176 handler_kind: route.kind().to_string(),
1177 target_uri: route.target_uri(),
1178 replay_of_event_id,
1179 result: Some(serde_json::json!({
1180 "deferred": true,
1181 "predicate": predicate.raw,
1182 "reason": evaluation.reason,
1183 })),
1184 error: None,
1185 });
1186 }
1187
1188 self.append_skipped_outbox_event(
1189 binding,
1190 &route,
1191 &event,
1192 replay_of_event_id.as_ref(),
1193 DispatchSkipStage::Predicate,
1194 serde_json::json!({
1195 "predicate": predicate.raw,
1196 "reason": evaluation.reason,
1197 }),
1198 )
1199 .await?;
1200 finish_in_flight(
1201 binding.id.as_str(),
1202 binding.version,
1203 TriggerDispatchOutcome::Dispatched,
1204 )
1205 .await
1206 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1207 decrement_in_flight(&self.state);
1208 self.append_dispatch_trust_record(
1209 binding,
1210 &route,
1211 &event,
1212 replay_of_event_id.as_ref(),
1213 autonomy_tier,
1214 TrustOutcome::Denied,
1215 "skipped",
1216 0,
1217 None,
1218 )
1219 .await?;
1220 return Ok(DispatchOutcome {
1221 trigger_id: binding.id.as_str().to_string(),
1222 binding_key: binding.binding_key(),
1223 event_id: event.id.0,
1224 attempt_count: 0,
1225 status: DispatchStatus::Skipped,
1226 handler_kind: route.kind().to_string(),
1227 target_uri: route.target_uri(),
1228 replay_of_event_id,
1229 result: Some(serde_json::json!({
1230 "skipped": true,
1231 "predicate": predicate.raw,
1232 "reason": evaluation.reason,
1233 })),
1234 error: None,
1235 });
1236 }
1237
1238 source_node_id = predicate_node_id;
1239 }
1240
1241 if autonomy_tier == AutonomyTier::ActAuto {
1242 if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1243 let request_id = self
1244 .append_autonomy_budget_approval_request(
1245 binding,
1246 &route,
1247 &event,
1248 replay_of_event_id.as_ref(),
1249 reason,
1250 )
1251 .await?;
1252 self.emit_autonomy_budget_approval_action_graph(
1253 binding,
1254 &route,
1255 &event,
1256 &source_node_id,
1257 replay_of_event_id.as_ref(),
1258 reason,
1259 &request_id,
1260 )
1261 .await?;
1262 finish_in_flight(
1263 binding.id.as_str(),
1264 binding.version,
1265 TriggerDispatchOutcome::Dispatched,
1266 )
1267 .await
1268 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1269 decrement_in_flight(&self.state);
1270 self.append_tier_transition_trust_record(
1271 binding,
1272 &event,
1273 replay_of_event_id.as_ref(),
1274 autonomy_tier,
1275 AutonomyTier::ActWithApproval,
1276 reason,
1277 &request_id,
1278 )
1279 .await?;
1280 self.append_dispatch_trust_record(
1281 binding,
1282 &route,
1283 &event,
1284 replay_of_event_id.as_ref(),
1285 autonomy_tier,
1286 TrustOutcome::Denied,
1287 "waiting",
1288 0,
1289 Some(reason.to_string()),
1290 )
1291 .await?;
1292 return Ok(DispatchOutcome {
1293 trigger_id: binding.id.as_str().to_string(),
1294 binding_key: binding.binding_key(),
1295 event_id: event.id.0,
1296 attempt_count: 0,
1297 status: DispatchStatus::Waiting,
1298 handler_kind: route.kind().to_string(),
1299 target_uri: route.target_uri(),
1300 replay_of_event_id,
1301 result: Some(serde_json::json!({
1302 "approval_required": true,
1303 "request_id": request_id,
1304 "reason": reason,
1305 "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1306 })),
1307 error: None,
1308 });
1309 }
1310 note_autonomous_decision(binding);
1311 }
1312
1313 let (event, acquired_flow) = match self
1314 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1315 .await?
1316 {
1317 FlowControlOutcome::Dispatch { event, acquired } => {
1318 (*event, Arc::new(AsyncMutex::new(acquired)))
1319 }
1320 FlowControlOutcome::Skip { reason } => {
1321 self.append_skipped_outbox_event(
1322 binding,
1323 &route,
1324 &event,
1325 replay_of_event_id.as_ref(),
1326 DispatchSkipStage::FlowControl,
1327 serde_json::json!({
1328 "flow_control": reason,
1329 }),
1330 )
1331 .await?;
1332 finish_in_flight(
1333 binding.id.as_str(),
1334 binding.version,
1335 TriggerDispatchOutcome::Dispatched,
1336 )
1337 .await
1338 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1339 decrement_in_flight(&self.state);
1340 return Ok(DispatchOutcome {
1341 trigger_id: binding.id.as_str().to_string(),
1342 binding_key: binding.binding_key(),
1343 event_id: event.id.0,
1344 attempt_count: 0,
1345 status: DispatchStatus::Skipped,
1346 handler_kind: route.kind().to_string(),
1347 target_uri: route.target_uri(),
1348 replay_of_event_id,
1349 result: Some(serde_json::json!({
1350 "skipped": true,
1351 "flow_control": reason,
1352 })),
1353 error: None,
1354 });
1355 }
1356 };
1357
1358 let destination_key = destination_circuit_key(&route);
1359 let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1360 DestinationCircuitProbe::Allow { half_open } => {
1361 if half_open {
1362 if let Some(metrics) = self.metrics.as_ref() {
1363 metrics.record_backpressure_event("circuit", "half_open_probe");
1364 }
1365 }
1366 half_open
1367 }
1368 DestinationCircuitProbe::Block { retry_after } => {
1369 if let Some(metrics) = self.metrics.as_ref() {
1370 metrics.record_backpressure_event("circuit", "fail_fast");
1371 }
1372 let final_error = format!(
1373 "destination circuit open for {}; retry after {}s",
1374 destination_key,
1375 retry_after.as_secs().max(1)
1376 );
1377 self.move_circuit_open_to_dlq(
1378 binding,
1379 &route,
1380 &event,
1381 replay_of_event_id.as_ref(),
1382 &final_error,
1383 &destination_key,
1384 )
1385 .await?;
1386 finish_in_flight(
1387 binding.id.as_str(),
1388 binding.version,
1389 TriggerDispatchOutcome::Dlq,
1390 )
1391 .await
1392 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1393 self.release_flow_control(&acquired_flow).await?;
1394 decrement_in_flight(&self.state);
1395 self.append_dispatch_trust_record(
1396 binding,
1397 &route,
1398 &event,
1399 replay_of_event_id.as_ref(),
1400 autonomy_tier,
1401 TrustOutcome::Failure,
1402 "dlq",
1403 0,
1404 Some(final_error.clone()),
1405 )
1406 .await?;
1407 return Ok(DispatchOutcome {
1408 trigger_id: binding.id.as_str().to_string(),
1409 binding_key: binding.binding_key(),
1410 event_id: event.id.0,
1411 attempt_count: 0,
1412 status: DispatchStatus::Dlq,
1413 handler_kind: route.kind().to_string(),
1414 target_uri: route.target_uri(),
1415 replay_of_event_id,
1416 result: None,
1417 error: Some(final_error),
1418 });
1419 }
1420 };
1421
1422 let mut previous_retry_node = None;
1423 let max_attempts = binding.retry.max_attempts();
1424 for attempt in 1..=max_attempts {
1425 if dispatch_cancel_requested(
1426 &self.event_log,
1427 &binding_key,
1428 &event.id.0,
1429 replay_of_event_id.as_ref(),
1430 )
1431 .await?
1432 {
1433 finish_in_flight(
1434 binding.id.as_str(),
1435 binding.version,
1436 TriggerDispatchOutcome::Failed,
1437 )
1438 .await
1439 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1440 decrement_in_flight(&self.state);
1441 return Ok(cancelled_dispatch_outcome(
1442 binding,
1443 &route,
1444 &event,
1445 replay_of_event_id,
1446 attempt.saturating_sub(1),
1447 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1448 ));
1449 }
1450 maybe_fail_before_outbox();
1451 let attempt_started_instant = Instant::now();
1452 let attempt_started_at_ms = current_unix_ms();
1453 let queue_age_at_start = duration_between_ms(
1454 attempt_started_at_ms,
1455 queue_appended_at_ms(parent_headers.as_ref(), &event),
1456 );
1457 if attempt == 1 {
1458 if let Some(metrics) = self.metrics.as_ref() {
1459 metrics.record_trigger_queue_age_at_dispatch_start(
1460 binding.id.as_str(),
1461 &binding_key,
1462 event.provider.as_str(),
1463 tenant_id(&event),
1464 "started",
1465 queue_age_at_start,
1466 );
1467 }
1468 }
1469 tracing::info!(
1470 component = "dispatcher",
1471 lifecycle = "dispatch_started",
1472 trigger_id = %binding.id.as_str(),
1473 binding_key = %binding_key,
1474 event_id = %event.id.0,
1475 attempt,
1476 queue_age_ms = queue_age_at_start.as_millis(),
1477 trace_id = %event.trace_id.0
1478 );
1479 let started_at = now_rfc3339();
1480 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1481 self.append_lifecycle_event(
1482 "DispatchStarted",
1483 &event,
1484 binding,
1485 serde_json::json!({
1486 "event_id": event.id.0,
1487 "attempt": attempt,
1488 "handler_kind": route.kind(),
1489 "target_uri": route.target_uri(),
1490 "replay_of_event_id": replay_of_event_id,
1491 }),
1492 replay_of_event_id.as_ref(),
1493 )
1494 .await?;
1495 self.append_topic_event(
1496 TRIGGER_OUTBOX_TOPIC,
1497 "dispatch_started",
1498 &event,
1499 Some(binding),
1500 Some(attempt),
1501 serde_json::json!({
1502 "event_id": event.id.0,
1503 "attempt": attempt,
1504 "trigger_id": binding.id.as_str(),
1505 "binding_key": binding.binding_key(),
1506 "handler_kind": route.kind(),
1507 "target_uri": route.target_uri(),
1508 "replay_of_event_id": replay_of_event_id,
1509 }),
1510 replay_of_event_id.as_ref(),
1511 )
1512 .await?;
1513
1514 let mut dispatch_edges = Vec::new();
1515 if attempt == 1 {
1516 dispatch_edges.push(RunActionGraphEdgeRecord {
1517 from_id: source_node_id.clone(),
1518 to_id: attempt_node_id.clone(),
1519 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1520 label: binding.when.as_ref().map(|_| "true".to_string()),
1521 });
1522 } else if let Some(retry_node_id) = previous_retry_node.take() {
1523 dispatch_edges.push(RunActionGraphEdgeRecord {
1524 from_id: retry_node_id,
1525 to_id: attempt_node_id.clone(),
1526 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1527 label: Some(format!("attempt {attempt}")),
1528 });
1529 }
1530
1531 self.emit_action_graph(
1532 &event,
1533 vec![RunActionGraphNodeRecord {
1534 id: attempt_node_id.clone(),
1535 label: dispatch_node_label(&route),
1536 kind: dispatch_node_kind(&route).to_string(),
1537 status: "running".to_string(),
1538 outcome: format!("attempt_{attempt}"),
1539 trace_id: Some(event.trace_id.0.clone()),
1540 stage_id: None,
1541 node_id: None,
1542 worker_id: None,
1543 run_id: None,
1544 run_path: None,
1545 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1546 }],
1547 dispatch_edges,
1548 serde_json::json!({
1549 "source": "dispatcher",
1550 "trigger_id": binding.id.as_str(),
1551 "binding_key": binding.binding_key(),
1552 "event_id": event.id.0,
1553 "attempt": attempt,
1554 "handler_kind": route.kind(),
1555 "target_uri": route.target_uri(),
1556 "target_agent": dispatch_target_agent(&route),
1557 "replay_of_event_id": replay_of_event_id,
1558 }),
1559 )
1560 .await?;
1561
1562 let result = self
1563 .dispatch_once(
1564 binding,
1565 &route,
1566 &event,
1567 autonomy_tier,
1568 Some(DispatchWaitLease::new(
1569 self.state.clone(),
1570 acquired_flow.clone(),
1571 )),
1572 &mut self.cancel_tx.subscribe(),
1573 )
1574 .await;
1575 let attempt_runtime = attempt_started_instant.elapsed();
1576 let attempt_status = dispatch_result_status(&result);
1577 if let Some(metrics) = self.metrics.as_ref() {
1578 metrics.record_trigger_dispatch_runtime(
1579 binding.id.as_str(),
1580 &binding_key,
1581 event.provider.as_str(),
1582 tenant_id(&event),
1583 attempt_status,
1584 attempt_runtime,
1585 );
1586 }
1587 tracing::info!(
1588 component = "dispatcher",
1589 lifecycle = "handler_completed",
1590 trigger_id = %binding.id.as_str(),
1591 binding_key = %binding_key,
1592 event_id = %event.id.0,
1593 attempt,
1594 status = attempt_status,
1595 runtime_ms = attempt_runtime.as_millis(),
1596 trace_id = %event.trace_id.0
1597 );
1598 let completed_at = now_rfc3339();
1599
1600 match result {
1601 Ok(result) => {
1602 let attempt_record = DispatchAttemptRecord {
1603 trigger_id: binding.id.as_str().to_string(),
1604 binding_key: binding.binding_key(),
1605 event_id: event.id.0.clone(),
1606 attempt,
1607 handler_kind: route.kind().to_string(),
1608 started_at,
1609 completed_at,
1610 outcome: "success".to_string(),
1611 error_msg: None,
1612 };
1613 attempts.push(attempt_record.clone());
1614 self.append_attempt_record(
1615 &event,
1616 binding,
1617 &attempt_record,
1618 replay_of_event_id.as_ref(),
1619 )
1620 .await?;
1621 self.append_lifecycle_event(
1622 "DispatchSucceeded",
1623 &event,
1624 binding,
1625 serde_json::json!({
1626 "event_id": event.id.0,
1627 "attempt": attempt,
1628 "handler_kind": route.kind(),
1629 "target_uri": route.target_uri(),
1630 "result": result,
1631 "replay_of_event_id": replay_of_event_id,
1632 }),
1633 replay_of_event_id.as_ref(),
1634 )
1635 .await?;
1636 self.append_topic_event(
1637 TRIGGER_OUTBOX_TOPIC,
1638 "dispatch_succeeded",
1639 &event,
1640 Some(binding),
1641 Some(attempt),
1642 serde_json::json!({
1643 "event_id": event.id.0,
1644 "attempt": attempt,
1645 "trigger_id": binding.id.as_str(),
1646 "binding_key": binding.binding_key(),
1647 "handler_kind": route.kind(),
1648 "target_uri": route.target_uri(),
1649 "result": result,
1650 "replay_of_event_id": replay_of_event_id,
1651 }),
1652 replay_of_event_id.as_ref(),
1653 )
1654 .await?;
1655 self.emit_action_graph(
1656 &event,
1657 vec![RunActionGraphNodeRecord {
1658 id: attempt_node_id.clone(),
1659 label: dispatch_node_label(&route),
1660 kind: dispatch_node_kind(&route).to_string(),
1661 status: "completed".to_string(),
1662 outcome: dispatch_success_outcome(&route, &result).to_string(),
1663 trace_id: Some(event.trace_id.0.clone()),
1664 stage_id: None,
1665 node_id: None,
1666 worker_id: None,
1667 run_id: None,
1668 run_path: None,
1669 metadata: dispatch_success_metadata(
1670 &route, binding, &event, attempt, &result,
1671 ),
1672 }],
1673 Vec::new(),
1674 serde_json::json!({
1675 "source": "dispatcher",
1676 "trigger_id": binding.id.as_str(),
1677 "binding_key": binding.binding_key(),
1678 "event_id": event.id.0,
1679 "attempt": attempt,
1680 "handler_kind": route.kind(),
1681 "target_uri": route.target_uri(),
1682 "result": result,
1683 "replay_of_event_id": replay_of_event_id,
1684 }),
1685 )
1686 .await?;
1687 self.state
1688 .destination_circuits
1689 .record_success(&destination_key);
1690 if half_open_probe {
1691 if let Some(metrics) = self.metrics.as_ref() {
1692 metrics.record_backpressure_event("circuit", "closed");
1693 }
1694 }
1695 finish_in_flight(
1696 binding.id.as_str(),
1697 binding.version,
1698 TriggerDispatchOutcome::Dispatched,
1699 )
1700 .await
1701 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1702 self.release_flow_control(&acquired_flow).await?;
1703 decrement_in_flight(&self.state);
1704 self.append_dispatch_trust_record(
1705 binding,
1706 &route,
1707 &event,
1708 replay_of_event_id.as_ref(),
1709 autonomy_tier,
1710 TrustOutcome::Success,
1711 "succeeded",
1712 attempt,
1713 None,
1714 )
1715 .await?;
1716 return Ok(DispatchOutcome {
1717 trigger_id: binding.id.as_str().to_string(),
1718 binding_key: binding.binding_key(),
1719 event_id: event.id.0,
1720 attempt_count: attempt,
1721 status: DispatchStatus::Succeeded,
1722 handler_kind: route.kind().to_string(),
1723 target_uri: route.target_uri(),
1724 replay_of_event_id,
1725 result: Some(result),
1726 error: None,
1727 });
1728 }
1729 Err(error) => {
1730 let attempt_record = DispatchAttemptRecord {
1731 trigger_id: binding.id.as_str().to_string(),
1732 binding_key: binding.binding_key(),
1733 event_id: event.id.0.clone(),
1734 attempt,
1735 handler_kind: route.kind().to_string(),
1736 started_at,
1737 completed_at,
1738 outcome: dispatch_error_label(&error).to_string(),
1739 error_msg: Some(error.to_string()),
1740 };
1741 attempts.push(attempt_record.clone());
1742 self.append_attempt_record(
1743 &event,
1744 binding,
1745 &attempt_record,
1746 replay_of_event_id.as_ref(),
1747 )
1748 .await?;
1749 if let DispatchError::Waiting(message) = &error {
1750 self.append_lifecycle_event(
1751 "DispatchWaiting",
1752 &event,
1753 binding,
1754 serde_json::json!({
1755 "event_id": event.id.0,
1756 "attempt": attempt,
1757 "handler_kind": route.kind(),
1758 "target_uri": route.target_uri(),
1759 "message": message,
1760 "replay_of_event_id": replay_of_event_id,
1761 }),
1762 replay_of_event_id.as_ref(),
1763 )
1764 .await?;
1765 self.append_topic_event(
1766 TRIGGER_OUTBOX_TOPIC,
1767 "dispatch_waiting",
1768 &event,
1769 Some(binding),
1770 Some(attempt),
1771 serde_json::json!({
1772 "event_id": event.id.0,
1773 "attempt": attempt,
1774 "trigger_id": binding.id.as_str(),
1775 "binding_key": binding.binding_key(),
1776 "handler_kind": route.kind(),
1777 "target_uri": route.target_uri(),
1778 "message": message,
1779 "replay_of_event_id": replay_of_event_id,
1780 }),
1781 replay_of_event_id.as_ref(),
1782 )
1783 .await?;
1784 finish_in_flight(
1785 binding.id.as_str(),
1786 binding.version,
1787 TriggerDispatchOutcome::Dispatched,
1788 )
1789 .await
1790 .map_err(|registry_error| {
1791 DispatchError::Registry(registry_error.to_string())
1792 })?;
1793 self.release_flow_control(&acquired_flow).await?;
1794 decrement_in_flight(&self.state);
1795 return Ok(DispatchOutcome {
1796 trigger_id: binding.id.as_str().to_string(),
1797 binding_key: binding.binding_key(),
1798 event_id: event.id.0,
1799 attempt_count: attempt,
1800 status: DispatchStatus::Waiting,
1801 handler_kind: route.kind().to_string(),
1802 target_uri: route.target_uri(),
1803 replay_of_event_id,
1804 result: Some(serde_json::json!({
1805 "waiting": true,
1806 "message": message,
1807 })),
1808 error: None,
1809 });
1810 }
1811
1812 self.append_lifecycle_event(
1813 "DispatchFailed",
1814 &event,
1815 binding,
1816 serde_json::json!({
1817 "event_id": event.id.0,
1818 "attempt": attempt,
1819 "handler_kind": route.kind(),
1820 "target_uri": route.target_uri(),
1821 "error": error.to_string(),
1822 "replay_of_event_id": replay_of_event_id,
1823 }),
1824 replay_of_event_id.as_ref(),
1825 )
1826 .await?;
1827 self.append_topic_event(
1828 TRIGGER_OUTBOX_TOPIC,
1829 "dispatch_failed",
1830 &event,
1831 Some(binding),
1832 Some(attempt),
1833 serde_json::json!({
1834 "event_id": event.id.0,
1835 "attempt": attempt,
1836 "trigger_id": binding.id.as_str(),
1837 "binding_key": binding.binding_key(),
1838 "handler_kind": route.kind(),
1839 "target_uri": route.target_uri(),
1840 "error": error.to_string(),
1841 "replay_of_event_id": replay_of_event_id,
1842 }),
1843 replay_of_event_id.as_ref(),
1844 )
1845 .await?;
1846 self.emit_action_graph(
1847 &event,
1848 vec![RunActionGraphNodeRecord {
1849 id: attempt_node_id.clone(),
1850 label: dispatch_node_label(&route),
1851 kind: dispatch_node_kind(&route).to_string(),
1852 status: if matches!(error, DispatchError::Cancelled(_)) {
1853 "cancelled".to_string()
1854 } else {
1855 "failed".to_string()
1856 },
1857 outcome: dispatch_error_label(&error).to_string(),
1858 trace_id: Some(event.trace_id.0.clone()),
1859 stage_id: None,
1860 node_id: None,
1861 worker_id: None,
1862 run_id: None,
1863 run_path: None,
1864 metadata: dispatch_error_metadata(
1865 &route, binding, &event, attempt, &error,
1866 ),
1867 }],
1868 Vec::new(),
1869 serde_json::json!({
1870 "source": "dispatcher",
1871 "trigger_id": binding.id.as_str(),
1872 "binding_key": binding.binding_key(),
1873 "event_id": event.id.0,
1874 "attempt": attempt,
1875 "handler_kind": route.kind(),
1876 "target_uri": route.target_uri(),
1877 "error": error.to_string(),
1878 "replay_of_event_id": replay_of_event_id,
1879 }),
1880 )
1881 .await?;
1882
1883 let circuit_opened = if error.retryable() {
1884 self.state
1885 .destination_circuits
1886 .record_failure(&destination_key)
1887 } else {
1888 false
1889 };
1890 if circuit_opened {
1891 if let Some(metrics) = self.metrics.as_ref() {
1892 metrics.record_backpressure_event("circuit", "opened");
1893 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1894 metrics.record_trigger_accepted_to_dlq(
1895 binding.id.as_str(),
1896 &binding_key,
1897 event.provider.as_str(),
1898 tenant_id(&event),
1899 "circuit_open",
1900 duration_between_ms(
1901 current_unix_ms(),
1902 accepted_at_ms(parent_headers.as_ref(), &event),
1903 ),
1904 );
1905 }
1906 let final_error = format!(
1907 "destination circuit opened for {} after {} consecutive failures: {}",
1908 destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
1909 );
1910 let dlq_entry = DlqEntry {
1911 trigger_id: binding.id.as_str().to_string(),
1912 binding_key: binding.binding_key(),
1913 event: event.clone(),
1914 attempt_count: attempt,
1915 final_error: final_error.clone(),
1916 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
1917 .to_string(),
1918 attempts: attempts.clone(),
1919 };
1920 self.state
1921 .dlq
1922 .lock()
1923 .expect("dispatcher dlq poisoned")
1924 .push(dlq_entry.clone());
1925 self.append_lifecycle_event(
1926 "DlqMoved",
1927 &event,
1928 binding,
1929 serde_json::json!({
1930 "event_id": event.id.0,
1931 "attempt_count": attempt,
1932 "final_error": dlq_entry.final_error,
1933 "reason": "circuit_open",
1934 "destination": destination_key,
1935 "replay_of_event_id": replay_of_event_id,
1936 }),
1937 replay_of_event_id.as_ref(),
1938 )
1939 .await?;
1940 self.append_topic_event(
1941 TRIGGER_DLQ_TOPIC,
1942 "dlq_moved",
1943 &event,
1944 Some(binding),
1945 Some(attempt),
1946 serde_json::to_value(&dlq_entry).map_err(|serde_error| {
1947 DispatchError::Serde(serde_error.to_string())
1948 })?,
1949 replay_of_event_id.as_ref(),
1950 )
1951 .await?;
1952 finish_in_flight(
1953 binding.id.as_str(),
1954 binding.version,
1955 TriggerDispatchOutcome::Dlq,
1956 )
1957 .await
1958 .map_err(|registry_error| {
1959 DispatchError::Registry(registry_error.to_string())
1960 })?;
1961 self.release_flow_control(&acquired_flow).await?;
1962 decrement_in_flight(&self.state);
1963 self.append_dispatch_trust_record(
1964 binding,
1965 &route,
1966 &event,
1967 replay_of_event_id.as_ref(),
1968 autonomy_tier,
1969 TrustOutcome::Failure,
1970 "dlq",
1971 attempt,
1972 Some(final_error.clone()),
1973 )
1974 .await?;
1975 return Ok(DispatchOutcome {
1976 trigger_id: binding.id.as_str().to_string(),
1977 binding_key: binding.binding_key(),
1978 event_id: event.id.0,
1979 attempt_count: attempt,
1980 status: DispatchStatus::Dlq,
1981 handler_kind: route.kind().to_string(),
1982 target_uri: route.target_uri(),
1983 replay_of_event_id,
1984 result: None,
1985 error: Some(final_error),
1986 });
1987 }
1988
1989 if !error.retryable() {
1990 finish_in_flight(
1991 binding.id.as_str(),
1992 binding.version,
1993 TriggerDispatchOutcome::Failed,
1994 )
1995 .await
1996 .map_err(|registry_error| {
1997 DispatchError::Registry(registry_error.to_string())
1998 })?;
1999 self.release_flow_control(&acquired_flow).await?;
2000 decrement_in_flight(&self.state);
2001 let trust_outcome = match error {
2002 DispatchError::Denied(_) => TrustOutcome::Denied,
2003 DispatchError::Timeout(_) => TrustOutcome::Timeout,
2004 _ => TrustOutcome::Failure,
2005 };
2006 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2007 "cancelled"
2008 } else {
2009 "failed"
2010 };
2011 self.append_dispatch_trust_record(
2012 binding,
2013 &route,
2014 &event,
2015 replay_of_event_id.as_ref(),
2016 autonomy_tier,
2017 trust_outcome,
2018 terminal_status,
2019 attempt,
2020 Some(error.to_string()),
2021 )
2022 .await?;
2023 return Ok(DispatchOutcome {
2024 trigger_id: binding.id.as_str().to_string(),
2025 binding_key: binding.binding_key(),
2026 event_id: event.id.0,
2027 attempt_count: attempt,
2028 status: if matches!(error, DispatchError::Cancelled(_)) {
2029 DispatchStatus::Cancelled
2030 } else {
2031 DispatchStatus::Failed
2032 },
2033 handler_kind: route.kind().to_string(),
2034 target_uri: route.target_uri(),
2035 replay_of_event_id,
2036 result: None,
2037 error: Some(error.to_string()),
2038 });
2039 }
2040
2041 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2042 if let Some(metrics) = self.metrics.as_ref() {
2043 metrics.record_retry_scheduled();
2044 metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2045 metrics.record_trigger_retry_delay(
2046 binding.id.as_str(),
2047 &binding_key,
2048 event.provider.as_str(),
2049 tenant_id(&event),
2050 "scheduled",
2051 delay,
2052 );
2053 }
2054 tracing::info!(
2055 component = "dispatcher",
2056 lifecycle = "retry_scheduled",
2057 trigger_id = %binding.id.as_str(),
2058 binding_key = %binding_key,
2059 event_id = %event.id.0,
2060 attempt = attempt + 1,
2061 delay_ms = delay.as_millis(),
2062 trace_id = %event.trace_id.0
2063 );
2064 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2065 previous_retry_node = Some(retry_node_id.clone());
2066 self.emit_action_graph(
2067 &event,
2068 vec![RunActionGraphNodeRecord {
2069 id: retry_node_id.clone(),
2070 label: format!("retry in {}ms", delay.as_millis()),
2071 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2072 status: "scheduled".to_string(),
2073 outcome: format!("attempt_{}", attempt + 1),
2074 trace_id: Some(event.trace_id.0.clone()),
2075 stage_id: None,
2076 node_id: None,
2077 worker_id: None,
2078 run_id: None,
2079 run_path: None,
2080 metadata: retry_node_metadata(
2081 binding,
2082 &event,
2083 attempt + 1,
2084 delay,
2085 &error,
2086 ),
2087 }],
2088 vec![RunActionGraphEdgeRecord {
2089 from_id: attempt_node_id,
2090 to_id: retry_node_id.clone(),
2091 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2092 label: Some(format!("attempt {}", attempt + 1)),
2093 }],
2094 serde_json::json!({
2095 "source": "dispatcher",
2096 "trigger_id": binding.id.as_str(),
2097 "binding_key": binding.binding_key(),
2098 "event_id": event.id.0,
2099 "attempt": attempt + 1,
2100 "delay_ms": delay.as_millis(),
2101 "replay_of_event_id": replay_of_event_id,
2102 }),
2103 )
2104 .await?;
2105 self.append_lifecycle_event(
2106 "RetryScheduled",
2107 &event,
2108 binding,
2109 serde_json::json!({
2110 "event_id": event.id.0,
2111 "attempt": attempt + 1,
2112 "delay_ms": delay.as_millis(),
2113 "error": error.to_string(),
2114 "replay_of_event_id": replay_of_event_id,
2115 }),
2116 replay_of_event_id.as_ref(),
2117 )
2118 .await?;
2119 self.append_topic_event(
2120 TRIGGER_ATTEMPTS_TOPIC,
2121 "retry_scheduled",
2122 &event,
2123 Some(binding),
2124 Some(attempt + 1),
2125 serde_json::json!({
2126 "event_id": event.id.0,
2127 "attempt": attempt + 1,
2128 "trigger_id": binding.id.as_str(),
2129 "binding_key": binding.binding_key(),
2130 "delay_ms": delay.as_millis(),
2131 "error": error.to_string(),
2132 "replay_of_event_id": replay_of_event_id,
2133 }),
2134 replay_of_event_id.as_ref(),
2135 )
2136 .await?;
2137 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2138 let sleep_result = sleep_or_cancel_or_request(
2139 &self.event_log,
2140 delay,
2141 &binding_key,
2142 &event.id.0,
2143 replay_of_event_id.as_ref(),
2144 &mut self.cancel_tx.subscribe(),
2145 )
2146 .await;
2147 decrement_retry_queue_depth(&self.state);
2148 if sleep_result.is_err() {
2149 finish_in_flight(
2150 binding.id.as_str(),
2151 binding.version,
2152 TriggerDispatchOutcome::Failed,
2153 )
2154 .await
2155 .map_err(|registry_error| {
2156 DispatchError::Registry(registry_error.to_string())
2157 })?;
2158 self.release_flow_control(&acquired_flow).await?;
2159 decrement_in_flight(&self.state);
2160 self.append_dispatch_trust_record(
2161 binding,
2162 &route,
2163 &event,
2164 replay_of_event_id.as_ref(),
2165 autonomy_tier,
2166 TrustOutcome::Failure,
2167 "cancelled",
2168 attempt,
2169 Some("dispatcher shutdown cancelled retry wait".to_string()),
2170 )
2171 .await?;
2172 return Ok(DispatchOutcome {
2173 trigger_id: binding.id.as_str().to_string(),
2174 binding_key: binding.binding_key(),
2175 event_id: event.id.0,
2176 attempt_count: attempt,
2177 status: DispatchStatus::Cancelled,
2178 handler_kind: route.kind().to_string(),
2179 target_uri: route.target_uri(),
2180 replay_of_event_id,
2181 result: None,
2182 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2183 });
2184 }
2185 continue;
2186 }
2187
2188 let final_error = error.to_string();
2189 let dlq_entry = DlqEntry {
2190 trigger_id: binding.id.as_str().to_string(),
2191 binding_key: binding.binding_key(),
2192 event: event.clone(),
2193 attempt_count: attempt,
2194 final_error: final_error.clone(),
2195 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2196 .to_string(),
2197 attempts: attempts.clone(),
2198 };
2199 self.state
2200 .dlq
2201 .lock()
2202 .expect("dispatcher dlq poisoned")
2203 .push(dlq_entry.clone());
2204 if let Some(metrics) = self.metrics.as_ref() {
2205 metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2206 metrics.record_trigger_accepted_to_dlq(
2207 binding.id.as_str(),
2208 &binding_key,
2209 event.provider.as_str(),
2210 tenant_id(&event),
2211 "retry_exhausted",
2212 duration_between_ms(
2213 current_unix_ms(),
2214 accepted_at_ms(parent_headers.as_ref(), &event),
2215 ),
2216 );
2217 }
2218 tracing::info!(
2219 component = "dispatcher",
2220 lifecycle = "dlq_moved",
2221 trigger_id = %binding.id.as_str(),
2222 binding_key = %binding_key,
2223 event_id = %event.id.0,
2224 attempt_count = attempt,
2225 reason = "retry_exhausted",
2226 trace_id = %event.trace_id.0
2227 );
2228 self.emit_action_graph(
2229 &event,
2230 vec![RunActionGraphNodeRecord {
2231 id: format!("dlq:{binding_key}:{}", event.id.0),
2232 label: binding.id.as_str().to_string(),
2233 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2234 status: "queued".to_string(),
2235 outcome: "retry_exhausted".to_string(),
2236 trace_id: Some(event.trace_id.0.clone()),
2237 stage_id: None,
2238 node_id: None,
2239 worker_id: None,
2240 run_id: None,
2241 run_path: None,
2242 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2243 }],
2244 vec![RunActionGraphEdgeRecord {
2245 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2246 to_id: format!("dlq:{binding_key}:{}", event.id.0),
2247 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2248 label: Some(format!("{attempt} attempts")),
2249 }],
2250 serde_json::json!({
2251 "source": "dispatcher",
2252 "trigger_id": binding.id.as_str(),
2253 "binding_key": binding.binding_key(),
2254 "event_id": event.id.0,
2255 "attempt_count": attempt,
2256 "final_error": final_error,
2257 "replay_of_event_id": replay_of_event_id,
2258 }),
2259 )
2260 .await?;
2261 self.append_lifecycle_event(
2262 "DlqMoved",
2263 &event,
2264 binding,
2265 serde_json::json!({
2266 "event_id": event.id.0,
2267 "attempt_count": attempt,
2268 "final_error": dlq_entry.final_error,
2269 "replay_of_event_id": replay_of_event_id,
2270 }),
2271 replay_of_event_id.as_ref(),
2272 )
2273 .await?;
2274 self.append_topic_event(
2275 TRIGGER_DLQ_TOPIC,
2276 "dlq_moved",
2277 &event,
2278 Some(binding),
2279 Some(attempt),
2280 serde_json::to_value(&dlq_entry)
2281 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2282 replay_of_event_id.as_ref(),
2283 )
2284 .await?;
2285 finish_in_flight(
2286 binding.id.as_str(),
2287 binding.version,
2288 TriggerDispatchOutcome::Dlq,
2289 )
2290 .await
2291 .map_err(|registry_error| {
2292 DispatchError::Registry(registry_error.to_string())
2293 })?;
2294 self.release_flow_control(&acquired_flow).await?;
2295 decrement_in_flight(&self.state);
2296 self.append_dispatch_trust_record(
2297 binding,
2298 &route,
2299 &event,
2300 replay_of_event_id.as_ref(),
2301 autonomy_tier,
2302 TrustOutcome::Failure,
2303 "dlq",
2304 attempt,
2305 Some(error.to_string()),
2306 )
2307 .await?;
2308 return Ok(DispatchOutcome {
2309 trigger_id: binding.id.as_str().to_string(),
2310 binding_key: binding.binding_key(),
2311 event_id: event.id.0,
2312 attempt_count: attempt,
2313 status: DispatchStatus::Dlq,
2314 handler_kind: route.kind().to_string(),
2315 target_uri: route.target_uri(),
2316 replay_of_event_id,
2317 result: None,
2318 error: Some(error.to_string()),
2319 });
2320 }
2321 }
2322 }
2323
2324 finish_in_flight(
2325 binding.id.as_str(),
2326 binding.version,
2327 TriggerDispatchOutcome::Failed,
2328 )
2329 .await
2330 .map_err(|error| DispatchError::Registry(error.to_string()))?;
2331 self.release_flow_control(&acquired_flow).await?;
2332 decrement_in_flight(&self.state);
2333 self.append_dispatch_trust_record(
2334 binding,
2335 &route,
2336 &event,
2337 replay_of_event_id.as_ref(),
2338 autonomy_tier,
2339 TrustOutcome::Failure,
2340 "failed",
2341 max_attempts,
2342 Some("dispatch exhausted without terminal outcome".to_string()),
2343 )
2344 .await?;
2345 Ok(DispatchOutcome {
2346 trigger_id: binding.id.as_str().to_string(),
2347 binding_key: binding.binding_key(),
2348 event_id: event.id.0,
2349 attempt_count: max_attempts,
2350 status: DispatchStatus::Failed,
2351 handler_kind: route.kind().to_string(),
2352 target_uri: route.target_uri(),
2353 replay_of_event_id,
2354 result: None,
2355 error: Some("dispatch exhausted without terminal outcome".to_string()),
2356 })
2357 }
2358
2359 async fn dispatch_once(
2360 &self,
2361 binding: &TriggerBinding,
2362 route: &DispatchUri,
2363 event: &TriggerEvent,
2364 autonomy_tier: AutonomyTier,
2365 wait_lease: Option<DispatchWaitLease>,
2366 cancel_rx: &mut broadcast::Receiver<()>,
2367 ) -> Result<serde_json::Value, DispatchError> {
2368 match route {
2369 DispatchUri::Local { .. } => {
2370 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2371 return Err(DispatchError::Local(format!(
2372 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2373 binding.id.as_str()
2374 )));
2375 };
2376 let value = self
2377 .invoke_vm_callable(
2378 closure,
2379 &binding.binding_key(),
2380 event,
2381 None,
2382 binding.id.as_str(),
2383 &format!("{}.{}", event.provider.as_str(), event.kind),
2384 autonomy_tier,
2385 wait_lease,
2386 cancel_rx,
2387 )
2388 .await?;
2389 Ok(vm_value_to_json(&value))
2390 }
2391 DispatchUri::A2a {
2392 target,
2393 allow_cleartext,
2394 } => {
2395 if self.state.shutting_down.load(Ordering::SeqCst) {
2396 return Err(DispatchError::Cancelled(
2397 "dispatcher shutdown cancelled A2A dispatch".to_string(),
2398 ));
2399 }
2400 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2401 target,
2402 *allow_cleartext,
2403 binding.id.as_str(),
2404 &binding.binding_key(),
2405 event,
2406 cancel_rx,
2407 )
2408 .await
2409 .map_err(|error| match error {
2410 crate::a2a::A2aClientError::Cancelled(message) => {
2411 DispatchError::Cancelled(message)
2412 }
2413 other => DispatchError::A2a(other.to_string()),
2414 })?;
2415 match ack {
2416 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2417 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2418 }
2419 }
2420 DispatchUri::Worker { queue } => {
2421 let receipt = crate::WorkerQueue::new(self.event_log.clone())
2422 .enqueue(&crate::WorkerQueueJob {
2423 queue: queue.clone(),
2424 trigger_id: binding.id.as_str().to_string(),
2425 binding_key: binding.binding_key(),
2426 binding_version: binding.version,
2427 event: event.clone(),
2428 replay_of_event_id: current_dispatch_context()
2429 .and_then(|context| context.replay_of_event_id),
2430 priority: worker_queue_priority(binding, event),
2431 })
2432 .await
2433 .map_err(DispatchError::from)?;
2434 Ok(serde_json::to_value(receipt)
2435 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2436 }
2437 }
2438 }
2439
2440 async fn apply_flow_control(
2441 &self,
2442 binding: &TriggerBinding,
2443 event: &TriggerEvent,
2444 replay_of_event_id: Option<&String>,
2445 ) -> Result<FlowControlOutcome, DispatchError> {
2446 let flow = &binding.flow_control;
2447 let mut managed_event = event.clone();
2448
2449 if let Some(batch) = &flow.batch {
2450 let gate = self
2451 .resolve_flow_gate(
2452 &binding.binding_key(),
2453 batch.key.as_ref(),
2454 &managed_event,
2455 replay_of_event_id,
2456 )
2457 .await?;
2458 match self
2459 .state
2460 .flow_control
2461 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2462 .await
2463 .map_err(DispatchError::from)?
2464 {
2465 BatchDecision::Dispatch(events) => {
2466 managed_event = build_batched_event(events)?;
2467 }
2468 BatchDecision::Merged => {
2469 return Ok(FlowControlOutcome::Skip {
2470 reason: "batch_merged".to_string(),
2471 })
2472 }
2473 }
2474 }
2475
2476 if let Some(debounce) = &flow.debounce {
2477 let gate = self
2478 .resolve_flow_gate(
2479 &binding.binding_key(),
2480 Some(&debounce.key),
2481 &managed_event,
2482 replay_of_event_id,
2483 )
2484 .await?;
2485 let latest = self
2486 .state
2487 .flow_control
2488 .debounce(&gate, debounce.period)
2489 .await
2490 .map_err(DispatchError::from)?;
2491 if !latest {
2492 return Ok(FlowControlOutcome::Skip {
2493 reason: "debounced".to_string(),
2494 });
2495 }
2496 }
2497
2498 if let Some(rate_limit) = &flow.rate_limit {
2499 let gate = self
2500 .resolve_flow_gate(
2501 &binding.binding_key(),
2502 rate_limit.key.as_ref(),
2503 &managed_event,
2504 replay_of_event_id,
2505 )
2506 .await?;
2507 let allowed = self
2508 .state
2509 .flow_control
2510 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2511 .await
2512 .map_err(DispatchError::from)?;
2513 if !allowed {
2514 return Ok(FlowControlOutcome::Skip {
2515 reason: "rate_limited".to_string(),
2516 });
2517 }
2518 }
2519
2520 if let Some(throttle) = &flow.throttle {
2521 let gate = self
2522 .resolve_flow_gate(
2523 &binding.binding_key(),
2524 throttle.key.as_ref(),
2525 &managed_event,
2526 replay_of_event_id,
2527 )
2528 .await?;
2529 self.state
2530 .flow_control
2531 .wait_for_throttle(&gate, throttle.period, throttle.max)
2532 .await
2533 .map_err(DispatchError::from)?;
2534 }
2535
2536 let mut acquired = AcquiredFlowControl::default();
2537 if let Some(singleton) = &flow.singleton {
2538 let gate = self
2539 .resolve_flow_gate(
2540 &binding.binding_key(),
2541 singleton.key.as_ref(),
2542 &managed_event,
2543 replay_of_event_id,
2544 )
2545 .await?;
2546 let acquired_singleton = self
2547 .state
2548 .flow_control
2549 .try_acquire_singleton(&gate)
2550 .await
2551 .map_err(DispatchError::from)?;
2552 if !acquired_singleton {
2553 return Ok(FlowControlOutcome::Skip {
2554 reason: "singleton_active".to_string(),
2555 });
2556 }
2557 acquired.singleton = Some(SingletonLease { gate, held: true });
2558 }
2559
2560 if let Some(concurrency) = &flow.concurrency {
2561 let gate = self
2562 .resolve_flow_gate(
2563 &binding.binding_key(),
2564 concurrency.key.as_ref(),
2565 &managed_event,
2566 replay_of_event_id,
2567 )
2568 .await?;
2569 let priority_rank = self
2570 .resolve_priority_rank(
2571 &binding.binding_key(),
2572 flow.priority.as_ref(),
2573 &managed_event,
2574 replay_of_event_id,
2575 )
2576 .await?;
2577 let permit = self
2578 .state
2579 .flow_control
2580 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2581 .await
2582 .map_err(DispatchError::from)?;
2583 acquired.concurrency = Some(ConcurrencyLease {
2584 gate,
2585 max: concurrency.max,
2586 priority_rank,
2587 permit: Some(permit),
2588 });
2589 }
2590
2591 Ok(FlowControlOutcome::Dispatch {
2592 event: Box::new(managed_event),
2593 acquired,
2594 })
2595 }
2596
2597 async fn release_flow_control(
2598 &self,
2599 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2600 ) -> Result<(), DispatchError> {
2601 let (singleton_gate, concurrency_permit) = {
2602 let mut acquired = acquired.lock().await;
2603 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2604 if lease.held {
2605 lease.held = false;
2606 Some(lease.gate.clone())
2607 } else {
2608 None
2609 }
2610 });
2611 let concurrency_permit = acquired
2612 .concurrency
2613 .as_mut()
2614 .and_then(|lease| lease.permit.take());
2615 (singleton_gate, concurrency_permit)
2616 };
2617 if let Some(gate) = singleton_gate {
2618 self.state
2619 .flow_control
2620 .release_singleton(&gate)
2621 .await
2622 .map_err(DispatchError::from)?;
2623 }
2624 if let Some(permit) = concurrency_permit {
2625 self.state
2626 .flow_control
2627 .release_concurrency(permit)
2628 .await
2629 .map_err(DispatchError::from)?;
2630 }
2631 Ok(())
2632 }
2633
2634 async fn resolve_flow_gate(
2635 &self,
2636 binding_key: &str,
2637 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2638 event: &TriggerEvent,
2639 replay_of_event_id: Option<&String>,
2640 ) -> Result<String, DispatchError> {
2641 let key = match expr {
2642 Some(expr) => {
2643 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2644 .await?
2645 }
2646 None => "_global".to_string(),
2647 };
2648 Ok(format!("{binding_key}:{key}"))
2649 }
2650
2651 async fn resolve_priority_rank(
2652 &self,
2653 binding_key: &str,
2654 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2655 event: &TriggerEvent,
2656 replay_of_event_id: Option<&String>,
2657 ) -> Result<usize, DispatchError> {
2658 let Some(priority) = priority else {
2659 return Ok(0);
2660 };
2661 let value = self
2662 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2663 .await?;
2664 Ok(priority
2665 .order
2666 .iter()
2667 .position(|candidate| candidate == &value)
2668 .unwrap_or(priority.order.len()))
2669 }
2670
2671 async fn evaluate_flow_expression(
2672 &self,
2673 binding_key: &str,
2674 expr: &crate::triggers::TriggerExpressionSpec,
2675 event: &TriggerEvent,
2676 replay_of_event_id: Option<&String>,
2677 ) -> Result<String, DispatchError> {
2678 let value = self
2679 .invoke_vm_callable(
2680 &expr.closure,
2681 binding_key,
2682 event,
2683 replay_of_event_id,
2684 "",
2685 "flow_control",
2686 AutonomyTier::Suggest,
2687 None,
2688 &mut self.cancel_tx.subscribe(),
2689 )
2690 .await?;
2691 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2692 }
2693
2694 #[allow(clippy::too_many_arguments)]
2695 async fn invoke_vm_callable(
2696 &self,
2697 closure: &crate::value::VmClosure,
2698 binding_key: &str,
2699 event: &TriggerEvent,
2700 replay_of_event_id: Option<&String>,
2701 agent_id: &str,
2702 action: &str,
2703 autonomy_tier: AutonomyTier,
2704 wait_lease: Option<DispatchWaitLease>,
2705 cancel_rx: &mut broadcast::Receiver<()>,
2706 ) -> Result<VmValue, DispatchError> {
2707 let mut vm = self.base_vm.child_vm();
2708 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2709 if self.state.shutting_down.load(Ordering::SeqCst) {
2710 cancel_token.store(true, Ordering::SeqCst);
2711 }
2712 self.state
2713 .cancel_tokens
2714 .lock()
2715 .expect("dispatcher cancel tokens poisoned")
2716 .push(cancel_token.clone());
2717 vm.install_cancel_token(cancel_token.clone());
2718 let arg = event_to_handler_value(event)?;
2719 let args = [arg];
2720 let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2721 let effective_policy = match crate::orchestration::current_execution_policy() {
2722 Some(parent) => parent
2723 .intersect(&tier_policy)
2724 .map_err(|error| DispatchError::Local(error.to_string()))?,
2725 None => tier_policy,
2726 };
2727 crate::orchestration::push_execution_policy(effective_policy);
2728 let _policy_guard = DispatchExecutionPolicyGuard;
2729 let future = vm.call_closure_pub(closure, &args);
2730 pin_mut!(future);
2731 let (binding_id, binding_version) = split_binding_key(binding_key);
2732 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2733 slot.borrow_mut().replace(DispatchContext {
2734 trigger_event: event.clone(),
2735 replay_of_event_id: replay_of_event_id.cloned(),
2736 binding_id,
2737 binding_version,
2738 agent_id: agent_id.to_string(),
2739 action: action.to_string(),
2740 autonomy_tier,
2741 })
2742 });
2743 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2744 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2745 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2746 crate::stdlib::hitl::reset_hitl_state();
2747 let mut poll = tokio::time::interval(Duration::from_millis(100));
2748 let result = loop {
2749 tokio::select! {
2750 result = &mut future => break result,
2751 _ = recv_cancel(cancel_rx) => {
2752 cancel_token.store(true, Ordering::SeqCst);
2753 }
2754 _ = poll.tick() => {
2755 if dispatch_cancel_requested(
2756 &self.event_log,
2757 binding_key,
2758 &event.id.0,
2759 replay_of_event_id,
2760 )
2761 .await? {
2762 cancel_token.store(true, Ordering::SeqCst);
2763 }
2764 }
2765 }
2766 };
2767 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2768 *slot.borrow_mut() = prior_context;
2769 });
2770 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2771 *slot.borrow_mut() = prior_wait_lease;
2772 });
2773 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2774 {
2775 let mut tokens = self
2776 .state
2777 .cancel_tokens
2778 .lock()
2779 .expect("dispatcher cancel tokens poisoned");
2780 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2781 }
2782
2783 if cancel_token.load(Ordering::SeqCst) {
2784 if dispatch_cancel_requested(
2785 &self.event_log,
2786 binding_key,
2787 &event.id.0,
2788 replay_of_event_id,
2789 )
2790 .await?
2791 {
2792 Err(DispatchError::Cancelled(
2793 "trigger cancel request cancelled local handler".to_string(),
2794 ))
2795 } else {
2796 Err(DispatchError::Cancelled(
2797 "dispatcher shutdown cancelled local handler".to_string(),
2798 ))
2799 }
2800 } else {
2801 result.map_err(dispatch_error_from_vm_error)
2802 }
2803 }
2804
2805 #[allow(clippy::too_many_arguments)]
2806 async fn invoke_vm_callable_with_timeout(
2807 &self,
2808 closure: &crate::value::VmClosure,
2809 binding_key: &str,
2810 event: &TriggerEvent,
2811 replay_of_event_id: Option<&String>,
2812 agent_id: &str,
2813 action: &str,
2814 autonomy_tier: AutonomyTier,
2815 cancel_rx: &mut broadcast::Receiver<()>,
2816 timeout: Option<Duration>,
2817 ) -> Result<VmValue, DispatchError> {
2818 let future = self.invoke_vm_callable(
2819 closure,
2820 binding_key,
2821 event,
2822 replay_of_event_id,
2823 agent_id,
2824 action,
2825 autonomy_tier,
2826 None,
2827 cancel_rx,
2828 );
2829 pin_mut!(future);
2830 if let Some(timeout) = timeout {
2831 match tokio::time::timeout(timeout, future).await {
2832 Ok(result) => result,
2833 Err(_) => Err(DispatchError::Local(
2834 "predicate evaluation timed out".to_string(),
2835 )),
2836 }
2837 } else {
2838 future.await
2839 }
2840 }
2841
2842 #[allow(clippy::too_many_arguments)]
2843 async fn append_dispatch_trust_record(
2844 &self,
2845 binding: &TriggerBinding,
2846 route: &DispatchUri,
2847 event: &TriggerEvent,
2848 replay_of_event_id: Option<&String>,
2849 autonomy_tier: AutonomyTier,
2850 outcome: TrustOutcome,
2851 terminal_status: &str,
2852 attempt_count: u32,
2853 error: Option<String>,
2854 ) -> Result<(), DispatchError> {
2855 let mut record = TrustRecord::new(
2856 binding.id.as_str().to_string(),
2857 format!("{}.{}", event.provider.as_str(), event.kind),
2858 None,
2859 outcome,
2860 event.trace_id.0.clone(),
2861 autonomy_tier,
2862 );
2863 record.metadata.insert(
2864 "binding_key".to_string(),
2865 serde_json::json!(binding.binding_key()),
2866 );
2867 record.metadata.insert(
2868 "binding_version".to_string(),
2869 serde_json::json!(binding.version),
2870 );
2871 record.metadata.insert(
2872 "provider".to_string(),
2873 serde_json::json!(event.provider.as_str()),
2874 );
2875 record
2876 .metadata
2877 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2878 record
2879 .metadata
2880 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2881 record.metadata.insert(
2882 "target_uri".to_string(),
2883 serde_json::json!(route.target_uri()),
2884 );
2885 record.metadata.insert(
2886 "terminal_status".to_string(),
2887 serde_json::json!(terminal_status),
2888 );
2889 record.metadata.insert(
2890 "attempt_count".to_string(),
2891 serde_json::json!(attempt_count),
2892 );
2893 if let Some(replay_of_event_id) = replay_of_event_id {
2894 record.metadata.insert(
2895 "replay_of_event_id".to_string(),
2896 serde_json::json!(replay_of_event_id),
2897 );
2898 }
2899 if let Some(error) = error {
2900 record
2901 .metadata
2902 .insert("error".to_string(), serde_json::json!(error));
2903 }
2904 append_trust_record(&self.event_log, &record)
2905 .await
2906 .map(|_| ())
2907 .map_err(DispatchError::from)
2908 }
2909
2910 async fn append_attempt_record(
2911 &self,
2912 event: &TriggerEvent,
2913 binding: &TriggerBinding,
2914 attempt: &DispatchAttemptRecord,
2915 replay_of_event_id: Option<&String>,
2916 ) -> Result<(), DispatchError> {
2917 self.append_topic_event(
2918 TRIGGER_ATTEMPTS_TOPIC,
2919 "attempt_recorded",
2920 event,
2921 Some(binding),
2922 Some(attempt.attempt),
2923 serde_json::to_value(attempt)
2924 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2925 replay_of_event_id,
2926 )
2927 .await
2928 }
2929
2930 async fn append_lifecycle_event(
2931 &self,
2932 kind: &str,
2933 event: &TriggerEvent,
2934 binding: &TriggerBinding,
2935 payload: serde_json::Value,
2936 replay_of_event_id: Option<&String>,
2937 ) -> Result<(), DispatchError> {
2938 self.append_topic_event(
2939 TRIGGERS_LIFECYCLE_TOPIC,
2940 kind,
2941 event,
2942 Some(binding),
2943 None,
2944 payload,
2945 replay_of_event_id,
2946 )
2947 .await
2948 }
2949
2950 async fn append_autonomy_budget_approval_request(
2951 &self,
2952 binding: &TriggerBinding,
2953 route: &DispatchUri,
2954 event: &TriggerEvent,
2955 replay_of_event_id: Option<&String>,
2956 reason: &str,
2957 ) -> Result<String, DispatchError> {
2958 let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2959 let detail = serde_json::json!({
2960 "trigger_id": binding.id.as_str(),
2961 "binding_key": binding.binding_key(),
2962 "event_id": event.id.0,
2963 "reason": reason,
2964 "from_tier": AutonomyTier::ActAuto.as_str(),
2965 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
2966 "handler_kind": route.kind(),
2967 "target_uri": route.target_uri(),
2968 "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
2969 "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
2970 "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
2971 "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
2972 "replay_of_event_id": replay_of_event_id,
2973 });
2974 let request_id = crate::stdlib::hitl::append_approval_request_on(
2975 &self.event_log,
2976 binding.id.as_str().to_string(),
2977 event.trace_id.0.clone(),
2978 format!(
2979 "approve autonomous dispatch for trigger '{}' after {}",
2980 binding.id.as_str(),
2981 reason
2982 ),
2983 detail.clone(),
2984 reviewers.clone(),
2985 )
2986 .await
2987 .map_err(dispatch_error_from_vm_error)?;
2988 self.append_lifecycle_event(
2989 "autonomy.budget_exceeded",
2990 event,
2991 binding,
2992 serde_json::json!({
2993 "trigger_id": binding.id.as_str(),
2994 "event_id": event.id.0,
2995 "reason": reason,
2996 "request_id": request_id,
2997 "reviewers": reviewers,
2998 "from_tier": AutonomyTier::ActAuto.as_str(),
2999 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3000 "replay_of_event_id": replay_of_event_id,
3001 }),
3002 replay_of_event_id,
3003 )
3004 .await?;
3005 Ok(request_id)
3006 }
3007
3008 #[allow(clippy::too_many_arguments)]
3009 async fn emit_autonomy_budget_approval_action_graph(
3010 &self,
3011 binding: &TriggerBinding,
3012 route: &DispatchUri,
3013 event: &TriggerEvent,
3014 source_node_id: &str,
3015 replay_of_event_id: Option<&String>,
3016 reason: &str,
3017 request_id: &str,
3018 ) -> Result<(), DispatchError> {
3019 let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3020 self.emit_action_graph(
3021 event,
3022 vec![RunActionGraphNodeRecord {
3023 id: approval_node_id.clone(),
3024 label: format!("approval required: {reason}"),
3025 kind: "approval".to_string(),
3026 status: "waiting".to_string(),
3027 outcome: "request_approval".to_string(),
3028 trace_id: Some(event.trace_id.0.clone()),
3029 stage_id: None,
3030 node_id: None,
3031 worker_id: None,
3032 run_id: None,
3033 run_path: None,
3034 metadata: BTreeMap::from([
3035 (
3036 "trigger_id".to_string(),
3037 serde_json::json!(binding.id.as_str()),
3038 ),
3039 (
3040 "binding_key".to_string(),
3041 serde_json::json!(binding.binding_key()),
3042 ),
3043 ("event_id".to_string(), serde_json::json!(event.id.0)),
3044 ("reason".to_string(), serde_json::json!(reason)),
3045 ("request_id".to_string(), serde_json::json!(request_id)),
3046 (
3047 "reviewers".to_string(),
3048 serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3049 ),
3050 ("handler_kind".to_string(), serde_json::json!(route.kind())),
3051 (
3052 "target_uri".to_string(),
3053 serde_json::json!(route.target_uri()),
3054 ),
3055 (
3056 "from_tier".to_string(),
3057 serde_json::json!(AutonomyTier::ActAuto.as_str()),
3058 ),
3059 (
3060 "requested_tier".to_string(),
3061 serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3062 ),
3063 (
3064 "replay_of_event_id".to_string(),
3065 serde_json::json!(replay_of_event_id),
3066 ),
3067 ]),
3068 }],
3069 vec![RunActionGraphEdgeRecord {
3070 from_id: source_node_id.to_string(),
3071 to_id: approval_node_id,
3072 kind: "approval_gate".to_string(),
3073 label: Some("autonomy budget".to_string()),
3074 }],
3075 serde_json::json!({
3076 "source": "dispatcher",
3077 "trigger_id": binding.id.as_str(),
3078 "binding_key": binding.binding_key(),
3079 "event_id": event.id.0,
3080 "reason": reason,
3081 "request_id": request_id,
3082 "replay_of_event_id": replay_of_event_id,
3083 }),
3084 )
3085 .await
3086 }
3087
3088 #[allow(clippy::too_many_arguments)]
3089 async fn append_tier_transition_trust_record(
3090 &self,
3091 binding: &TriggerBinding,
3092 event: &TriggerEvent,
3093 replay_of_event_id: Option<&String>,
3094 from_tier: AutonomyTier,
3095 to_tier: AutonomyTier,
3096 reason: &str,
3097 request_id: &str,
3098 ) -> Result<(), DispatchError> {
3099 let mut record = TrustRecord::new(
3100 binding.id.as_str().to_string(),
3101 "autonomy.tier_transition",
3102 Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3103 TrustOutcome::Denied,
3104 event.trace_id.0.clone(),
3105 to_tier,
3106 );
3107 record.metadata.insert(
3108 "binding_key".to_string(),
3109 serde_json::json!(binding.binding_key()),
3110 );
3111 record
3112 .metadata
3113 .insert("event_id".to_string(), serde_json::json!(event.id.0));
3114 record.metadata.insert(
3115 "from_tier".to_string(),
3116 serde_json::json!(from_tier.as_str()),
3117 );
3118 record
3119 .metadata
3120 .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3121 record
3122 .metadata
3123 .insert("reason".to_string(), serde_json::json!(reason));
3124 record
3125 .metadata
3126 .insert("request_id".to_string(), serde_json::json!(request_id));
3127 if let Some(replay_of_event_id) = replay_of_event_id {
3128 record.metadata.insert(
3129 "replay_of_event_id".to_string(),
3130 serde_json::json!(replay_of_event_id),
3131 );
3132 }
3133 append_trust_record(&self.event_log, &record)
3134 .await
3135 .map(|_| ())
3136 .map_err(DispatchError::from)
3137 }
3138
3139 async fn append_budget_deferred_event(
3140 &self,
3141 binding: &TriggerBinding,
3142 route: &DispatchUri,
3143 event: &TriggerEvent,
3144 replay_of_event_id: Option<&String>,
3145 reason: &str,
3146 ) -> Result<(), DispatchError> {
3147 self.append_topic_event(
3148 TRIGGER_ATTEMPTS_TOPIC,
3149 "budget_deferred",
3150 event,
3151 Some(binding),
3152 None,
3153 serde_json::json!({
3154 "event_id": event.id.0,
3155 "trigger_id": binding.id.as_str(),
3156 "binding_key": binding.binding_key(),
3157 "handler_kind": route.kind(),
3158 "target_uri": route.target_uri(),
3159 "reason": reason,
3160 "retry_at": next_budget_reset_rfc3339(binding),
3161 "replay_of_event_id": replay_of_event_id,
3162 }),
3163 replay_of_event_id,
3164 )
3165 .await?;
3166 self.append_skipped_outbox_event(
3167 binding,
3168 route,
3169 event,
3170 replay_of_event_id,
3171 DispatchSkipStage::Predicate,
3172 serde_json::json!({
3173 "deferred": true,
3174 "reason": reason,
3175 "retry_at": next_budget_reset_rfc3339(binding),
3176 }),
3177 )
3178 .await
3179 }
3180
3181 async fn append_skipped_outbox_event(
3182 &self,
3183 binding: &TriggerBinding,
3184 route: &DispatchUri,
3185 event: &TriggerEvent,
3186 replay_of_event_id: Option<&String>,
3187 stage: DispatchSkipStage,
3188 detail: serde_json::Value,
3189 ) -> Result<(), DispatchError> {
3190 self.append_topic_event(
3191 TRIGGER_OUTBOX_TOPIC,
3192 "dispatch_skipped",
3193 event,
3194 Some(binding),
3195 None,
3196 serde_json::json!({
3197 "event_id": event.id.0,
3198 "trigger_id": binding.id.as_str(),
3199 "binding_key": binding.binding_key(),
3200 "handler_kind": route.kind(),
3201 "target_uri": route.target_uri(),
3202 "skip_stage": stage.as_str(),
3203 "detail": detail,
3204 "replay_of_event_id": replay_of_event_id,
3205 }),
3206 replay_of_event_id,
3207 )
3208 .await
3209 }
3210
3211 async fn append_topic_event(
3212 &self,
3213 topic_name: &str,
3214 kind: &str,
3215 event: &TriggerEvent,
3216 binding: Option<&TriggerBinding>,
3217 attempt: Option<u32>,
3218 payload: serde_json::Value,
3219 replay_of_event_id: Option<&String>,
3220 ) -> Result<(), DispatchError> {
3221 let topic = topic_for_event(event, topic_name)?;
3222 let headers = event_headers(event, binding, attempt, replay_of_event_id);
3223 self.event_log
3224 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3225 .await
3226 .map_err(DispatchError::from)
3227 .map(|_| ())
3228 }
3229
3230 async fn emit_action_graph(
3231 &self,
3232 event: &TriggerEvent,
3233 nodes: Vec<RunActionGraphNodeRecord>,
3234 edges: Vec<RunActionGraphEdgeRecord>,
3235 extra: serde_json::Value,
3236 ) -> Result<(), DispatchError> {
3237 let mut headers = BTreeMap::new();
3238 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3239 headers.insert("event_id".to_string(), event.id.0.clone());
3240 let observability = RunObservabilityRecord {
3241 schema_version: 1,
3242 action_graph_nodes: nodes,
3243 action_graph_edges: edges,
3244 ..Default::default()
3245 };
3246 append_action_graph_update(
3247 headers,
3248 serde_json::json!({
3249 "source": "dispatcher",
3250 "trace_id": event.trace_id.0,
3251 "event_id": event.id.0,
3252 "observability": observability,
3253 "context": extra,
3254 }),
3255 )
3256 .await
3257 .map_err(DispatchError::from)
3258 }
3259}
3260
3261async fn dispatch_cancel_requested(
3262 event_log: &Arc<AnyEventLog>,
3263 binding_key: &str,
3264 event_id: &str,
3265 replay_of_event_id: Option<&String>,
3266) -> Result<bool, DispatchError> {
3267 if replay_of_event_id.is_some() {
3268 return Ok(false);
3269 }
3270 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3271 .expect("static trigger cancel topic should always be valid");
3272 let events = event_log.read_range(&topic, None, usize::MAX).await?;
3273 let requested = events
3274 .into_iter()
3275 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3276 .filter_map(|(_, event)| {
3277 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3278 })
3279 .collect::<BTreeSet<_>>();
3280 Ok(requested
3281 .iter()
3282 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3283}
3284
3285async fn sleep_or_cancel_or_request(
3286 event_log: &Arc<AnyEventLog>,
3287 delay: Duration,
3288 binding_key: &str,
3289 event_id: &str,
3290 replay_of_event_id: Option<&String>,
3291 cancel_rx: &mut broadcast::Receiver<()>,
3292) -> Result<(), DispatchError> {
3293 let sleep = tokio::time::sleep(delay);
3294 pin_mut!(sleep);
3295 let mut poll = tokio::time::interval(Duration::from_millis(100));
3296 loop {
3297 tokio::select! {
3298 _ = &mut sleep => return Ok(()),
3299 _ = recv_cancel(cancel_rx) => {
3300 return Err(DispatchError::Cancelled(
3301 "dispatcher shutdown cancelled retry wait".to_string(),
3302 ));
3303 }
3304 _ = poll.tick() => {
3305 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3306 return Err(DispatchError::Cancelled(
3307 "trigger cancel request cancelled retry wait".to_string(),
3308 ));
3309 }
3310 }
3311 }
3312 }
3313}
3314
3315fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3316 let mut iter = events.into_iter();
3317 let Some(mut root) = iter.next() else {
3318 return Err(DispatchError::Registry(
3319 "batch dispatch produced an empty event list".to_string(),
3320 ));
3321 };
3322 let mut batch = Vec::new();
3323 batch.push(
3324 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3325 );
3326 for event in iter {
3327 batch.push(
3328 serde_json::to_value(&event)
3329 .map_err(|error| DispatchError::Serde(error.to_string()))?,
3330 );
3331 }
3332 root.batch = Some(batch);
3333 Ok(root)
3334}
3335
3336fn json_value_to_gate(value: &serde_json::Value) -> String {
3337 match value {
3338 serde_json::Value::Null => "null".to_string(),
3339 serde_json::Value::String(text) => text.clone(),
3340 serde_json::Value::Bool(value) => value.to_string(),
3341 serde_json::Value::Number(value) => value.to_string(),
3342 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3343 }
3344}
3345
3346fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3347 let json =
3348 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3349 let value = json_to_vm_value(&json);
3350 match (&event.raw_body, value) {
3351 (Some(raw_body), VmValue::Dict(dict)) => {
3352 let mut map = (*dict).clone();
3353 map.insert(
3354 "raw_body".to_string(),
3355 VmValue::Bytes(Rc::new(raw_body.clone())),
3356 );
3357 Ok(VmValue::Dict(Rc::new(map)))
3358 }
3359 (_, other) => Ok(other),
3360 }
3361}
3362
3363fn decrement_in_flight(state: &DispatcherRuntimeState) {
3364 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3365 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3366 state.idle_notify.notify_waiters();
3367 }
3368}
3369
3370fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3371 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3372 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3373 state.idle_notify.notify_waiters();
3374 }
3375}
3376
3377#[cfg(test)]
3378fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3379 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3380 *slot.borrow_mut() = Some(tx);
3381 });
3382}
3383
3384#[cfg(not(test))]
3385fn notify_test_inbox_dequeued() {}
3386
3387#[cfg(test)]
3388fn notify_test_inbox_dequeued() {
3389 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3390 if let Some(tx) = slot.borrow_mut().take() {
3391 let _ = tx.send(());
3392 }
3393 });
3394}
3395
3396pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3397 event_log: &L,
3398 event: &TriggerEvent,
3399) -> Result<u64, DispatchError> {
3400 let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
3401 let headers = event_headers(event, None, None, None);
3402 let payload =
3403 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3404 event_log
3405 .append(
3406 &topic,
3407 LogEvent::new("event_ingested", payload).with_headers(headers),
3408 )
3409 .await
3410 .map_err(DispatchError::from)
3411}
3412
3413pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3414 ACTIVE_DISPATCHER_STATE.with(|slot| {
3415 slot.borrow()
3416 .as_ref()
3417 .map(|state| DispatcherStatsSnapshot {
3418 in_flight: state.in_flight.load(Ordering::Relaxed),
3419 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3420 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3421 })
3422 .unwrap_or_default()
3423 })
3424}
3425
3426pub fn clear_dispatcher_state() {
3427 ACTIVE_DISPATCHER_STATE.with(|slot| {
3428 *slot.borrow_mut() = None;
3429 });
3430 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3431 *slot.borrow_mut() = None;
3432 });
3433}
3434
3435fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3436 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3437 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3438 }
3439 if is_cancelled_vm_error(&error) {
3440 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3441 }
3442 if let VmError::Thrown(VmValue::String(message)) = &error {
3443 return DispatchError::Local(message.to_string());
3444 }
3445 match error_to_category(&error) {
3446 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3447 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3448 ErrorCategory::Cancelled => {
3449 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3450 }
3451 _ => DispatchError::Local(error.to_string()),
3452 }
3453}
3454
3455fn dispatch_error_label(error: &DispatchError) -> &'static str {
3456 match error {
3457 DispatchError::Denied(_) => "denied",
3458 DispatchError::Timeout(_) => "timeout",
3459 DispatchError::Waiting(_) => "waiting",
3460 DispatchError::Cancelled(_) => "cancelled",
3461 _ => "failed",
3462 }
3463}
3464
3465fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3466 match route {
3467 DispatchUri::Worker { .. } => "enqueued",
3468 DispatchUri::A2a { .. }
3469 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3470 {
3471 "pending"
3472 }
3473 DispatchUri::A2a { .. } => "completed",
3474 DispatchUri::Local { .. } => "success",
3475 }
3476}
3477
3478fn dispatch_node_id(
3479 route: &DispatchUri,
3480 binding_key: &str,
3481 event_id: &str,
3482 attempt: u32,
3483) -> String {
3484 let prefix = match route {
3485 DispatchUri::A2a { .. } => "a2a",
3486 _ => "dispatch",
3487 };
3488 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3489}
3490
3491fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3492 match route {
3493 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3494 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3495 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3496 }
3497}
3498
3499fn dispatch_node_label(route: &DispatchUri) -> String {
3500 match route {
3501 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3502 _ => route.target_uri(),
3503 }
3504}
3505
3506fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3507 match route {
3508 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3509 _ => None,
3510 }
3511}
3512
3513fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3514 match route {
3515 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3516 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3517 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3518 }
3519}
3520
3521fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3522 match status {
3523 crate::triggers::SignatureStatus::Verified => "verified",
3524 crate::triggers::SignatureStatus::Unsigned => "unsigned",
3525 crate::triggers::SignatureStatus::Failed { .. } => "failed",
3526 }
3527}
3528
3529fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3530 let mut metadata = BTreeMap::new();
3531 metadata.insert(
3532 "provider".to_string(),
3533 serde_json::json!(event.provider.as_str()),
3534 );
3535 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3536 metadata.insert(
3537 "dedupe_key".to_string(),
3538 serde_json::json!(event.dedupe_key),
3539 );
3540 metadata.insert(
3541 "signature_status".to_string(),
3542 serde_json::json!(signature_status_label(&event.signature_status)),
3543 );
3544 metadata
3545}
3546
3547fn dispatch_node_metadata(
3548 route: &DispatchUri,
3549 binding: &TriggerBinding,
3550 event: &TriggerEvent,
3551 attempt: u32,
3552) -> BTreeMap<String, serde_json::Value> {
3553 let mut metadata = BTreeMap::new();
3554 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3555 metadata.insert(
3556 "target_uri".to_string(),
3557 serde_json::json!(route.target_uri()),
3558 );
3559 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3560 metadata.insert(
3561 "trigger_id".to_string(),
3562 serde_json::json!(binding.id.as_str()),
3563 );
3564 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3565 if let Some(target_agent) = dispatch_target_agent(route) {
3566 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3567 }
3568 if let DispatchUri::Worker { queue } = route {
3569 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3570 }
3571 metadata
3572}
3573
3574fn dispatch_success_metadata(
3575 route: &DispatchUri,
3576 binding: &TriggerBinding,
3577 event: &TriggerEvent,
3578 attempt: u32,
3579 result: &serde_json::Value,
3580) -> BTreeMap<String, serde_json::Value> {
3581 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3582 match route {
3583 DispatchUri::A2a { .. } => {
3584 if let Some(task_id) = result
3585 .get("task_id")
3586 .or_else(|| result.get("id"))
3587 .and_then(|value| value.as_str())
3588 {
3589 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3590 }
3591 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3592 metadata.insert("state".to_string(), serde_json::json!(state));
3593 }
3594 }
3595 DispatchUri::Worker { .. } => {
3596 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3597 {
3598 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3599 }
3600 if let Some(response_topic) = result
3601 .get("response_topic")
3602 .and_then(|value| value.as_str())
3603 {
3604 metadata.insert(
3605 "response_topic".to_string(),
3606 serde_json::json!(response_topic),
3607 );
3608 }
3609 }
3610 DispatchUri::Local { .. } => {}
3611 }
3612 metadata
3613}
3614
3615fn dispatch_error_metadata(
3616 route: &DispatchUri,
3617 binding: &TriggerBinding,
3618 event: &TriggerEvent,
3619 attempt: u32,
3620 error: &DispatchError,
3621) -> BTreeMap<String, serde_json::Value> {
3622 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3623 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3624 metadata
3625}
3626
3627fn retry_node_metadata(
3628 binding: &TriggerBinding,
3629 event: &TriggerEvent,
3630 attempt: u32,
3631 delay: Duration,
3632 error: &DispatchError,
3633) -> BTreeMap<String, serde_json::Value> {
3634 let mut metadata = BTreeMap::new();
3635 metadata.insert(
3636 "trigger_id".to_string(),
3637 serde_json::json!(binding.id.as_str()),
3638 );
3639 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3640 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3641 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3642 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3643 metadata
3644}
3645
3646fn split_binding_key(binding_key: &str) -> (String, u32) {
3647 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3648 return (binding_key.to_string(), 0);
3649 };
3650 let version = suffix.parse::<u32>().unwrap_or(0);
3651 (binding_id.to_string(), version)
3652}
3653
3654fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
3655 match binding_version {
3656 Some(version) => format!("{trigger_id}@v{version}"),
3657 None => trigger_id.to_string(),
3658 }
3659}
3660
3661fn tenant_id(event: &TriggerEvent) -> Option<&str> {
3662 event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
3663}
3664
3665fn current_unix_ms() -> i64 {
3666 unix_ms(time::OffsetDateTime::now_utc())
3667}
3668
3669fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
3670 (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
3671}
3672
3673fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3674 lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
3675 .unwrap_or_else(|| unix_ms(event.received_at))
3676}
3677
3678fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3679 lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
3680 .unwrap_or_else(|| accepted_at_ms(headers, event))
3681}
3682
3683fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
3684 headers
3685 .and_then(|headers| headers.get(name))
3686 .and_then(|value| value.parse::<i64>().ok())
3687}
3688
3689fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
3690 Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
3691}
3692
3693fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
3694 match result {
3695 Ok(_) => "succeeded",
3696 Err(DispatchError::Waiting(_)) => "waiting",
3697 Err(DispatchError::Cancelled(_)) => "cancelled",
3698 Err(DispatchError::Denied(_)) => "denied",
3699 Err(DispatchError::Timeout(_)) => "timeout",
3700 Err(_) => "failed",
3701 }
3702}
3703
3704fn is_cancelled_vm_error(error: &VmError) -> bool {
3705 matches!(
3706 error,
3707 VmError::Thrown(VmValue::String(message))
3708 if message.starts_with("kind:cancelled:")
3709 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3710}
3711
3712fn event_headers(
3713 event: &TriggerEvent,
3714 binding: Option<&TriggerBinding>,
3715 attempt: Option<u32>,
3716 replay_of_event_id: Option<&String>,
3717) -> BTreeMap<String, String> {
3718 let mut headers = BTreeMap::new();
3719 headers.insert("event_id".to_string(), event.id.0.clone());
3720 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3721 headers.insert("provider".to_string(), event.provider.as_str().to_string());
3722 headers.insert("kind".to_string(), event.kind.clone());
3723 if let Some(replay_of_event_id) = replay_of_event_id {
3724 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3725 }
3726 if let Some(tenant_id) = event.tenant_id.as_ref() {
3727 headers.insert("tenant_id".to_string(), tenant_id.0.clone());
3728 }
3729 if let Some(binding) = binding {
3730 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3731 headers.insert("binding_key".to_string(), binding.binding_key());
3732 headers.insert(
3733 "handler_kind".to_string(),
3734 DispatchUri::from(&binding.handler).kind().to_string(),
3735 );
3736 }
3737 if let Some(attempt) = attempt {
3738 headers.insert("attempt".to_string(), attempt.to_string());
3739 }
3740 headers
3741}
3742
3743fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
3744 let topic = Topic::new(topic_name)
3745 .expect("static trigger dispatcher topic names should always be valid");
3746 match event.tenant_id.as_ref() {
3747 Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
3748 None => Ok(topic),
3749 }
3750}
3751
3752fn worker_queue_priority(
3753 binding: &super::registry::TriggerBinding,
3754 event: &TriggerEvent,
3755) -> crate::WorkerQueuePriority {
3756 match event
3757 .headers
3758 .get("priority")
3759 .map(|value| value.trim().to_ascii_lowercase())
3760 .as_deref()
3761 {
3762 Some("high") => crate::WorkerQueuePriority::High,
3763 Some("low") => crate::WorkerQueuePriority::Low,
3764 _ => binding.dispatch_priority,
3765 }
3766}
3767
3768const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3769
3770fn maybe_fail_before_outbox() {
3771 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3772 std::process::exit(86);
3773 }
3774}
3775
3776fn now_rfc3339() -> String {
3777 time::OffsetDateTime::now_utc()
3778 .format(&time::format_description::well_known::Rfc3339)
3779 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3780}
3781
3782fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3783 let now = time::OffsetDateTime::now_utc();
3784 let reset = if binding.hourly_cost_usd.is_some() {
3785 let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3786 time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3787 } else {
3788 let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3789 time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3790 };
3791 reset
3792 .format(&time::format_description::well_known::Rfc3339)
3793 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3794}
3795
3796fn now_unix_ms() -> i64 {
3797 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3798}
3799
3800fn cancelled_dispatch_outcome(
3801 binding: &TriggerBinding,
3802 route: &DispatchUri,
3803 event: &TriggerEvent,
3804 replay_of_event_id: Option<String>,
3805 attempt_count: u32,
3806 error: String,
3807) -> DispatchOutcome {
3808 DispatchOutcome {
3809 trigger_id: binding.id.as_str().to_string(),
3810 binding_key: binding.binding_key(),
3811 event_id: event.id.0.clone(),
3812 attempt_count,
3813 status: DispatchStatus::Cancelled,
3814 handler_kind: route.kind().to_string(),
3815 target_uri: route.target_uri(),
3816 replay_of_event_id,
3817 result: None,
3818 error: Some(error),
3819 }
3820}
3821
3822async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3823 let _ = cancel_rx.recv().await;
3824}
3825
3826#[cfg(test)]
3827mod tests;