1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::vm_value_to_json;
16use crate::orchestration::{
17 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
18 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
19 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
20 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
21 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
22 ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
23 ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
24};
25use crate::stdlib::json_to_vm_value;
26use crate::trust_graph::{
27 append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
28};
29use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
30use crate::vm::Vm;
31
32use self::uri::DispatchUri;
33use super::registry::{
34 binding_autonomy_budget_would_exceed, matching_bindings, note_autonomous_decision,
35 TriggerBinding, TriggerBudgetExhaustionStrategy, TriggerHandlerSpec,
36};
37use super::{
38 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
39 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
40 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_OUTBOX_TOPIC,
41};
42use circuits::{
43 destination_circuit_key, dlq_node_metadata, DestinationCircuitProbe,
44 DestinationCircuitRegistry, DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
45};
46use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
47use predicate_eval::predicate_node_metadata;
48
49mod circuits;
50mod flow_control;
51mod predicate_eval;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
58pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
59pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
60
61thread_local! {
62 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65 #[cfg(test)]
66 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70 static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75 pub trigger_event: TriggerEvent,
76 pub replay_of_event_id: Option<String>,
77 pub binding_id: String,
78 pub binding_version: u32,
79 pub agent_id: String,
80 pub action: String,
81 pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87 fn drop(&mut self) {
88 crate::orchestration::pop_execution_policy();
89 }
90}
91
92const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
93
94pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
95 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
96}
97
98pub(crate) fn current_dispatch_is_replay() -> bool {
99 ACTIVE_DISPATCH_IS_REPLAY
100 .try_with(|is_replay| *is_replay)
101 .unwrap_or(false)
102}
103
104pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
105 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
106}
107
108#[derive(Clone)]
109pub struct Dispatcher {
110 base_vm: Rc<Vm>,
111 event_log: Arc<AnyEventLog>,
112 cancel_tx: broadcast::Sender<()>,
113 state: Arc<DispatcherRuntimeState>,
114 metrics: Option<Arc<crate::MetricsRegistry>>,
115 a2a_client: Arc<dyn crate::a2a::A2aClient>,
116}
117
118#[derive(Debug)]
119struct DispatcherRuntimeState {
120 in_flight: AtomicU64,
121 retry_queue_depth: AtomicU64,
122 dlq: Mutex<Vec<DlqEntry>>,
123 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
124 shutting_down: std::sync::atomic::AtomicBool,
125 idle_notify: Notify,
126 flow_control: FlowControlManager,
127 destination_circuits: DestinationCircuitRegistry,
128}
129
130impl DispatcherRuntimeState {
131 fn new(event_log: Arc<AnyEventLog>) -> Self {
132 Self {
133 in_flight: AtomicU64::new(0),
134 retry_queue_depth: AtomicU64::new(0),
135 dlq: Mutex::new(Vec::new()),
136 cancel_tokens: Mutex::new(Vec::new()),
137 shutting_down: std::sync::atomic::AtomicBool::new(false),
138 idle_notify: Notify::new(),
139 flow_control: FlowControlManager::new(event_log),
140 destination_circuits: DestinationCircuitRegistry::default(),
141 }
142 }
143}
144
145#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
146#[serde(default)]
147pub struct DispatcherStatsSnapshot {
148 pub in_flight: u64,
149 pub retry_queue_depth: u64,
150 pub dlq_depth: u64,
151}
152
153#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
154#[serde(rename_all = "snake_case")]
155pub enum DispatchStatus {
156 Succeeded,
157 Failed,
158 Dlq,
159 Skipped,
160 Waiting,
161 Cancelled,
162}
163
164impl DispatchStatus {
165 pub fn as_str(&self) -> &'static str {
166 match self {
167 Self::Succeeded => "succeeded",
168 Self::Failed => "failed",
169 Self::Dlq => "dlq",
170 Self::Skipped => "skipped",
171 Self::Waiting => "waiting",
172 Self::Cancelled => "cancelled",
173 }
174 }
175}
176
177#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
178#[serde(default)]
179pub struct DispatchOutcome {
180 pub trigger_id: String,
181 pub binding_key: String,
182 pub event_id: String,
183 pub attempt_count: u32,
184 pub status: DispatchStatus,
185 pub handler_kind: String,
186 pub target_uri: String,
187 pub replay_of_event_id: Option<String>,
188 pub result: Option<serde_json::Value>,
189 pub error: Option<String>,
190}
191
192#[derive(Clone, Debug, Serialize, Deserialize)]
193pub struct InboxEnvelope {
194 pub trigger_id: Option<String>,
195 pub binding_version: Option<u32>,
196 pub event: TriggerEvent,
197}
198
199#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
200#[serde(default)]
201pub struct DispatcherDrainReport {
202 pub drained: bool,
203 pub in_flight: u64,
204 pub retry_queue_depth: u64,
205 pub dlq_depth: u64,
206}
207
208impl Default for DispatchOutcome {
209 fn default() -> Self {
210 Self {
211 trigger_id: String::new(),
212 binding_key: String::new(),
213 event_id: String::new(),
214 attempt_count: 0,
215 status: DispatchStatus::Failed,
216 handler_kind: String::new(),
217 target_uri: String::new(),
218 replay_of_event_id: None,
219 result: None,
220 error: None,
221 }
222 }
223}
224
225#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
226#[serde(default)]
227pub struct DispatchAttemptRecord {
228 pub trigger_id: String,
229 pub binding_key: String,
230 pub event_id: String,
231 pub attempt: u32,
232 pub handler_kind: String,
233 pub started_at: String,
234 pub completed_at: String,
235 pub outcome: String,
236 pub error_msg: Option<String>,
237}
238
239#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
240pub struct DispatchCancelRequest {
241 pub binding_key: String,
242 pub event_id: String,
243 #[serde(with = "time::serde::rfc3339")]
244 pub requested_at: time::OffsetDateTime,
245 #[serde(default)]
246 pub requested_by: Option<String>,
247 #[serde(default)]
248 pub audit_id: Option<String>,
249}
250
251#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
252pub struct DlqEntry {
253 pub trigger_id: String,
254 pub binding_key: String,
255 pub event: TriggerEvent,
256 pub attempt_count: u32,
257 pub final_error: String,
258 #[serde(default = "default_dlq_error_class")]
259 pub error_class: String,
260 pub attempts: Vec<DispatchAttemptRecord>,
261}
262
263fn default_dlq_error_class() -> String {
264 "unknown".to_string()
265}
266
267#[derive(Clone, Debug)]
268struct SingletonLease {
269 gate: String,
270 held: bool,
271}
272
273#[derive(Clone, Debug)]
274struct ConcurrencyLease {
275 gate: String,
276 max: u32,
277 priority_rank: usize,
278 permit: Option<ConcurrencyPermit>,
279}
280
281#[derive(Default, Debug)]
282struct AcquiredFlowControl {
283 singleton: Option<SingletonLease>,
284 concurrency: Option<ConcurrencyLease>,
285}
286
287#[derive(Clone)]
288pub(crate) struct DispatchWaitLease {
289 state: Arc<DispatcherRuntimeState>,
290 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
291 suspended: Arc<AtomicBool>,
292}
293
294impl DispatchWaitLease {
295 fn new(
296 state: Arc<DispatcherRuntimeState>,
297 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
298 ) -> Self {
299 Self {
300 state,
301 acquired,
302 suspended: Arc::new(AtomicBool::new(false)),
303 }
304 }
305
306 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
307 if self.suspended.swap(true, Ordering::SeqCst) {
308 return Ok(());
309 }
310 let (singleton_gate, concurrency_permit) = {
311 let mut acquired = self.acquired.lock().await;
312 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
313 if lease.held {
314 lease.held = false;
315 Some(lease.gate.clone())
316 } else {
317 None
318 }
319 });
320 let concurrency_permit = acquired
321 .concurrency
322 .as_mut()
323 .and_then(|lease| lease.permit.take());
324 (singleton_gate, concurrency_permit)
325 };
326
327 if let Some(gate) = singleton_gate {
328 self.state
329 .flow_control
330 .release_singleton(&gate)
331 .await
332 .map_err(DispatchError::from)?;
333 }
334 if let Some(permit) = concurrency_permit {
335 self.state
336 .flow_control
337 .release_concurrency(permit)
338 .await
339 .map_err(DispatchError::from)?;
340 }
341 Ok(())
342 }
343
344 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
345 if !self.suspended.swap(false, Ordering::SeqCst) {
346 return Ok(());
347 }
348
349 let singleton_gate = {
350 let acquired = self.acquired.lock().await;
351 acquired.singleton.as_ref().and_then(|lease| {
352 if lease.held {
353 None
354 } else {
355 Some(lease.gate.clone())
356 }
357 })
358 };
359 if let Some(gate) = singleton_gate {
360 self.state
361 .flow_control
362 .acquire_singleton(&gate)
363 .await
364 .map_err(DispatchError::from)?;
365 let mut acquired = self.acquired.lock().await;
366 if let Some(lease) = acquired.singleton.as_mut() {
367 lease.held = true;
368 }
369 }
370
371 let concurrency_spec = {
372 let acquired = self.acquired.lock().await;
373 acquired.concurrency.as_ref().and_then(|lease| {
374 if lease.permit.is_some() {
375 None
376 } else {
377 Some((lease.gate.clone(), lease.max, lease.priority_rank))
378 }
379 })
380 };
381 if let Some((gate, max, priority_rank)) = concurrency_spec {
382 let permit = self
383 .state
384 .flow_control
385 .acquire_concurrency(&gate, max, priority_rank)
386 .await
387 .map_err(DispatchError::from)?;
388 let mut acquired = self.acquired.lock().await;
389 if let Some(lease) = acquired.concurrency.as_mut() {
390 lease.permit = Some(permit);
391 }
392 }
393 Ok(())
394 }
395}
396
397enum FlowControlOutcome {
398 Dispatch {
399 event: Box<TriggerEvent>,
400 acquired: AcquiredFlowControl,
401 },
402 Skip {
403 reason: String,
404 },
405}
406
407#[derive(Clone, Debug)]
408enum DispatchSkipStage {
409 Predicate,
410 FlowControl,
411}
412
413#[derive(Debug)]
414pub enum DispatchError {
415 EventLog(String),
416 Registry(String),
417 Serde(String),
418 Local(String),
419 A2a(String),
420 Denied(String),
421 Timeout(String),
422 Waiting(String),
423 Cancelled(String),
424 NotImplemented(String),
425}
426
427impl std::fmt::Display for DispatchError {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 match self {
430 Self::EventLog(message)
431 | Self::Registry(message)
432 | Self::Serde(message)
433 | Self::Local(message)
434 | Self::A2a(message)
435 | Self::Denied(message)
436 | Self::Timeout(message)
437 | Self::Waiting(message)
438 | Self::Cancelled(message)
439 | Self::NotImplemented(message) => f.write_str(message),
440 }
441 }
442}
443
444impl std::error::Error for DispatchError {}
445
446impl DispatchError {
447 fn retryable(&self) -> bool {
448 !matches!(
449 self,
450 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
451 )
452 }
453}
454
455impl DispatchSkipStage {
456 fn as_str(&self) -> &'static str {
457 match self {
458 Self::Predicate => "predicate",
459 Self::FlowControl => "flow_control",
460 }
461 }
462}
463
464impl From<LogError> for DispatchError {
465 fn from(value: LogError) -> Self {
466 Self::EventLog(value.to_string())
467 }
468}
469
470pub async fn append_dispatch_cancel_request(
471 event_log: &Arc<AnyEventLog>,
472 request: &DispatchCancelRequest,
473) -> Result<u64, DispatchError> {
474 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
475 .expect("static trigger cancel topic should always be valid");
476 event_log
477 .append(
478 &topic,
479 LogEvent::new(
480 "dispatch_cancel_requested",
481 serde_json::to_value(request)
482 .map_err(|error| DispatchError::Serde(error.to_string()))?,
483 ),
484 )
485 .await
486 .map_err(DispatchError::from)
487}
488
489impl Dispatcher {
490 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
491 self.event_log.clone()
492 }
493
494 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
495 let event_log = active_event_log().ok_or_else(|| {
496 DispatchError::EventLog("dispatcher requires an active event log".to_string())
497 })?;
498 Ok(Self::with_event_log(base_vm, event_log))
499 }
500
501 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
502 Self::with_event_log_and_metrics(base_vm, event_log, None)
503 }
504
505 pub fn with_event_log_and_metrics(
506 base_vm: Vm,
507 event_log: Arc<AnyEventLog>,
508 metrics: Option<Arc<crate::MetricsRegistry>>,
509 ) -> Self {
510 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
511 ACTIVE_DISPATCHER_STATE.with(|slot| {
512 *slot.borrow_mut() = Some(state.clone());
513 });
514 let (cancel_tx, _) = broadcast::channel(32);
515 Self {
516 base_vm: Rc::new(base_vm),
517 event_log,
518 cancel_tx,
519 state,
520 metrics,
521 a2a_client: Arc::new(crate::a2a::RealA2aClient),
522 }
523 }
524
525 #[cfg(test)]
526 pub fn with_a2a_client(mut self, client: Arc<dyn crate::a2a::A2aClient>) -> Self {
527 self.a2a_client = client;
528 self
529 }
530
531 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
532 DispatcherStatsSnapshot {
533 in_flight: self.state.in_flight.load(Ordering::Relaxed),
534 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
535 dlq_depth: self
536 .state
537 .dlq
538 .lock()
539 .expect("dispatcher dlq poisoned")
540 .len() as u64,
541 }
542 }
543
544 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
545 self.state
546 .dlq
547 .lock()
548 .expect("dispatcher dlq poisoned")
549 .clone()
550 }
551
552 pub fn shutdown(&self) {
553 self.state.shutting_down.store(true, Ordering::SeqCst);
554 for token in self
555 .state
556 .cancel_tokens
557 .lock()
558 .expect("dispatcher cancel tokens poisoned")
559 .iter()
560 {
561 token.store(true, Ordering::SeqCst);
562 }
563 let _ = self.cancel_tx.send(());
564 }
565
566 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
567 self.enqueue_targeted(None, None, event).await
568 }
569
570 pub async fn enqueue_targeted(
571 &self,
572 trigger_id: Option<String>,
573 binding_version: Option<u32>,
574 event: TriggerEvent,
575 ) -> Result<u64, DispatchError> {
576 self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
577 .await
578 }
579
580 pub async fn enqueue_targeted_with_headers(
581 &self,
582 trigger_id: Option<String>,
583 binding_version: Option<u32>,
584 event: TriggerEvent,
585 parent_headers: Option<&BTreeMap<String, String>>,
586 ) -> Result<u64, DispatchError> {
587 let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
588 let trigger_id_for_metrics = trigger_id.clone();
589 let mut headers = parent_headers.cloned().unwrap_or_default();
590 headers.extend(event_headers(&event, None, None, None));
591 if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
592 headers.insert("trigger_id".to_string(), trigger_id.clone());
593 headers.insert(
594 "binding_key".to_string(),
595 binding_key_from_parts(trigger_id, binding_version),
596 );
597 }
598 headers
599 .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
600 .or_insert_with(|| unix_ms(event.received_at).to_string());
601 let payload = serde_json::to_value(InboxEnvelope {
602 trigger_id,
603 binding_version,
604 event: event.clone(),
605 })
606 .map_err(|error| DispatchError::Serde(error.to_string()))?;
607 let mut log_event = LogEvent::new("event_ingested", payload);
608 let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
609 let queue_appended_at_ms = headers
610 .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
611 .and_then(|value| value.parse::<i64>().ok())
612 .unwrap_or(log_event.occurred_at_ms);
613 headers
614 .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
615 .or_insert_with(|| log_event.occurred_at_ms.to_string());
616 if let (Some(metrics), Some(trigger_id)) =
617 (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
618 {
619 let binding_key = binding_key_from_parts(trigger_id, binding_version);
620 let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
621 if !had_queue_appended_at {
622 metrics.record_trigger_accepted_to_queue_append(
623 trigger_id,
624 &binding_key,
625 event.provider.as_str(),
626 tenant_id(&event),
627 "queued",
628 duration_between_ms(queue_appended_at_ms, accepted_at_ms),
629 );
630 }
631 metrics.note_trigger_pending_event(
632 event.id.0.as_str(),
633 trigger_id,
634 &binding_key,
635 event.provider.as_str(),
636 tenant_id(&event),
637 accepted_at_ms,
638 queue_appended_at_ms,
639 );
640 }
641 log_event.headers = headers;
642 self.event_log
643 .append(&topic, log_event)
644 .await
645 .map_err(DispatchError::from)
646 }
647
648 pub async fn run(&self) -> Result<(), DispatchError> {
649 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
650 .expect("static trigger inbox envelopes topic is valid");
651 let start_from = self.event_log.latest(&topic).await?;
652 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
653 pin_mut!(stream);
654 let mut cancel_rx = self.cancel_tx.subscribe();
655
656 loop {
657 tokio::select! {
658 received = stream.next() => {
659 let Some(received) = received else {
660 break;
661 };
662 let (_, event) = received.map_err(DispatchError::from)?;
663 if event.kind != "event_ingested" {
664 continue;
665 }
666 let parent_headers = event.headers.clone();
667 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
668 .map_err(|error| DispatchError::Serde(error.to_string()))?;
669 notify_test_inbox_dequeued();
670 let _ = self
671 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
672 .await;
673 }
674 _ = recv_cancel(&mut cancel_rx) => break,
675 }
676 }
677
678 Ok(())
679 }
680
681 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
682 let deadline = tokio::time::Instant::now() + timeout;
683 loop {
684 let snapshot = self.snapshot();
685 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
686 return Ok(DispatcherDrainReport {
687 drained: true,
688 in_flight: snapshot.in_flight,
689 retry_queue_depth: snapshot.retry_queue_depth,
690 dlq_depth: snapshot.dlq_depth,
691 });
692 }
693
694 let notified = self.state.idle_notify.notified();
695 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
696 if remaining.is_zero() {
697 return Ok(DispatcherDrainReport {
698 drained: false,
699 in_flight: snapshot.in_flight,
700 retry_queue_depth: snapshot.retry_queue_depth,
701 dlq_depth: snapshot.dlq_depth,
702 });
703 }
704 if tokio::time::timeout(remaining, notified).await.is_err() {
705 let snapshot = self.snapshot();
706 return Ok(DispatcherDrainReport {
707 drained: false,
708 in_flight: snapshot.in_flight,
709 retry_queue_depth: snapshot.retry_queue_depth,
710 dlq_depth: snapshot.dlq_depth,
711 });
712 }
713 }
714 }
715
716 pub async fn dispatch_inbox_envelope(
717 &self,
718 envelope: InboxEnvelope,
719 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
720 self.dispatch_inbox_envelope_with_headers(envelope, None)
721 .await
722 }
723
724 pub async fn dispatch_inbox_envelope_with_parent_headers(
725 &self,
726 envelope: InboxEnvelope,
727 parent_headers: &BTreeMap<String, String>,
728 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
729 self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
730 .await
731 }
732
733 async fn dispatch_inbox_envelope_with_headers(
734 &self,
735 envelope: InboxEnvelope,
736 parent_headers: Option<&BTreeMap<String, String>>,
737 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
738 if let Some(trigger_id) = envelope.trigger_id {
739 let binding = super::registry::resolve_live_trigger_binding(
740 &trigger_id,
741 envelope.binding_version,
742 )
743 .map_err(|error| DispatchError::Registry(error.to_string()))?;
744 return Ok(vec![
745 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
746 .await?,
747 ]);
748 }
749
750 let cron_target = match &envelope.event.provider_payload {
751 crate::triggers::ProviderPayload::Known(
752 crate::triggers::event::KnownProviderPayload::Cron(payload),
753 ) => payload.cron_id.clone(),
754 _ => None,
755 };
756 if let Some(trigger_id) = cron_target {
757 let binding = super::registry::resolve_live_trigger_binding(
758 &trigger_id,
759 envelope.binding_version,
760 )
761 .map_err(|error| DispatchError::Registry(error.to_string()))?;
762 return Ok(vec![
763 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
764 .await?,
765 ]);
766 }
767
768 self.dispatch_event_with_headers(envelope.event, parent_headers)
769 .await
770 }
771
772 pub async fn dispatch_event(
773 &self,
774 event: TriggerEvent,
775 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
776 self.dispatch_event_with_headers(event, None).await
777 }
778
779 async fn dispatch_event_with_headers(
780 &self,
781 event: TriggerEvent,
782 parent_headers: Option<&BTreeMap<String, String>>,
783 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
784 let bindings = matching_bindings(&event);
785 let mut outcomes = Vec::new();
786 for binding in bindings {
787 outcomes.push(
788 self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
789 .await?,
790 );
791 }
792 Ok(outcomes)
793 }
794
795 pub async fn dispatch(
796 &self,
797 binding: &TriggerBinding,
798 event: TriggerEvent,
799 ) -> Result<DispatchOutcome, DispatchError> {
800 self.dispatch_with_replay(binding, event, None, None, None)
801 .await
802 }
803
804 pub async fn dispatch_replay(
805 &self,
806 binding: &TriggerBinding,
807 event: TriggerEvent,
808 replay_of_event_id: String,
809 ) -> Result<DispatchOutcome, DispatchError> {
810 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
811 .await
812 }
813
814 pub async fn dispatch_with_parent_span_id(
815 &self,
816 binding: &TriggerBinding,
817 event: TriggerEvent,
818 parent_span_id: Option<String>,
819 ) -> Result<DispatchOutcome, DispatchError> {
820 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
821 .await
822 }
823
824 async fn dispatch_with_replay(
825 &self,
826 binding: &TriggerBinding,
827 event: TriggerEvent,
828 replay_of_event_id: Option<String>,
829 parent_span_id: Option<String>,
830 parent_headers: Option<&BTreeMap<String, String>>,
831 ) -> Result<DispatchOutcome, DispatchError> {
832 let parent_headers_for_metrics = parent_headers.cloned();
833 let admitted_at_ms = current_unix_ms();
834 if let Some(metrics) = self.metrics.as_ref() {
835 let binding_key = binding.binding_key();
836 let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
837 metrics.record_trigger_queue_age_at_dispatch_admission(
838 binding.id.as_str(),
839 &binding_key,
840 event.provider.as_str(),
841 tenant_id(&event),
842 "admitted",
843 duration_between_ms(admitted_at_ms, queue_appended_at_ms),
844 );
845 metrics.clear_trigger_pending_event(
846 event.id.0.as_str(),
847 binding.id.as_str(),
848 &binding_key,
849 event.provider.as_str(),
850 tenant_id(&event),
851 admitted_at_ms,
852 );
853 }
854 let span = tracing::info_span!(
855 "dispatch",
856 trigger_id = %binding.id.as_str(),
857 binding_version = binding.version,
858 trace_id = %event.trace_id.0
859 );
860 #[cfg(feature = "otel")]
861 let span_for_otel = span.clone();
862 let _ = if let Some(headers) = parent_headers {
863 crate::observability::otel::set_span_parent_from_headers(
864 &span,
865 headers,
866 &event.trace_id,
867 parent_span_id.as_deref(),
868 )
869 } else {
870 crate::observability::otel::set_span_parent(
871 &span,
872 &event.trace_id,
873 parent_span_id.as_deref(),
874 )
875 };
876 #[cfg(feature = "otel")]
877 let started_at = Instant::now();
878 let metrics = self.metrics.clone();
879 let outcome = ACTIVE_DISPATCH_IS_REPLAY
880 .scope(
881 replay_of_event_id.is_some(),
882 self.dispatch_with_replay_inner(
883 binding,
884 event,
885 replay_of_event_id,
886 parent_headers_for_metrics,
887 )
888 .instrument(span),
889 )
890 .await;
891 if let Some(metrics) = metrics.as_ref() {
892 match &outcome {
893 Ok(dispatch_outcome) => match dispatch_outcome.status {
894 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
895 metrics.record_dispatch_succeeded();
896 }
897 DispatchStatus::Waiting => {}
898 _ => metrics.record_dispatch_failed(),
899 },
900 Err(_) => metrics.record_dispatch_failed(),
901 }
902 let outcome_label = match &outcome {
903 Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
904 Err(DispatchError::Cancelled(_)) => "cancelled",
905 Err(_) => "failed",
906 };
907 metrics.record_trigger_dispatched(
908 binding.id.as_str(),
909 binding.handler.kind(),
910 outcome_label,
911 );
912 metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
913 }
914 #[cfg(feature = "otel")]
915 {
916 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
917
918 let duration_ms = started_at.elapsed().as_millis() as i64;
919 let status = match &outcome {
920 Ok(dispatch_outcome) => match dispatch_outcome.status {
921 DispatchStatus::Succeeded => "succeeded",
922 DispatchStatus::Skipped => "skipped",
923 DispatchStatus::Waiting => "waiting",
924 DispatchStatus::Cancelled => "cancelled",
925 DispatchStatus::Failed => "failed",
926 DispatchStatus::Dlq => "dlq",
927 },
928 Err(DispatchError::Cancelled(_)) => "cancelled",
929 Err(_) => "failed",
930 };
931 span_for_otel.set_attribute("result.status", status);
932 span_for_otel.set_attribute("result.duration_ms", duration_ms);
933 }
934 outcome
935 }
936
937 async fn dispatch_with_replay_inner(
938 &self,
939 binding: &TriggerBinding,
940 event: TriggerEvent,
941 replay_of_event_id: Option<String>,
942 parent_headers: Option<BTreeMap<String, String>>,
943 ) -> Result<DispatchOutcome, DispatchError> {
944 let autonomy_tier = crate::resolve_agent_autonomy_tier(
945 &self.event_log,
946 binding.id.as_str(),
947 binding.autonomy_tier,
948 )
949 .await
950 .unwrap_or(binding.autonomy_tier);
951 let binding_key = binding.binding_key();
952 let route = DispatchUri::from(&binding.handler);
953 let trigger_id = binding.id.as_str().to_string();
954 let event_id = event.id.0.clone();
955 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
956 let begin = if replay_of_event_id.is_some() {
957 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
958 } else {
959 begin_in_flight(binding.id.as_str(), binding.version)
960 };
961 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
962
963 let mut attempts = Vec::new();
964 let mut source_node_id = format!("trigger:{}", event.id.0);
965 let mut initial_nodes = Vec::new();
966 let mut initial_edges = Vec::new();
967 if let Some(original_event_id) = replay_of_event_id.as_ref() {
968 let original_node_id = format!("trigger:{original_event_id}");
969 initial_nodes.push(RunActionGraphNodeRecord {
970 id: original_node_id.clone(),
971 label: format!(
972 "{}:{} (original {})",
973 event.provider.as_str(),
974 event.kind,
975 original_event_id
976 ),
977 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
978 status: "historical".to_string(),
979 outcome: "replayed_from".to_string(),
980 trace_id: Some(event.trace_id.0.clone()),
981 stage_id: None,
982 node_id: None,
983 worker_id: None,
984 run_id: None,
985 run_path: None,
986 metadata: trigger_node_metadata(&event),
987 });
988 initial_edges.push(RunActionGraphEdgeRecord {
989 from_id: original_node_id,
990 to_id: source_node_id.clone(),
991 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
992 label: Some("replay chain".to_string()),
993 });
994 }
995 initial_nodes.push(RunActionGraphNodeRecord {
996 id: source_node_id.clone(),
997 label: format!("{}:{}", event.provider.as_str(), event.kind),
998 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
999 status: "received".to_string(),
1000 outcome: "received".to_string(),
1001 trace_id: Some(event.trace_id.0.clone()),
1002 stage_id: None,
1003 node_id: None,
1004 worker_id: None,
1005 run_id: None,
1006 run_path: None,
1007 metadata: trigger_node_metadata(&event),
1008 });
1009 self.emit_action_graph(
1010 &event,
1011 initial_nodes,
1012 initial_edges,
1013 serde_json::json!({
1014 "source": "dispatcher",
1015 "trigger_id": trigger_id,
1016 "binding_key": binding_key,
1017 "event_id": event_id,
1018 "replay_of_event_id": replay_of_event_id,
1019 }),
1020 )
1021 .await?;
1022
1023 if dispatch_cancel_requested(
1024 &self.event_log,
1025 &binding_key,
1026 &event.id.0,
1027 replay_of_event_id.as_ref(),
1028 )
1029 .await?
1030 {
1031 finish_in_flight(
1032 binding.id.as_str(),
1033 binding.version,
1034 TriggerDispatchOutcome::Failed,
1035 )
1036 .await
1037 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1038 decrement_in_flight(&self.state);
1039 return Ok(cancelled_dispatch_outcome(
1040 binding,
1041 &route,
1042 &event,
1043 replay_of_event_id,
1044 0,
1045 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1046 ));
1047 }
1048
1049 if let Some(predicate) = binding.when.as_ref() {
1050 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1051 let evaluation = self
1052 .evaluate_predicate(
1053 binding,
1054 predicate,
1055 &event,
1056 replay_of_event_id.as_ref(),
1057 autonomy_tier,
1058 )
1059 .await?;
1060 let passed = evaluation.result;
1061 self.emit_action_graph(
1062 &event,
1063 vec![RunActionGraphNodeRecord {
1064 id: predicate_node_id.clone(),
1065 label: predicate.raw.clone(),
1066 kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1067 status: "completed".to_string(),
1068 outcome: passed.to_string(),
1069 trace_id: Some(event.trace_id.0.clone()),
1070 stage_id: None,
1071 node_id: None,
1072 worker_id: None,
1073 run_id: None,
1074 run_path: None,
1075 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1076 }],
1077 vec![RunActionGraphEdgeRecord {
1078 from_id: source_node_id.clone(),
1079 to_id: predicate_node_id.clone(),
1080 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1081 label: None,
1082 }],
1083 serde_json::json!({
1084 "source": "dispatcher",
1085 "trigger_id": binding.id.as_str(),
1086 "binding_key": binding.binding_key(),
1087 "event_id": event.id.0,
1088 "predicate": predicate.raw,
1089 "reason": evaluation.reason,
1090 "cached": evaluation.cached,
1091 "cost_usd": evaluation.cost_usd,
1092 "tokens": evaluation.tokens,
1093 "latency_ms": evaluation.latency_ms,
1094 "replay_of_event_id": replay_of_event_id,
1095 }),
1096 )
1097 .await?;
1098
1099 if !passed {
1100 if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1101 let final_error = format!(
1102 "trigger budget exhausted: {}",
1103 evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1104 );
1105 self.move_budget_exhausted_to_dlq(
1106 binding,
1107 &route,
1108 &event,
1109 replay_of_event_id.as_ref(),
1110 &final_error,
1111 )
1112 .await?;
1113 finish_in_flight(
1114 binding.id.as_str(),
1115 binding.version,
1116 TriggerDispatchOutcome::Dlq,
1117 )
1118 .await
1119 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1120 decrement_in_flight(&self.state);
1121 self.append_dispatch_trust_record(
1122 binding,
1123 &route,
1124 &event,
1125 replay_of_event_id.as_ref(),
1126 autonomy_tier,
1127 TrustOutcome::Failure,
1128 "dlq",
1129 0,
1130 Some(final_error.clone()),
1131 )
1132 .await?;
1133 return Ok(DispatchOutcome {
1134 trigger_id: binding.id.as_str().to_string(),
1135 binding_key: binding.binding_key(),
1136 event_id: event.id.0,
1137 attempt_count: 0,
1138 status: DispatchStatus::Dlq,
1139 handler_kind: route.kind().to_string(),
1140 target_uri: route.target_uri(),
1141 replay_of_event_id,
1142 result: None,
1143 error: Some(final_error),
1144 });
1145 }
1146
1147 if evaluation.exhaustion_strategy
1148 == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1149 {
1150 self.append_budget_deferred_event(
1151 binding,
1152 &route,
1153 &event,
1154 replay_of_event_id.as_ref(),
1155 evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1156 )
1157 .await?;
1158 finish_in_flight(
1159 binding.id.as_str(),
1160 binding.version,
1161 TriggerDispatchOutcome::Dispatched,
1162 )
1163 .await
1164 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1165 decrement_in_flight(&self.state);
1166 self.append_dispatch_trust_record(
1167 binding,
1168 &route,
1169 &event,
1170 replay_of_event_id.as_ref(),
1171 autonomy_tier,
1172 TrustOutcome::Denied,
1173 "waiting",
1174 0,
1175 evaluation.reason.clone(),
1176 )
1177 .await?;
1178 return Ok(DispatchOutcome {
1179 trigger_id: binding.id.as_str().to_string(),
1180 binding_key: binding.binding_key(),
1181 event_id: event.id.0,
1182 attempt_count: 0,
1183 status: DispatchStatus::Waiting,
1184 handler_kind: route.kind().to_string(),
1185 target_uri: route.target_uri(),
1186 replay_of_event_id,
1187 result: Some(serde_json::json!({
1188 "deferred": true,
1189 "predicate": predicate.raw,
1190 "reason": evaluation.reason,
1191 })),
1192 error: None,
1193 });
1194 }
1195
1196 self.append_skipped_outbox_event(
1197 binding,
1198 &route,
1199 &event,
1200 replay_of_event_id.as_ref(),
1201 DispatchSkipStage::Predicate,
1202 serde_json::json!({
1203 "predicate": predicate.raw,
1204 "reason": evaluation.reason,
1205 }),
1206 )
1207 .await?;
1208 finish_in_flight(
1209 binding.id.as_str(),
1210 binding.version,
1211 TriggerDispatchOutcome::Dispatched,
1212 )
1213 .await
1214 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1215 decrement_in_flight(&self.state);
1216 self.append_dispatch_trust_record(
1217 binding,
1218 &route,
1219 &event,
1220 replay_of_event_id.as_ref(),
1221 autonomy_tier,
1222 TrustOutcome::Denied,
1223 "skipped",
1224 0,
1225 None,
1226 )
1227 .await?;
1228 return Ok(DispatchOutcome {
1229 trigger_id: binding.id.as_str().to_string(),
1230 binding_key: binding.binding_key(),
1231 event_id: event.id.0,
1232 attempt_count: 0,
1233 status: DispatchStatus::Skipped,
1234 handler_kind: route.kind().to_string(),
1235 target_uri: route.target_uri(),
1236 replay_of_event_id,
1237 result: Some(serde_json::json!({
1238 "skipped": true,
1239 "predicate": predicate.raw,
1240 "reason": evaluation.reason,
1241 })),
1242 error: None,
1243 });
1244 }
1245
1246 source_node_id = predicate_node_id;
1247 }
1248
1249 if autonomy_tier == AutonomyTier::ActAuto {
1250 if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1251 let request_id = self
1252 .append_autonomy_budget_approval_request(
1253 binding,
1254 &route,
1255 &event,
1256 replay_of_event_id.as_ref(),
1257 reason,
1258 )
1259 .await?;
1260 self.emit_autonomy_budget_approval_action_graph(
1261 binding,
1262 &route,
1263 &event,
1264 &source_node_id,
1265 replay_of_event_id.as_ref(),
1266 reason,
1267 &request_id,
1268 )
1269 .await?;
1270 finish_in_flight(
1271 binding.id.as_str(),
1272 binding.version,
1273 TriggerDispatchOutcome::Dispatched,
1274 )
1275 .await
1276 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1277 decrement_in_flight(&self.state);
1278 self.append_tier_transition_trust_record(
1279 binding,
1280 &event,
1281 replay_of_event_id.as_ref(),
1282 autonomy_tier,
1283 AutonomyTier::ActWithApproval,
1284 reason,
1285 &request_id,
1286 )
1287 .await?;
1288 self.append_dispatch_trust_record(
1289 binding,
1290 &route,
1291 &event,
1292 replay_of_event_id.as_ref(),
1293 autonomy_tier,
1294 TrustOutcome::Denied,
1295 "waiting",
1296 0,
1297 Some(reason.to_string()),
1298 )
1299 .await?;
1300 return Ok(DispatchOutcome {
1301 trigger_id: binding.id.as_str().to_string(),
1302 binding_key: binding.binding_key(),
1303 event_id: event.id.0,
1304 attempt_count: 0,
1305 status: DispatchStatus::Waiting,
1306 handler_kind: route.kind().to_string(),
1307 target_uri: route.target_uri(),
1308 replay_of_event_id,
1309 result: Some(serde_json::json!({
1310 "approval_required": true,
1311 "request_id": request_id,
1312 "reason": reason,
1313 "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1314 })),
1315 error: None,
1316 });
1317 }
1318 note_autonomous_decision(binding);
1319 }
1320
1321 let (event, acquired_flow) = match self
1322 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1323 .await?
1324 {
1325 FlowControlOutcome::Dispatch { event, acquired } => {
1326 (*event, Arc::new(AsyncMutex::new(acquired)))
1327 }
1328 FlowControlOutcome::Skip { reason } => {
1329 self.append_skipped_outbox_event(
1330 binding,
1331 &route,
1332 &event,
1333 replay_of_event_id.as_ref(),
1334 DispatchSkipStage::FlowControl,
1335 serde_json::json!({
1336 "flow_control": reason,
1337 }),
1338 )
1339 .await?;
1340 finish_in_flight(
1341 binding.id.as_str(),
1342 binding.version,
1343 TriggerDispatchOutcome::Dispatched,
1344 )
1345 .await
1346 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1347 decrement_in_flight(&self.state);
1348 return Ok(DispatchOutcome {
1349 trigger_id: binding.id.as_str().to_string(),
1350 binding_key: binding.binding_key(),
1351 event_id: event.id.0,
1352 attempt_count: 0,
1353 status: DispatchStatus::Skipped,
1354 handler_kind: route.kind().to_string(),
1355 target_uri: route.target_uri(),
1356 replay_of_event_id,
1357 result: Some(serde_json::json!({
1358 "skipped": true,
1359 "flow_control": reason,
1360 })),
1361 error: None,
1362 });
1363 }
1364 };
1365
1366 let destination_key = destination_circuit_key(&route);
1367 let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1368 DestinationCircuitProbe::Allow { half_open } => {
1369 if half_open {
1370 if let Some(metrics) = self.metrics.as_ref() {
1371 metrics.record_backpressure_event("circuit", "half_open_probe");
1372 }
1373 }
1374 half_open
1375 }
1376 DestinationCircuitProbe::Block { retry_after } => {
1377 if let Some(metrics) = self.metrics.as_ref() {
1378 metrics.record_backpressure_event("circuit", "fail_fast");
1379 }
1380 let final_error = format!(
1381 "destination circuit open for {}; retry after {}s",
1382 destination_key,
1383 retry_after.as_secs().max(1)
1384 );
1385 self.move_circuit_open_to_dlq(
1386 binding,
1387 &route,
1388 &event,
1389 replay_of_event_id.as_ref(),
1390 &final_error,
1391 &destination_key,
1392 )
1393 .await?;
1394 finish_in_flight(
1395 binding.id.as_str(),
1396 binding.version,
1397 TriggerDispatchOutcome::Dlq,
1398 )
1399 .await
1400 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1401 self.release_flow_control(&acquired_flow).await?;
1402 decrement_in_flight(&self.state);
1403 self.append_dispatch_trust_record(
1404 binding,
1405 &route,
1406 &event,
1407 replay_of_event_id.as_ref(),
1408 autonomy_tier,
1409 TrustOutcome::Failure,
1410 "dlq",
1411 0,
1412 Some(final_error.clone()),
1413 )
1414 .await?;
1415 return Ok(DispatchOutcome {
1416 trigger_id: binding.id.as_str().to_string(),
1417 binding_key: binding.binding_key(),
1418 event_id: event.id.0,
1419 attempt_count: 0,
1420 status: DispatchStatus::Dlq,
1421 handler_kind: route.kind().to_string(),
1422 target_uri: route.target_uri(),
1423 replay_of_event_id,
1424 result: None,
1425 error: Some(final_error),
1426 });
1427 }
1428 };
1429
1430 let mut previous_retry_node = None;
1431 let max_attempts = binding.retry.max_attempts();
1432 for attempt in 1..=max_attempts {
1433 if dispatch_cancel_requested(
1434 &self.event_log,
1435 &binding_key,
1436 &event.id.0,
1437 replay_of_event_id.as_ref(),
1438 )
1439 .await?
1440 {
1441 finish_in_flight(
1442 binding.id.as_str(),
1443 binding.version,
1444 TriggerDispatchOutcome::Failed,
1445 )
1446 .await
1447 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1448 decrement_in_flight(&self.state);
1449 return Ok(cancelled_dispatch_outcome(
1450 binding,
1451 &route,
1452 &event,
1453 replay_of_event_id,
1454 attempt.saturating_sub(1),
1455 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1456 ));
1457 }
1458 maybe_fail_before_outbox();
1459 let attempt_started_instant = Instant::now();
1460 let attempt_started_at_ms = current_unix_ms();
1461 let queue_age_at_start = duration_between_ms(
1462 attempt_started_at_ms,
1463 queue_appended_at_ms(parent_headers.as_ref(), &event),
1464 );
1465 if attempt == 1 {
1466 if let Some(metrics) = self.metrics.as_ref() {
1467 metrics.record_trigger_queue_age_at_dispatch_start(
1468 binding.id.as_str(),
1469 &binding_key,
1470 event.provider.as_str(),
1471 tenant_id(&event),
1472 "started",
1473 queue_age_at_start,
1474 );
1475 }
1476 }
1477 tracing::info!(
1478 component = "dispatcher",
1479 lifecycle = "dispatch_started",
1480 trigger_id = %binding.id.as_str(),
1481 binding_key = %binding_key,
1482 event_id = %event.id.0,
1483 attempt,
1484 queue_age_ms = queue_age_at_start.as_millis(),
1485 trace_id = %event.trace_id.0
1486 );
1487 let started_at = now_rfc3339();
1488 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1489 self.append_lifecycle_event(
1490 "DispatchStarted",
1491 &event,
1492 binding,
1493 serde_json::json!({
1494 "event_id": event.id.0,
1495 "attempt": attempt,
1496 "handler_kind": route.kind(),
1497 "target_uri": route.target_uri(),
1498 "replay_of_event_id": replay_of_event_id,
1499 }),
1500 replay_of_event_id.as_ref(),
1501 )
1502 .await?;
1503 self.append_topic_event(
1504 TRIGGER_OUTBOX_TOPIC,
1505 "dispatch_started",
1506 &event,
1507 Some(binding),
1508 Some(attempt),
1509 serde_json::json!({
1510 "event_id": event.id.0,
1511 "attempt": attempt,
1512 "trigger_id": binding.id.as_str(),
1513 "binding_key": binding.binding_key(),
1514 "handler_kind": route.kind(),
1515 "target_uri": route.target_uri(),
1516 "replay_of_event_id": replay_of_event_id,
1517 }),
1518 replay_of_event_id.as_ref(),
1519 )
1520 .await?;
1521
1522 let mut dispatch_edges = Vec::new();
1523 if attempt == 1 {
1524 dispatch_edges.push(RunActionGraphEdgeRecord {
1525 from_id: source_node_id.clone(),
1526 to_id: attempt_node_id.clone(),
1527 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1528 label: binding.when.as_ref().map(|_| "true".to_string()),
1529 });
1530 } else if let Some(retry_node_id) = previous_retry_node.take() {
1531 dispatch_edges.push(RunActionGraphEdgeRecord {
1532 from_id: retry_node_id,
1533 to_id: attempt_node_id.clone(),
1534 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1535 label: Some(format!("attempt {attempt}")),
1536 });
1537 }
1538
1539 self.emit_action_graph(
1540 &event,
1541 vec![RunActionGraphNodeRecord {
1542 id: attempt_node_id.clone(),
1543 label: dispatch_node_label(&route),
1544 kind: dispatch_node_kind(&route).to_string(),
1545 status: "running".to_string(),
1546 outcome: format!("attempt_{attempt}"),
1547 trace_id: Some(event.trace_id.0.clone()),
1548 stage_id: None,
1549 node_id: None,
1550 worker_id: None,
1551 run_id: None,
1552 run_path: None,
1553 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1554 }],
1555 dispatch_edges,
1556 serde_json::json!({
1557 "source": "dispatcher",
1558 "trigger_id": binding.id.as_str(),
1559 "binding_key": binding.binding_key(),
1560 "event_id": event.id.0,
1561 "attempt": attempt,
1562 "handler_kind": route.kind(),
1563 "target_uri": route.target_uri(),
1564 "target_agent": dispatch_target_agent(&route),
1565 "replay_of_event_id": replay_of_event_id,
1566 }),
1567 )
1568 .await?;
1569
1570 let result = self
1571 .dispatch_once(
1572 binding,
1573 &route,
1574 &event,
1575 autonomy_tier,
1576 Some(DispatchWaitLease::new(
1577 self.state.clone(),
1578 acquired_flow.clone(),
1579 )),
1580 &mut self.cancel_tx.subscribe(),
1581 )
1582 .await;
1583 let attempt_runtime = attempt_started_instant.elapsed();
1584 let attempt_status = dispatch_result_status(&result);
1585 if let Some(metrics) = self.metrics.as_ref() {
1586 metrics.record_trigger_dispatch_runtime(
1587 binding.id.as_str(),
1588 &binding_key,
1589 event.provider.as_str(),
1590 tenant_id(&event),
1591 attempt_status,
1592 attempt_runtime,
1593 );
1594 }
1595 tracing::info!(
1596 component = "dispatcher",
1597 lifecycle = "handler_completed",
1598 trigger_id = %binding.id.as_str(),
1599 binding_key = %binding_key,
1600 event_id = %event.id.0,
1601 attempt,
1602 status = attempt_status,
1603 runtime_ms = attempt_runtime.as_millis(),
1604 trace_id = %event.trace_id.0
1605 );
1606 let completed_at = now_rfc3339();
1607
1608 match result {
1609 Ok(result) => {
1610 let attempt_record = DispatchAttemptRecord {
1611 trigger_id: binding.id.as_str().to_string(),
1612 binding_key: binding.binding_key(),
1613 event_id: event.id.0.clone(),
1614 attempt,
1615 handler_kind: route.kind().to_string(),
1616 started_at,
1617 completed_at,
1618 outcome: "success".to_string(),
1619 error_msg: None,
1620 };
1621 attempts.push(attempt_record.clone());
1622 self.append_attempt_record(
1623 &event,
1624 binding,
1625 &attempt_record,
1626 replay_of_event_id.as_ref(),
1627 )
1628 .await?;
1629 self.append_lifecycle_event(
1630 "DispatchSucceeded",
1631 &event,
1632 binding,
1633 serde_json::json!({
1634 "event_id": event.id.0,
1635 "attempt": attempt,
1636 "handler_kind": route.kind(),
1637 "target_uri": route.target_uri(),
1638 "result": result,
1639 "replay_of_event_id": replay_of_event_id,
1640 }),
1641 replay_of_event_id.as_ref(),
1642 )
1643 .await?;
1644 self.append_topic_event(
1645 TRIGGER_OUTBOX_TOPIC,
1646 "dispatch_succeeded",
1647 &event,
1648 Some(binding),
1649 Some(attempt),
1650 serde_json::json!({
1651 "event_id": event.id.0,
1652 "attempt": attempt,
1653 "trigger_id": binding.id.as_str(),
1654 "binding_key": binding.binding_key(),
1655 "handler_kind": route.kind(),
1656 "target_uri": route.target_uri(),
1657 "result": result,
1658 "replay_of_event_id": replay_of_event_id,
1659 }),
1660 replay_of_event_id.as_ref(),
1661 )
1662 .await?;
1663 self.emit_action_graph(
1664 &event,
1665 vec![RunActionGraphNodeRecord {
1666 id: attempt_node_id.clone(),
1667 label: dispatch_node_label(&route),
1668 kind: dispatch_node_kind(&route).to_string(),
1669 status: "completed".to_string(),
1670 outcome: dispatch_success_outcome(&route, &result).to_string(),
1671 trace_id: Some(event.trace_id.0.clone()),
1672 stage_id: None,
1673 node_id: None,
1674 worker_id: None,
1675 run_id: None,
1676 run_path: None,
1677 metadata: dispatch_success_metadata(
1678 &route, binding, &event, attempt, &result,
1679 ),
1680 }],
1681 Vec::new(),
1682 serde_json::json!({
1683 "source": "dispatcher",
1684 "trigger_id": binding.id.as_str(),
1685 "binding_key": binding.binding_key(),
1686 "event_id": event.id.0,
1687 "attempt": attempt,
1688 "handler_kind": route.kind(),
1689 "target_uri": route.target_uri(),
1690 "result": result,
1691 "replay_of_event_id": replay_of_event_id,
1692 }),
1693 )
1694 .await?;
1695 self.state
1696 .destination_circuits
1697 .record_success(&destination_key);
1698 if half_open_probe {
1699 if let Some(metrics) = self.metrics.as_ref() {
1700 metrics.record_backpressure_event("circuit", "closed");
1701 }
1702 }
1703 finish_in_flight(
1704 binding.id.as_str(),
1705 binding.version,
1706 TriggerDispatchOutcome::Dispatched,
1707 )
1708 .await
1709 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1710 self.release_flow_control(&acquired_flow).await?;
1711 decrement_in_flight(&self.state);
1712 self.append_dispatch_trust_record(
1713 binding,
1714 &route,
1715 &event,
1716 replay_of_event_id.as_ref(),
1717 autonomy_tier,
1718 TrustOutcome::Success,
1719 "succeeded",
1720 attempt,
1721 None,
1722 )
1723 .await?;
1724 return Ok(DispatchOutcome {
1725 trigger_id: binding.id.as_str().to_string(),
1726 binding_key: binding.binding_key(),
1727 event_id: event.id.0,
1728 attempt_count: attempt,
1729 status: DispatchStatus::Succeeded,
1730 handler_kind: route.kind().to_string(),
1731 target_uri: route.target_uri(),
1732 replay_of_event_id,
1733 result: Some(result),
1734 error: None,
1735 });
1736 }
1737 Err(error) => {
1738 let attempt_record = DispatchAttemptRecord {
1739 trigger_id: binding.id.as_str().to_string(),
1740 binding_key: binding.binding_key(),
1741 event_id: event.id.0.clone(),
1742 attempt,
1743 handler_kind: route.kind().to_string(),
1744 started_at,
1745 completed_at,
1746 outcome: dispatch_error_label(&error).to_string(),
1747 error_msg: Some(error.to_string()),
1748 };
1749 attempts.push(attempt_record.clone());
1750 self.append_attempt_record(
1751 &event,
1752 binding,
1753 &attempt_record,
1754 replay_of_event_id.as_ref(),
1755 )
1756 .await?;
1757 if let DispatchError::Waiting(message) = &error {
1758 self.append_lifecycle_event(
1759 "DispatchWaiting",
1760 &event,
1761 binding,
1762 serde_json::json!({
1763 "event_id": event.id.0,
1764 "attempt": attempt,
1765 "handler_kind": route.kind(),
1766 "target_uri": route.target_uri(),
1767 "message": message,
1768 "replay_of_event_id": replay_of_event_id,
1769 }),
1770 replay_of_event_id.as_ref(),
1771 )
1772 .await?;
1773 self.append_topic_event(
1774 TRIGGER_OUTBOX_TOPIC,
1775 "dispatch_waiting",
1776 &event,
1777 Some(binding),
1778 Some(attempt),
1779 serde_json::json!({
1780 "event_id": event.id.0,
1781 "attempt": attempt,
1782 "trigger_id": binding.id.as_str(),
1783 "binding_key": binding.binding_key(),
1784 "handler_kind": route.kind(),
1785 "target_uri": route.target_uri(),
1786 "message": message,
1787 "replay_of_event_id": replay_of_event_id,
1788 }),
1789 replay_of_event_id.as_ref(),
1790 )
1791 .await?;
1792 finish_in_flight(
1793 binding.id.as_str(),
1794 binding.version,
1795 TriggerDispatchOutcome::Dispatched,
1796 )
1797 .await
1798 .map_err(|registry_error| {
1799 DispatchError::Registry(registry_error.to_string())
1800 })?;
1801 self.release_flow_control(&acquired_flow).await?;
1802 decrement_in_flight(&self.state);
1803 return Ok(DispatchOutcome {
1804 trigger_id: binding.id.as_str().to_string(),
1805 binding_key: binding.binding_key(),
1806 event_id: event.id.0,
1807 attempt_count: attempt,
1808 status: DispatchStatus::Waiting,
1809 handler_kind: route.kind().to_string(),
1810 target_uri: route.target_uri(),
1811 replay_of_event_id,
1812 result: Some(serde_json::json!({
1813 "waiting": true,
1814 "message": message,
1815 })),
1816 error: None,
1817 });
1818 }
1819
1820 self.append_lifecycle_event(
1821 "DispatchFailed",
1822 &event,
1823 binding,
1824 serde_json::json!({
1825 "event_id": event.id.0,
1826 "attempt": attempt,
1827 "handler_kind": route.kind(),
1828 "target_uri": route.target_uri(),
1829 "error": error.to_string(),
1830 "replay_of_event_id": replay_of_event_id,
1831 }),
1832 replay_of_event_id.as_ref(),
1833 )
1834 .await?;
1835 self.append_topic_event(
1836 TRIGGER_OUTBOX_TOPIC,
1837 "dispatch_failed",
1838 &event,
1839 Some(binding),
1840 Some(attempt),
1841 serde_json::json!({
1842 "event_id": event.id.0,
1843 "attempt": attempt,
1844 "trigger_id": binding.id.as_str(),
1845 "binding_key": binding.binding_key(),
1846 "handler_kind": route.kind(),
1847 "target_uri": route.target_uri(),
1848 "error": error.to_string(),
1849 "replay_of_event_id": replay_of_event_id,
1850 }),
1851 replay_of_event_id.as_ref(),
1852 )
1853 .await?;
1854 self.emit_action_graph(
1855 &event,
1856 vec![RunActionGraphNodeRecord {
1857 id: attempt_node_id.clone(),
1858 label: dispatch_node_label(&route),
1859 kind: dispatch_node_kind(&route).to_string(),
1860 status: if matches!(error, DispatchError::Cancelled(_)) {
1861 "cancelled".to_string()
1862 } else {
1863 "failed".to_string()
1864 },
1865 outcome: dispatch_error_label(&error).to_string(),
1866 trace_id: Some(event.trace_id.0.clone()),
1867 stage_id: None,
1868 node_id: None,
1869 worker_id: None,
1870 run_id: None,
1871 run_path: None,
1872 metadata: dispatch_error_metadata(
1873 &route, binding, &event, attempt, &error,
1874 ),
1875 }],
1876 Vec::new(),
1877 serde_json::json!({
1878 "source": "dispatcher",
1879 "trigger_id": binding.id.as_str(),
1880 "binding_key": binding.binding_key(),
1881 "event_id": event.id.0,
1882 "attempt": attempt,
1883 "handler_kind": route.kind(),
1884 "target_uri": route.target_uri(),
1885 "error": error.to_string(),
1886 "replay_of_event_id": replay_of_event_id,
1887 }),
1888 )
1889 .await?;
1890
1891 let circuit_opened = if error.retryable() {
1892 self.state
1893 .destination_circuits
1894 .record_failure(&destination_key)
1895 } else {
1896 false
1897 };
1898 if circuit_opened {
1899 if let Some(metrics) = self.metrics.as_ref() {
1900 metrics.record_backpressure_event("circuit", "opened");
1901 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1902 metrics.record_trigger_accepted_to_dlq(
1903 binding.id.as_str(),
1904 &binding_key,
1905 event.provider.as_str(),
1906 tenant_id(&event),
1907 "circuit_open",
1908 duration_between_ms(
1909 current_unix_ms(),
1910 accepted_at_ms(parent_headers.as_ref(), &event),
1911 ),
1912 );
1913 }
1914 let final_error = format!(
1915 "destination circuit opened for {} after {} consecutive failures: {}",
1916 destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
1917 );
1918 let dlq_entry = DlqEntry {
1919 trigger_id: binding.id.as_str().to_string(),
1920 binding_key: binding.binding_key(),
1921 event: event.clone(),
1922 attempt_count: attempt,
1923 final_error: final_error.clone(),
1924 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
1925 .to_string(),
1926 attempts: attempts.clone(),
1927 };
1928 self.state
1929 .dlq
1930 .lock()
1931 .expect("dispatcher dlq poisoned")
1932 .push(dlq_entry.clone());
1933 self.append_lifecycle_event(
1934 "DlqMoved",
1935 &event,
1936 binding,
1937 serde_json::json!({
1938 "event_id": event.id.0,
1939 "attempt_count": attempt,
1940 "final_error": dlq_entry.final_error,
1941 "reason": "circuit_open",
1942 "destination": destination_key,
1943 "replay_of_event_id": replay_of_event_id,
1944 }),
1945 replay_of_event_id.as_ref(),
1946 )
1947 .await?;
1948 self.append_topic_event(
1949 TRIGGER_DLQ_TOPIC,
1950 "dlq_moved",
1951 &event,
1952 Some(binding),
1953 Some(attempt),
1954 serde_json::to_value(&dlq_entry).map_err(|serde_error| {
1955 DispatchError::Serde(serde_error.to_string())
1956 })?,
1957 replay_of_event_id.as_ref(),
1958 )
1959 .await?;
1960 finish_in_flight(
1961 binding.id.as_str(),
1962 binding.version,
1963 TriggerDispatchOutcome::Dlq,
1964 )
1965 .await
1966 .map_err(|registry_error| {
1967 DispatchError::Registry(registry_error.to_string())
1968 })?;
1969 self.release_flow_control(&acquired_flow).await?;
1970 decrement_in_flight(&self.state);
1971 self.append_dispatch_trust_record(
1972 binding,
1973 &route,
1974 &event,
1975 replay_of_event_id.as_ref(),
1976 autonomy_tier,
1977 TrustOutcome::Failure,
1978 "dlq",
1979 attempt,
1980 Some(final_error.clone()),
1981 )
1982 .await?;
1983 return Ok(DispatchOutcome {
1984 trigger_id: binding.id.as_str().to_string(),
1985 binding_key: binding.binding_key(),
1986 event_id: event.id.0,
1987 attempt_count: attempt,
1988 status: DispatchStatus::Dlq,
1989 handler_kind: route.kind().to_string(),
1990 target_uri: route.target_uri(),
1991 replay_of_event_id,
1992 result: None,
1993 error: Some(final_error),
1994 });
1995 }
1996
1997 if !error.retryable() {
1998 finish_in_flight(
1999 binding.id.as_str(),
2000 binding.version,
2001 TriggerDispatchOutcome::Failed,
2002 )
2003 .await
2004 .map_err(|registry_error| {
2005 DispatchError::Registry(registry_error.to_string())
2006 })?;
2007 self.release_flow_control(&acquired_flow).await?;
2008 decrement_in_flight(&self.state);
2009 let trust_outcome = match error {
2010 DispatchError::Denied(_) => TrustOutcome::Denied,
2011 DispatchError::Timeout(_) => TrustOutcome::Timeout,
2012 _ => TrustOutcome::Failure,
2013 };
2014 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2015 "cancelled"
2016 } else {
2017 "failed"
2018 };
2019 self.append_dispatch_trust_record(
2020 binding,
2021 &route,
2022 &event,
2023 replay_of_event_id.as_ref(),
2024 autonomy_tier,
2025 trust_outcome,
2026 terminal_status,
2027 attempt,
2028 Some(error.to_string()),
2029 )
2030 .await?;
2031 return Ok(DispatchOutcome {
2032 trigger_id: binding.id.as_str().to_string(),
2033 binding_key: binding.binding_key(),
2034 event_id: event.id.0,
2035 attempt_count: attempt,
2036 status: if matches!(error, DispatchError::Cancelled(_)) {
2037 DispatchStatus::Cancelled
2038 } else {
2039 DispatchStatus::Failed
2040 },
2041 handler_kind: route.kind().to_string(),
2042 target_uri: route.target_uri(),
2043 replay_of_event_id,
2044 result: None,
2045 error: Some(error.to_string()),
2046 });
2047 }
2048
2049 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2050 if let Some(metrics) = self.metrics.as_ref() {
2051 metrics.record_retry_scheduled();
2052 metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2053 metrics.record_trigger_retry_delay(
2054 binding.id.as_str(),
2055 &binding_key,
2056 event.provider.as_str(),
2057 tenant_id(&event),
2058 "scheduled",
2059 delay,
2060 );
2061 }
2062 tracing::info!(
2063 component = "dispatcher",
2064 lifecycle = "retry_scheduled",
2065 trigger_id = %binding.id.as_str(),
2066 binding_key = %binding_key,
2067 event_id = %event.id.0,
2068 attempt = attempt + 1,
2069 delay_ms = delay.as_millis(),
2070 trace_id = %event.trace_id.0
2071 );
2072 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2073 previous_retry_node = Some(retry_node_id.clone());
2074 self.emit_action_graph(
2075 &event,
2076 vec![RunActionGraphNodeRecord {
2077 id: retry_node_id.clone(),
2078 label: format!("retry in {}ms", delay.as_millis()),
2079 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2080 status: "scheduled".to_string(),
2081 outcome: format!("attempt_{}", attempt + 1),
2082 trace_id: Some(event.trace_id.0.clone()),
2083 stage_id: None,
2084 node_id: None,
2085 worker_id: None,
2086 run_id: None,
2087 run_path: None,
2088 metadata: retry_node_metadata(
2089 binding,
2090 &event,
2091 attempt + 1,
2092 delay,
2093 &error,
2094 ),
2095 }],
2096 vec![RunActionGraphEdgeRecord {
2097 from_id: attempt_node_id,
2098 to_id: retry_node_id.clone(),
2099 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2100 label: Some(format!("attempt {}", attempt + 1)),
2101 }],
2102 serde_json::json!({
2103 "source": "dispatcher",
2104 "trigger_id": binding.id.as_str(),
2105 "binding_key": binding.binding_key(),
2106 "event_id": event.id.0,
2107 "attempt": attempt + 1,
2108 "delay_ms": delay.as_millis(),
2109 "replay_of_event_id": replay_of_event_id,
2110 }),
2111 )
2112 .await?;
2113 self.append_lifecycle_event(
2114 "RetryScheduled",
2115 &event,
2116 binding,
2117 serde_json::json!({
2118 "event_id": event.id.0,
2119 "attempt": attempt + 1,
2120 "delay_ms": delay.as_millis(),
2121 "error": error.to_string(),
2122 "replay_of_event_id": replay_of_event_id,
2123 }),
2124 replay_of_event_id.as_ref(),
2125 )
2126 .await?;
2127 self.append_topic_event(
2128 TRIGGER_ATTEMPTS_TOPIC,
2129 "retry_scheduled",
2130 &event,
2131 Some(binding),
2132 Some(attempt + 1),
2133 serde_json::json!({
2134 "event_id": event.id.0,
2135 "attempt": attempt + 1,
2136 "trigger_id": binding.id.as_str(),
2137 "binding_key": binding.binding_key(),
2138 "delay_ms": delay.as_millis(),
2139 "error": error.to_string(),
2140 "replay_of_event_id": replay_of_event_id,
2141 }),
2142 replay_of_event_id.as_ref(),
2143 )
2144 .await?;
2145 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2146 let sleep_result = sleep_or_cancel_or_request(
2147 &self.event_log,
2148 delay,
2149 &binding_key,
2150 &event.id.0,
2151 replay_of_event_id.as_ref(),
2152 &mut self.cancel_tx.subscribe(),
2153 )
2154 .await;
2155 decrement_retry_queue_depth(&self.state);
2156 if sleep_result.is_err() {
2157 finish_in_flight(
2158 binding.id.as_str(),
2159 binding.version,
2160 TriggerDispatchOutcome::Failed,
2161 )
2162 .await
2163 .map_err(|registry_error| {
2164 DispatchError::Registry(registry_error.to_string())
2165 })?;
2166 self.release_flow_control(&acquired_flow).await?;
2167 decrement_in_flight(&self.state);
2168 self.append_dispatch_trust_record(
2169 binding,
2170 &route,
2171 &event,
2172 replay_of_event_id.as_ref(),
2173 autonomy_tier,
2174 TrustOutcome::Failure,
2175 "cancelled",
2176 attempt,
2177 Some("dispatcher shutdown cancelled retry wait".to_string()),
2178 )
2179 .await?;
2180 return Ok(DispatchOutcome {
2181 trigger_id: binding.id.as_str().to_string(),
2182 binding_key: binding.binding_key(),
2183 event_id: event.id.0,
2184 attempt_count: attempt,
2185 status: DispatchStatus::Cancelled,
2186 handler_kind: route.kind().to_string(),
2187 target_uri: route.target_uri(),
2188 replay_of_event_id,
2189 result: None,
2190 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2191 });
2192 }
2193 continue;
2194 }
2195
2196 let final_error = error.to_string();
2197 let dlq_entry = DlqEntry {
2198 trigger_id: binding.id.as_str().to_string(),
2199 binding_key: binding.binding_key(),
2200 event: event.clone(),
2201 attempt_count: attempt,
2202 final_error: final_error.clone(),
2203 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2204 .to_string(),
2205 attempts: attempts.clone(),
2206 };
2207 self.state
2208 .dlq
2209 .lock()
2210 .expect("dispatcher dlq poisoned")
2211 .push(dlq_entry.clone());
2212 if let Some(metrics) = self.metrics.as_ref() {
2213 metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2214 metrics.record_trigger_accepted_to_dlq(
2215 binding.id.as_str(),
2216 &binding_key,
2217 event.provider.as_str(),
2218 tenant_id(&event),
2219 "retry_exhausted",
2220 duration_between_ms(
2221 current_unix_ms(),
2222 accepted_at_ms(parent_headers.as_ref(), &event),
2223 ),
2224 );
2225 }
2226 tracing::info!(
2227 component = "dispatcher",
2228 lifecycle = "dlq_moved",
2229 trigger_id = %binding.id.as_str(),
2230 binding_key = %binding_key,
2231 event_id = %event.id.0,
2232 attempt_count = attempt,
2233 reason = "retry_exhausted",
2234 trace_id = %event.trace_id.0
2235 );
2236 self.emit_action_graph(
2237 &event,
2238 vec![RunActionGraphNodeRecord {
2239 id: format!("dlq:{binding_key}:{}", event.id.0),
2240 label: binding.id.as_str().to_string(),
2241 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2242 status: "queued".to_string(),
2243 outcome: "retry_exhausted".to_string(),
2244 trace_id: Some(event.trace_id.0.clone()),
2245 stage_id: None,
2246 node_id: None,
2247 worker_id: None,
2248 run_id: None,
2249 run_path: None,
2250 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2251 }],
2252 vec![RunActionGraphEdgeRecord {
2253 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2254 to_id: format!("dlq:{binding_key}:{}", event.id.0),
2255 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2256 label: Some(format!("{attempt} attempts")),
2257 }],
2258 serde_json::json!({
2259 "source": "dispatcher",
2260 "trigger_id": binding.id.as_str(),
2261 "binding_key": binding.binding_key(),
2262 "event_id": event.id.0,
2263 "attempt_count": attempt,
2264 "final_error": final_error,
2265 "replay_of_event_id": replay_of_event_id,
2266 }),
2267 )
2268 .await?;
2269 self.append_lifecycle_event(
2270 "DlqMoved",
2271 &event,
2272 binding,
2273 serde_json::json!({
2274 "event_id": event.id.0,
2275 "attempt_count": attempt,
2276 "final_error": dlq_entry.final_error,
2277 "replay_of_event_id": replay_of_event_id,
2278 }),
2279 replay_of_event_id.as_ref(),
2280 )
2281 .await?;
2282 self.append_topic_event(
2283 TRIGGER_DLQ_TOPIC,
2284 "dlq_moved",
2285 &event,
2286 Some(binding),
2287 Some(attempt),
2288 serde_json::to_value(&dlq_entry)
2289 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2290 replay_of_event_id.as_ref(),
2291 )
2292 .await?;
2293 finish_in_flight(
2294 binding.id.as_str(),
2295 binding.version,
2296 TriggerDispatchOutcome::Dlq,
2297 )
2298 .await
2299 .map_err(|registry_error| {
2300 DispatchError::Registry(registry_error.to_string())
2301 })?;
2302 self.release_flow_control(&acquired_flow).await?;
2303 decrement_in_flight(&self.state);
2304 self.append_dispatch_trust_record(
2305 binding,
2306 &route,
2307 &event,
2308 replay_of_event_id.as_ref(),
2309 autonomy_tier,
2310 TrustOutcome::Failure,
2311 "dlq",
2312 attempt,
2313 Some(error.to_string()),
2314 )
2315 .await?;
2316 return Ok(DispatchOutcome {
2317 trigger_id: binding.id.as_str().to_string(),
2318 binding_key: binding.binding_key(),
2319 event_id: event.id.0,
2320 attempt_count: attempt,
2321 status: DispatchStatus::Dlq,
2322 handler_kind: route.kind().to_string(),
2323 target_uri: route.target_uri(),
2324 replay_of_event_id,
2325 result: None,
2326 error: Some(error.to_string()),
2327 });
2328 }
2329 }
2330 }
2331
2332 finish_in_flight(
2333 binding.id.as_str(),
2334 binding.version,
2335 TriggerDispatchOutcome::Failed,
2336 )
2337 .await
2338 .map_err(|error| DispatchError::Registry(error.to_string()))?;
2339 self.release_flow_control(&acquired_flow).await?;
2340 decrement_in_flight(&self.state);
2341 self.append_dispatch_trust_record(
2342 binding,
2343 &route,
2344 &event,
2345 replay_of_event_id.as_ref(),
2346 autonomy_tier,
2347 TrustOutcome::Failure,
2348 "failed",
2349 max_attempts,
2350 Some("dispatch exhausted without terminal outcome".to_string()),
2351 )
2352 .await?;
2353 Ok(DispatchOutcome {
2354 trigger_id: binding.id.as_str().to_string(),
2355 binding_key: binding.binding_key(),
2356 event_id: event.id.0,
2357 attempt_count: max_attempts,
2358 status: DispatchStatus::Failed,
2359 handler_kind: route.kind().to_string(),
2360 target_uri: route.target_uri(),
2361 replay_of_event_id,
2362 result: None,
2363 error: Some("dispatch exhausted without terminal outcome".to_string()),
2364 })
2365 }
2366
2367 async fn dispatch_once(
2368 &self,
2369 binding: &TriggerBinding,
2370 route: &DispatchUri,
2371 event: &TriggerEvent,
2372 autonomy_tier: AutonomyTier,
2373 wait_lease: Option<DispatchWaitLease>,
2374 cancel_rx: &mut broadcast::Receiver<()>,
2375 ) -> Result<serde_json::Value, DispatchError> {
2376 match route {
2377 DispatchUri::Local { .. } => {
2378 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2379 return Err(DispatchError::Local(format!(
2380 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2381 binding.id.as_str()
2382 )));
2383 };
2384 let value = self
2385 .invoke_vm_callable(
2386 closure,
2387 &binding.binding_key(),
2388 event,
2389 None,
2390 binding.id.as_str(),
2391 &format!("{}.{}", event.provider.as_str(), event.kind),
2392 autonomy_tier,
2393 wait_lease,
2394 cancel_rx,
2395 )
2396 .await?;
2397 Ok(vm_value_to_json(&value))
2398 }
2399 DispatchUri::A2a {
2400 target,
2401 allow_cleartext,
2402 } => {
2403 if self.state.shutting_down.load(Ordering::SeqCst) {
2404 return Err(DispatchError::Cancelled(
2405 "dispatcher shutdown cancelled A2A dispatch".to_string(),
2406 ));
2407 }
2408 let (_endpoint, ack) = self
2409 .a2a_client
2410 .dispatch(
2411 target,
2412 *allow_cleartext,
2413 binding.id.as_str(),
2414 &binding.binding_key(),
2415 event,
2416 cancel_rx,
2417 )
2418 .await
2419 .map_err(|error| match error {
2420 crate::a2a::A2aClientError::Cancelled(message) => {
2421 DispatchError::Cancelled(message)
2422 }
2423 other => DispatchError::A2a(other.to_string()),
2424 })?;
2425 match ack {
2426 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2427 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2428 }
2429 }
2430 DispatchUri::Worker { queue } => {
2431 let receipt = crate::WorkerQueue::new(self.event_log.clone())
2432 .enqueue(&crate::WorkerQueueJob {
2433 queue: queue.clone(),
2434 trigger_id: binding.id.as_str().to_string(),
2435 binding_key: binding.binding_key(),
2436 binding_version: binding.version,
2437 event: event.clone(),
2438 replay_of_event_id: current_dispatch_context()
2439 .and_then(|context| context.replay_of_event_id),
2440 priority: worker_queue_priority(binding, event),
2441 })
2442 .await
2443 .map_err(DispatchError::from)?;
2444 Ok(serde_json::to_value(receipt)
2445 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2446 }
2447 DispatchUri::Persona { .. } => {
2448 let TriggerHandlerSpec::Persona {
2449 binding: persona_binding,
2450 } = &binding.handler
2451 else {
2452 return Err(DispatchError::Local(format!(
2453 "trigger '{}' resolved to a persona dispatch URI but does not carry a persona binding",
2454 binding.id.as_str()
2455 )));
2456 };
2457 let receipt = crate::fire_persona_trigger(
2458 &self.event_log,
2459 persona_binding,
2460 event.provider.as_str(),
2461 &event.kind,
2462 trigger_event_persona_metadata(event),
2463 crate::PersonaRunCost::default(),
2464 crate::persona_now_ms(),
2465 )
2466 .await
2467 .map_err(DispatchError::Local)?;
2468 Ok(serde_json::to_value(receipt)
2469 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2470 }
2471 }
2472 }
2473
2474 async fn apply_flow_control(
2475 &self,
2476 binding: &TriggerBinding,
2477 event: &TriggerEvent,
2478 replay_of_event_id: Option<&String>,
2479 ) -> Result<FlowControlOutcome, DispatchError> {
2480 let flow = &binding.flow_control;
2481 let mut managed_event = event.clone();
2482
2483 if let Some(batch) = &flow.batch {
2484 let gate = self
2485 .resolve_flow_gate(
2486 &binding.binding_key(),
2487 batch.key.as_ref(),
2488 &managed_event,
2489 replay_of_event_id,
2490 )
2491 .await?;
2492 match self
2493 .state
2494 .flow_control
2495 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2496 .await
2497 .map_err(DispatchError::from)?
2498 {
2499 BatchDecision::Dispatch(events) => {
2500 managed_event = build_batched_event(events)?;
2501 }
2502 BatchDecision::Merged => {
2503 return Ok(FlowControlOutcome::Skip {
2504 reason: "batch_merged".to_string(),
2505 })
2506 }
2507 }
2508 }
2509
2510 if let Some(debounce) = &flow.debounce {
2511 let gate = self
2512 .resolve_flow_gate(
2513 &binding.binding_key(),
2514 Some(&debounce.key),
2515 &managed_event,
2516 replay_of_event_id,
2517 )
2518 .await?;
2519 let latest = self
2520 .state
2521 .flow_control
2522 .debounce(&gate, debounce.period)
2523 .await
2524 .map_err(DispatchError::from)?;
2525 if !latest {
2526 return Ok(FlowControlOutcome::Skip {
2527 reason: "debounced".to_string(),
2528 });
2529 }
2530 }
2531
2532 if let Some(rate_limit) = &flow.rate_limit {
2533 let gate = self
2534 .resolve_flow_gate(
2535 &binding.binding_key(),
2536 rate_limit.key.as_ref(),
2537 &managed_event,
2538 replay_of_event_id,
2539 )
2540 .await?;
2541 let allowed = self
2542 .state
2543 .flow_control
2544 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2545 .await
2546 .map_err(DispatchError::from)?;
2547 if !allowed {
2548 return Ok(FlowControlOutcome::Skip {
2549 reason: "rate_limited".to_string(),
2550 });
2551 }
2552 }
2553
2554 if let Some(throttle) = &flow.throttle {
2555 let gate = self
2556 .resolve_flow_gate(
2557 &binding.binding_key(),
2558 throttle.key.as_ref(),
2559 &managed_event,
2560 replay_of_event_id,
2561 )
2562 .await?;
2563 self.state
2564 .flow_control
2565 .wait_for_throttle(&gate, throttle.period, throttle.max)
2566 .await
2567 .map_err(DispatchError::from)?;
2568 }
2569
2570 let mut acquired = AcquiredFlowControl::default();
2571 if let Some(singleton) = &flow.singleton {
2572 let gate = self
2573 .resolve_flow_gate(
2574 &binding.binding_key(),
2575 singleton.key.as_ref(),
2576 &managed_event,
2577 replay_of_event_id,
2578 )
2579 .await?;
2580 let acquired_singleton = self
2581 .state
2582 .flow_control
2583 .try_acquire_singleton(&gate)
2584 .await
2585 .map_err(DispatchError::from)?;
2586 if !acquired_singleton {
2587 return Ok(FlowControlOutcome::Skip {
2588 reason: "singleton_active".to_string(),
2589 });
2590 }
2591 acquired.singleton = Some(SingletonLease { gate, held: true });
2592 }
2593
2594 if let Some(concurrency) = &flow.concurrency {
2595 let gate = self
2596 .resolve_flow_gate(
2597 &binding.binding_key(),
2598 concurrency.key.as_ref(),
2599 &managed_event,
2600 replay_of_event_id,
2601 )
2602 .await?;
2603 let priority_rank = self
2604 .resolve_priority_rank(
2605 &binding.binding_key(),
2606 flow.priority.as_ref(),
2607 &managed_event,
2608 replay_of_event_id,
2609 )
2610 .await?;
2611 let permit = self
2612 .state
2613 .flow_control
2614 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2615 .await
2616 .map_err(DispatchError::from)?;
2617 acquired.concurrency = Some(ConcurrencyLease {
2618 gate,
2619 max: concurrency.max,
2620 priority_rank,
2621 permit: Some(permit),
2622 });
2623 }
2624
2625 Ok(FlowControlOutcome::Dispatch {
2626 event: Box::new(managed_event),
2627 acquired,
2628 })
2629 }
2630
2631 async fn release_flow_control(
2632 &self,
2633 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2634 ) -> Result<(), DispatchError> {
2635 let (singleton_gate, concurrency_permit) = {
2636 let mut acquired = acquired.lock().await;
2637 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2638 if lease.held {
2639 lease.held = false;
2640 Some(lease.gate.clone())
2641 } else {
2642 None
2643 }
2644 });
2645 let concurrency_permit = acquired
2646 .concurrency
2647 .as_mut()
2648 .and_then(|lease| lease.permit.take());
2649 (singleton_gate, concurrency_permit)
2650 };
2651 if let Some(gate) = singleton_gate {
2652 self.state
2653 .flow_control
2654 .release_singleton(&gate)
2655 .await
2656 .map_err(DispatchError::from)?;
2657 }
2658 if let Some(permit) = concurrency_permit {
2659 self.state
2660 .flow_control
2661 .release_concurrency(permit)
2662 .await
2663 .map_err(DispatchError::from)?;
2664 }
2665 Ok(())
2666 }
2667
2668 async fn resolve_flow_gate(
2669 &self,
2670 binding_key: &str,
2671 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2672 event: &TriggerEvent,
2673 replay_of_event_id: Option<&String>,
2674 ) -> Result<String, DispatchError> {
2675 let key = match expr {
2676 Some(expr) => {
2677 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2678 .await?
2679 }
2680 None => "_global".to_string(),
2681 };
2682 Ok(format!("{binding_key}:{key}"))
2683 }
2684
2685 async fn resolve_priority_rank(
2686 &self,
2687 binding_key: &str,
2688 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2689 event: &TriggerEvent,
2690 replay_of_event_id: Option<&String>,
2691 ) -> Result<usize, DispatchError> {
2692 let Some(priority) = priority else {
2693 return Ok(0);
2694 };
2695 let value = self
2696 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2697 .await?;
2698 Ok(priority
2699 .order
2700 .iter()
2701 .position(|candidate| candidate == &value)
2702 .unwrap_or(priority.order.len()))
2703 }
2704
2705 async fn evaluate_flow_expression(
2706 &self,
2707 binding_key: &str,
2708 expr: &crate::triggers::TriggerExpressionSpec,
2709 event: &TriggerEvent,
2710 replay_of_event_id: Option<&String>,
2711 ) -> Result<String, DispatchError> {
2712 let value = self
2713 .invoke_vm_callable(
2714 &expr.closure,
2715 binding_key,
2716 event,
2717 replay_of_event_id,
2718 "",
2719 "flow_control",
2720 AutonomyTier::Suggest,
2721 None,
2722 &mut self.cancel_tx.subscribe(),
2723 )
2724 .await?;
2725 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2726 }
2727
2728 #[allow(clippy::too_many_arguments)]
2729 async fn invoke_vm_callable(
2730 &self,
2731 closure: &crate::value::VmClosure,
2732 binding_key: &str,
2733 event: &TriggerEvent,
2734 replay_of_event_id: Option<&String>,
2735 agent_id: &str,
2736 action: &str,
2737 autonomy_tier: AutonomyTier,
2738 wait_lease: Option<DispatchWaitLease>,
2739 cancel_rx: &mut broadcast::Receiver<()>,
2740 ) -> Result<VmValue, DispatchError> {
2741 let mut vm = self.base_vm.child_vm();
2742 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2743 if self.state.shutting_down.load(Ordering::SeqCst) {
2744 cancel_token.store(true, Ordering::SeqCst);
2745 }
2746 self.state
2747 .cancel_tokens
2748 .lock()
2749 .expect("dispatcher cancel tokens poisoned")
2750 .push(cancel_token.clone());
2751 vm.install_cancel_token(cancel_token.clone());
2752 let arg = event_to_handler_value(event)?;
2753 let args = [arg];
2754 let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2755 let effective_policy = match crate::orchestration::current_execution_policy() {
2756 Some(parent) => parent
2757 .intersect(&tier_policy)
2758 .map_err(|error| DispatchError::Local(error.to_string()))?,
2759 None => tier_policy,
2760 };
2761 crate::orchestration::push_execution_policy(effective_policy);
2762 let _policy_guard = DispatchExecutionPolicyGuard;
2763 let future = vm.call_closure_pub(closure, &args);
2764 pin_mut!(future);
2765 let (binding_id, binding_version) = split_binding_key(binding_key);
2766 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2767 slot.borrow_mut().replace(DispatchContext {
2768 trigger_event: event.clone(),
2769 replay_of_event_id: replay_of_event_id.cloned(),
2770 binding_id,
2771 binding_version,
2772 agent_id: agent_id.to_string(),
2773 action: action.to_string(),
2774 autonomy_tier,
2775 })
2776 });
2777 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2778 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2779 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2780 crate::stdlib::hitl::reset_hitl_state();
2781 let mut poll = tokio::time::interval(Duration::from_millis(100));
2782 let result = loop {
2783 tokio::select! {
2784 result = &mut future => break result,
2785 _ = recv_cancel(cancel_rx) => {
2786 cancel_token.store(true, Ordering::SeqCst);
2787 }
2788 _ = poll.tick() => {
2789 if dispatch_cancel_requested(
2790 &self.event_log,
2791 binding_key,
2792 &event.id.0,
2793 replay_of_event_id,
2794 )
2795 .await? {
2796 cancel_token.store(true, Ordering::SeqCst);
2797 }
2798 }
2799 }
2800 };
2801 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2802 *slot.borrow_mut() = prior_context;
2803 });
2804 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2805 *slot.borrow_mut() = prior_wait_lease;
2806 });
2807 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2808 {
2809 let mut tokens = self
2810 .state
2811 .cancel_tokens
2812 .lock()
2813 .expect("dispatcher cancel tokens poisoned");
2814 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2815 }
2816
2817 if cancel_token.load(Ordering::SeqCst) {
2818 if dispatch_cancel_requested(
2819 &self.event_log,
2820 binding_key,
2821 &event.id.0,
2822 replay_of_event_id,
2823 )
2824 .await?
2825 {
2826 Err(DispatchError::Cancelled(
2827 "trigger cancel request cancelled local handler".to_string(),
2828 ))
2829 } else {
2830 Err(DispatchError::Cancelled(
2831 "dispatcher shutdown cancelled local handler".to_string(),
2832 ))
2833 }
2834 } else {
2835 result.map_err(dispatch_error_from_vm_error)
2836 }
2837 }
2838
2839 #[allow(clippy::too_many_arguments)]
2840 async fn invoke_vm_callable_with_timeout(
2841 &self,
2842 closure: &crate::value::VmClosure,
2843 binding_key: &str,
2844 event: &TriggerEvent,
2845 replay_of_event_id: Option<&String>,
2846 agent_id: &str,
2847 action: &str,
2848 autonomy_tier: AutonomyTier,
2849 cancel_rx: &mut broadcast::Receiver<()>,
2850 timeout: Option<Duration>,
2851 ) -> Result<VmValue, DispatchError> {
2852 let future = self.invoke_vm_callable(
2853 closure,
2854 binding_key,
2855 event,
2856 replay_of_event_id,
2857 agent_id,
2858 action,
2859 autonomy_tier,
2860 None,
2861 cancel_rx,
2862 );
2863 pin_mut!(future);
2864 if let Some(timeout) = timeout {
2865 match tokio::time::timeout(timeout, future).await {
2866 Ok(result) => result,
2867 Err(_) => Err(DispatchError::Local(
2868 "predicate evaluation timed out".to_string(),
2869 )),
2870 }
2871 } else {
2872 future.await
2873 }
2874 }
2875
2876 #[allow(clippy::too_many_arguments)]
2877 async fn append_dispatch_trust_record(
2878 &self,
2879 binding: &TriggerBinding,
2880 route: &DispatchUri,
2881 event: &TriggerEvent,
2882 replay_of_event_id: Option<&String>,
2883 autonomy_tier: AutonomyTier,
2884 outcome: TrustOutcome,
2885 terminal_status: &str,
2886 attempt_count: u32,
2887 error: Option<String>,
2888 ) -> Result<(), DispatchError> {
2889 let mut record = TrustRecord::new(
2890 binding.id.as_str().to_string(),
2891 format!("{}.{}", event.provider.as_str(), event.kind),
2892 None,
2893 outcome,
2894 event.trace_id.0.clone(),
2895 autonomy_tier,
2896 );
2897 record.metadata.insert(
2898 "binding_key".to_string(),
2899 serde_json::json!(binding.binding_key()),
2900 );
2901 record.metadata.insert(
2902 "binding_version".to_string(),
2903 serde_json::json!(binding.version),
2904 );
2905 record.metadata.insert(
2906 "provider".to_string(),
2907 serde_json::json!(event.provider.as_str()),
2908 );
2909 record
2910 .metadata
2911 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2912 record
2913 .metadata
2914 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2915 record.metadata.insert(
2916 "target_uri".to_string(),
2917 serde_json::json!(route.target_uri()),
2918 );
2919 record.metadata.insert(
2920 "terminal_status".to_string(),
2921 serde_json::json!(terminal_status),
2922 );
2923 record.metadata.insert(
2924 "attempt_count".to_string(),
2925 serde_json::json!(attempt_count),
2926 );
2927 if let Some(replay_of_event_id) = replay_of_event_id {
2928 record.metadata.insert(
2929 "replay_of_event_id".to_string(),
2930 serde_json::json!(replay_of_event_id),
2931 );
2932 }
2933 if let Some(error) = error {
2934 record
2935 .metadata
2936 .insert("error".to_string(), serde_json::json!(error));
2937 }
2938 append_trust_record(&self.event_log, &record)
2939 .await
2940 .map(|_| ())
2941 .map_err(DispatchError::from)
2942 }
2943
2944 async fn append_attempt_record(
2945 &self,
2946 event: &TriggerEvent,
2947 binding: &TriggerBinding,
2948 attempt: &DispatchAttemptRecord,
2949 replay_of_event_id: Option<&String>,
2950 ) -> Result<(), DispatchError> {
2951 self.append_topic_event(
2952 TRIGGER_ATTEMPTS_TOPIC,
2953 "attempt_recorded",
2954 event,
2955 Some(binding),
2956 Some(attempt.attempt),
2957 serde_json::to_value(attempt)
2958 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2959 replay_of_event_id,
2960 )
2961 .await
2962 }
2963
2964 async fn append_lifecycle_event(
2965 &self,
2966 kind: &str,
2967 event: &TriggerEvent,
2968 binding: &TriggerBinding,
2969 payload: serde_json::Value,
2970 replay_of_event_id: Option<&String>,
2971 ) -> Result<(), DispatchError> {
2972 self.append_topic_event(
2973 TRIGGERS_LIFECYCLE_TOPIC,
2974 kind,
2975 event,
2976 Some(binding),
2977 None,
2978 payload,
2979 replay_of_event_id,
2980 )
2981 .await
2982 }
2983
2984 async fn append_autonomy_budget_approval_request(
2985 &self,
2986 binding: &TriggerBinding,
2987 route: &DispatchUri,
2988 event: &TriggerEvent,
2989 replay_of_event_id: Option<&String>,
2990 reason: &str,
2991 ) -> Result<String, DispatchError> {
2992 let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2993 let detail = serde_json::json!({
2994 "trigger_id": binding.id.as_str(),
2995 "binding_key": binding.binding_key(),
2996 "event_id": event.id.0,
2997 "reason": reason,
2998 "from_tier": AutonomyTier::ActAuto.as_str(),
2999 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3000 "handler_kind": route.kind(),
3001 "target_uri": route.target_uri(),
3002 "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3003 "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3004 "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3005 "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3006 "replay_of_event_id": replay_of_event_id,
3007 });
3008 let request_id = crate::stdlib::hitl::append_approval_request_on(
3009 &self.event_log,
3010 binding.id.as_str().to_string(),
3011 event.trace_id.0.clone(),
3012 format!(
3013 "approve autonomous dispatch for trigger '{}' after {}",
3014 binding.id.as_str(),
3015 reason
3016 ),
3017 detail.clone(),
3018 reviewers.clone(),
3019 )
3020 .await
3021 .map_err(dispatch_error_from_vm_error)?;
3022 self.append_lifecycle_event(
3023 "autonomy.budget_exceeded",
3024 event,
3025 binding,
3026 serde_json::json!({
3027 "trigger_id": binding.id.as_str(),
3028 "event_id": event.id.0,
3029 "reason": reason,
3030 "request_id": request_id,
3031 "reviewers": reviewers,
3032 "from_tier": AutonomyTier::ActAuto.as_str(),
3033 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3034 "replay_of_event_id": replay_of_event_id,
3035 }),
3036 replay_of_event_id,
3037 )
3038 .await?;
3039 Ok(request_id)
3040 }
3041
3042 #[allow(clippy::too_many_arguments)]
3043 async fn emit_autonomy_budget_approval_action_graph(
3044 &self,
3045 binding: &TriggerBinding,
3046 route: &DispatchUri,
3047 event: &TriggerEvent,
3048 source_node_id: &str,
3049 replay_of_event_id: Option<&String>,
3050 reason: &str,
3051 request_id: &str,
3052 ) -> Result<(), DispatchError> {
3053 let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3054 self.emit_action_graph(
3055 event,
3056 vec![RunActionGraphNodeRecord {
3057 id: approval_node_id.clone(),
3058 label: format!("approval required: {reason}"),
3059 kind: "approval".to_string(),
3060 status: "waiting".to_string(),
3061 outcome: "request_approval".to_string(),
3062 trace_id: Some(event.trace_id.0.clone()),
3063 stage_id: None,
3064 node_id: None,
3065 worker_id: None,
3066 run_id: None,
3067 run_path: None,
3068 metadata: BTreeMap::from([
3069 (
3070 "trigger_id".to_string(),
3071 serde_json::json!(binding.id.as_str()),
3072 ),
3073 (
3074 "binding_key".to_string(),
3075 serde_json::json!(binding.binding_key()),
3076 ),
3077 ("event_id".to_string(), serde_json::json!(event.id.0)),
3078 ("reason".to_string(), serde_json::json!(reason)),
3079 ("request_id".to_string(), serde_json::json!(request_id)),
3080 (
3081 "reviewers".to_string(),
3082 serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3083 ),
3084 ("handler_kind".to_string(), serde_json::json!(route.kind())),
3085 (
3086 "target_uri".to_string(),
3087 serde_json::json!(route.target_uri()),
3088 ),
3089 (
3090 "from_tier".to_string(),
3091 serde_json::json!(AutonomyTier::ActAuto.as_str()),
3092 ),
3093 (
3094 "requested_tier".to_string(),
3095 serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3096 ),
3097 (
3098 "replay_of_event_id".to_string(),
3099 serde_json::json!(replay_of_event_id),
3100 ),
3101 ]),
3102 }],
3103 vec![RunActionGraphEdgeRecord {
3104 from_id: source_node_id.to_string(),
3105 to_id: approval_node_id,
3106 kind: "approval_gate".to_string(),
3107 label: Some("autonomy budget".to_string()),
3108 }],
3109 serde_json::json!({
3110 "source": "dispatcher",
3111 "trigger_id": binding.id.as_str(),
3112 "binding_key": binding.binding_key(),
3113 "event_id": event.id.0,
3114 "reason": reason,
3115 "request_id": request_id,
3116 "replay_of_event_id": replay_of_event_id,
3117 }),
3118 )
3119 .await
3120 }
3121
3122 #[allow(clippy::too_many_arguments)]
3123 async fn append_tier_transition_trust_record(
3124 &self,
3125 binding: &TriggerBinding,
3126 event: &TriggerEvent,
3127 replay_of_event_id: Option<&String>,
3128 from_tier: AutonomyTier,
3129 to_tier: AutonomyTier,
3130 reason: &str,
3131 request_id: &str,
3132 ) -> Result<(), DispatchError> {
3133 let mut record = TrustRecord::new(
3134 binding.id.as_str().to_string(),
3135 "autonomy.tier_transition",
3136 Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3137 TrustOutcome::Denied,
3138 event.trace_id.0.clone(),
3139 to_tier,
3140 );
3141 record.metadata.insert(
3142 "binding_key".to_string(),
3143 serde_json::json!(binding.binding_key()),
3144 );
3145 record
3146 .metadata
3147 .insert("event_id".to_string(), serde_json::json!(event.id.0));
3148 record.metadata.insert(
3149 "from_tier".to_string(),
3150 serde_json::json!(from_tier.as_str()),
3151 );
3152 record
3153 .metadata
3154 .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3155 record
3156 .metadata
3157 .insert("reason".to_string(), serde_json::json!(reason));
3158 record
3159 .metadata
3160 .insert("request_id".to_string(), serde_json::json!(request_id));
3161 if let Some(replay_of_event_id) = replay_of_event_id {
3162 record.metadata.insert(
3163 "replay_of_event_id".to_string(),
3164 serde_json::json!(replay_of_event_id),
3165 );
3166 }
3167 append_trust_record(&self.event_log, &record)
3168 .await
3169 .map(|_| ())
3170 .map_err(DispatchError::from)
3171 }
3172
3173 async fn append_budget_deferred_event(
3174 &self,
3175 binding: &TriggerBinding,
3176 route: &DispatchUri,
3177 event: &TriggerEvent,
3178 replay_of_event_id: Option<&String>,
3179 reason: &str,
3180 ) -> Result<(), DispatchError> {
3181 self.append_topic_event(
3182 TRIGGER_ATTEMPTS_TOPIC,
3183 "budget_deferred",
3184 event,
3185 Some(binding),
3186 None,
3187 serde_json::json!({
3188 "event_id": event.id.0,
3189 "trigger_id": binding.id.as_str(),
3190 "binding_key": binding.binding_key(),
3191 "handler_kind": route.kind(),
3192 "target_uri": route.target_uri(),
3193 "reason": reason,
3194 "retry_at": next_budget_reset_rfc3339(binding),
3195 "replay_of_event_id": replay_of_event_id,
3196 }),
3197 replay_of_event_id,
3198 )
3199 .await?;
3200 self.append_skipped_outbox_event(
3201 binding,
3202 route,
3203 event,
3204 replay_of_event_id,
3205 DispatchSkipStage::Predicate,
3206 serde_json::json!({
3207 "deferred": true,
3208 "reason": reason,
3209 "retry_at": next_budget_reset_rfc3339(binding),
3210 }),
3211 )
3212 .await
3213 }
3214
3215 async fn append_skipped_outbox_event(
3216 &self,
3217 binding: &TriggerBinding,
3218 route: &DispatchUri,
3219 event: &TriggerEvent,
3220 replay_of_event_id: Option<&String>,
3221 stage: DispatchSkipStage,
3222 detail: serde_json::Value,
3223 ) -> Result<(), DispatchError> {
3224 self.append_topic_event(
3225 TRIGGER_OUTBOX_TOPIC,
3226 "dispatch_skipped",
3227 event,
3228 Some(binding),
3229 None,
3230 serde_json::json!({
3231 "event_id": event.id.0,
3232 "trigger_id": binding.id.as_str(),
3233 "binding_key": binding.binding_key(),
3234 "handler_kind": route.kind(),
3235 "target_uri": route.target_uri(),
3236 "skip_stage": stage.as_str(),
3237 "detail": detail,
3238 "replay_of_event_id": replay_of_event_id,
3239 }),
3240 replay_of_event_id,
3241 )
3242 .await
3243 }
3244
3245 async fn append_topic_event(
3246 &self,
3247 topic_name: &str,
3248 kind: &str,
3249 event: &TriggerEvent,
3250 binding: Option<&TriggerBinding>,
3251 attempt: Option<u32>,
3252 payload: serde_json::Value,
3253 replay_of_event_id: Option<&String>,
3254 ) -> Result<(), DispatchError> {
3255 let topic = topic_for_event(event, topic_name)?;
3256 let headers = event_headers(event, binding, attempt, replay_of_event_id);
3257 self.event_log
3258 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3259 .await
3260 .map_err(DispatchError::from)
3261 .map(|_| ())
3262 }
3263
3264 async fn emit_action_graph(
3265 &self,
3266 event: &TriggerEvent,
3267 nodes: Vec<RunActionGraphNodeRecord>,
3268 edges: Vec<RunActionGraphEdgeRecord>,
3269 extra: serde_json::Value,
3270 ) -> Result<(), DispatchError> {
3271 let mut headers = BTreeMap::new();
3272 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3273 headers.insert("event_id".to_string(), event.id.0.clone());
3274 let observability = RunObservabilityRecord {
3275 schema_version: 1,
3276 action_graph_nodes: nodes,
3277 action_graph_edges: edges,
3278 ..Default::default()
3279 };
3280 append_action_graph_update(
3281 headers,
3282 serde_json::json!({
3283 "source": "dispatcher",
3284 "trace_id": event.trace_id.0,
3285 "event_id": event.id.0,
3286 "observability": observability,
3287 "context": extra,
3288 }),
3289 )
3290 .await
3291 .map_err(DispatchError::from)
3292 }
3293}
3294
3295async fn dispatch_cancel_requested(
3296 event_log: &Arc<AnyEventLog>,
3297 binding_key: &str,
3298 event_id: &str,
3299 replay_of_event_id: Option<&String>,
3300) -> Result<bool, DispatchError> {
3301 if replay_of_event_id.is_some() {
3302 return Ok(false);
3303 }
3304 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3305 .expect("static trigger cancel topic should always be valid");
3306 let events = event_log.read_range(&topic, None, usize::MAX).await?;
3307 let requested = events
3308 .into_iter()
3309 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3310 .filter_map(|(_, event)| {
3311 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3312 })
3313 .collect::<BTreeSet<_>>();
3314 Ok(requested
3315 .iter()
3316 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3317}
3318
3319async fn sleep_or_cancel_or_request(
3320 event_log: &Arc<AnyEventLog>,
3321 delay: Duration,
3322 binding_key: &str,
3323 event_id: &str,
3324 replay_of_event_id: Option<&String>,
3325 cancel_rx: &mut broadcast::Receiver<()>,
3326) -> Result<(), DispatchError> {
3327 let sleep = tokio::time::sleep(delay);
3328 pin_mut!(sleep);
3329 let mut poll = tokio::time::interval(Duration::from_millis(100));
3330 loop {
3331 tokio::select! {
3332 _ = &mut sleep => return Ok(()),
3333 _ = recv_cancel(cancel_rx) => {
3334 return Err(DispatchError::Cancelled(
3335 "dispatcher shutdown cancelled retry wait".to_string(),
3336 ));
3337 }
3338 _ = poll.tick() => {
3339 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3340 return Err(DispatchError::Cancelled(
3341 "trigger cancel request cancelled retry wait".to_string(),
3342 ));
3343 }
3344 }
3345 }
3346 }
3347}
3348
3349fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3350 let mut iter = events.into_iter();
3351 let Some(mut root) = iter.next() else {
3352 return Err(DispatchError::Registry(
3353 "batch dispatch produced an empty event list".to_string(),
3354 ));
3355 };
3356 let mut batch = Vec::new();
3357 batch.push(
3358 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3359 );
3360 for event in iter {
3361 batch.push(
3362 serde_json::to_value(&event)
3363 .map_err(|error| DispatchError::Serde(error.to_string()))?,
3364 );
3365 }
3366 root.batch = Some(batch);
3367 Ok(root)
3368}
3369
3370fn json_value_to_gate(value: &serde_json::Value) -> String {
3371 match value {
3372 serde_json::Value::Null => "null".to_string(),
3373 serde_json::Value::String(text) => text.clone(),
3374 serde_json::Value::Bool(value) => value.to_string(),
3375 serde_json::Value::Number(value) => value.to_string(),
3376 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3377 }
3378}
3379
3380fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3381 let json =
3382 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3383 let value = json_to_vm_value(&json);
3384 match (&event.raw_body, value) {
3385 (Some(raw_body), VmValue::Dict(dict)) => {
3386 let mut map = (*dict).clone();
3387 map.insert(
3388 "raw_body".to_string(),
3389 VmValue::Bytes(Rc::new(raw_body.clone())),
3390 );
3391 Ok(VmValue::Dict(Rc::new(map)))
3392 }
3393 (_, other) => Ok(other),
3394 }
3395}
3396
3397fn decrement_in_flight(state: &DispatcherRuntimeState) {
3398 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3399 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3400 state.idle_notify.notify_waiters();
3401 }
3402}
3403
3404fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3405 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3406 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3407 state.idle_notify.notify_waiters();
3408 }
3409}
3410
3411#[cfg(test)]
3412fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3413 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3414 *slot.borrow_mut() = Some(tx);
3415 });
3416}
3417
3418#[cfg(not(test))]
3419fn notify_test_inbox_dequeued() {}
3420
3421#[cfg(test)]
3422fn notify_test_inbox_dequeued() {
3423 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3424 if let Some(tx) = slot.borrow_mut().take() {
3425 let _ = tx.send(());
3426 }
3427 });
3428}
3429
3430pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3431 event_log: &L,
3432 event: &TriggerEvent,
3433) -> Result<u64, DispatchError> {
3434 let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
3435 let headers = event_headers(event, None, None, None);
3436 let payload =
3437 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3438 event_log
3439 .append(
3440 &topic,
3441 LogEvent::new("event_ingested", payload).with_headers(headers),
3442 )
3443 .await
3444 .map_err(DispatchError::from)
3445}
3446
3447pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3448 ACTIVE_DISPATCHER_STATE.with(|slot| {
3449 slot.borrow()
3450 .as_ref()
3451 .map(|state| DispatcherStatsSnapshot {
3452 in_flight: state.in_flight.load(Ordering::Relaxed),
3453 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3454 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3455 })
3456 .unwrap_or_default()
3457 })
3458}
3459
3460pub fn clear_dispatcher_state() {
3461 ACTIVE_DISPATCHER_STATE.with(|slot| {
3462 *slot.borrow_mut() = None;
3463 });
3464 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3465 *slot.borrow_mut() = None;
3466 });
3467}
3468
3469fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3470 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3471 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3472 }
3473 if is_cancelled_vm_error(&error) {
3474 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3475 }
3476 if let VmError::Thrown(VmValue::String(message)) = &error {
3477 return DispatchError::Local(message.to_string());
3478 }
3479 match error_to_category(&error) {
3480 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3481 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3482 ErrorCategory::Cancelled => {
3483 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3484 }
3485 _ => DispatchError::Local(error.to_string()),
3486 }
3487}
3488
3489fn dispatch_error_label(error: &DispatchError) -> &'static str {
3490 match error {
3491 DispatchError::Denied(_) => "denied",
3492 DispatchError::Timeout(_) => "timeout",
3493 DispatchError::Waiting(_) => "waiting",
3494 DispatchError::Cancelled(_) => "cancelled",
3495 _ => "failed",
3496 }
3497}
3498
3499fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3500 match route {
3501 DispatchUri::Worker { .. } => "enqueued",
3502 DispatchUri::Persona { .. } => "recorded",
3503 DispatchUri::A2a { .. }
3504 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3505 {
3506 "pending"
3507 }
3508 DispatchUri::A2a { .. } => "completed",
3509 DispatchUri::Local { .. } => "success",
3510 }
3511}
3512
3513fn dispatch_node_id(
3514 route: &DispatchUri,
3515 binding_key: &str,
3516 event_id: &str,
3517 attempt: u32,
3518) -> String {
3519 let prefix = match route {
3520 DispatchUri::A2a { .. } => "a2a",
3521 _ => "dispatch",
3522 };
3523 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3524}
3525
3526fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3527 match route {
3528 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3529 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3530 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3531 }
3532}
3533
3534fn dispatch_node_label(route: &DispatchUri) -> String {
3535 match route {
3536 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3537 _ => route.target_uri(),
3538 }
3539}
3540
3541fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3542 match route {
3543 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3544 _ => None,
3545 }
3546}
3547
3548fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3549 match route {
3550 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3551 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3552 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3553 }
3554}
3555
3556fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3557 match status {
3558 crate::triggers::SignatureStatus::Verified => "verified",
3559 crate::triggers::SignatureStatus::Unsigned => "unsigned",
3560 crate::triggers::SignatureStatus::Failed { .. } => "failed",
3561 }
3562}
3563
3564fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3565 let mut metadata = BTreeMap::new();
3566 metadata.insert(
3567 "provider".to_string(),
3568 serde_json::json!(event.provider.as_str()),
3569 );
3570 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3571 metadata.insert(
3572 "dedupe_key".to_string(),
3573 serde_json::json!(event.dedupe_key),
3574 );
3575 metadata.insert(
3576 "signature_status".to_string(),
3577 serde_json::json!(signature_status_label(&event.signature_status)),
3578 );
3579 metadata
3580}
3581
3582fn trigger_event_persona_metadata(event: &TriggerEvent) -> BTreeMap<String, String> {
3583 let mut metadata = BTreeMap::new();
3584 metadata.insert("trigger_event_id".to_string(), event.id.0.clone());
3585 metadata.insert("event_id".to_string(), event.id.0.clone());
3586 metadata.insert("dedupe_key".to_string(), event.dedupe_key.clone());
3587 metadata.insert("trace_id".to_string(), event.trace_id.0.clone());
3588 if let Some(tenant_id) = &event.tenant_id {
3589 metadata.insert("tenant_id".to_string(), tenant_id.0.clone());
3590 }
3591 for (key, value) in &event.headers {
3592 metadata.insert(format!("header.{key}"), value.clone());
3593 }
3594 if let Ok(payload) = serde_json::to_value(&event.provider_payload) {
3595 collect_persona_payload_metadata("", &payload, &mut metadata);
3596 if let Some(raw) = payload.get("raw") {
3597 collect_persona_payload_metadata("", raw, &mut metadata);
3598 }
3599 }
3600 metadata
3601}
3602
3603fn collect_persona_payload_metadata(
3604 prefix: &str,
3605 value: &serde_json::Value,
3606 metadata: &mut BTreeMap<String, String>,
3607) {
3608 let serde_json::Value::Object(object) = value else {
3609 return;
3610 };
3611 for (key, value) in object {
3612 let path = if prefix.is_empty() {
3613 key.clone()
3614 } else {
3615 format!("{prefix}.{key}")
3616 };
3617 match value {
3618 serde_json::Value::String(text) => {
3619 metadata.insert(path, text.clone());
3620 }
3621 serde_json::Value::Number(number) => {
3622 metadata.insert(path, number.to_string());
3623 }
3624 serde_json::Value::Bool(flag) => {
3625 metadata.insert(path, flag.to_string());
3626 }
3627 serde_json::Value::Object(_) if prefix.is_empty() => {
3628 collect_persona_payload_metadata(&path, value, metadata);
3629 }
3630 _ => {}
3631 }
3632 }
3633}
3634
3635fn dispatch_node_metadata(
3636 route: &DispatchUri,
3637 binding: &TriggerBinding,
3638 event: &TriggerEvent,
3639 attempt: u32,
3640) -> BTreeMap<String, serde_json::Value> {
3641 let mut metadata = BTreeMap::new();
3642 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3643 metadata.insert(
3644 "target_uri".to_string(),
3645 serde_json::json!(route.target_uri()),
3646 );
3647 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3648 metadata.insert(
3649 "trigger_id".to_string(),
3650 serde_json::json!(binding.id.as_str()),
3651 );
3652 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3653 if let Some(target_agent) = dispatch_target_agent(route) {
3654 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3655 }
3656 if let DispatchUri::Worker { queue } = route {
3657 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3658 }
3659 if let DispatchUri::Persona { name } = route {
3660 metadata.insert("persona".to_string(), serde_json::json!(name));
3661 }
3662 metadata
3663}
3664
3665fn dispatch_success_metadata(
3666 route: &DispatchUri,
3667 binding: &TriggerBinding,
3668 event: &TriggerEvent,
3669 attempt: u32,
3670 result: &serde_json::Value,
3671) -> BTreeMap<String, serde_json::Value> {
3672 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3673 match route {
3674 DispatchUri::A2a { .. } => {
3675 if let Some(task_id) = result
3676 .get("task_id")
3677 .or_else(|| result.get("id"))
3678 .and_then(|value| value.as_str())
3679 {
3680 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3681 }
3682 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3683 metadata.insert("state".to_string(), serde_json::json!(state));
3684 }
3685 }
3686 DispatchUri::Worker { .. } => {
3687 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3688 {
3689 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3690 }
3691 if let Some(response_topic) = result
3692 .get("response_topic")
3693 .and_then(|value| value.as_str())
3694 {
3695 metadata.insert(
3696 "response_topic".to_string(),
3697 serde_json::json!(response_topic),
3698 );
3699 }
3700 }
3701 DispatchUri::Persona { .. } => {
3702 if let Some(receipt_id) = result.get("receipt_id").and_then(|value| value.as_str()) {
3703 metadata.insert("receipt_id".to_string(), serde_json::json!(receipt_id));
3704 }
3705 if let Some(status) = result.get("status").and_then(|value| value.as_str()) {
3706 metadata.insert("status".to_string(), serde_json::json!(status));
3707 }
3708 }
3709 DispatchUri::Local { .. } => {}
3710 }
3711 metadata
3712}
3713
3714fn dispatch_error_metadata(
3715 route: &DispatchUri,
3716 binding: &TriggerBinding,
3717 event: &TriggerEvent,
3718 attempt: u32,
3719 error: &DispatchError,
3720) -> BTreeMap<String, serde_json::Value> {
3721 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3722 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3723 metadata
3724}
3725
3726fn retry_node_metadata(
3727 binding: &TriggerBinding,
3728 event: &TriggerEvent,
3729 attempt: u32,
3730 delay: Duration,
3731 error: &DispatchError,
3732) -> BTreeMap<String, serde_json::Value> {
3733 let mut metadata = BTreeMap::new();
3734 metadata.insert(
3735 "trigger_id".to_string(),
3736 serde_json::json!(binding.id.as_str()),
3737 );
3738 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3739 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3740 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3741 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3742 metadata
3743}
3744
3745fn split_binding_key(binding_key: &str) -> (String, u32) {
3746 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3747 return (binding_key.to_string(), 0);
3748 };
3749 let version = suffix.parse::<u32>().unwrap_or(0);
3750 (binding_id.to_string(), version)
3751}
3752
3753fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
3754 match binding_version {
3755 Some(version) => format!("{trigger_id}@v{version}"),
3756 None => trigger_id.to_string(),
3757 }
3758}
3759
3760fn tenant_id(event: &TriggerEvent) -> Option<&str> {
3761 event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
3762}
3763
3764fn current_unix_ms() -> i64 {
3765 unix_ms(crate::triggers::test_util::clock::now_utc())
3766}
3767
3768fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
3769 (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
3770}
3771
3772fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3773 lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
3774 .unwrap_or_else(|| unix_ms(event.received_at))
3775}
3776
3777fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
3778 lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
3779 .unwrap_or_else(|| accepted_at_ms(headers, event))
3780}
3781
3782fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
3783 headers
3784 .and_then(|headers| headers.get(name))
3785 .and_then(|value| value.parse::<i64>().ok())
3786}
3787
3788fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
3789 Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
3790}
3791
3792fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
3793 match result {
3794 Ok(_) => "succeeded",
3795 Err(DispatchError::Waiting(_)) => "waiting",
3796 Err(DispatchError::Cancelled(_)) => "cancelled",
3797 Err(DispatchError::Denied(_)) => "denied",
3798 Err(DispatchError::Timeout(_)) => "timeout",
3799 Err(_) => "failed",
3800 }
3801}
3802
3803fn is_cancelled_vm_error(error: &VmError) -> bool {
3804 matches!(
3805 error,
3806 VmError::Thrown(VmValue::String(message))
3807 if message.starts_with("kind:cancelled:")
3808 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3809}
3810
3811fn event_headers(
3812 event: &TriggerEvent,
3813 binding: Option<&TriggerBinding>,
3814 attempt: Option<u32>,
3815 replay_of_event_id: Option<&String>,
3816) -> BTreeMap<String, String> {
3817 let mut headers = BTreeMap::new();
3818 headers.insert("event_id".to_string(), event.id.0.clone());
3819 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3820 headers.insert("provider".to_string(), event.provider.as_str().to_string());
3821 headers.insert("kind".to_string(), event.kind.clone());
3822 if let Some(replay_of_event_id) = replay_of_event_id {
3823 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3824 }
3825 if let Some(tenant_id) = event.tenant_id.as_ref() {
3826 headers.insert("tenant_id".to_string(), tenant_id.0.clone());
3827 }
3828 if let Some(binding) = binding {
3829 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3830 headers.insert("binding_key".to_string(), binding.binding_key());
3831 headers.insert(
3832 "handler_kind".to_string(),
3833 DispatchUri::from(&binding.handler).kind().to_string(),
3834 );
3835 }
3836 if let Some(attempt) = attempt {
3837 headers.insert("attempt".to_string(), attempt.to_string());
3838 }
3839 headers
3840}
3841
3842fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
3843 let topic = Topic::new(topic_name)
3844 .expect("static trigger dispatcher topic names should always be valid");
3845 match event.tenant_id.as_ref() {
3846 Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
3847 None => Ok(topic),
3848 }
3849}
3850
3851fn worker_queue_priority(
3852 binding: &super::registry::TriggerBinding,
3853 event: &TriggerEvent,
3854) -> crate::WorkerQueuePriority {
3855 match event
3856 .headers
3857 .get("priority")
3858 .map(|value| value.trim().to_ascii_lowercase())
3859 .as_deref()
3860 {
3861 Some("high") => crate::WorkerQueuePriority::High,
3862 Some("low") => crate::WorkerQueuePriority::Low,
3863 _ => binding.dispatch_priority,
3864 }
3865}
3866
3867const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3868
3869fn maybe_fail_before_outbox() {
3870 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3871 std::process::exit(86);
3872 }
3873}
3874
3875fn now_rfc3339() -> String {
3876 crate::triggers::test_util::clock::now_utc()
3877 .format(&time::format_description::well_known::Rfc3339)
3878 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3879}
3880
3881fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3882 let now = crate::triggers::test_util::clock::now_utc();
3883 let reset = if binding.hourly_cost_usd.is_some() {
3884 let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3885 time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3886 } else {
3887 let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3888 time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3889 };
3890 reset
3891 .format(&time::format_description::well_known::Rfc3339)
3892 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3893}
3894
3895fn now_unix_ms() -> i64 {
3896 (crate::triggers::test_util::clock::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3897}
3898
3899fn cancelled_dispatch_outcome(
3900 binding: &TriggerBinding,
3901 route: &DispatchUri,
3902 event: &TriggerEvent,
3903 replay_of_event_id: Option<String>,
3904 attempt_count: u32,
3905 error: String,
3906) -> DispatchOutcome {
3907 DispatchOutcome {
3908 trigger_id: binding.id.as_str().to_string(),
3909 binding_key: binding.binding_key(),
3910 event_id: event.id.0.clone(),
3911 attempt_count,
3912 status: DispatchStatus::Cancelled,
3913 handler_kind: route.kind().to_string(),
3914 target_uri: route.target_uri(),
3915 replay_of_event_id,
3916 result: None,
3917 error: Some(error),
3918 }
3919}
3920
3921async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3922 let _ = cancel_rx.recv().await;
3923}
3924
3925#[cfg(test)]
3926mod tests;