1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
16use crate::llm::vm_value_to_json;
17use crate::orchestration::{
18 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
19 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
20 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
21 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
22 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
23 ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
24 ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
25};
26use crate::stdlib::json_to_vm_value;
27use crate::trust_graph::{
28 append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
29};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::{
35 binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
36 expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
37 note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
38 reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
39 TriggerHandlerSpec,
40};
41use super::{
42 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
43 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
44 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
45 TRIGGER_OUTBOX_TOPIC,
46};
47use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
48
49mod flow_control;
50pub mod retry;
51pub mod uri;
52
53pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
54
55pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
56pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
57pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
58const DESTINATION_CIRCUIT_FAILURE_THRESHOLD: u32 = 5;
59const DESTINATION_CIRCUIT_BACKOFF: Duration = Duration::from_secs(60);
60
61thread_local! {
62 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65 #[cfg(test)]
66 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70 static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75 pub trigger_event: TriggerEvent,
76 pub replay_of_event_id: Option<String>,
77 pub binding_id: String,
78 pub binding_version: u32,
79 pub agent_id: String,
80 pub action: String,
81 pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87 fn drop(&mut self) {
88 crate::orchestration::pop_execution_policy();
89 }
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93struct PredicateCacheRecord {
94 trigger_id: String,
95 event_id: String,
96 entries: Vec<PredicateCacheEntry>,
97}
98
99#[derive(Clone, Debug, Default)]
100struct PredicateEvaluationRecord {
101 result: bool,
102 cost_usd: f64,
103 tokens: u64,
104 latency_ms: u64,
105 cached: bool,
106 reason: Option<String>,
107 exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
108}
109
110const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
111
112pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
113 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
114}
115
116pub(crate) fn current_dispatch_is_replay() -> bool {
117 ACTIVE_DISPATCH_IS_REPLAY
118 .try_with(|is_replay| *is_replay)
119 .unwrap_or(false)
120}
121
122pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
123 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
124}
125
126#[derive(Clone)]
127pub struct Dispatcher {
128 base_vm: Rc<Vm>,
129 event_log: Arc<AnyEventLog>,
130 cancel_tx: broadcast::Sender<()>,
131 state: Arc<DispatcherRuntimeState>,
132 metrics: Option<Arc<crate::MetricsRegistry>>,
133}
134
135#[derive(Debug)]
136struct DispatcherRuntimeState {
137 in_flight: AtomicU64,
138 retry_queue_depth: AtomicU64,
139 dlq: Mutex<Vec<DlqEntry>>,
140 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
141 shutting_down: std::sync::atomic::AtomicBool,
142 idle_notify: Notify,
143 flow_control: FlowControlManager,
144 destination_circuits: DestinationCircuitRegistry,
145}
146
147impl DispatcherRuntimeState {
148 fn new(event_log: Arc<AnyEventLog>) -> Self {
149 Self {
150 in_flight: AtomicU64::new(0),
151 retry_queue_depth: AtomicU64::new(0),
152 dlq: Mutex::new(Vec::new()),
153 cancel_tokens: Mutex::new(Vec::new()),
154 shutting_down: std::sync::atomic::AtomicBool::new(false),
155 idle_notify: Notify::new(),
156 flow_control: FlowControlManager::new(event_log),
157 destination_circuits: DestinationCircuitRegistry::default(),
158 }
159 }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq)]
163enum DestinationCircuitProbe {
164 Allow { half_open: bool },
165 Block { retry_after: Duration },
166}
167
168#[derive(Debug)]
169struct DestinationCircuitRegistry {
170 threshold: u32,
171 backoff: Duration,
172 states: Mutex<BTreeMap<String, DestinationCircuitState>>,
173}
174
175#[derive(Clone, Debug)]
176struct DestinationCircuitState {
177 failures: u32,
178 opened_at: Option<Instant>,
179}
180
181impl Default for DestinationCircuitRegistry {
182 fn default() -> Self {
183 Self {
184 threshold: DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
185 backoff: DESTINATION_CIRCUIT_BACKOFF,
186 states: Mutex::new(BTreeMap::new()),
187 }
188 }
189}
190
191impl DestinationCircuitRegistry {
192 fn check(&self, destination: &str) -> DestinationCircuitProbe {
193 let mut states = self
194 .states
195 .lock()
196 .expect("destination circuit registry poisoned");
197 let Some(state) = states.get_mut(destination) else {
198 return DestinationCircuitProbe::Allow { half_open: false };
199 };
200 let Some(opened_at) = state.opened_at else {
201 return DestinationCircuitProbe::Allow { half_open: false };
202 };
203 let elapsed = opened_at.elapsed();
204 if elapsed >= self.backoff {
205 DestinationCircuitProbe::Allow { half_open: true }
206 } else {
207 DestinationCircuitProbe::Block {
208 retry_after: self.backoff.saturating_sub(elapsed),
209 }
210 }
211 }
212
213 fn record_success(&self, destination: &str) {
214 let mut states = self
215 .states
216 .lock()
217 .expect("destination circuit registry poisoned");
218 states.remove(destination);
219 }
220
221 fn record_failure(&self, destination: &str) -> bool {
222 let mut states = self
223 .states
224 .lock()
225 .expect("destination circuit registry poisoned");
226 let state = states
227 .entry(destination.to_string())
228 .or_insert(DestinationCircuitState {
229 failures: 0,
230 opened_at: None,
231 });
232 if state.opened_at.is_some() {
233 state.failures = self.threshold;
234 state.opened_at = Some(Instant::now());
235 return true;
236 }
237 state.failures = state.failures.saturating_add(1);
238 if state.failures >= self.threshold {
239 state.opened_at = Some(Instant::now());
240 true
241 } else {
242 false
243 }
244 }
245}
246
247#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
248#[serde(default)]
249pub struct DispatcherStatsSnapshot {
250 pub in_flight: u64,
251 pub retry_queue_depth: u64,
252 pub dlq_depth: u64,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256#[serde(rename_all = "snake_case")]
257pub enum DispatchStatus {
258 Succeeded,
259 Failed,
260 Dlq,
261 Skipped,
262 Waiting,
263 Cancelled,
264}
265
266impl DispatchStatus {
267 pub fn as_str(&self) -> &'static str {
268 match self {
269 Self::Succeeded => "succeeded",
270 Self::Failed => "failed",
271 Self::Dlq => "dlq",
272 Self::Skipped => "skipped",
273 Self::Waiting => "waiting",
274 Self::Cancelled => "cancelled",
275 }
276 }
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280#[serde(default)]
281pub struct DispatchOutcome {
282 pub trigger_id: String,
283 pub binding_key: String,
284 pub event_id: String,
285 pub attempt_count: u32,
286 pub status: DispatchStatus,
287 pub handler_kind: String,
288 pub target_uri: String,
289 pub replay_of_event_id: Option<String>,
290 pub result: Option<serde_json::Value>,
291 pub error: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct InboxEnvelope {
296 pub trigger_id: Option<String>,
297 pub binding_version: Option<u32>,
298 pub event: TriggerEvent,
299}
300
301#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
302#[serde(default)]
303pub struct DispatcherDrainReport {
304 pub drained: bool,
305 pub in_flight: u64,
306 pub retry_queue_depth: u64,
307 pub dlq_depth: u64,
308}
309
310impl Default for DispatchOutcome {
311 fn default() -> Self {
312 Self {
313 trigger_id: String::new(),
314 binding_key: String::new(),
315 event_id: String::new(),
316 attempt_count: 0,
317 status: DispatchStatus::Failed,
318 handler_kind: String::new(),
319 target_uri: String::new(),
320 replay_of_event_id: None,
321 result: None,
322 error: None,
323 }
324 }
325}
326
327#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
328#[serde(default)]
329pub struct DispatchAttemptRecord {
330 pub trigger_id: String,
331 pub binding_key: String,
332 pub event_id: String,
333 pub attempt: u32,
334 pub handler_kind: String,
335 pub started_at: String,
336 pub completed_at: String,
337 pub outcome: String,
338 pub error_msg: Option<String>,
339}
340
341#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
342pub struct DispatchCancelRequest {
343 pub binding_key: String,
344 pub event_id: String,
345 #[serde(with = "time::serde::rfc3339")]
346 pub requested_at: time::OffsetDateTime,
347 #[serde(default)]
348 pub requested_by: Option<String>,
349 #[serde(default)]
350 pub audit_id: Option<String>,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct DlqEntry {
355 pub trigger_id: String,
356 pub binding_key: String,
357 pub event: TriggerEvent,
358 pub attempt_count: u32,
359 pub final_error: String,
360 #[serde(default = "default_dlq_error_class")]
361 pub error_class: String,
362 pub attempts: Vec<DispatchAttemptRecord>,
363}
364
365fn default_dlq_error_class() -> String {
366 "unknown".to_string()
367}
368
369#[derive(Clone, Debug)]
370struct SingletonLease {
371 gate: String,
372 held: bool,
373}
374
375#[derive(Clone, Debug)]
376struct ConcurrencyLease {
377 gate: String,
378 max: u32,
379 priority_rank: usize,
380 permit: Option<ConcurrencyPermit>,
381}
382
383#[derive(Default, Debug)]
384struct AcquiredFlowControl {
385 singleton: Option<SingletonLease>,
386 concurrency: Option<ConcurrencyLease>,
387}
388
389#[derive(Clone)]
390pub(crate) struct DispatchWaitLease {
391 state: Arc<DispatcherRuntimeState>,
392 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
393 suspended: Arc<AtomicBool>,
394}
395
396impl DispatchWaitLease {
397 fn new(
398 state: Arc<DispatcherRuntimeState>,
399 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
400 ) -> Self {
401 Self {
402 state,
403 acquired,
404 suspended: Arc::new(AtomicBool::new(false)),
405 }
406 }
407
408 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
409 if self.suspended.swap(true, Ordering::SeqCst) {
410 return Ok(());
411 }
412 let (singleton_gate, concurrency_permit) = {
413 let mut acquired = self.acquired.lock().await;
414 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
415 if lease.held {
416 lease.held = false;
417 Some(lease.gate.clone())
418 } else {
419 None
420 }
421 });
422 let concurrency_permit = acquired
423 .concurrency
424 .as_mut()
425 .and_then(|lease| lease.permit.take());
426 (singleton_gate, concurrency_permit)
427 };
428
429 if let Some(gate) = singleton_gate {
430 self.state
431 .flow_control
432 .release_singleton(&gate)
433 .await
434 .map_err(DispatchError::from)?;
435 }
436 if let Some(permit) = concurrency_permit {
437 self.state
438 .flow_control
439 .release_concurrency(permit)
440 .await
441 .map_err(DispatchError::from)?;
442 }
443 Ok(())
444 }
445
446 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
447 if !self.suspended.swap(false, Ordering::SeqCst) {
448 return Ok(());
449 }
450
451 let singleton_gate = {
452 let acquired = self.acquired.lock().await;
453 acquired.singleton.as_ref().and_then(|lease| {
454 if lease.held {
455 None
456 } else {
457 Some(lease.gate.clone())
458 }
459 })
460 };
461 if let Some(gate) = singleton_gate {
462 self.state
463 .flow_control
464 .acquire_singleton(&gate)
465 .await
466 .map_err(DispatchError::from)?;
467 let mut acquired = self.acquired.lock().await;
468 if let Some(lease) = acquired.singleton.as_mut() {
469 lease.held = true;
470 }
471 }
472
473 let concurrency_spec = {
474 let acquired = self.acquired.lock().await;
475 acquired.concurrency.as_ref().and_then(|lease| {
476 if lease.permit.is_some() {
477 None
478 } else {
479 Some((lease.gate.clone(), lease.max, lease.priority_rank))
480 }
481 })
482 };
483 if let Some((gate, max, priority_rank)) = concurrency_spec {
484 let permit = self
485 .state
486 .flow_control
487 .acquire_concurrency(&gate, max, priority_rank)
488 .await
489 .map_err(DispatchError::from)?;
490 let mut acquired = self.acquired.lock().await;
491 if let Some(lease) = acquired.concurrency.as_mut() {
492 lease.permit = Some(permit);
493 }
494 }
495 Ok(())
496 }
497}
498
499enum FlowControlOutcome {
500 Dispatch {
501 event: Box<TriggerEvent>,
502 acquired: AcquiredFlowControl,
503 },
504 Skip {
505 reason: String,
506 },
507}
508
509#[derive(Clone, Debug)]
510enum DispatchSkipStage {
511 Predicate,
512 FlowControl,
513}
514
515#[derive(Debug)]
516pub enum DispatchError {
517 EventLog(String),
518 Registry(String),
519 Serde(String),
520 Local(String),
521 A2a(String),
522 Denied(String),
523 Timeout(String),
524 Waiting(String),
525 Cancelled(String),
526 NotImplemented(String),
527}
528
529impl std::fmt::Display for DispatchError {
530 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
531 match self {
532 Self::EventLog(message)
533 | Self::Registry(message)
534 | Self::Serde(message)
535 | Self::Local(message)
536 | Self::A2a(message)
537 | Self::Denied(message)
538 | Self::Timeout(message)
539 | Self::Waiting(message)
540 | Self::Cancelled(message)
541 | Self::NotImplemented(message) => f.write_str(message),
542 }
543 }
544}
545
546impl std::error::Error for DispatchError {}
547
548impl DispatchError {
549 fn retryable(&self) -> bool {
550 !matches!(
551 self,
552 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
553 )
554 }
555}
556
557impl DispatchSkipStage {
558 fn as_str(&self) -> &'static str {
559 match self {
560 Self::Predicate => "predicate",
561 Self::FlowControl => "flow_control",
562 }
563 }
564}
565
566impl From<LogError> for DispatchError {
567 fn from(value: LogError) -> Self {
568 Self::EventLog(value.to_string())
569 }
570}
571
572pub async fn append_dispatch_cancel_request(
573 event_log: &Arc<AnyEventLog>,
574 request: &DispatchCancelRequest,
575) -> Result<u64, DispatchError> {
576 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
577 .expect("static trigger cancel topic should always be valid");
578 event_log
579 .append(
580 &topic,
581 LogEvent::new(
582 "dispatch_cancel_requested",
583 serde_json::to_value(request)
584 .map_err(|error| DispatchError::Serde(error.to_string()))?,
585 ),
586 )
587 .await
588 .map_err(DispatchError::from)
589}
590
591impl Dispatcher {
592 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
593 self.event_log.clone()
594 }
595
596 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
597 let event_log = active_event_log().ok_or_else(|| {
598 DispatchError::EventLog("dispatcher requires an active event log".to_string())
599 })?;
600 Ok(Self::with_event_log(base_vm, event_log))
601 }
602
603 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
604 Self::with_event_log_and_metrics(base_vm, event_log, None)
605 }
606
607 pub fn with_event_log_and_metrics(
608 base_vm: Vm,
609 event_log: Arc<AnyEventLog>,
610 metrics: Option<Arc<crate::MetricsRegistry>>,
611 ) -> Self {
612 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
613 ACTIVE_DISPATCHER_STATE.with(|slot| {
614 *slot.borrow_mut() = Some(state.clone());
615 });
616 let (cancel_tx, _) = broadcast::channel(32);
617 Self {
618 base_vm: Rc::new(base_vm),
619 event_log,
620 cancel_tx,
621 state,
622 metrics,
623 }
624 }
625
626 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
627 DispatcherStatsSnapshot {
628 in_flight: self.state.in_flight.load(Ordering::Relaxed),
629 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
630 dlq_depth: self
631 .state
632 .dlq
633 .lock()
634 .expect("dispatcher dlq poisoned")
635 .len() as u64,
636 }
637 }
638
639 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
640 self.state
641 .dlq
642 .lock()
643 .expect("dispatcher dlq poisoned")
644 .clone()
645 }
646
647 pub fn shutdown(&self) {
648 self.state.shutting_down.store(true, Ordering::SeqCst);
649 for token in self
650 .state
651 .cancel_tokens
652 .lock()
653 .expect("dispatcher cancel tokens poisoned")
654 .iter()
655 {
656 token.store(true, Ordering::SeqCst);
657 }
658 let _ = self.cancel_tx.send(());
659 }
660
661 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
662 self.enqueue_targeted(None, None, event).await
663 }
664
665 pub async fn enqueue_targeted(
666 &self,
667 trigger_id: Option<String>,
668 binding_version: Option<u32>,
669 event: TriggerEvent,
670 ) -> Result<u64, DispatchError> {
671 self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
672 .await
673 }
674
675 pub async fn enqueue_targeted_with_headers(
676 &self,
677 trigger_id: Option<String>,
678 binding_version: Option<u32>,
679 event: TriggerEvent,
680 parent_headers: Option<&BTreeMap<String, String>>,
681 ) -> Result<u64, DispatchError> {
682 let topic = topic_for_event(&event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
683 let trigger_id_for_metrics = trigger_id.clone();
684 let mut headers = parent_headers.cloned().unwrap_or_default();
685 headers.extend(event_headers(&event, None, None, None));
686 if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
687 headers.insert("trigger_id".to_string(), trigger_id.clone());
688 headers.insert(
689 "binding_key".to_string(),
690 binding_key_from_parts(trigger_id, binding_version),
691 );
692 }
693 headers
694 .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
695 .or_insert_with(|| unix_ms(event.received_at).to_string());
696 let payload = serde_json::to_value(InboxEnvelope {
697 trigger_id,
698 binding_version,
699 event: event.clone(),
700 })
701 .map_err(|error| DispatchError::Serde(error.to_string()))?;
702 let mut log_event = LogEvent::new("event_ingested", payload);
703 let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
704 let queue_appended_at_ms = headers
705 .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
706 .and_then(|value| value.parse::<i64>().ok())
707 .unwrap_or(log_event.occurred_at_ms);
708 headers
709 .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
710 .or_insert_with(|| log_event.occurred_at_ms.to_string());
711 if let (Some(metrics), Some(trigger_id)) =
712 (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
713 {
714 let binding_key = binding_key_from_parts(trigger_id, binding_version);
715 let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
716 if !had_queue_appended_at {
717 metrics.record_trigger_accepted_to_queue_append(
718 trigger_id,
719 &binding_key,
720 event.provider.as_str(),
721 tenant_id(&event),
722 "queued",
723 duration_between_ms(queue_appended_at_ms, accepted_at_ms),
724 );
725 }
726 metrics.note_trigger_pending_event(
727 event.id.0.as_str(),
728 trigger_id,
729 &binding_key,
730 event.provider.as_str(),
731 tenant_id(&event),
732 accepted_at_ms,
733 queue_appended_at_ms,
734 );
735 }
736 log_event.headers = headers;
737 self.event_log
738 .append(&topic, log_event)
739 .await
740 .map_err(DispatchError::from)
741 }
742
743 pub async fn run(&self) -> Result<(), DispatchError> {
744 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
745 .expect("static trigger inbox envelopes topic is valid");
746 let start_from = self.event_log.latest(&topic).await?;
747 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
748 pin_mut!(stream);
749 let mut cancel_rx = self.cancel_tx.subscribe();
750
751 loop {
752 tokio::select! {
753 received = stream.next() => {
754 let Some(received) = received else {
755 break;
756 };
757 let (_, event) = received.map_err(DispatchError::from)?;
758 if event.kind != "event_ingested" {
759 continue;
760 }
761 let parent_headers = event.headers.clone();
762 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
763 .map_err(|error| DispatchError::Serde(error.to_string()))?;
764 notify_test_inbox_dequeued();
765 let _ = self
766 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
767 .await;
768 }
769 _ = recv_cancel(&mut cancel_rx) => break,
770 }
771 }
772
773 Ok(())
774 }
775
776 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
777 let deadline = tokio::time::Instant::now() + timeout;
778 loop {
779 let snapshot = self.snapshot();
780 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
781 return Ok(DispatcherDrainReport {
782 drained: true,
783 in_flight: snapshot.in_flight,
784 retry_queue_depth: snapshot.retry_queue_depth,
785 dlq_depth: snapshot.dlq_depth,
786 });
787 }
788
789 let notified = self.state.idle_notify.notified();
790 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
791 if remaining.is_zero() {
792 return Ok(DispatcherDrainReport {
793 drained: false,
794 in_flight: snapshot.in_flight,
795 retry_queue_depth: snapshot.retry_queue_depth,
796 dlq_depth: snapshot.dlq_depth,
797 });
798 }
799 if tokio::time::timeout(remaining, notified).await.is_err() {
800 let snapshot = self.snapshot();
801 return Ok(DispatcherDrainReport {
802 drained: false,
803 in_flight: snapshot.in_flight,
804 retry_queue_depth: snapshot.retry_queue_depth,
805 dlq_depth: snapshot.dlq_depth,
806 });
807 }
808 }
809 }
810
811 pub async fn dispatch_inbox_envelope(
812 &self,
813 envelope: InboxEnvelope,
814 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
815 self.dispatch_inbox_envelope_with_headers(envelope, None)
816 .await
817 }
818
819 pub async fn dispatch_inbox_envelope_with_parent_headers(
820 &self,
821 envelope: InboxEnvelope,
822 parent_headers: &BTreeMap<String, String>,
823 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
824 self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
825 .await
826 }
827
828 async fn dispatch_inbox_envelope_with_headers(
829 &self,
830 envelope: InboxEnvelope,
831 parent_headers: Option<&BTreeMap<String, String>>,
832 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
833 if let Some(trigger_id) = envelope.trigger_id {
834 let binding = super::registry::resolve_live_trigger_binding(
835 &trigger_id,
836 envelope.binding_version,
837 )
838 .map_err(|error| DispatchError::Registry(error.to_string()))?;
839 return Ok(vec![
840 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
841 .await?,
842 ]);
843 }
844
845 let cron_target = match &envelope.event.provider_payload {
846 crate::triggers::ProviderPayload::Known(
847 crate::triggers::event::KnownProviderPayload::Cron(payload),
848 ) => payload.cron_id.clone(),
849 _ => None,
850 };
851 if let Some(trigger_id) = cron_target {
852 let binding = super::registry::resolve_live_trigger_binding(
853 &trigger_id,
854 envelope.binding_version,
855 )
856 .map_err(|error| DispatchError::Registry(error.to_string()))?;
857 return Ok(vec![
858 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
859 .await?,
860 ]);
861 }
862
863 self.dispatch_event_with_headers(envelope.event, parent_headers)
864 .await
865 }
866
867 pub async fn dispatch_event(
868 &self,
869 event: TriggerEvent,
870 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
871 self.dispatch_event_with_headers(event, None).await
872 }
873
874 async fn dispatch_event_with_headers(
875 &self,
876 event: TriggerEvent,
877 parent_headers: Option<&BTreeMap<String, String>>,
878 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
879 let bindings = matching_bindings(&event);
880 let mut outcomes = Vec::new();
881 for binding in bindings {
882 outcomes.push(
883 self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
884 .await?,
885 );
886 }
887 Ok(outcomes)
888 }
889
890 pub async fn dispatch(
891 &self,
892 binding: &TriggerBinding,
893 event: TriggerEvent,
894 ) -> Result<DispatchOutcome, DispatchError> {
895 self.dispatch_with_replay(binding, event, None, None, None)
896 .await
897 }
898
899 pub async fn dispatch_replay(
900 &self,
901 binding: &TriggerBinding,
902 event: TriggerEvent,
903 replay_of_event_id: String,
904 ) -> Result<DispatchOutcome, DispatchError> {
905 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
906 .await
907 }
908
909 pub async fn dispatch_with_parent_span_id(
910 &self,
911 binding: &TriggerBinding,
912 event: TriggerEvent,
913 parent_span_id: Option<String>,
914 ) -> Result<DispatchOutcome, DispatchError> {
915 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
916 .await
917 }
918
919 async fn dispatch_with_replay(
920 &self,
921 binding: &TriggerBinding,
922 event: TriggerEvent,
923 replay_of_event_id: Option<String>,
924 parent_span_id: Option<String>,
925 parent_headers: Option<&BTreeMap<String, String>>,
926 ) -> Result<DispatchOutcome, DispatchError> {
927 let parent_headers_for_metrics = parent_headers.cloned();
928 let admitted_at_ms = current_unix_ms();
929 if let Some(metrics) = self.metrics.as_ref() {
930 let binding_key = binding.binding_key();
931 let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
932 metrics.record_trigger_queue_age_at_dispatch_admission(
933 binding.id.as_str(),
934 &binding_key,
935 event.provider.as_str(),
936 tenant_id(&event),
937 "admitted",
938 duration_between_ms(admitted_at_ms, queue_appended_at_ms),
939 );
940 metrics.clear_trigger_pending_event(
941 event.id.0.as_str(),
942 binding.id.as_str(),
943 &binding_key,
944 event.provider.as_str(),
945 tenant_id(&event),
946 admitted_at_ms,
947 );
948 }
949 let span = tracing::info_span!(
950 "dispatch",
951 trigger_id = %binding.id.as_str(),
952 binding_version = binding.version,
953 trace_id = %event.trace_id.0
954 );
955 #[cfg(feature = "otel")]
956 let span_for_otel = span.clone();
957 let _ = if let Some(headers) = parent_headers {
958 crate::observability::otel::set_span_parent_from_headers(
959 &span,
960 headers,
961 &event.trace_id,
962 parent_span_id.as_deref(),
963 )
964 } else {
965 crate::observability::otel::set_span_parent(
966 &span,
967 &event.trace_id,
968 parent_span_id.as_deref(),
969 )
970 };
971 #[cfg(feature = "otel")]
972 let started_at = Instant::now();
973 let metrics = self.metrics.clone();
974 let outcome = ACTIVE_DISPATCH_IS_REPLAY
975 .scope(
976 replay_of_event_id.is_some(),
977 self.dispatch_with_replay_inner(
978 binding,
979 event,
980 replay_of_event_id,
981 parent_headers_for_metrics,
982 )
983 .instrument(span),
984 )
985 .await;
986 if let Some(metrics) = metrics.as_ref() {
987 match &outcome {
988 Ok(dispatch_outcome) => match dispatch_outcome.status {
989 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
990 metrics.record_dispatch_succeeded();
991 }
992 DispatchStatus::Waiting => {}
993 _ => metrics.record_dispatch_failed(),
994 },
995 Err(_) => metrics.record_dispatch_failed(),
996 }
997 let outcome_label = match &outcome {
998 Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
999 Err(DispatchError::Cancelled(_)) => "cancelled",
1000 Err(_) => "failed",
1001 };
1002 metrics.record_trigger_dispatched(
1003 binding.id.as_str(),
1004 binding.handler.kind(),
1005 outcome_label,
1006 );
1007 metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
1008 }
1009 #[cfg(feature = "otel")]
1010 {
1011 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1012
1013 let duration_ms = started_at.elapsed().as_millis() as i64;
1014 let status = match &outcome {
1015 Ok(dispatch_outcome) => match dispatch_outcome.status {
1016 DispatchStatus::Succeeded => "succeeded",
1017 DispatchStatus::Skipped => "skipped",
1018 DispatchStatus::Waiting => "waiting",
1019 DispatchStatus::Cancelled => "cancelled",
1020 DispatchStatus::Failed => "failed",
1021 DispatchStatus::Dlq => "dlq",
1022 },
1023 Err(DispatchError::Cancelled(_)) => "cancelled",
1024 Err(_) => "failed",
1025 };
1026 span_for_otel.set_attribute("result.status", status);
1027 span_for_otel.set_attribute("result.duration_ms", duration_ms);
1028 }
1029 outcome
1030 }
1031
1032 async fn dispatch_with_replay_inner(
1033 &self,
1034 binding: &TriggerBinding,
1035 event: TriggerEvent,
1036 replay_of_event_id: Option<String>,
1037 parent_headers: Option<BTreeMap<String, String>>,
1038 ) -> Result<DispatchOutcome, DispatchError> {
1039 let autonomy_tier = crate::resolve_agent_autonomy_tier(
1040 &self.event_log,
1041 binding.id.as_str(),
1042 binding.autonomy_tier,
1043 )
1044 .await
1045 .unwrap_or(binding.autonomy_tier);
1046 let binding_key = binding.binding_key();
1047 let route = DispatchUri::from(&binding.handler);
1048 let trigger_id = binding.id.as_str().to_string();
1049 let event_id = event.id.0.clone();
1050 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
1051 let begin = if replay_of_event_id.is_some() {
1052 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
1053 } else {
1054 begin_in_flight(binding.id.as_str(), binding.version)
1055 };
1056 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
1057
1058 let mut attempts = Vec::new();
1059 let mut source_node_id = format!("trigger:{}", event.id.0);
1060 let mut initial_nodes = Vec::new();
1061 let mut initial_edges = Vec::new();
1062 if let Some(original_event_id) = replay_of_event_id.as_ref() {
1063 let original_node_id = format!("trigger:{original_event_id}");
1064 initial_nodes.push(RunActionGraphNodeRecord {
1065 id: original_node_id.clone(),
1066 label: format!(
1067 "{}:{} (original {})",
1068 event.provider.as_str(),
1069 event.kind,
1070 original_event_id
1071 ),
1072 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1073 status: "historical".to_string(),
1074 outcome: "replayed_from".to_string(),
1075 trace_id: Some(event.trace_id.0.clone()),
1076 stage_id: None,
1077 node_id: None,
1078 worker_id: None,
1079 run_id: None,
1080 run_path: None,
1081 metadata: trigger_node_metadata(&event),
1082 });
1083 initial_edges.push(RunActionGraphEdgeRecord {
1084 from_id: original_node_id,
1085 to_id: source_node_id.clone(),
1086 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1087 label: Some("replay chain".to_string()),
1088 });
1089 }
1090 initial_nodes.push(RunActionGraphNodeRecord {
1091 id: source_node_id.clone(),
1092 label: format!("{}:{}", event.provider.as_str(), event.kind),
1093 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1094 status: "received".to_string(),
1095 outcome: "received".to_string(),
1096 trace_id: Some(event.trace_id.0.clone()),
1097 stage_id: None,
1098 node_id: None,
1099 worker_id: None,
1100 run_id: None,
1101 run_path: None,
1102 metadata: trigger_node_metadata(&event),
1103 });
1104 self.emit_action_graph(
1105 &event,
1106 initial_nodes,
1107 initial_edges,
1108 serde_json::json!({
1109 "source": "dispatcher",
1110 "trigger_id": trigger_id,
1111 "binding_key": binding_key,
1112 "event_id": event_id,
1113 "replay_of_event_id": replay_of_event_id,
1114 }),
1115 )
1116 .await?;
1117
1118 if dispatch_cancel_requested(
1119 &self.event_log,
1120 &binding_key,
1121 &event.id.0,
1122 replay_of_event_id.as_ref(),
1123 )
1124 .await?
1125 {
1126 finish_in_flight(
1127 binding.id.as_str(),
1128 binding.version,
1129 TriggerDispatchOutcome::Failed,
1130 )
1131 .await
1132 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1133 decrement_in_flight(&self.state);
1134 return Ok(cancelled_dispatch_outcome(
1135 binding,
1136 &route,
1137 &event,
1138 replay_of_event_id,
1139 0,
1140 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1141 ));
1142 }
1143
1144 if let Some(predicate) = binding.when.as_ref() {
1145 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1146 let evaluation = self
1147 .evaluate_predicate(
1148 binding,
1149 predicate,
1150 &event,
1151 replay_of_event_id.as_ref(),
1152 autonomy_tier,
1153 )
1154 .await?;
1155 let passed = evaluation.result;
1156 self.emit_action_graph(
1157 &event,
1158 vec![RunActionGraphNodeRecord {
1159 id: predicate_node_id.clone(),
1160 label: predicate.raw.clone(),
1161 kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1162 status: "completed".to_string(),
1163 outcome: passed.to_string(),
1164 trace_id: Some(event.trace_id.0.clone()),
1165 stage_id: None,
1166 node_id: None,
1167 worker_id: None,
1168 run_id: None,
1169 run_path: None,
1170 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1171 }],
1172 vec![RunActionGraphEdgeRecord {
1173 from_id: source_node_id.clone(),
1174 to_id: predicate_node_id.clone(),
1175 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1176 label: None,
1177 }],
1178 serde_json::json!({
1179 "source": "dispatcher",
1180 "trigger_id": binding.id.as_str(),
1181 "binding_key": binding.binding_key(),
1182 "event_id": event.id.0,
1183 "predicate": predicate.raw,
1184 "reason": evaluation.reason,
1185 "cached": evaluation.cached,
1186 "cost_usd": evaluation.cost_usd,
1187 "tokens": evaluation.tokens,
1188 "latency_ms": evaluation.latency_ms,
1189 "replay_of_event_id": replay_of_event_id,
1190 }),
1191 )
1192 .await?;
1193
1194 if !passed {
1195 if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1196 let final_error = format!(
1197 "trigger budget exhausted: {}",
1198 evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1199 );
1200 self.move_budget_exhausted_to_dlq(
1201 binding,
1202 &route,
1203 &event,
1204 replay_of_event_id.as_ref(),
1205 &final_error,
1206 )
1207 .await?;
1208 finish_in_flight(
1209 binding.id.as_str(),
1210 binding.version,
1211 TriggerDispatchOutcome::Dlq,
1212 )
1213 .await
1214 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1215 decrement_in_flight(&self.state);
1216 self.append_dispatch_trust_record(
1217 binding,
1218 &route,
1219 &event,
1220 replay_of_event_id.as_ref(),
1221 autonomy_tier,
1222 TrustOutcome::Failure,
1223 "dlq",
1224 0,
1225 Some(final_error.clone()),
1226 )
1227 .await?;
1228 return Ok(DispatchOutcome {
1229 trigger_id: binding.id.as_str().to_string(),
1230 binding_key: binding.binding_key(),
1231 event_id: event.id.0,
1232 attempt_count: 0,
1233 status: DispatchStatus::Dlq,
1234 handler_kind: route.kind().to_string(),
1235 target_uri: route.target_uri(),
1236 replay_of_event_id,
1237 result: None,
1238 error: Some(final_error),
1239 });
1240 }
1241
1242 if evaluation.exhaustion_strategy
1243 == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1244 {
1245 self.append_budget_deferred_event(
1246 binding,
1247 &route,
1248 &event,
1249 replay_of_event_id.as_ref(),
1250 evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1251 )
1252 .await?;
1253 finish_in_flight(
1254 binding.id.as_str(),
1255 binding.version,
1256 TriggerDispatchOutcome::Dispatched,
1257 )
1258 .await
1259 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1260 decrement_in_flight(&self.state);
1261 self.append_dispatch_trust_record(
1262 binding,
1263 &route,
1264 &event,
1265 replay_of_event_id.as_ref(),
1266 autonomy_tier,
1267 TrustOutcome::Denied,
1268 "waiting",
1269 0,
1270 evaluation.reason.clone(),
1271 )
1272 .await?;
1273 return Ok(DispatchOutcome {
1274 trigger_id: binding.id.as_str().to_string(),
1275 binding_key: binding.binding_key(),
1276 event_id: event.id.0,
1277 attempt_count: 0,
1278 status: DispatchStatus::Waiting,
1279 handler_kind: route.kind().to_string(),
1280 target_uri: route.target_uri(),
1281 replay_of_event_id,
1282 result: Some(serde_json::json!({
1283 "deferred": true,
1284 "predicate": predicate.raw,
1285 "reason": evaluation.reason,
1286 })),
1287 error: None,
1288 });
1289 }
1290
1291 self.append_skipped_outbox_event(
1292 binding,
1293 &route,
1294 &event,
1295 replay_of_event_id.as_ref(),
1296 DispatchSkipStage::Predicate,
1297 serde_json::json!({
1298 "predicate": predicate.raw,
1299 "reason": evaluation.reason,
1300 }),
1301 )
1302 .await?;
1303 finish_in_flight(
1304 binding.id.as_str(),
1305 binding.version,
1306 TriggerDispatchOutcome::Dispatched,
1307 )
1308 .await
1309 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1310 decrement_in_flight(&self.state);
1311 self.append_dispatch_trust_record(
1312 binding,
1313 &route,
1314 &event,
1315 replay_of_event_id.as_ref(),
1316 autonomy_tier,
1317 TrustOutcome::Denied,
1318 "skipped",
1319 0,
1320 None,
1321 )
1322 .await?;
1323 return Ok(DispatchOutcome {
1324 trigger_id: binding.id.as_str().to_string(),
1325 binding_key: binding.binding_key(),
1326 event_id: event.id.0,
1327 attempt_count: 0,
1328 status: DispatchStatus::Skipped,
1329 handler_kind: route.kind().to_string(),
1330 target_uri: route.target_uri(),
1331 replay_of_event_id,
1332 result: Some(serde_json::json!({
1333 "skipped": true,
1334 "predicate": predicate.raw,
1335 "reason": evaluation.reason,
1336 })),
1337 error: None,
1338 });
1339 }
1340
1341 source_node_id = predicate_node_id;
1342 }
1343
1344 if autonomy_tier == AutonomyTier::ActAuto {
1345 if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1346 let request_id = self
1347 .append_autonomy_budget_approval_request(
1348 binding,
1349 &route,
1350 &event,
1351 replay_of_event_id.as_ref(),
1352 reason,
1353 )
1354 .await?;
1355 self.emit_autonomy_budget_approval_action_graph(
1356 binding,
1357 &route,
1358 &event,
1359 &source_node_id,
1360 replay_of_event_id.as_ref(),
1361 reason,
1362 &request_id,
1363 )
1364 .await?;
1365 finish_in_flight(
1366 binding.id.as_str(),
1367 binding.version,
1368 TriggerDispatchOutcome::Dispatched,
1369 )
1370 .await
1371 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1372 decrement_in_flight(&self.state);
1373 self.append_tier_transition_trust_record(
1374 binding,
1375 &event,
1376 replay_of_event_id.as_ref(),
1377 autonomy_tier,
1378 AutonomyTier::ActWithApproval,
1379 reason,
1380 &request_id,
1381 )
1382 .await?;
1383 self.append_dispatch_trust_record(
1384 binding,
1385 &route,
1386 &event,
1387 replay_of_event_id.as_ref(),
1388 autonomy_tier,
1389 TrustOutcome::Denied,
1390 "waiting",
1391 0,
1392 Some(reason.to_string()),
1393 )
1394 .await?;
1395 return Ok(DispatchOutcome {
1396 trigger_id: binding.id.as_str().to_string(),
1397 binding_key: binding.binding_key(),
1398 event_id: event.id.0,
1399 attempt_count: 0,
1400 status: DispatchStatus::Waiting,
1401 handler_kind: route.kind().to_string(),
1402 target_uri: route.target_uri(),
1403 replay_of_event_id,
1404 result: Some(serde_json::json!({
1405 "approval_required": true,
1406 "request_id": request_id,
1407 "reason": reason,
1408 "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1409 })),
1410 error: None,
1411 });
1412 }
1413 note_autonomous_decision(binding);
1414 }
1415
1416 let (event, acquired_flow) = match self
1417 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1418 .await?
1419 {
1420 FlowControlOutcome::Dispatch { event, acquired } => {
1421 (*event, Arc::new(AsyncMutex::new(acquired)))
1422 }
1423 FlowControlOutcome::Skip { reason } => {
1424 self.append_skipped_outbox_event(
1425 binding,
1426 &route,
1427 &event,
1428 replay_of_event_id.as_ref(),
1429 DispatchSkipStage::FlowControl,
1430 serde_json::json!({
1431 "flow_control": reason,
1432 }),
1433 )
1434 .await?;
1435 finish_in_flight(
1436 binding.id.as_str(),
1437 binding.version,
1438 TriggerDispatchOutcome::Dispatched,
1439 )
1440 .await
1441 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1442 decrement_in_flight(&self.state);
1443 return Ok(DispatchOutcome {
1444 trigger_id: binding.id.as_str().to_string(),
1445 binding_key: binding.binding_key(),
1446 event_id: event.id.0,
1447 attempt_count: 0,
1448 status: DispatchStatus::Skipped,
1449 handler_kind: route.kind().to_string(),
1450 target_uri: route.target_uri(),
1451 replay_of_event_id,
1452 result: Some(serde_json::json!({
1453 "skipped": true,
1454 "flow_control": reason,
1455 })),
1456 error: None,
1457 });
1458 }
1459 };
1460
1461 let destination_key = destination_circuit_key(&route);
1462 let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1463 DestinationCircuitProbe::Allow { half_open } => {
1464 if half_open {
1465 if let Some(metrics) = self.metrics.as_ref() {
1466 metrics.record_backpressure_event("circuit", "half_open_probe");
1467 }
1468 }
1469 half_open
1470 }
1471 DestinationCircuitProbe::Block { retry_after } => {
1472 if let Some(metrics) = self.metrics.as_ref() {
1473 metrics.record_backpressure_event("circuit", "fail_fast");
1474 }
1475 let final_error = format!(
1476 "destination circuit open for {}; retry after {}s",
1477 destination_key,
1478 retry_after.as_secs().max(1)
1479 );
1480 self.move_circuit_open_to_dlq(
1481 binding,
1482 &route,
1483 &event,
1484 replay_of_event_id.as_ref(),
1485 &final_error,
1486 &destination_key,
1487 )
1488 .await?;
1489 finish_in_flight(
1490 binding.id.as_str(),
1491 binding.version,
1492 TriggerDispatchOutcome::Dlq,
1493 )
1494 .await
1495 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1496 self.release_flow_control(&acquired_flow).await?;
1497 decrement_in_flight(&self.state);
1498 self.append_dispatch_trust_record(
1499 binding,
1500 &route,
1501 &event,
1502 replay_of_event_id.as_ref(),
1503 autonomy_tier,
1504 TrustOutcome::Failure,
1505 "dlq",
1506 0,
1507 Some(final_error.clone()),
1508 )
1509 .await?;
1510 return Ok(DispatchOutcome {
1511 trigger_id: binding.id.as_str().to_string(),
1512 binding_key: binding.binding_key(),
1513 event_id: event.id.0,
1514 attempt_count: 0,
1515 status: DispatchStatus::Dlq,
1516 handler_kind: route.kind().to_string(),
1517 target_uri: route.target_uri(),
1518 replay_of_event_id,
1519 result: None,
1520 error: Some(final_error),
1521 });
1522 }
1523 };
1524
1525 let mut previous_retry_node = None;
1526 let max_attempts = binding.retry.max_attempts();
1527 for attempt in 1..=max_attempts {
1528 if dispatch_cancel_requested(
1529 &self.event_log,
1530 &binding_key,
1531 &event.id.0,
1532 replay_of_event_id.as_ref(),
1533 )
1534 .await?
1535 {
1536 finish_in_flight(
1537 binding.id.as_str(),
1538 binding.version,
1539 TriggerDispatchOutcome::Failed,
1540 )
1541 .await
1542 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1543 decrement_in_flight(&self.state);
1544 return Ok(cancelled_dispatch_outcome(
1545 binding,
1546 &route,
1547 &event,
1548 replay_of_event_id,
1549 attempt.saturating_sub(1),
1550 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1551 ));
1552 }
1553 maybe_fail_before_outbox();
1554 let attempt_started_instant = Instant::now();
1555 let attempt_started_at_ms = current_unix_ms();
1556 let queue_age_at_start = duration_between_ms(
1557 attempt_started_at_ms,
1558 queue_appended_at_ms(parent_headers.as_ref(), &event),
1559 );
1560 if attempt == 1 {
1561 if let Some(metrics) = self.metrics.as_ref() {
1562 metrics.record_trigger_queue_age_at_dispatch_start(
1563 binding.id.as_str(),
1564 &binding_key,
1565 event.provider.as_str(),
1566 tenant_id(&event),
1567 "started",
1568 queue_age_at_start,
1569 );
1570 }
1571 }
1572 tracing::info!(
1573 component = "dispatcher",
1574 lifecycle = "dispatch_started",
1575 trigger_id = %binding.id.as_str(),
1576 binding_key = %binding_key,
1577 event_id = %event.id.0,
1578 attempt,
1579 queue_age_ms = queue_age_at_start.as_millis(),
1580 trace_id = %event.trace_id.0
1581 );
1582 let started_at = now_rfc3339();
1583 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1584 self.append_lifecycle_event(
1585 "DispatchStarted",
1586 &event,
1587 binding,
1588 serde_json::json!({
1589 "event_id": event.id.0,
1590 "attempt": attempt,
1591 "handler_kind": route.kind(),
1592 "target_uri": route.target_uri(),
1593 "replay_of_event_id": replay_of_event_id,
1594 }),
1595 replay_of_event_id.as_ref(),
1596 )
1597 .await?;
1598 self.append_topic_event(
1599 TRIGGER_OUTBOX_TOPIC,
1600 "dispatch_started",
1601 &event,
1602 Some(binding),
1603 Some(attempt),
1604 serde_json::json!({
1605 "event_id": event.id.0,
1606 "attempt": attempt,
1607 "trigger_id": binding.id.as_str(),
1608 "binding_key": binding.binding_key(),
1609 "handler_kind": route.kind(),
1610 "target_uri": route.target_uri(),
1611 "replay_of_event_id": replay_of_event_id,
1612 }),
1613 replay_of_event_id.as_ref(),
1614 )
1615 .await?;
1616
1617 let mut dispatch_edges = Vec::new();
1618 if attempt == 1 {
1619 dispatch_edges.push(RunActionGraphEdgeRecord {
1620 from_id: source_node_id.clone(),
1621 to_id: attempt_node_id.clone(),
1622 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1623 label: binding.when.as_ref().map(|_| "true".to_string()),
1624 });
1625 } else if let Some(retry_node_id) = previous_retry_node.take() {
1626 dispatch_edges.push(RunActionGraphEdgeRecord {
1627 from_id: retry_node_id,
1628 to_id: attempt_node_id.clone(),
1629 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1630 label: Some(format!("attempt {attempt}")),
1631 });
1632 }
1633
1634 self.emit_action_graph(
1635 &event,
1636 vec![RunActionGraphNodeRecord {
1637 id: attempt_node_id.clone(),
1638 label: dispatch_node_label(&route),
1639 kind: dispatch_node_kind(&route).to_string(),
1640 status: "running".to_string(),
1641 outcome: format!("attempt_{attempt}"),
1642 trace_id: Some(event.trace_id.0.clone()),
1643 stage_id: None,
1644 node_id: None,
1645 worker_id: None,
1646 run_id: None,
1647 run_path: None,
1648 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1649 }],
1650 dispatch_edges,
1651 serde_json::json!({
1652 "source": "dispatcher",
1653 "trigger_id": binding.id.as_str(),
1654 "binding_key": binding.binding_key(),
1655 "event_id": event.id.0,
1656 "attempt": attempt,
1657 "handler_kind": route.kind(),
1658 "target_uri": route.target_uri(),
1659 "target_agent": dispatch_target_agent(&route),
1660 "replay_of_event_id": replay_of_event_id,
1661 }),
1662 )
1663 .await?;
1664
1665 let result = self
1666 .dispatch_once(
1667 binding,
1668 &route,
1669 &event,
1670 autonomy_tier,
1671 Some(DispatchWaitLease::new(
1672 self.state.clone(),
1673 acquired_flow.clone(),
1674 )),
1675 &mut self.cancel_tx.subscribe(),
1676 )
1677 .await;
1678 let attempt_runtime = attempt_started_instant.elapsed();
1679 let attempt_status = dispatch_result_status(&result);
1680 if let Some(metrics) = self.metrics.as_ref() {
1681 metrics.record_trigger_dispatch_runtime(
1682 binding.id.as_str(),
1683 &binding_key,
1684 event.provider.as_str(),
1685 tenant_id(&event),
1686 attempt_status,
1687 attempt_runtime,
1688 );
1689 }
1690 tracing::info!(
1691 component = "dispatcher",
1692 lifecycle = "handler_completed",
1693 trigger_id = %binding.id.as_str(),
1694 binding_key = %binding_key,
1695 event_id = %event.id.0,
1696 attempt,
1697 status = attempt_status,
1698 runtime_ms = attempt_runtime.as_millis(),
1699 trace_id = %event.trace_id.0
1700 );
1701 let completed_at = now_rfc3339();
1702
1703 match result {
1704 Ok(result) => {
1705 let attempt_record = DispatchAttemptRecord {
1706 trigger_id: binding.id.as_str().to_string(),
1707 binding_key: binding.binding_key(),
1708 event_id: event.id.0.clone(),
1709 attempt,
1710 handler_kind: route.kind().to_string(),
1711 started_at,
1712 completed_at,
1713 outcome: "success".to_string(),
1714 error_msg: None,
1715 };
1716 attempts.push(attempt_record.clone());
1717 self.append_attempt_record(
1718 &event,
1719 binding,
1720 &attempt_record,
1721 replay_of_event_id.as_ref(),
1722 )
1723 .await?;
1724 self.append_lifecycle_event(
1725 "DispatchSucceeded",
1726 &event,
1727 binding,
1728 serde_json::json!({
1729 "event_id": event.id.0,
1730 "attempt": attempt,
1731 "handler_kind": route.kind(),
1732 "target_uri": route.target_uri(),
1733 "result": result,
1734 "replay_of_event_id": replay_of_event_id,
1735 }),
1736 replay_of_event_id.as_ref(),
1737 )
1738 .await?;
1739 self.append_topic_event(
1740 TRIGGER_OUTBOX_TOPIC,
1741 "dispatch_succeeded",
1742 &event,
1743 Some(binding),
1744 Some(attempt),
1745 serde_json::json!({
1746 "event_id": event.id.0,
1747 "attempt": attempt,
1748 "trigger_id": binding.id.as_str(),
1749 "binding_key": binding.binding_key(),
1750 "handler_kind": route.kind(),
1751 "target_uri": route.target_uri(),
1752 "result": result,
1753 "replay_of_event_id": replay_of_event_id,
1754 }),
1755 replay_of_event_id.as_ref(),
1756 )
1757 .await?;
1758 self.emit_action_graph(
1759 &event,
1760 vec![RunActionGraphNodeRecord {
1761 id: attempt_node_id.clone(),
1762 label: dispatch_node_label(&route),
1763 kind: dispatch_node_kind(&route).to_string(),
1764 status: "completed".to_string(),
1765 outcome: dispatch_success_outcome(&route, &result).to_string(),
1766 trace_id: Some(event.trace_id.0.clone()),
1767 stage_id: None,
1768 node_id: None,
1769 worker_id: None,
1770 run_id: None,
1771 run_path: None,
1772 metadata: dispatch_success_metadata(
1773 &route, binding, &event, attempt, &result,
1774 ),
1775 }],
1776 Vec::new(),
1777 serde_json::json!({
1778 "source": "dispatcher",
1779 "trigger_id": binding.id.as_str(),
1780 "binding_key": binding.binding_key(),
1781 "event_id": event.id.0,
1782 "attempt": attempt,
1783 "handler_kind": route.kind(),
1784 "target_uri": route.target_uri(),
1785 "result": result,
1786 "replay_of_event_id": replay_of_event_id,
1787 }),
1788 )
1789 .await?;
1790 self.state
1791 .destination_circuits
1792 .record_success(&destination_key);
1793 if half_open_probe {
1794 if let Some(metrics) = self.metrics.as_ref() {
1795 metrics.record_backpressure_event("circuit", "closed");
1796 }
1797 }
1798 finish_in_flight(
1799 binding.id.as_str(),
1800 binding.version,
1801 TriggerDispatchOutcome::Dispatched,
1802 )
1803 .await
1804 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1805 self.release_flow_control(&acquired_flow).await?;
1806 decrement_in_flight(&self.state);
1807 self.append_dispatch_trust_record(
1808 binding,
1809 &route,
1810 &event,
1811 replay_of_event_id.as_ref(),
1812 autonomy_tier,
1813 TrustOutcome::Success,
1814 "succeeded",
1815 attempt,
1816 None,
1817 )
1818 .await?;
1819 return Ok(DispatchOutcome {
1820 trigger_id: binding.id.as_str().to_string(),
1821 binding_key: binding.binding_key(),
1822 event_id: event.id.0,
1823 attempt_count: attempt,
1824 status: DispatchStatus::Succeeded,
1825 handler_kind: route.kind().to_string(),
1826 target_uri: route.target_uri(),
1827 replay_of_event_id,
1828 result: Some(result),
1829 error: None,
1830 });
1831 }
1832 Err(error) => {
1833 let attempt_record = DispatchAttemptRecord {
1834 trigger_id: binding.id.as_str().to_string(),
1835 binding_key: binding.binding_key(),
1836 event_id: event.id.0.clone(),
1837 attempt,
1838 handler_kind: route.kind().to_string(),
1839 started_at,
1840 completed_at,
1841 outcome: dispatch_error_label(&error).to_string(),
1842 error_msg: Some(error.to_string()),
1843 };
1844 attempts.push(attempt_record.clone());
1845 self.append_attempt_record(
1846 &event,
1847 binding,
1848 &attempt_record,
1849 replay_of_event_id.as_ref(),
1850 )
1851 .await?;
1852 if let DispatchError::Waiting(message) = &error {
1853 self.append_lifecycle_event(
1854 "DispatchWaiting",
1855 &event,
1856 binding,
1857 serde_json::json!({
1858 "event_id": event.id.0,
1859 "attempt": attempt,
1860 "handler_kind": route.kind(),
1861 "target_uri": route.target_uri(),
1862 "message": message,
1863 "replay_of_event_id": replay_of_event_id,
1864 }),
1865 replay_of_event_id.as_ref(),
1866 )
1867 .await?;
1868 self.append_topic_event(
1869 TRIGGER_OUTBOX_TOPIC,
1870 "dispatch_waiting",
1871 &event,
1872 Some(binding),
1873 Some(attempt),
1874 serde_json::json!({
1875 "event_id": event.id.0,
1876 "attempt": attempt,
1877 "trigger_id": binding.id.as_str(),
1878 "binding_key": binding.binding_key(),
1879 "handler_kind": route.kind(),
1880 "target_uri": route.target_uri(),
1881 "message": message,
1882 "replay_of_event_id": replay_of_event_id,
1883 }),
1884 replay_of_event_id.as_ref(),
1885 )
1886 .await?;
1887 finish_in_flight(
1888 binding.id.as_str(),
1889 binding.version,
1890 TriggerDispatchOutcome::Dispatched,
1891 )
1892 .await
1893 .map_err(|registry_error| {
1894 DispatchError::Registry(registry_error.to_string())
1895 })?;
1896 self.release_flow_control(&acquired_flow).await?;
1897 decrement_in_flight(&self.state);
1898 return Ok(DispatchOutcome {
1899 trigger_id: binding.id.as_str().to_string(),
1900 binding_key: binding.binding_key(),
1901 event_id: event.id.0,
1902 attempt_count: attempt,
1903 status: DispatchStatus::Waiting,
1904 handler_kind: route.kind().to_string(),
1905 target_uri: route.target_uri(),
1906 replay_of_event_id,
1907 result: Some(serde_json::json!({
1908 "waiting": true,
1909 "message": message,
1910 })),
1911 error: None,
1912 });
1913 }
1914
1915 self.append_lifecycle_event(
1916 "DispatchFailed",
1917 &event,
1918 binding,
1919 serde_json::json!({
1920 "event_id": event.id.0,
1921 "attempt": attempt,
1922 "handler_kind": route.kind(),
1923 "target_uri": route.target_uri(),
1924 "error": error.to_string(),
1925 "replay_of_event_id": replay_of_event_id,
1926 }),
1927 replay_of_event_id.as_ref(),
1928 )
1929 .await?;
1930 self.append_topic_event(
1931 TRIGGER_OUTBOX_TOPIC,
1932 "dispatch_failed",
1933 &event,
1934 Some(binding),
1935 Some(attempt),
1936 serde_json::json!({
1937 "event_id": event.id.0,
1938 "attempt": attempt,
1939 "trigger_id": binding.id.as_str(),
1940 "binding_key": binding.binding_key(),
1941 "handler_kind": route.kind(),
1942 "target_uri": route.target_uri(),
1943 "error": error.to_string(),
1944 "replay_of_event_id": replay_of_event_id,
1945 }),
1946 replay_of_event_id.as_ref(),
1947 )
1948 .await?;
1949 self.emit_action_graph(
1950 &event,
1951 vec![RunActionGraphNodeRecord {
1952 id: attempt_node_id.clone(),
1953 label: dispatch_node_label(&route),
1954 kind: dispatch_node_kind(&route).to_string(),
1955 status: if matches!(error, DispatchError::Cancelled(_)) {
1956 "cancelled".to_string()
1957 } else {
1958 "failed".to_string()
1959 },
1960 outcome: dispatch_error_label(&error).to_string(),
1961 trace_id: Some(event.trace_id.0.clone()),
1962 stage_id: None,
1963 node_id: None,
1964 worker_id: None,
1965 run_id: None,
1966 run_path: None,
1967 metadata: dispatch_error_metadata(
1968 &route, binding, &event, attempt, &error,
1969 ),
1970 }],
1971 Vec::new(),
1972 serde_json::json!({
1973 "source": "dispatcher",
1974 "trigger_id": binding.id.as_str(),
1975 "binding_key": binding.binding_key(),
1976 "event_id": event.id.0,
1977 "attempt": attempt,
1978 "handler_kind": route.kind(),
1979 "target_uri": route.target_uri(),
1980 "error": error.to_string(),
1981 "replay_of_event_id": replay_of_event_id,
1982 }),
1983 )
1984 .await?;
1985
1986 let circuit_opened = if error.retryable() {
1987 self.state
1988 .destination_circuits
1989 .record_failure(&destination_key)
1990 } else {
1991 false
1992 };
1993 if circuit_opened {
1994 if let Some(metrics) = self.metrics.as_ref() {
1995 metrics.record_backpressure_event("circuit", "opened");
1996 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1997 metrics.record_trigger_accepted_to_dlq(
1998 binding.id.as_str(),
1999 &binding_key,
2000 event.provider.as_str(),
2001 tenant_id(&event),
2002 "circuit_open",
2003 duration_between_ms(
2004 current_unix_ms(),
2005 accepted_at_ms(parent_headers.as_ref(), &event),
2006 ),
2007 );
2008 }
2009 let final_error = format!(
2010 "destination circuit opened for {} after {} consecutive failures: {}",
2011 destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
2012 );
2013 let dlq_entry = DlqEntry {
2014 trigger_id: binding.id.as_str().to_string(),
2015 binding_key: binding.binding_key(),
2016 event: event.clone(),
2017 attempt_count: attempt,
2018 final_error: final_error.clone(),
2019 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2020 .to_string(),
2021 attempts: attempts.clone(),
2022 };
2023 self.state
2024 .dlq
2025 .lock()
2026 .expect("dispatcher dlq poisoned")
2027 .push(dlq_entry.clone());
2028 self.append_lifecycle_event(
2029 "DlqMoved",
2030 &event,
2031 binding,
2032 serde_json::json!({
2033 "event_id": event.id.0,
2034 "attempt_count": attempt,
2035 "final_error": dlq_entry.final_error,
2036 "reason": "circuit_open",
2037 "destination": destination_key,
2038 "replay_of_event_id": replay_of_event_id,
2039 }),
2040 replay_of_event_id.as_ref(),
2041 )
2042 .await?;
2043 self.append_topic_event(
2044 TRIGGER_DLQ_TOPIC,
2045 "dlq_moved",
2046 &event,
2047 Some(binding),
2048 Some(attempt),
2049 serde_json::to_value(&dlq_entry).map_err(|serde_error| {
2050 DispatchError::Serde(serde_error.to_string())
2051 })?,
2052 replay_of_event_id.as_ref(),
2053 )
2054 .await?;
2055 finish_in_flight(
2056 binding.id.as_str(),
2057 binding.version,
2058 TriggerDispatchOutcome::Dlq,
2059 )
2060 .await
2061 .map_err(|registry_error| {
2062 DispatchError::Registry(registry_error.to_string())
2063 })?;
2064 self.release_flow_control(&acquired_flow).await?;
2065 decrement_in_flight(&self.state);
2066 self.append_dispatch_trust_record(
2067 binding,
2068 &route,
2069 &event,
2070 replay_of_event_id.as_ref(),
2071 autonomy_tier,
2072 TrustOutcome::Failure,
2073 "dlq",
2074 attempt,
2075 Some(final_error.clone()),
2076 )
2077 .await?;
2078 return Ok(DispatchOutcome {
2079 trigger_id: binding.id.as_str().to_string(),
2080 binding_key: binding.binding_key(),
2081 event_id: event.id.0,
2082 attempt_count: attempt,
2083 status: DispatchStatus::Dlq,
2084 handler_kind: route.kind().to_string(),
2085 target_uri: route.target_uri(),
2086 replay_of_event_id,
2087 result: None,
2088 error: Some(final_error),
2089 });
2090 }
2091
2092 if !error.retryable() {
2093 finish_in_flight(
2094 binding.id.as_str(),
2095 binding.version,
2096 TriggerDispatchOutcome::Failed,
2097 )
2098 .await
2099 .map_err(|registry_error| {
2100 DispatchError::Registry(registry_error.to_string())
2101 })?;
2102 self.release_flow_control(&acquired_flow).await?;
2103 decrement_in_flight(&self.state);
2104 let trust_outcome = match error {
2105 DispatchError::Denied(_) => TrustOutcome::Denied,
2106 DispatchError::Timeout(_) => TrustOutcome::Timeout,
2107 _ => TrustOutcome::Failure,
2108 };
2109 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2110 "cancelled"
2111 } else {
2112 "failed"
2113 };
2114 self.append_dispatch_trust_record(
2115 binding,
2116 &route,
2117 &event,
2118 replay_of_event_id.as_ref(),
2119 autonomy_tier,
2120 trust_outcome,
2121 terminal_status,
2122 attempt,
2123 Some(error.to_string()),
2124 )
2125 .await?;
2126 return Ok(DispatchOutcome {
2127 trigger_id: binding.id.as_str().to_string(),
2128 binding_key: binding.binding_key(),
2129 event_id: event.id.0,
2130 attempt_count: attempt,
2131 status: if matches!(error, DispatchError::Cancelled(_)) {
2132 DispatchStatus::Cancelled
2133 } else {
2134 DispatchStatus::Failed
2135 },
2136 handler_kind: route.kind().to_string(),
2137 target_uri: route.target_uri(),
2138 replay_of_event_id,
2139 result: None,
2140 error: Some(error.to_string()),
2141 });
2142 }
2143
2144 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2145 if let Some(metrics) = self.metrics.as_ref() {
2146 metrics.record_retry_scheduled();
2147 metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2148 metrics.record_trigger_retry_delay(
2149 binding.id.as_str(),
2150 &binding_key,
2151 event.provider.as_str(),
2152 tenant_id(&event),
2153 "scheduled",
2154 delay,
2155 );
2156 }
2157 tracing::info!(
2158 component = "dispatcher",
2159 lifecycle = "retry_scheduled",
2160 trigger_id = %binding.id.as_str(),
2161 binding_key = %binding_key,
2162 event_id = %event.id.0,
2163 attempt = attempt + 1,
2164 delay_ms = delay.as_millis(),
2165 trace_id = %event.trace_id.0
2166 );
2167 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2168 previous_retry_node = Some(retry_node_id.clone());
2169 self.emit_action_graph(
2170 &event,
2171 vec![RunActionGraphNodeRecord {
2172 id: retry_node_id.clone(),
2173 label: format!("retry in {}ms", delay.as_millis()),
2174 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2175 status: "scheduled".to_string(),
2176 outcome: format!("attempt_{}", attempt + 1),
2177 trace_id: Some(event.trace_id.0.clone()),
2178 stage_id: None,
2179 node_id: None,
2180 worker_id: None,
2181 run_id: None,
2182 run_path: None,
2183 metadata: retry_node_metadata(
2184 binding,
2185 &event,
2186 attempt + 1,
2187 delay,
2188 &error,
2189 ),
2190 }],
2191 vec![RunActionGraphEdgeRecord {
2192 from_id: attempt_node_id,
2193 to_id: retry_node_id.clone(),
2194 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2195 label: Some(format!("attempt {}", attempt + 1)),
2196 }],
2197 serde_json::json!({
2198 "source": "dispatcher",
2199 "trigger_id": binding.id.as_str(),
2200 "binding_key": binding.binding_key(),
2201 "event_id": event.id.0,
2202 "attempt": attempt + 1,
2203 "delay_ms": delay.as_millis(),
2204 "replay_of_event_id": replay_of_event_id,
2205 }),
2206 )
2207 .await?;
2208 self.append_lifecycle_event(
2209 "RetryScheduled",
2210 &event,
2211 binding,
2212 serde_json::json!({
2213 "event_id": event.id.0,
2214 "attempt": attempt + 1,
2215 "delay_ms": delay.as_millis(),
2216 "error": error.to_string(),
2217 "replay_of_event_id": replay_of_event_id,
2218 }),
2219 replay_of_event_id.as_ref(),
2220 )
2221 .await?;
2222 self.append_topic_event(
2223 TRIGGER_ATTEMPTS_TOPIC,
2224 "retry_scheduled",
2225 &event,
2226 Some(binding),
2227 Some(attempt + 1),
2228 serde_json::json!({
2229 "event_id": event.id.0,
2230 "attempt": attempt + 1,
2231 "trigger_id": binding.id.as_str(),
2232 "binding_key": binding.binding_key(),
2233 "delay_ms": delay.as_millis(),
2234 "error": error.to_string(),
2235 "replay_of_event_id": replay_of_event_id,
2236 }),
2237 replay_of_event_id.as_ref(),
2238 )
2239 .await?;
2240 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2241 let sleep_result = sleep_or_cancel_or_request(
2242 &self.event_log,
2243 delay,
2244 &binding_key,
2245 &event.id.0,
2246 replay_of_event_id.as_ref(),
2247 &mut self.cancel_tx.subscribe(),
2248 )
2249 .await;
2250 decrement_retry_queue_depth(&self.state);
2251 if sleep_result.is_err() {
2252 finish_in_flight(
2253 binding.id.as_str(),
2254 binding.version,
2255 TriggerDispatchOutcome::Failed,
2256 )
2257 .await
2258 .map_err(|registry_error| {
2259 DispatchError::Registry(registry_error.to_string())
2260 })?;
2261 self.release_flow_control(&acquired_flow).await?;
2262 decrement_in_flight(&self.state);
2263 self.append_dispatch_trust_record(
2264 binding,
2265 &route,
2266 &event,
2267 replay_of_event_id.as_ref(),
2268 autonomy_tier,
2269 TrustOutcome::Failure,
2270 "cancelled",
2271 attempt,
2272 Some("dispatcher shutdown cancelled retry wait".to_string()),
2273 )
2274 .await?;
2275 return Ok(DispatchOutcome {
2276 trigger_id: binding.id.as_str().to_string(),
2277 binding_key: binding.binding_key(),
2278 event_id: event.id.0,
2279 attempt_count: attempt,
2280 status: DispatchStatus::Cancelled,
2281 handler_kind: route.kind().to_string(),
2282 target_uri: route.target_uri(),
2283 replay_of_event_id,
2284 result: None,
2285 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2286 });
2287 }
2288 continue;
2289 }
2290
2291 let final_error = error.to_string();
2292 let dlq_entry = DlqEntry {
2293 trigger_id: binding.id.as_str().to_string(),
2294 binding_key: binding.binding_key(),
2295 event: event.clone(),
2296 attempt_count: attempt,
2297 final_error: final_error.clone(),
2298 error_class: crate::triggers::classify_trigger_dlq_error(&final_error)
2299 .to_string(),
2300 attempts: attempts.clone(),
2301 };
2302 self.state
2303 .dlq
2304 .lock()
2305 .expect("dispatcher dlq poisoned")
2306 .push(dlq_entry.clone());
2307 if let Some(metrics) = self.metrics.as_ref() {
2308 metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2309 metrics.record_trigger_accepted_to_dlq(
2310 binding.id.as_str(),
2311 &binding_key,
2312 event.provider.as_str(),
2313 tenant_id(&event),
2314 "retry_exhausted",
2315 duration_between_ms(
2316 current_unix_ms(),
2317 accepted_at_ms(parent_headers.as_ref(), &event),
2318 ),
2319 );
2320 }
2321 tracing::info!(
2322 component = "dispatcher",
2323 lifecycle = "dlq_moved",
2324 trigger_id = %binding.id.as_str(),
2325 binding_key = %binding_key,
2326 event_id = %event.id.0,
2327 attempt_count = attempt,
2328 reason = "retry_exhausted",
2329 trace_id = %event.trace_id.0
2330 );
2331 self.emit_action_graph(
2332 &event,
2333 vec![RunActionGraphNodeRecord {
2334 id: format!("dlq:{binding_key}:{}", event.id.0),
2335 label: binding.id.as_str().to_string(),
2336 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2337 status: "queued".to_string(),
2338 outcome: "retry_exhausted".to_string(),
2339 trace_id: Some(event.trace_id.0.clone()),
2340 stage_id: None,
2341 node_id: None,
2342 worker_id: None,
2343 run_id: None,
2344 run_path: None,
2345 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2346 }],
2347 vec![RunActionGraphEdgeRecord {
2348 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2349 to_id: format!("dlq:{binding_key}:{}", event.id.0),
2350 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2351 label: Some(format!("{attempt} attempts")),
2352 }],
2353 serde_json::json!({
2354 "source": "dispatcher",
2355 "trigger_id": binding.id.as_str(),
2356 "binding_key": binding.binding_key(),
2357 "event_id": event.id.0,
2358 "attempt_count": attempt,
2359 "final_error": final_error,
2360 "replay_of_event_id": replay_of_event_id,
2361 }),
2362 )
2363 .await?;
2364 self.append_lifecycle_event(
2365 "DlqMoved",
2366 &event,
2367 binding,
2368 serde_json::json!({
2369 "event_id": event.id.0,
2370 "attempt_count": attempt,
2371 "final_error": dlq_entry.final_error,
2372 "replay_of_event_id": replay_of_event_id,
2373 }),
2374 replay_of_event_id.as_ref(),
2375 )
2376 .await?;
2377 self.append_topic_event(
2378 TRIGGER_DLQ_TOPIC,
2379 "dlq_moved",
2380 &event,
2381 Some(binding),
2382 Some(attempt),
2383 serde_json::to_value(&dlq_entry)
2384 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2385 replay_of_event_id.as_ref(),
2386 )
2387 .await?;
2388 finish_in_flight(
2389 binding.id.as_str(),
2390 binding.version,
2391 TriggerDispatchOutcome::Dlq,
2392 )
2393 .await
2394 .map_err(|registry_error| {
2395 DispatchError::Registry(registry_error.to_string())
2396 })?;
2397 self.release_flow_control(&acquired_flow).await?;
2398 decrement_in_flight(&self.state);
2399 self.append_dispatch_trust_record(
2400 binding,
2401 &route,
2402 &event,
2403 replay_of_event_id.as_ref(),
2404 autonomy_tier,
2405 TrustOutcome::Failure,
2406 "dlq",
2407 attempt,
2408 Some(error.to_string()),
2409 )
2410 .await?;
2411 return Ok(DispatchOutcome {
2412 trigger_id: binding.id.as_str().to_string(),
2413 binding_key: binding.binding_key(),
2414 event_id: event.id.0,
2415 attempt_count: attempt,
2416 status: DispatchStatus::Dlq,
2417 handler_kind: route.kind().to_string(),
2418 target_uri: route.target_uri(),
2419 replay_of_event_id,
2420 result: None,
2421 error: Some(error.to_string()),
2422 });
2423 }
2424 }
2425 }
2426
2427 finish_in_flight(
2428 binding.id.as_str(),
2429 binding.version,
2430 TriggerDispatchOutcome::Failed,
2431 )
2432 .await
2433 .map_err(|error| DispatchError::Registry(error.to_string()))?;
2434 self.release_flow_control(&acquired_flow).await?;
2435 decrement_in_flight(&self.state);
2436 self.append_dispatch_trust_record(
2437 binding,
2438 &route,
2439 &event,
2440 replay_of_event_id.as_ref(),
2441 autonomy_tier,
2442 TrustOutcome::Failure,
2443 "failed",
2444 max_attempts,
2445 Some("dispatch exhausted without terminal outcome".to_string()),
2446 )
2447 .await?;
2448 Ok(DispatchOutcome {
2449 trigger_id: binding.id.as_str().to_string(),
2450 binding_key: binding.binding_key(),
2451 event_id: event.id.0,
2452 attempt_count: max_attempts,
2453 status: DispatchStatus::Failed,
2454 handler_kind: route.kind().to_string(),
2455 target_uri: route.target_uri(),
2456 replay_of_event_id,
2457 result: None,
2458 error: Some("dispatch exhausted without terminal outcome".to_string()),
2459 })
2460 }
2461
2462 async fn dispatch_once(
2463 &self,
2464 binding: &TriggerBinding,
2465 route: &DispatchUri,
2466 event: &TriggerEvent,
2467 autonomy_tier: AutonomyTier,
2468 wait_lease: Option<DispatchWaitLease>,
2469 cancel_rx: &mut broadcast::Receiver<()>,
2470 ) -> Result<serde_json::Value, DispatchError> {
2471 match route {
2472 DispatchUri::Local { .. } => {
2473 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2474 return Err(DispatchError::Local(format!(
2475 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2476 binding.id.as_str()
2477 )));
2478 };
2479 let value = self
2480 .invoke_vm_callable(
2481 closure,
2482 &binding.binding_key(),
2483 event,
2484 None,
2485 binding.id.as_str(),
2486 &format!("{}.{}", event.provider.as_str(), event.kind),
2487 autonomy_tier,
2488 wait_lease,
2489 cancel_rx,
2490 )
2491 .await?;
2492 Ok(vm_value_to_json(&value))
2493 }
2494 DispatchUri::A2a {
2495 target,
2496 allow_cleartext,
2497 } => {
2498 if self.state.shutting_down.load(Ordering::SeqCst) {
2499 return Err(DispatchError::Cancelled(
2500 "dispatcher shutdown cancelled A2A dispatch".to_string(),
2501 ));
2502 }
2503 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2504 target,
2505 *allow_cleartext,
2506 binding.id.as_str(),
2507 &binding.binding_key(),
2508 event,
2509 cancel_rx,
2510 )
2511 .await
2512 .map_err(|error| match error {
2513 crate::a2a::A2aClientError::Cancelled(message) => {
2514 DispatchError::Cancelled(message)
2515 }
2516 other => DispatchError::A2a(other.to_string()),
2517 })?;
2518 match ack {
2519 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2520 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2521 }
2522 }
2523 DispatchUri::Worker { queue } => {
2524 let receipt = crate::WorkerQueue::new(self.event_log.clone())
2525 .enqueue(&crate::WorkerQueueJob {
2526 queue: queue.clone(),
2527 trigger_id: binding.id.as_str().to_string(),
2528 binding_key: binding.binding_key(),
2529 binding_version: binding.version,
2530 event: event.clone(),
2531 replay_of_event_id: current_dispatch_context()
2532 .and_then(|context| context.replay_of_event_id),
2533 priority: worker_queue_priority(binding, event),
2534 })
2535 .await
2536 .map_err(DispatchError::from)?;
2537 Ok(serde_json::to_value(receipt)
2538 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2539 }
2540 }
2541 }
2542
2543 async fn apply_flow_control(
2544 &self,
2545 binding: &TriggerBinding,
2546 event: &TriggerEvent,
2547 replay_of_event_id: Option<&String>,
2548 ) -> Result<FlowControlOutcome, DispatchError> {
2549 let flow = &binding.flow_control;
2550 let mut managed_event = event.clone();
2551
2552 if let Some(batch) = &flow.batch {
2553 let gate = self
2554 .resolve_flow_gate(
2555 &binding.binding_key(),
2556 batch.key.as_ref(),
2557 &managed_event,
2558 replay_of_event_id,
2559 )
2560 .await?;
2561 match self
2562 .state
2563 .flow_control
2564 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2565 .await
2566 .map_err(DispatchError::from)?
2567 {
2568 BatchDecision::Dispatch(events) => {
2569 managed_event = build_batched_event(events)?;
2570 }
2571 BatchDecision::Merged => {
2572 return Ok(FlowControlOutcome::Skip {
2573 reason: "batch_merged".to_string(),
2574 })
2575 }
2576 }
2577 }
2578
2579 if let Some(debounce) = &flow.debounce {
2580 let gate = self
2581 .resolve_flow_gate(
2582 &binding.binding_key(),
2583 Some(&debounce.key),
2584 &managed_event,
2585 replay_of_event_id,
2586 )
2587 .await?;
2588 let latest = self
2589 .state
2590 .flow_control
2591 .debounce(&gate, debounce.period)
2592 .await
2593 .map_err(DispatchError::from)?;
2594 if !latest {
2595 return Ok(FlowControlOutcome::Skip {
2596 reason: "debounced".to_string(),
2597 });
2598 }
2599 }
2600
2601 if let Some(rate_limit) = &flow.rate_limit {
2602 let gate = self
2603 .resolve_flow_gate(
2604 &binding.binding_key(),
2605 rate_limit.key.as_ref(),
2606 &managed_event,
2607 replay_of_event_id,
2608 )
2609 .await?;
2610 let allowed = self
2611 .state
2612 .flow_control
2613 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2614 .await
2615 .map_err(DispatchError::from)?;
2616 if !allowed {
2617 return Ok(FlowControlOutcome::Skip {
2618 reason: "rate_limited".to_string(),
2619 });
2620 }
2621 }
2622
2623 if let Some(throttle) = &flow.throttle {
2624 let gate = self
2625 .resolve_flow_gate(
2626 &binding.binding_key(),
2627 throttle.key.as_ref(),
2628 &managed_event,
2629 replay_of_event_id,
2630 )
2631 .await?;
2632 self.state
2633 .flow_control
2634 .wait_for_throttle(&gate, throttle.period, throttle.max)
2635 .await
2636 .map_err(DispatchError::from)?;
2637 }
2638
2639 let mut acquired = AcquiredFlowControl::default();
2640 if let Some(singleton) = &flow.singleton {
2641 let gate = self
2642 .resolve_flow_gate(
2643 &binding.binding_key(),
2644 singleton.key.as_ref(),
2645 &managed_event,
2646 replay_of_event_id,
2647 )
2648 .await?;
2649 let acquired_singleton = self
2650 .state
2651 .flow_control
2652 .try_acquire_singleton(&gate)
2653 .await
2654 .map_err(DispatchError::from)?;
2655 if !acquired_singleton {
2656 return Ok(FlowControlOutcome::Skip {
2657 reason: "singleton_active".to_string(),
2658 });
2659 }
2660 acquired.singleton = Some(SingletonLease { gate, held: true });
2661 }
2662
2663 if let Some(concurrency) = &flow.concurrency {
2664 let gate = self
2665 .resolve_flow_gate(
2666 &binding.binding_key(),
2667 concurrency.key.as_ref(),
2668 &managed_event,
2669 replay_of_event_id,
2670 )
2671 .await?;
2672 let priority_rank = self
2673 .resolve_priority_rank(
2674 &binding.binding_key(),
2675 flow.priority.as_ref(),
2676 &managed_event,
2677 replay_of_event_id,
2678 )
2679 .await?;
2680 let permit = self
2681 .state
2682 .flow_control
2683 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2684 .await
2685 .map_err(DispatchError::from)?;
2686 acquired.concurrency = Some(ConcurrencyLease {
2687 gate,
2688 max: concurrency.max,
2689 priority_rank,
2690 permit: Some(permit),
2691 });
2692 }
2693
2694 Ok(FlowControlOutcome::Dispatch {
2695 event: Box::new(managed_event),
2696 acquired,
2697 })
2698 }
2699
2700 async fn release_flow_control(
2701 &self,
2702 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2703 ) -> Result<(), DispatchError> {
2704 let (singleton_gate, concurrency_permit) = {
2705 let mut acquired = acquired.lock().await;
2706 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2707 if lease.held {
2708 lease.held = false;
2709 Some(lease.gate.clone())
2710 } else {
2711 None
2712 }
2713 });
2714 let concurrency_permit = acquired
2715 .concurrency
2716 .as_mut()
2717 .and_then(|lease| lease.permit.take());
2718 (singleton_gate, concurrency_permit)
2719 };
2720 if let Some(gate) = singleton_gate {
2721 self.state
2722 .flow_control
2723 .release_singleton(&gate)
2724 .await
2725 .map_err(DispatchError::from)?;
2726 }
2727 if let Some(permit) = concurrency_permit {
2728 self.state
2729 .flow_control
2730 .release_concurrency(permit)
2731 .await
2732 .map_err(DispatchError::from)?;
2733 }
2734 Ok(())
2735 }
2736
2737 async fn resolve_flow_gate(
2738 &self,
2739 binding_key: &str,
2740 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2741 event: &TriggerEvent,
2742 replay_of_event_id: Option<&String>,
2743 ) -> Result<String, DispatchError> {
2744 let key = match expr {
2745 Some(expr) => {
2746 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2747 .await?
2748 }
2749 None => "_global".to_string(),
2750 };
2751 Ok(format!("{binding_key}:{key}"))
2752 }
2753
2754 async fn resolve_priority_rank(
2755 &self,
2756 binding_key: &str,
2757 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2758 event: &TriggerEvent,
2759 replay_of_event_id: Option<&String>,
2760 ) -> Result<usize, DispatchError> {
2761 let Some(priority) = priority else {
2762 return Ok(0);
2763 };
2764 let value = self
2765 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2766 .await?;
2767 Ok(priority
2768 .order
2769 .iter()
2770 .position(|candidate| candidate == &value)
2771 .unwrap_or(priority.order.len()))
2772 }
2773
2774 async fn evaluate_flow_expression(
2775 &self,
2776 binding_key: &str,
2777 expr: &crate::triggers::TriggerExpressionSpec,
2778 event: &TriggerEvent,
2779 replay_of_event_id: Option<&String>,
2780 ) -> Result<String, DispatchError> {
2781 let value = self
2782 .invoke_vm_callable(
2783 &expr.closure,
2784 binding_key,
2785 event,
2786 replay_of_event_id,
2787 "",
2788 "flow_control",
2789 AutonomyTier::Suggest,
2790 None,
2791 &mut self.cancel_tx.subscribe(),
2792 )
2793 .await?;
2794 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2795 }
2796
2797 #[allow(clippy::too_many_arguments)]
2798 async fn invoke_vm_callable(
2799 &self,
2800 closure: &crate::value::VmClosure,
2801 binding_key: &str,
2802 event: &TriggerEvent,
2803 replay_of_event_id: Option<&String>,
2804 agent_id: &str,
2805 action: &str,
2806 autonomy_tier: AutonomyTier,
2807 wait_lease: Option<DispatchWaitLease>,
2808 cancel_rx: &mut broadcast::Receiver<()>,
2809 ) -> Result<VmValue, DispatchError> {
2810 let mut vm = self.base_vm.child_vm();
2811 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2812 if self.state.shutting_down.load(Ordering::SeqCst) {
2813 cancel_token.store(true, Ordering::SeqCst);
2814 }
2815 self.state
2816 .cancel_tokens
2817 .lock()
2818 .expect("dispatcher cancel tokens poisoned")
2819 .push(cancel_token.clone());
2820 vm.install_cancel_token(cancel_token.clone());
2821 let arg = event_to_handler_value(event)?;
2822 let args = [arg];
2823 let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2824 let effective_policy = match crate::orchestration::current_execution_policy() {
2825 Some(parent) => parent
2826 .intersect(&tier_policy)
2827 .map_err(|error| DispatchError::Local(error.to_string()))?,
2828 None => tier_policy,
2829 };
2830 crate::orchestration::push_execution_policy(effective_policy);
2831 let _policy_guard = DispatchExecutionPolicyGuard;
2832 let future = vm.call_closure_pub(closure, &args);
2833 pin_mut!(future);
2834 let (binding_id, binding_version) = split_binding_key(binding_key);
2835 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2836 slot.borrow_mut().replace(DispatchContext {
2837 trigger_event: event.clone(),
2838 replay_of_event_id: replay_of_event_id.cloned(),
2839 binding_id,
2840 binding_version,
2841 agent_id: agent_id.to_string(),
2842 action: action.to_string(),
2843 autonomy_tier,
2844 })
2845 });
2846 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2847 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2848 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2849 crate::stdlib::hitl::reset_hitl_state();
2850 let mut poll = tokio::time::interval(Duration::from_millis(100));
2851 let result = loop {
2852 tokio::select! {
2853 result = &mut future => break result,
2854 _ = recv_cancel(cancel_rx) => {
2855 cancel_token.store(true, Ordering::SeqCst);
2856 }
2857 _ = poll.tick() => {
2858 if dispatch_cancel_requested(
2859 &self.event_log,
2860 binding_key,
2861 &event.id.0,
2862 replay_of_event_id,
2863 )
2864 .await? {
2865 cancel_token.store(true, Ordering::SeqCst);
2866 }
2867 }
2868 }
2869 };
2870 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2871 *slot.borrow_mut() = prior_context;
2872 });
2873 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2874 *slot.borrow_mut() = prior_wait_lease;
2875 });
2876 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2877 {
2878 let mut tokens = self
2879 .state
2880 .cancel_tokens
2881 .lock()
2882 .expect("dispatcher cancel tokens poisoned");
2883 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2884 }
2885
2886 if cancel_token.load(Ordering::SeqCst) {
2887 if dispatch_cancel_requested(
2888 &self.event_log,
2889 binding_key,
2890 &event.id.0,
2891 replay_of_event_id,
2892 )
2893 .await?
2894 {
2895 Err(DispatchError::Cancelled(
2896 "trigger cancel request cancelled local handler".to_string(),
2897 ))
2898 } else {
2899 Err(DispatchError::Cancelled(
2900 "dispatcher shutdown cancelled local handler".to_string(),
2901 ))
2902 }
2903 } else {
2904 result.map_err(dispatch_error_from_vm_error)
2905 }
2906 }
2907
2908 async fn evaluate_predicate(
2909 &self,
2910 binding: &TriggerBinding,
2911 predicate: &super::registry::TriggerPredicateSpec,
2912 event: &TriggerEvent,
2913 replay_of_event_id: Option<&String>,
2914 autonomy_tier: AutonomyTier,
2915 ) -> Result<PredicateEvaluationRecord, DispatchError> {
2916 let event_id = event.id.0.clone();
2917 let trigger_id = binding.id.as_str().to_string();
2918 let now_ms = now_unix_ms();
2919 reset_binding_budget_windows(binding);
2920
2921 let breaker_open_until = {
2922 let state = binding
2923 .predicate_state
2924 .lock()
2925 .expect("trigger predicate state poisoned");
2926 if state
2927 .breaker_open_until_ms
2928 .is_some_and(|until_ms| until_ms > now_ms)
2929 {
2930 state.breaker_open_until_ms
2931 } else {
2932 None
2933 }
2934 };
2935
2936 if breaker_open_until.is_some() {
2937 let mut metadata = BTreeMap::new();
2938 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2939 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2940 metadata.insert(
2941 "breaker_open_until_ms".to_string(),
2942 serde_json::json!(breaker_open_until),
2943 );
2944 crate::events::log_warn_meta(
2945 "trigger.predicate.circuit_breaker",
2946 "trigger predicate circuit breaker is open; short-circuiting to false",
2947 metadata,
2948 );
2949 let record = PredicateEvaluationRecord {
2950 result: false,
2951 reason: Some("circuit_open".to_string()),
2952 ..Default::default()
2953 };
2954 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2955 .await?;
2956 return Ok(record);
2957 }
2958
2959 let expected_cost = expected_predicate_cost_usd_micros(binding);
2960 if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2961 .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2962 {
2963 self.append_budget_exhausted_event(
2964 binding,
2965 event,
2966 reason,
2967 expected_cost,
2968 None,
2969 replay_of_event_id,
2970 )
2971 .await?;
2972 let record = PredicateEvaluationRecord {
2973 result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2974 reason: Some(reason.to_string()),
2975 exhaustion_strategy: Some(binding.on_budget_exhausted),
2976 ..Default::default()
2977 };
2978 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2979 .await?;
2980 return Ok(record);
2981 }
2982
2983 let replay_cache = self
2984 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2985 .await?;
2986 let guard = start_predicate_evaluation(
2987 binding.when_budget.clone().unwrap_or_default(),
2988 replay_cache,
2989 );
2990 let started = std::time::Instant::now();
2991 let eval = self
2992 .invoke_vm_callable_with_timeout(
2993 &predicate.closure,
2994 &binding.binding_key(),
2995 event,
2996 replay_of_event_id,
2997 binding.id.as_str(),
2998 &format!("{}.{}", event.provider.as_str(), event.kind),
2999 autonomy_tier,
3000 &mut self.cancel_tx.subscribe(),
3001 binding
3002 .when_budget
3003 .as_ref()
3004 .and_then(|budget| budget.timeout()),
3005 )
3006 .await;
3007 let capture = guard.finish();
3008 let latency_ms = started.elapsed().as_millis() as u64;
3009 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
3010 self.append_predicate_cache_record(binding, event, &capture.entries)
3011 .await?;
3012 }
3013
3014 let mut record = PredicateEvaluationRecord {
3015 result: false,
3016 cost_usd: capture.total_cost_usd,
3017 tokens: capture.total_tokens,
3018 latency_ms,
3019 cached: capture.cached,
3020 reason: None,
3021 exhaustion_strategy: None,
3022 };
3023
3024 let mut count_failure = false;
3025 let mut opened_breaker = false;
3026
3027 match eval {
3028 Ok(value) => match predicate_value_as_bool(value) {
3029 Ok(result) => {
3030 record.result = result;
3031 }
3032 Err(reason) => {
3033 count_failure = true;
3034 record.reason = Some(reason);
3035 }
3036 },
3037 Err(error) => {
3038 count_failure = true;
3039 record.reason = Some(error.to_string());
3040 }
3041 }
3042
3043 let cost_usd_micros = usd_to_micros(record.cost_usd);
3044 if cost_usd_micros > 0 {
3045 binding
3046 .metrics
3047 .cost_total_usd_micros
3048 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3049 binding
3050 .metrics
3051 .cost_today_usd_micros
3052 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3053 binding
3054 .metrics
3055 .cost_hour_usd_micros
3056 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3057 note_orchestrator_budget_cost(cost_usd_micros);
3058 record_predicate_cost_sample(binding, cost_usd_micros);
3059 }
3060
3061 let timed_out = matches!(
3062 record.reason.as_deref(),
3063 Some("predicate evaluation timed out")
3064 );
3065 if capture.budget_exceeded || timed_out {
3066 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3067 record.result = false;
3068 } else if timed_out {
3069 record.result = true;
3070 }
3071 record.reason = Some("budget_exceeded".to_string());
3072 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3073 self.append_budget_exhausted_event(
3074 binding,
3075 event,
3076 "budget_exceeded",
3077 cost_usd_micros,
3078 Some(record.tokens),
3079 replay_of_event_id,
3080 )
3081 .await?;
3082 self.append_lifecycle_event(
3083 "predicate.invocation_budget_exceeded",
3084 event,
3085 binding,
3086 serde_json::json!({
3087 "trigger_id": binding.id.as_str(),
3088 "event_id": event.id.0,
3089 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
3090 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
3091 "cost_usd": record.cost_usd,
3092 "tokens": record.tokens,
3093 "replay_of_event_id": replay_of_event_id,
3094 }),
3095 replay_of_event_id,
3096 )
3097 .await?;
3098 }
3099
3100 if let Some(reason) =
3101 binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
3102 {
3103 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3104 record.result = false;
3105 }
3106 record.reason = Some(reason.to_string());
3107 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3108 self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
3109 .await?;
3110 }
3111
3112 {
3113 let mut state = binding
3114 .predicate_state
3115 .lock()
3116 .expect("trigger predicate state poisoned");
3117 if count_failure {
3118 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
3119 if state.consecutive_failures >= 3 {
3120 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
3121 opened_breaker = true;
3122 }
3123 } else {
3124 state.consecutive_failures = 0;
3125 state.breaker_open_until_ms = None;
3126 }
3127 }
3128
3129 if opened_breaker {
3130 let mut metadata = BTreeMap::new();
3131 metadata.insert(
3132 "trigger_id".to_string(),
3133 serde_json::json!(binding.id.as_str()),
3134 );
3135 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3136 metadata.insert("failure_count".to_string(), serde_json::json!(3));
3137 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
3138 crate::events::log_warn_meta(
3139 "trigger.predicate.circuit_breaker",
3140 "trigger predicate circuit breaker opened for 5 minutes",
3141 metadata,
3142 );
3143 }
3144
3145 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
3146 .await?;
3147 Ok(record)
3148 }
3149
3150 #[allow(clippy::too_many_arguments)]
3151 #[allow(clippy::too_many_arguments)]
3152 async fn invoke_vm_callable_with_timeout(
3153 &self,
3154 closure: &crate::value::VmClosure,
3155 binding_key: &str,
3156 event: &TriggerEvent,
3157 replay_of_event_id: Option<&String>,
3158 agent_id: &str,
3159 action: &str,
3160 autonomy_tier: AutonomyTier,
3161 cancel_rx: &mut broadcast::Receiver<()>,
3162 timeout: Option<Duration>,
3163 ) -> Result<VmValue, DispatchError> {
3164 let future = self.invoke_vm_callable(
3165 closure,
3166 binding_key,
3167 event,
3168 replay_of_event_id,
3169 agent_id,
3170 action,
3171 autonomy_tier,
3172 None,
3173 cancel_rx,
3174 );
3175 pin_mut!(future);
3176 if let Some(timeout) = timeout {
3177 match tokio::time::timeout(timeout, future).await {
3178 Ok(result) => result,
3179 Err(_) => Err(DispatchError::Local(
3180 "predicate evaluation timed out".to_string(),
3181 )),
3182 }
3183 } else {
3184 future.await
3185 }
3186 }
3187
3188 async fn append_predicate_evaluated_event(
3189 &self,
3190 binding: &TriggerBinding,
3191 event: &TriggerEvent,
3192 record: &PredicateEvaluationRecord,
3193 replay_of_event_id: Option<&String>,
3194 ) -> Result<(), DispatchError> {
3195 if let Some(metrics) = self.metrics.as_ref() {
3196 metrics.record_trigger_predicate_evaluation(
3197 binding.id.as_str(),
3198 record.result,
3199 record.cost_usd,
3200 );
3201 metrics.set_trigger_budget_cost_today(
3202 binding.id.as_str(),
3203 current_predicate_daily_cost(binding),
3204 );
3205 if matches!(
3206 record.reason.as_deref(),
3207 Some(
3208 "budget_exceeded"
3209 | "daily_budget_exceeded"
3210 | "hourly_budget_exceeded"
3211 | "orchestrator_daily_budget_exceeded"
3212 | "orchestrator_hourly_budget_exceeded"
3213 )
3214 ) {
3215 metrics.record_trigger_budget_exhausted(
3216 binding.id.as_str(),
3217 record
3218 .exhaustion_strategy
3219 .map(TriggerBudgetExhaustionStrategy::as_str)
3220 .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
3221 );
3222 }
3223 }
3224 self.append_lifecycle_event(
3225 "predicate.evaluated",
3226 event,
3227 binding,
3228 serde_json::json!({
3229 "trigger_id": binding.id.as_str(),
3230 "event_id": event.id.0,
3231 "result": record.result,
3232 "cost_usd": record.cost_usd,
3233 "tokens": record.tokens,
3234 "latency_ms": record.latency_ms,
3235 "cached": record.cached,
3236 "reason": record.reason,
3237 "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
3238 "replay_of_event_id": replay_of_event_id,
3239 }),
3240 replay_of_event_id,
3241 )
3242 .await
3243 }
3244
3245 async fn append_predicate_cache_record(
3246 &self,
3247 binding: &TriggerBinding,
3248 event: &TriggerEvent,
3249 entries: &[PredicateCacheEntry],
3250 ) -> Result<(), DispatchError> {
3251 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3252 .expect("static trigger inbox legacy topic name is valid");
3253 let payload = serde_json::to_value(PredicateCacheRecord {
3254 trigger_id: binding.id.as_str().to_string(),
3255 event_id: event.id.0.clone(),
3256 entries: entries.to_vec(),
3257 })
3258 .map_err(|error| DispatchError::Serde(error.to_string()))?;
3259 self.event_log
3260 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
3261 .await
3262 .map_err(DispatchError::from)
3263 .map(|_| ())
3264 }
3265
3266 async fn read_predicate_cache_record(
3267 &self,
3268 event_id: &str,
3269 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
3270 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3271 .expect("static trigger inbox legacy topic name is valid");
3272 let records = self
3273 .event_log
3274 .read_range(&topic, None, usize::MAX)
3275 .await
3276 .map_err(DispatchError::from)?;
3277 Ok(records
3278 .into_iter()
3279 .filter(|(_, event)| event.kind == "predicate_llm_cache")
3280 .filter_map(|(_, event)| {
3281 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
3282 })
3283 .filter(|record| record.event_id == event_id)
3284 .flat_map(|record| record.entries)
3285 .collect())
3286 }
3287
3288 #[allow(clippy::too_many_arguments)]
3289 async fn append_dispatch_trust_record(
3290 &self,
3291 binding: &TriggerBinding,
3292 route: &DispatchUri,
3293 event: &TriggerEvent,
3294 replay_of_event_id: Option<&String>,
3295 autonomy_tier: AutonomyTier,
3296 outcome: TrustOutcome,
3297 terminal_status: &str,
3298 attempt_count: u32,
3299 error: Option<String>,
3300 ) -> Result<(), DispatchError> {
3301 let mut record = TrustRecord::new(
3302 binding.id.as_str().to_string(),
3303 format!("{}.{}", event.provider.as_str(), event.kind),
3304 None,
3305 outcome,
3306 event.trace_id.0.clone(),
3307 autonomy_tier,
3308 );
3309 record.metadata.insert(
3310 "binding_key".to_string(),
3311 serde_json::json!(binding.binding_key()),
3312 );
3313 record.metadata.insert(
3314 "binding_version".to_string(),
3315 serde_json::json!(binding.version),
3316 );
3317 record.metadata.insert(
3318 "provider".to_string(),
3319 serde_json::json!(event.provider.as_str()),
3320 );
3321 record
3322 .metadata
3323 .insert("event_kind".to_string(), serde_json::json!(event.kind));
3324 record
3325 .metadata
3326 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3327 record.metadata.insert(
3328 "target_uri".to_string(),
3329 serde_json::json!(route.target_uri()),
3330 );
3331 record.metadata.insert(
3332 "terminal_status".to_string(),
3333 serde_json::json!(terminal_status),
3334 );
3335 record.metadata.insert(
3336 "attempt_count".to_string(),
3337 serde_json::json!(attempt_count),
3338 );
3339 if let Some(replay_of_event_id) = replay_of_event_id {
3340 record.metadata.insert(
3341 "replay_of_event_id".to_string(),
3342 serde_json::json!(replay_of_event_id),
3343 );
3344 }
3345 if let Some(error) = error {
3346 record
3347 .metadata
3348 .insert("error".to_string(), serde_json::json!(error));
3349 }
3350 append_trust_record(&self.event_log, &record)
3351 .await
3352 .map(|_| ())
3353 .map_err(DispatchError::from)
3354 }
3355
3356 async fn append_attempt_record(
3357 &self,
3358 event: &TriggerEvent,
3359 binding: &TriggerBinding,
3360 attempt: &DispatchAttemptRecord,
3361 replay_of_event_id: Option<&String>,
3362 ) -> Result<(), DispatchError> {
3363 self.append_topic_event(
3364 TRIGGER_ATTEMPTS_TOPIC,
3365 "attempt_recorded",
3366 event,
3367 Some(binding),
3368 Some(attempt.attempt),
3369 serde_json::to_value(attempt)
3370 .map_err(|error| DispatchError::Serde(error.to_string()))?,
3371 replay_of_event_id,
3372 )
3373 .await
3374 }
3375
3376 async fn append_lifecycle_event(
3377 &self,
3378 kind: &str,
3379 event: &TriggerEvent,
3380 binding: &TriggerBinding,
3381 payload: serde_json::Value,
3382 replay_of_event_id: Option<&String>,
3383 ) -> Result<(), DispatchError> {
3384 self.append_topic_event(
3385 TRIGGERS_LIFECYCLE_TOPIC,
3386 kind,
3387 event,
3388 Some(binding),
3389 None,
3390 payload,
3391 replay_of_event_id,
3392 )
3393 .await
3394 }
3395
3396 async fn append_budget_exhausted_event(
3397 &self,
3398 binding: &TriggerBinding,
3399 event: &TriggerEvent,
3400 reason: &str,
3401 expected_cost_usd_micros: u64,
3402 tokens: Option<u64>,
3403 replay_of_event_id: Option<&String>,
3404 ) -> Result<(), DispatchError> {
3405 let payload = serde_json::json!({
3406 "trigger_id": binding.id.as_str(),
3407 "event_id": event.id.0,
3408 "reason": reason,
3409 "strategy": binding.on_budget_exhausted.as_str(),
3410 "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
3411 "cost_usd": micros_to_usd(expected_cost_usd_micros),
3412 "tokens": tokens,
3413 "daily_limit_usd": binding.daily_cost_usd,
3414 "hourly_limit_usd": binding.hourly_cost_usd,
3415 "cost_today_usd": current_predicate_daily_cost(binding),
3416 "cost_hour_usd": current_predicate_hourly_cost(binding),
3417 "replay_of_event_id": replay_of_event_id,
3418 });
3419 self.append_lifecycle_event(
3420 "predicate.budget_exceeded",
3421 event,
3422 binding,
3423 payload.clone(),
3424 replay_of_event_id,
3425 )
3426 .await?;
3427 let legacy_kind = match reason {
3428 "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
3429 "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
3430 "orchestrator_daily_budget_exceeded" => {
3431 Some("predicate.orchestrator_daily_budget_exceeded")
3432 }
3433 "orchestrator_hourly_budget_exceeded" => {
3434 Some("predicate.orchestrator_hourly_budget_exceeded")
3435 }
3436 _ => None,
3437 };
3438 if let Some(kind) = legacy_kind {
3439 self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
3440 .await?;
3441 }
3442 Ok(())
3443 }
3444
3445 async fn append_autonomy_budget_approval_request(
3446 &self,
3447 binding: &TriggerBinding,
3448 route: &DispatchUri,
3449 event: &TriggerEvent,
3450 replay_of_event_id: Option<&String>,
3451 reason: &str,
3452 ) -> Result<String, DispatchError> {
3453 let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
3454 let detail = serde_json::json!({
3455 "trigger_id": binding.id.as_str(),
3456 "binding_key": binding.binding_key(),
3457 "event_id": event.id.0,
3458 "reason": reason,
3459 "from_tier": AutonomyTier::ActAuto.as_str(),
3460 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3461 "handler_kind": route.kind(),
3462 "target_uri": route.target_uri(),
3463 "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3464 "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3465 "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3466 "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3467 "replay_of_event_id": replay_of_event_id,
3468 });
3469 let request_id = crate::stdlib::hitl::append_approval_request_on(
3470 &self.event_log,
3471 binding.id.as_str().to_string(),
3472 event.trace_id.0.clone(),
3473 format!(
3474 "approve autonomous dispatch for trigger '{}' after {}",
3475 binding.id.as_str(),
3476 reason
3477 ),
3478 detail.clone(),
3479 reviewers.clone(),
3480 )
3481 .await
3482 .map_err(dispatch_error_from_vm_error)?;
3483 self.append_lifecycle_event(
3484 "autonomy.budget_exceeded",
3485 event,
3486 binding,
3487 serde_json::json!({
3488 "trigger_id": binding.id.as_str(),
3489 "event_id": event.id.0,
3490 "reason": reason,
3491 "request_id": request_id,
3492 "reviewers": reviewers,
3493 "from_tier": AutonomyTier::ActAuto.as_str(),
3494 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3495 "replay_of_event_id": replay_of_event_id,
3496 }),
3497 replay_of_event_id,
3498 )
3499 .await?;
3500 Ok(request_id)
3501 }
3502
3503 #[allow(clippy::too_many_arguments)]
3504 async fn emit_autonomy_budget_approval_action_graph(
3505 &self,
3506 binding: &TriggerBinding,
3507 route: &DispatchUri,
3508 event: &TriggerEvent,
3509 source_node_id: &str,
3510 replay_of_event_id: Option<&String>,
3511 reason: &str,
3512 request_id: &str,
3513 ) -> Result<(), DispatchError> {
3514 let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3515 self.emit_action_graph(
3516 event,
3517 vec![RunActionGraphNodeRecord {
3518 id: approval_node_id.clone(),
3519 label: format!("approval required: {reason}"),
3520 kind: "approval".to_string(),
3521 status: "waiting".to_string(),
3522 outcome: "request_approval".to_string(),
3523 trace_id: Some(event.trace_id.0.clone()),
3524 stage_id: None,
3525 node_id: None,
3526 worker_id: None,
3527 run_id: None,
3528 run_path: None,
3529 metadata: BTreeMap::from([
3530 (
3531 "trigger_id".to_string(),
3532 serde_json::json!(binding.id.as_str()),
3533 ),
3534 (
3535 "binding_key".to_string(),
3536 serde_json::json!(binding.binding_key()),
3537 ),
3538 ("event_id".to_string(), serde_json::json!(event.id.0)),
3539 ("reason".to_string(), serde_json::json!(reason)),
3540 ("request_id".to_string(), serde_json::json!(request_id)),
3541 (
3542 "reviewers".to_string(),
3543 serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3544 ),
3545 ("handler_kind".to_string(), serde_json::json!(route.kind())),
3546 (
3547 "target_uri".to_string(),
3548 serde_json::json!(route.target_uri()),
3549 ),
3550 (
3551 "from_tier".to_string(),
3552 serde_json::json!(AutonomyTier::ActAuto.as_str()),
3553 ),
3554 (
3555 "requested_tier".to_string(),
3556 serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3557 ),
3558 (
3559 "replay_of_event_id".to_string(),
3560 serde_json::json!(replay_of_event_id),
3561 ),
3562 ]),
3563 }],
3564 vec![RunActionGraphEdgeRecord {
3565 from_id: source_node_id.to_string(),
3566 to_id: approval_node_id,
3567 kind: "approval_gate".to_string(),
3568 label: Some("autonomy budget".to_string()),
3569 }],
3570 serde_json::json!({
3571 "source": "dispatcher",
3572 "trigger_id": binding.id.as_str(),
3573 "binding_key": binding.binding_key(),
3574 "event_id": event.id.0,
3575 "reason": reason,
3576 "request_id": request_id,
3577 "replay_of_event_id": replay_of_event_id,
3578 }),
3579 )
3580 .await
3581 }
3582
3583 #[allow(clippy::too_many_arguments)]
3584 async fn append_tier_transition_trust_record(
3585 &self,
3586 binding: &TriggerBinding,
3587 event: &TriggerEvent,
3588 replay_of_event_id: Option<&String>,
3589 from_tier: AutonomyTier,
3590 to_tier: AutonomyTier,
3591 reason: &str,
3592 request_id: &str,
3593 ) -> Result<(), DispatchError> {
3594 let mut record = TrustRecord::new(
3595 binding.id.as_str().to_string(),
3596 "autonomy.tier_transition",
3597 Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3598 TrustOutcome::Denied,
3599 event.trace_id.0.clone(),
3600 to_tier,
3601 );
3602 record.metadata.insert(
3603 "binding_key".to_string(),
3604 serde_json::json!(binding.binding_key()),
3605 );
3606 record
3607 .metadata
3608 .insert("event_id".to_string(), serde_json::json!(event.id.0));
3609 record.metadata.insert(
3610 "from_tier".to_string(),
3611 serde_json::json!(from_tier.as_str()),
3612 );
3613 record
3614 .metadata
3615 .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3616 record
3617 .metadata
3618 .insert("reason".to_string(), serde_json::json!(reason));
3619 record
3620 .metadata
3621 .insert("request_id".to_string(), serde_json::json!(request_id));
3622 if let Some(replay_of_event_id) = replay_of_event_id {
3623 record.metadata.insert(
3624 "replay_of_event_id".to_string(),
3625 serde_json::json!(replay_of_event_id),
3626 );
3627 }
3628 append_trust_record(&self.event_log, &record)
3629 .await
3630 .map(|_| ())
3631 .map_err(DispatchError::from)
3632 }
3633
3634 async fn append_budget_deferred_event(
3635 &self,
3636 binding: &TriggerBinding,
3637 route: &DispatchUri,
3638 event: &TriggerEvent,
3639 replay_of_event_id: Option<&String>,
3640 reason: &str,
3641 ) -> Result<(), DispatchError> {
3642 self.append_topic_event(
3643 TRIGGER_ATTEMPTS_TOPIC,
3644 "budget_deferred",
3645 event,
3646 Some(binding),
3647 None,
3648 serde_json::json!({
3649 "event_id": event.id.0,
3650 "trigger_id": binding.id.as_str(),
3651 "binding_key": binding.binding_key(),
3652 "handler_kind": route.kind(),
3653 "target_uri": route.target_uri(),
3654 "reason": reason,
3655 "retry_at": next_budget_reset_rfc3339(binding),
3656 "replay_of_event_id": replay_of_event_id,
3657 }),
3658 replay_of_event_id,
3659 )
3660 .await?;
3661 self.append_skipped_outbox_event(
3662 binding,
3663 route,
3664 event,
3665 replay_of_event_id,
3666 DispatchSkipStage::Predicate,
3667 serde_json::json!({
3668 "deferred": true,
3669 "reason": reason,
3670 "retry_at": next_budget_reset_rfc3339(binding),
3671 }),
3672 )
3673 .await
3674 }
3675
3676 async fn move_budget_exhausted_to_dlq(
3677 &self,
3678 binding: &TriggerBinding,
3679 route: &DispatchUri,
3680 event: &TriggerEvent,
3681 replay_of_event_id: Option<&String>,
3682 final_error: &str,
3683 ) -> Result<(), DispatchError> {
3684 let dlq_entry = DlqEntry {
3685 trigger_id: binding.id.as_str().to_string(),
3686 binding_key: binding.binding_key(),
3687 event: event.clone(),
3688 attempt_count: 0,
3689 final_error: final_error.to_string(),
3690 error_class: crate::triggers::classify_trigger_dlq_error(final_error).to_string(),
3691 attempts: Vec::new(),
3692 };
3693 self.state
3694 .dlq
3695 .lock()
3696 .expect("dispatcher dlq poisoned")
3697 .push(dlq_entry.clone());
3698 if let Some(metrics) = self.metrics.as_ref() {
3699 metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3700 metrics.record_trigger_accepted_to_dlq(
3701 binding.id.as_str(),
3702 &binding.binding_key(),
3703 event.provider.as_str(),
3704 tenant_id(event),
3705 "budget_exhausted",
3706 duration_between_ms(current_unix_ms(), accepted_at_ms(None, event)),
3707 );
3708 }
3709 tracing::info!(
3710 component = "dispatcher",
3711 lifecycle = "dlq_moved",
3712 trigger_id = %binding.id.as_str(),
3713 binding_key = %binding.binding_key(),
3714 event_id = %event.id.0,
3715 reason = "budget_exhausted",
3716 trace_id = %event.trace_id.0
3717 );
3718 self.emit_action_graph(
3719 event,
3720 vec![RunActionGraphNodeRecord {
3721 id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3722 label: binding.id.as_str().to_string(),
3723 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3724 status: "queued".to_string(),
3725 outcome: "budget_exhausted".to_string(),
3726 trace_id: Some(event.trace_id.0.clone()),
3727 stage_id: None,
3728 node_id: None,
3729 worker_id: None,
3730 run_id: None,
3731 run_path: None,
3732 metadata: dlq_node_metadata(binding, event, 0, final_error),
3733 }],
3734 vec![RunActionGraphEdgeRecord {
3735 from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3736 to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3737 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3738 label: Some("budget exhausted".to_string()),
3739 }],
3740 serde_json::json!({
3741 "source": "dispatcher",
3742 "trigger_id": binding.id.as_str(),
3743 "binding_key": binding.binding_key(),
3744 "event_id": event.id.0,
3745 "handler_kind": route.kind(),
3746 "target_uri": route.target_uri(),
3747 "final_error": final_error,
3748 "replay_of_event_id": replay_of_event_id,
3749 }),
3750 )
3751 .await?;
3752 self.append_lifecycle_event(
3753 "DlqMoved",
3754 event,
3755 binding,
3756 serde_json::json!({
3757 "event_id": event.id.0,
3758 "attempt_count": 0,
3759 "final_error": final_error,
3760 "reason": "budget_exhausted",
3761 "replay_of_event_id": replay_of_event_id,
3762 }),
3763 replay_of_event_id,
3764 )
3765 .await?;
3766 self.append_topic_event(
3767 TRIGGER_DLQ_TOPIC,
3768 "dlq_moved",
3769 event,
3770 Some(binding),
3771 None,
3772 serde_json::to_value(&dlq_entry)
3773 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3774 replay_of_event_id,
3775 )
3776 .await
3777 }
3778
3779 async fn move_circuit_open_to_dlq(
3780 &self,
3781 binding: &TriggerBinding,
3782 route: &DispatchUri,
3783 event: &TriggerEvent,
3784 replay_of_event_id: Option<&String>,
3785 final_error: &str,
3786 destination: &str,
3787 ) -> Result<(), DispatchError> {
3788 let dlq_entry = DlqEntry {
3789 trigger_id: binding.id.as_str().to_string(),
3790 binding_key: binding.binding_key(),
3791 event: event.clone(),
3792 attempt_count: 0,
3793 final_error: final_error.to_string(),
3794 error_class: crate::triggers::classify_trigger_dlq_error(final_error).to_string(),
3795 attempts: Vec::new(),
3796 };
3797 self.state
3798 .dlq
3799 .lock()
3800 .expect("dispatcher dlq poisoned")
3801 .push(dlq_entry.clone());
3802 if let Some(metrics) = self.metrics.as_ref() {
3803 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
3804 metrics.record_trigger_accepted_to_dlq(
3805 binding.id.as_str(),
3806 &binding.binding_key(),
3807 event.provider.as_str(),
3808 tenant_id(event),
3809 "circuit_open",
3810 Duration::ZERO,
3811 );
3812 }
3813 tracing::info!(
3814 component = "dispatcher",
3815 lifecycle = "dlq_moved",
3816 trigger_id = %binding.id.as_str(),
3817 binding_key = %binding.binding_key(),
3818 event_id = %event.id.0,
3819 reason = "circuit_open",
3820 destination,
3821 trace_id = %event.trace_id.0
3822 );
3823 self.emit_action_graph(
3824 event,
3825 vec![RunActionGraphNodeRecord {
3826 id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3827 label: binding.id.as_str().to_string(),
3828 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3829 status: "queued".to_string(),
3830 outcome: "circuit_open".to_string(),
3831 trace_id: Some(event.trace_id.0.clone()),
3832 stage_id: None,
3833 node_id: None,
3834 worker_id: None,
3835 run_id: None,
3836 run_path: None,
3837 metadata: dlq_node_metadata(binding, event, 0, final_error),
3838 }],
3839 vec![RunActionGraphEdgeRecord {
3840 from_id: format!("trigger:{}", event.id.0),
3841 to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3842 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3843 label: Some("circuit open".to_string()),
3844 }],
3845 serde_json::json!({
3846 "source": "dispatcher",
3847 "trigger_id": binding.id.as_str(),
3848 "binding_key": binding.binding_key(),
3849 "event_id": event.id.0,
3850 "handler_kind": route.kind(),
3851 "target_uri": route.target_uri(),
3852 "destination": destination,
3853 "final_error": final_error,
3854 "replay_of_event_id": replay_of_event_id,
3855 }),
3856 )
3857 .await?;
3858 self.append_lifecycle_event(
3859 "DlqMoved",
3860 event,
3861 binding,
3862 serde_json::json!({
3863 "event_id": event.id.0,
3864 "attempt_count": 0,
3865 "final_error": final_error,
3866 "reason": "circuit_open",
3867 "destination": destination,
3868 "replay_of_event_id": replay_of_event_id,
3869 }),
3870 replay_of_event_id,
3871 )
3872 .await?;
3873 self.append_topic_event(
3874 TRIGGER_DLQ_TOPIC,
3875 "dlq_moved",
3876 event,
3877 Some(binding),
3878 None,
3879 serde_json::to_value(&dlq_entry)
3880 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3881 replay_of_event_id,
3882 )
3883 .await
3884 }
3885
3886 async fn append_skipped_outbox_event(
3887 &self,
3888 binding: &TriggerBinding,
3889 route: &DispatchUri,
3890 event: &TriggerEvent,
3891 replay_of_event_id: Option<&String>,
3892 stage: DispatchSkipStage,
3893 detail: serde_json::Value,
3894 ) -> Result<(), DispatchError> {
3895 self.append_topic_event(
3896 TRIGGER_OUTBOX_TOPIC,
3897 "dispatch_skipped",
3898 event,
3899 Some(binding),
3900 None,
3901 serde_json::json!({
3902 "event_id": event.id.0,
3903 "trigger_id": binding.id.as_str(),
3904 "binding_key": binding.binding_key(),
3905 "handler_kind": route.kind(),
3906 "target_uri": route.target_uri(),
3907 "skip_stage": stage.as_str(),
3908 "detail": detail,
3909 "replay_of_event_id": replay_of_event_id,
3910 }),
3911 replay_of_event_id,
3912 )
3913 .await
3914 }
3915
3916 async fn append_topic_event(
3917 &self,
3918 topic_name: &str,
3919 kind: &str,
3920 event: &TriggerEvent,
3921 binding: Option<&TriggerBinding>,
3922 attempt: Option<u32>,
3923 payload: serde_json::Value,
3924 replay_of_event_id: Option<&String>,
3925 ) -> Result<(), DispatchError> {
3926 let topic = topic_for_event(event, topic_name)?;
3927 let headers = event_headers(event, binding, attempt, replay_of_event_id);
3928 self.event_log
3929 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3930 .await
3931 .map_err(DispatchError::from)
3932 .map(|_| ())
3933 }
3934
3935 async fn emit_action_graph(
3936 &self,
3937 event: &TriggerEvent,
3938 nodes: Vec<RunActionGraphNodeRecord>,
3939 edges: Vec<RunActionGraphEdgeRecord>,
3940 extra: serde_json::Value,
3941 ) -> Result<(), DispatchError> {
3942 let mut headers = BTreeMap::new();
3943 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3944 headers.insert("event_id".to_string(), event.id.0.clone());
3945 let observability = RunObservabilityRecord {
3946 schema_version: 1,
3947 action_graph_nodes: nodes,
3948 action_graph_edges: edges,
3949 ..Default::default()
3950 };
3951 append_action_graph_update(
3952 headers,
3953 serde_json::json!({
3954 "source": "dispatcher",
3955 "trace_id": event.trace_id.0,
3956 "event_id": event.id.0,
3957 "observability": observability,
3958 "context": extra,
3959 }),
3960 )
3961 .await
3962 .map_err(DispatchError::from)
3963 }
3964}
3965
3966async fn dispatch_cancel_requested(
3967 event_log: &Arc<AnyEventLog>,
3968 binding_key: &str,
3969 event_id: &str,
3970 replay_of_event_id: Option<&String>,
3971) -> Result<bool, DispatchError> {
3972 if replay_of_event_id.is_some() {
3973 return Ok(false);
3974 }
3975 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3976 .expect("static trigger cancel topic should always be valid");
3977 let events = event_log.read_range(&topic, None, usize::MAX).await?;
3978 let requested = events
3979 .into_iter()
3980 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3981 .filter_map(|(_, event)| {
3982 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3983 })
3984 .collect::<BTreeSet<_>>();
3985 Ok(requested
3986 .iter()
3987 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3988}
3989
3990async fn sleep_or_cancel_or_request(
3991 event_log: &Arc<AnyEventLog>,
3992 delay: Duration,
3993 binding_key: &str,
3994 event_id: &str,
3995 replay_of_event_id: Option<&String>,
3996 cancel_rx: &mut broadcast::Receiver<()>,
3997) -> Result<(), DispatchError> {
3998 let sleep = tokio::time::sleep(delay);
3999 pin_mut!(sleep);
4000 let mut poll = tokio::time::interval(Duration::from_millis(100));
4001 loop {
4002 tokio::select! {
4003 _ = &mut sleep => return Ok(()),
4004 _ = recv_cancel(cancel_rx) => {
4005 return Err(DispatchError::Cancelled(
4006 "dispatcher shutdown cancelled retry wait".to_string(),
4007 ));
4008 }
4009 _ = poll.tick() => {
4010 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
4011 return Err(DispatchError::Cancelled(
4012 "trigger cancel request cancelled retry wait".to_string(),
4013 ));
4014 }
4015 }
4016 }
4017 }
4018}
4019
4020fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
4021 let mut iter = events.into_iter();
4022 let Some(mut root) = iter.next() else {
4023 return Err(DispatchError::Registry(
4024 "batch dispatch produced an empty event list".to_string(),
4025 ));
4026 };
4027 let mut batch = Vec::new();
4028 batch.push(
4029 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
4030 );
4031 for event in iter {
4032 batch.push(
4033 serde_json::to_value(&event)
4034 .map_err(|error| DispatchError::Serde(error.to_string()))?,
4035 );
4036 }
4037 root.batch = Some(batch);
4038 Ok(root)
4039}
4040
4041fn json_value_to_gate(value: &serde_json::Value) -> String {
4042 match value {
4043 serde_json::Value::Null => "null".to_string(),
4044 serde_json::Value::String(text) => text.clone(),
4045 serde_json::Value::Bool(value) => value.to_string(),
4046 serde_json::Value::Number(value) => value.to_string(),
4047 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
4048 }
4049}
4050
4051fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
4052 let json =
4053 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4054 let value = json_to_vm_value(&json);
4055 match (&event.raw_body, value) {
4056 (Some(raw_body), VmValue::Dict(dict)) => {
4057 let mut map = (*dict).clone();
4058 map.insert(
4059 "raw_body".to_string(),
4060 VmValue::Bytes(Rc::new(raw_body.clone())),
4061 );
4062 Ok(VmValue::Dict(Rc::new(map)))
4063 }
4064 (_, other) => Ok(other),
4065 }
4066}
4067
4068fn decrement_in_flight(state: &DispatcherRuntimeState) {
4069 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
4070 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
4071 state.idle_notify.notify_waiters();
4072 }
4073}
4074
4075fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
4076 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
4077 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
4078 state.idle_notify.notify_waiters();
4079 }
4080}
4081
4082#[cfg(test)]
4083fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
4084 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4085 *slot.borrow_mut() = Some(tx);
4086 });
4087}
4088
4089#[cfg(not(test))]
4090fn notify_test_inbox_dequeued() {}
4091
4092#[cfg(test)]
4093fn notify_test_inbox_dequeued() {
4094 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4095 if let Some(tx) = slot.borrow_mut().take() {
4096 let _ = tx.send(());
4097 }
4098 });
4099}
4100
4101pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
4102 event_log: &L,
4103 event: &TriggerEvent,
4104) -> Result<u64, DispatchError> {
4105 let topic = topic_for_event(event, TRIGGER_INBOX_ENVELOPES_TOPIC)?;
4106 let headers = event_headers(event, None, None, None);
4107 let payload =
4108 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4109 event_log
4110 .append(
4111 &topic,
4112 LogEvent::new("event_ingested", payload).with_headers(headers),
4113 )
4114 .await
4115 .map_err(DispatchError::from)
4116}
4117
4118pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
4119 ACTIVE_DISPATCHER_STATE.with(|slot| {
4120 slot.borrow()
4121 .as_ref()
4122 .map(|state| DispatcherStatsSnapshot {
4123 in_flight: state.in_flight.load(Ordering::Relaxed),
4124 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
4125 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
4126 })
4127 .unwrap_or_default()
4128 })
4129}
4130
4131pub fn clear_dispatcher_state() {
4132 ACTIVE_DISPATCHER_STATE.with(|slot| {
4133 *slot.borrow_mut() = None;
4134 });
4135 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
4136 *slot.borrow_mut() = None;
4137 });
4138}
4139
4140fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
4141 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
4142 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
4143 }
4144 if is_cancelled_vm_error(&error) {
4145 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
4146 }
4147 if let VmError::Thrown(VmValue::String(message)) = &error {
4148 return DispatchError::Local(message.to_string());
4149 }
4150 match error_to_category(&error) {
4151 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
4152 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
4153 ErrorCategory::Cancelled => {
4154 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
4155 }
4156 _ => DispatchError::Local(error.to_string()),
4157 }
4158}
4159
4160fn dispatch_error_label(error: &DispatchError) -> &'static str {
4161 match error {
4162 DispatchError::Denied(_) => "denied",
4163 DispatchError::Timeout(_) => "timeout",
4164 DispatchError::Waiting(_) => "waiting",
4165 DispatchError::Cancelled(_) => "cancelled",
4166 _ => "failed",
4167 }
4168}
4169
4170fn destination_circuit_key(route: &DispatchUri) -> String {
4171 format!("{}:{}", route.kind(), route.target_uri())
4172}
4173
4174fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
4175 match route {
4176 DispatchUri::Worker { .. } => "enqueued",
4177 DispatchUri::A2a { .. }
4178 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
4179 {
4180 "pending"
4181 }
4182 DispatchUri::A2a { .. } => "completed",
4183 DispatchUri::Local { .. } => "success",
4184 }
4185}
4186
4187fn dispatch_node_id(
4188 route: &DispatchUri,
4189 binding_key: &str,
4190 event_id: &str,
4191 attempt: u32,
4192) -> String {
4193 let prefix = match route {
4194 DispatchUri::A2a { .. } => "a2a",
4195 _ => "dispatch",
4196 };
4197 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
4198}
4199
4200fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
4201 match route {
4202 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
4203 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
4204 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
4205 }
4206}
4207
4208fn dispatch_node_label(route: &DispatchUri) -> String {
4209 match route {
4210 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
4211 _ => route.target_uri(),
4212 }
4213}
4214
4215fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
4216 match route {
4217 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
4218 _ => None,
4219 }
4220}
4221
4222fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
4223 match route {
4224 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
4225 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
4226 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
4227 }
4228}
4229
4230fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
4231 match status {
4232 crate::triggers::SignatureStatus::Verified => "verified",
4233 crate::triggers::SignatureStatus::Unsigned => "unsigned",
4234 crate::triggers::SignatureStatus::Failed { .. } => "failed",
4235 }
4236}
4237
4238fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
4239 let mut metadata = BTreeMap::new();
4240 metadata.insert(
4241 "provider".to_string(),
4242 serde_json::json!(event.provider.as_str()),
4243 );
4244 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
4245 metadata.insert(
4246 "dedupe_key".to_string(),
4247 serde_json::json!(event.dedupe_key),
4248 );
4249 metadata.insert(
4250 "signature_status".to_string(),
4251 serde_json::json!(signature_status_label(&event.signature_status)),
4252 );
4253 metadata
4254}
4255
4256fn predicate_node_metadata(
4257 binding: &TriggerBinding,
4258 predicate: &super::registry::TriggerPredicateSpec,
4259 event: &TriggerEvent,
4260 evaluation: &PredicateEvaluationRecord,
4261) -> BTreeMap<String, serde_json::Value> {
4262 let mut metadata = BTreeMap::new();
4263 metadata.insert(
4264 "trigger_id".to_string(),
4265 serde_json::json!(binding.id.as_str()),
4266 );
4267 metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
4268 metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
4269 metadata.insert(
4270 "cost_usd".to_string(),
4271 serde_json::json!(evaluation.cost_usd),
4272 );
4273 metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
4274 metadata.insert(
4275 "latency_ms".to_string(),
4276 serde_json::json!(evaluation.latency_ms),
4277 );
4278 metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
4279 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4280 if let Some(reason) = evaluation.reason.as_ref() {
4281 metadata.insert("reason".to_string(), serde_json::json!(reason));
4282 }
4283 metadata
4284}
4285
4286fn dispatch_node_metadata(
4287 route: &DispatchUri,
4288 binding: &TriggerBinding,
4289 event: &TriggerEvent,
4290 attempt: u32,
4291) -> BTreeMap<String, serde_json::Value> {
4292 let mut metadata = BTreeMap::new();
4293 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
4294 metadata.insert(
4295 "target_uri".to_string(),
4296 serde_json::json!(route.target_uri()),
4297 );
4298 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4299 metadata.insert(
4300 "trigger_id".to_string(),
4301 serde_json::json!(binding.id.as_str()),
4302 );
4303 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4304 if let Some(target_agent) = dispatch_target_agent(route) {
4305 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
4306 }
4307 if let DispatchUri::Worker { queue } = route {
4308 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
4309 }
4310 metadata
4311}
4312
4313fn dispatch_success_metadata(
4314 route: &DispatchUri,
4315 binding: &TriggerBinding,
4316 event: &TriggerEvent,
4317 attempt: u32,
4318 result: &serde_json::Value,
4319) -> BTreeMap<String, serde_json::Value> {
4320 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4321 match route {
4322 DispatchUri::A2a { .. } => {
4323 if let Some(task_id) = result
4324 .get("task_id")
4325 .or_else(|| result.get("id"))
4326 .and_then(|value| value.as_str())
4327 {
4328 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
4329 }
4330 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
4331 metadata.insert("state".to_string(), serde_json::json!(state));
4332 }
4333 }
4334 DispatchUri::Worker { .. } => {
4335 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
4336 {
4337 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
4338 }
4339 if let Some(response_topic) = result
4340 .get("response_topic")
4341 .and_then(|value| value.as_str())
4342 {
4343 metadata.insert(
4344 "response_topic".to_string(),
4345 serde_json::json!(response_topic),
4346 );
4347 }
4348 }
4349 DispatchUri::Local { .. } => {}
4350 }
4351 metadata
4352}
4353
4354fn dispatch_error_metadata(
4355 route: &DispatchUri,
4356 binding: &TriggerBinding,
4357 event: &TriggerEvent,
4358 attempt: u32,
4359 error: &DispatchError,
4360) -> BTreeMap<String, serde_json::Value> {
4361 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4362 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4363 metadata
4364}
4365
4366fn retry_node_metadata(
4367 binding: &TriggerBinding,
4368 event: &TriggerEvent,
4369 attempt: u32,
4370 delay: Duration,
4371 error: &DispatchError,
4372) -> BTreeMap<String, serde_json::Value> {
4373 let mut metadata = BTreeMap::new();
4374 metadata.insert(
4375 "trigger_id".to_string(),
4376 serde_json::json!(binding.id.as_str()),
4377 );
4378 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4379 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4380 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
4381 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4382 metadata
4383}
4384
4385fn dlq_node_metadata(
4386 binding: &TriggerBinding,
4387 event: &TriggerEvent,
4388 attempt_count: u32,
4389 final_error: &str,
4390) -> BTreeMap<String, serde_json::Value> {
4391 let mut metadata = BTreeMap::new();
4392 metadata.insert(
4393 "trigger_id".to_string(),
4394 serde_json::json!(binding.id.as_str()),
4395 );
4396 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4397 metadata.insert(
4398 "attempt_count".to_string(),
4399 serde_json::json!(attempt_count),
4400 );
4401 metadata.insert("final_error".to_string(), serde_json::json!(final_error));
4402 metadata.insert(
4403 "error_class".to_string(),
4404 serde_json::json!(crate::triggers::classify_trigger_dlq_error(final_error)),
4405 );
4406 metadata
4407}
4408
4409fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
4410 match value {
4411 VmValue::Bool(result) => Ok(result),
4412 VmValue::EnumVariant {
4413 enum_name,
4414 variant,
4415 fields,
4416 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
4417 Some(VmValue::Bool(result)) => Ok(*result),
4418 Some(other) => Err(format!(
4419 "predicate Result.Ok payload must be bool, got {}",
4420 other.type_name()
4421 )),
4422 None => Err("predicate Result.Ok payload is missing".to_string()),
4423 },
4424 VmValue::EnumVariant {
4425 enum_name,
4426 variant,
4427 fields,
4428 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
4429 .first()
4430 .map(VmValue::display)
4431 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
4432 other => Err(format!(
4433 "predicate must return bool or Result<bool, _>, got {}",
4434 other.type_name()
4435 )),
4436 }
4437}
4438
4439fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
4440 micros_to_usd(
4441 binding
4442 .metrics
4443 .cost_today_usd_micros
4444 .load(Ordering::Relaxed),
4445 )
4446}
4447
4448fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
4449 micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
4450}
4451
4452fn split_binding_key(binding_key: &str) -> (String, u32) {
4453 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
4454 return (binding_key.to_string(), 0);
4455 };
4456 let version = suffix.parse::<u32>().unwrap_or(0);
4457 (binding_id.to_string(), version)
4458}
4459
4460fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
4461 match binding_version {
4462 Some(version) => format!("{trigger_id}@v{version}"),
4463 None => trigger_id.to_string(),
4464 }
4465}
4466
4467fn tenant_id(event: &TriggerEvent) -> Option<&str> {
4468 event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
4469}
4470
4471fn current_unix_ms() -> i64 {
4472 unix_ms(time::OffsetDateTime::now_utc())
4473}
4474
4475fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
4476 (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
4477}
4478
4479fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4480 lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
4481 .unwrap_or_else(|| unix_ms(event.received_at))
4482}
4483
4484fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4485 lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
4486 .unwrap_or_else(|| accepted_at_ms(headers, event))
4487}
4488
4489fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
4490 headers
4491 .and_then(|headers| headers.get(name))
4492 .and_then(|value| value.parse::<i64>().ok())
4493}
4494
4495fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
4496 Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
4497}
4498
4499fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
4500 match result {
4501 Ok(_) => "succeeded",
4502 Err(DispatchError::Waiting(_)) => "waiting",
4503 Err(DispatchError::Cancelled(_)) => "cancelled",
4504 Err(DispatchError::Denied(_)) => "denied",
4505 Err(DispatchError::Timeout(_)) => "timeout",
4506 Err(_) => "failed",
4507 }
4508}
4509
4510fn is_cancelled_vm_error(error: &VmError) -> bool {
4511 matches!(
4512 error,
4513 VmError::Thrown(VmValue::String(message))
4514 if message.starts_with("kind:cancelled:")
4515 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
4516}
4517
4518fn event_headers(
4519 event: &TriggerEvent,
4520 binding: Option<&TriggerBinding>,
4521 attempt: Option<u32>,
4522 replay_of_event_id: Option<&String>,
4523) -> BTreeMap<String, String> {
4524 let mut headers = BTreeMap::new();
4525 headers.insert("event_id".to_string(), event.id.0.clone());
4526 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
4527 headers.insert("provider".to_string(), event.provider.as_str().to_string());
4528 headers.insert("kind".to_string(), event.kind.clone());
4529 if let Some(replay_of_event_id) = replay_of_event_id {
4530 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
4531 }
4532 if let Some(tenant_id) = event.tenant_id.as_ref() {
4533 headers.insert("tenant_id".to_string(), tenant_id.0.clone());
4534 }
4535 if let Some(binding) = binding {
4536 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
4537 headers.insert("binding_key".to_string(), binding.binding_key());
4538 headers.insert(
4539 "handler_kind".to_string(),
4540 DispatchUri::from(&binding.handler).kind().to_string(),
4541 );
4542 }
4543 if let Some(attempt) = attempt {
4544 headers.insert("attempt".to_string(), attempt.to_string());
4545 }
4546 headers
4547}
4548
4549fn topic_for_event(event: &TriggerEvent, topic_name: &str) -> Result<Topic, DispatchError> {
4550 let topic = Topic::new(topic_name)
4551 .expect("static trigger dispatcher topic names should always be valid");
4552 match event.tenant_id.as_ref() {
4553 Some(tenant_id) => crate::tenant_topic(tenant_id, &topic).map_err(DispatchError::from),
4554 None => Ok(topic),
4555 }
4556}
4557
4558fn worker_queue_priority(
4559 binding: &super::registry::TriggerBinding,
4560 event: &TriggerEvent,
4561) -> crate::WorkerQueuePriority {
4562 match event
4563 .headers
4564 .get("priority")
4565 .map(|value| value.trim().to_ascii_lowercase())
4566 .as_deref()
4567 {
4568 Some("high") => crate::WorkerQueuePriority::High,
4569 Some("low") => crate::WorkerQueuePriority::Low,
4570 _ => binding.dispatch_priority,
4571 }
4572}
4573
4574const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
4575
4576fn maybe_fail_before_outbox() {
4577 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
4578 std::process::exit(86);
4579 }
4580}
4581
4582fn now_rfc3339() -> String {
4583 time::OffsetDateTime::now_utc()
4584 .format(&time::format_description::well_known::Rfc3339)
4585 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4586}
4587
4588fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
4589 let now = time::OffsetDateTime::now_utc();
4590 let reset = if binding.hourly_cost_usd.is_some() {
4591 let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
4592 time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
4593 } else {
4594 let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
4595 time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
4596 };
4597 reset
4598 .format(&time::format_description::well_known::Rfc3339)
4599 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4600}
4601
4602fn now_unix_ms() -> i64 {
4603 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
4604}
4605
4606fn cancelled_dispatch_outcome(
4607 binding: &TriggerBinding,
4608 route: &DispatchUri,
4609 event: &TriggerEvent,
4610 replay_of_event_id: Option<String>,
4611 attempt_count: u32,
4612 error: String,
4613) -> DispatchOutcome {
4614 DispatchOutcome {
4615 trigger_id: binding.id.as_str().to_string(),
4616 binding_key: binding.binding_key(),
4617 event_id: event.id.0.clone(),
4618 attempt_count,
4619 status: DispatchStatus::Cancelled,
4620 handler_kind: route.kind().to_string(),
4621 target_uri: route.target_uri(),
4622 replay_of_event_id,
4623 result: None,
4624 error: Some(error),
4625 }
4626}
4627
4628async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
4629 let _ = cancel_rx.recv().await;
4630}
4631
4632#[cfg(test)]
4633mod tests;