1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::{Duration, Instant};
7
8use futures::{pin_mut, StreamExt};
9use serde::{Deserialize, Serialize};
10use tokio::sync::broadcast;
11use tokio::sync::{Mutex as AsyncMutex, Notify};
12use tracing::Instrument as _;
13
14use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
15use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
16use crate::llm::vm_value_to_json;
17use crate::orchestration::{
18 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
19 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
20 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
21 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
22 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
23 ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
24 ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
25};
26use crate::stdlib::json_to_vm_value;
27use crate::trust_graph::{
28 append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
29};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::{
35 binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
36 expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
37 note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
38 reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
39 TriggerHandlerSpec,
40};
41use super::{
42 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
43 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
44 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
45 TRIGGER_OUTBOX_TOPIC,
46};
47use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
48
49mod flow_control;
50pub mod retry;
51pub mod uri;
52
53pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
54
55pub const TRIGGER_ACCEPTED_AT_MS_HEADER: &str = "harn_trigger_accepted_at_ms";
56pub const TRIGGER_NORMALIZED_AT_MS_HEADER: &str = "harn_trigger_normalized_at_ms";
57pub const TRIGGER_QUEUE_APPENDED_AT_MS_HEADER: &str = "harn_trigger_queue_appended_at_ms";
58const DESTINATION_CIRCUIT_FAILURE_THRESHOLD: u32 = 5;
59const DESTINATION_CIRCUIT_BACKOFF: Duration = Duration::from_secs(60);
60
61thread_local! {
62 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
63 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
64 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
65 #[cfg(test)]
66 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
67}
68
69tokio::task_local! {
70 static ACTIVE_DISPATCH_IS_REPLAY: bool;
71}
72
73#[derive(Clone, Debug)]
74pub(crate) struct DispatchContext {
75 pub trigger_event: TriggerEvent,
76 pub replay_of_event_id: Option<String>,
77 pub binding_id: String,
78 pub binding_version: u32,
79 pub agent_id: String,
80 pub action: String,
81 pub autonomy_tier: AutonomyTier,
82}
83
84struct DispatchExecutionPolicyGuard;
85
86impl Drop for DispatchExecutionPolicyGuard {
87 fn drop(&mut self) {
88 crate::orchestration::pop_execution_policy();
89 }
90}
91
92#[derive(Clone, Debug, Serialize, Deserialize)]
93struct PredicateCacheRecord {
94 trigger_id: String,
95 event_id: String,
96 entries: Vec<PredicateCacheEntry>,
97}
98
99#[derive(Clone, Debug, Default)]
100struct PredicateEvaluationRecord {
101 result: bool,
102 cost_usd: f64,
103 tokens: u64,
104 latency_ms: u64,
105 cached: bool,
106 reason: Option<String>,
107 exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
108}
109
110const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
111
112pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
113 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
114}
115
116pub(crate) fn current_dispatch_is_replay() -> bool {
117 ACTIVE_DISPATCH_IS_REPLAY
118 .try_with(|is_replay| *is_replay)
119 .unwrap_or(false)
120}
121
122pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
123 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
124}
125
126#[derive(Clone)]
127pub struct Dispatcher {
128 base_vm: Rc<Vm>,
129 event_log: Arc<AnyEventLog>,
130 cancel_tx: broadcast::Sender<()>,
131 state: Arc<DispatcherRuntimeState>,
132 metrics: Option<Arc<crate::MetricsRegistry>>,
133}
134
135#[derive(Debug)]
136struct DispatcherRuntimeState {
137 in_flight: AtomicU64,
138 retry_queue_depth: AtomicU64,
139 dlq: Mutex<Vec<DlqEntry>>,
140 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
141 shutting_down: std::sync::atomic::AtomicBool,
142 idle_notify: Notify,
143 flow_control: FlowControlManager,
144 destination_circuits: DestinationCircuitRegistry,
145}
146
147impl DispatcherRuntimeState {
148 fn new(event_log: Arc<AnyEventLog>) -> Self {
149 Self {
150 in_flight: AtomicU64::new(0),
151 retry_queue_depth: AtomicU64::new(0),
152 dlq: Mutex::new(Vec::new()),
153 cancel_tokens: Mutex::new(Vec::new()),
154 shutting_down: std::sync::atomic::AtomicBool::new(false),
155 idle_notify: Notify::new(),
156 flow_control: FlowControlManager::new(event_log),
157 destination_circuits: DestinationCircuitRegistry::default(),
158 }
159 }
160}
161
162#[derive(Clone, Copy, Debug, PartialEq, Eq)]
163enum DestinationCircuitProbe {
164 Allow { half_open: bool },
165 Block { retry_after: Duration },
166}
167
168#[derive(Debug)]
169struct DestinationCircuitRegistry {
170 threshold: u32,
171 backoff: Duration,
172 states: Mutex<BTreeMap<String, DestinationCircuitState>>,
173}
174
175#[derive(Clone, Debug)]
176struct DestinationCircuitState {
177 failures: u32,
178 opened_at: Option<Instant>,
179}
180
181impl Default for DestinationCircuitRegistry {
182 fn default() -> Self {
183 Self {
184 threshold: DESTINATION_CIRCUIT_FAILURE_THRESHOLD,
185 backoff: DESTINATION_CIRCUIT_BACKOFF,
186 states: Mutex::new(BTreeMap::new()),
187 }
188 }
189}
190
191impl DestinationCircuitRegistry {
192 fn check(&self, destination: &str) -> DestinationCircuitProbe {
193 let mut states = self
194 .states
195 .lock()
196 .expect("destination circuit registry poisoned");
197 let Some(state) = states.get_mut(destination) else {
198 return DestinationCircuitProbe::Allow { half_open: false };
199 };
200 let Some(opened_at) = state.opened_at else {
201 return DestinationCircuitProbe::Allow { half_open: false };
202 };
203 let elapsed = opened_at.elapsed();
204 if elapsed >= self.backoff {
205 DestinationCircuitProbe::Allow { half_open: true }
206 } else {
207 DestinationCircuitProbe::Block {
208 retry_after: self.backoff.saturating_sub(elapsed),
209 }
210 }
211 }
212
213 fn record_success(&self, destination: &str) {
214 let mut states = self
215 .states
216 .lock()
217 .expect("destination circuit registry poisoned");
218 states.remove(destination);
219 }
220
221 fn record_failure(&self, destination: &str) -> bool {
222 let mut states = self
223 .states
224 .lock()
225 .expect("destination circuit registry poisoned");
226 let state = states
227 .entry(destination.to_string())
228 .or_insert(DestinationCircuitState {
229 failures: 0,
230 opened_at: None,
231 });
232 if state.opened_at.is_some() {
233 state.failures = self.threshold;
234 state.opened_at = Some(Instant::now());
235 return true;
236 }
237 state.failures = state.failures.saturating_add(1);
238 if state.failures >= self.threshold {
239 state.opened_at = Some(Instant::now());
240 true
241 } else {
242 false
243 }
244 }
245}
246
247#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
248#[serde(default)]
249pub struct DispatcherStatsSnapshot {
250 pub in_flight: u64,
251 pub retry_queue_depth: u64,
252 pub dlq_depth: u64,
253}
254
255#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
256#[serde(rename_all = "snake_case")]
257pub enum DispatchStatus {
258 Succeeded,
259 Failed,
260 Dlq,
261 Skipped,
262 Waiting,
263 Cancelled,
264}
265
266impl DispatchStatus {
267 pub fn as_str(&self) -> &'static str {
268 match self {
269 Self::Succeeded => "succeeded",
270 Self::Failed => "failed",
271 Self::Dlq => "dlq",
272 Self::Skipped => "skipped",
273 Self::Waiting => "waiting",
274 Self::Cancelled => "cancelled",
275 }
276 }
277}
278
279#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
280#[serde(default)]
281pub struct DispatchOutcome {
282 pub trigger_id: String,
283 pub binding_key: String,
284 pub event_id: String,
285 pub attempt_count: u32,
286 pub status: DispatchStatus,
287 pub handler_kind: String,
288 pub target_uri: String,
289 pub replay_of_event_id: Option<String>,
290 pub result: Option<serde_json::Value>,
291 pub error: Option<String>,
292}
293
294#[derive(Clone, Debug, Serialize, Deserialize)]
295pub struct InboxEnvelope {
296 pub trigger_id: Option<String>,
297 pub binding_version: Option<u32>,
298 pub event: TriggerEvent,
299}
300
301#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
302#[serde(default)]
303pub struct DispatcherDrainReport {
304 pub drained: bool,
305 pub in_flight: u64,
306 pub retry_queue_depth: u64,
307 pub dlq_depth: u64,
308}
309
310impl Default for DispatchOutcome {
311 fn default() -> Self {
312 Self {
313 trigger_id: String::new(),
314 binding_key: String::new(),
315 event_id: String::new(),
316 attempt_count: 0,
317 status: DispatchStatus::Failed,
318 handler_kind: String::new(),
319 target_uri: String::new(),
320 replay_of_event_id: None,
321 result: None,
322 error: None,
323 }
324 }
325}
326
327#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
328#[serde(default)]
329pub struct DispatchAttemptRecord {
330 pub trigger_id: String,
331 pub binding_key: String,
332 pub event_id: String,
333 pub attempt: u32,
334 pub handler_kind: String,
335 pub started_at: String,
336 pub completed_at: String,
337 pub outcome: String,
338 pub error_msg: Option<String>,
339}
340
341#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
342pub struct DispatchCancelRequest {
343 pub binding_key: String,
344 pub event_id: String,
345 #[serde(with = "time::serde::rfc3339")]
346 pub requested_at: time::OffsetDateTime,
347 #[serde(default)]
348 pub requested_by: Option<String>,
349 #[serde(default)]
350 pub audit_id: Option<String>,
351}
352
353#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
354pub struct DlqEntry {
355 pub trigger_id: String,
356 pub binding_key: String,
357 pub event: TriggerEvent,
358 pub attempt_count: u32,
359 pub final_error: String,
360 pub attempts: Vec<DispatchAttemptRecord>,
361}
362
363#[derive(Clone, Debug)]
364struct SingletonLease {
365 gate: String,
366 held: bool,
367}
368
369#[derive(Clone, Debug)]
370struct ConcurrencyLease {
371 gate: String,
372 max: u32,
373 priority_rank: usize,
374 permit: Option<ConcurrencyPermit>,
375}
376
377#[derive(Default, Debug)]
378struct AcquiredFlowControl {
379 singleton: Option<SingletonLease>,
380 concurrency: Option<ConcurrencyLease>,
381}
382
383#[derive(Clone)]
384pub(crate) struct DispatchWaitLease {
385 state: Arc<DispatcherRuntimeState>,
386 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
387 suspended: Arc<AtomicBool>,
388}
389
390impl DispatchWaitLease {
391 fn new(
392 state: Arc<DispatcherRuntimeState>,
393 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
394 ) -> Self {
395 Self {
396 state,
397 acquired,
398 suspended: Arc::new(AtomicBool::new(false)),
399 }
400 }
401
402 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
403 if self.suspended.swap(true, Ordering::SeqCst) {
404 return Ok(());
405 }
406 let (singleton_gate, concurrency_permit) = {
407 let mut acquired = self.acquired.lock().await;
408 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
409 if lease.held {
410 lease.held = false;
411 Some(lease.gate.clone())
412 } else {
413 None
414 }
415 });
416 let concurrency_permit = acquired
417 .concurrency
418 .as_mut()
419 .and_then(|lease| lease.permit.take());
420 (singleton_gate, concurrency_permit)
421 };
422
423 if let Some(gate) = singleton_gate {
424 self.state
425 .flow_control
426 .release_singleton(&gate)
427 .await
428 .map_err(DispatchError::from)?;
429 }
430 if let Some(permit) = concurrency_permit {
431 self.state
432 .flow_control
433 .release_concurrency(permit)
434 .await
435 .map_err(DispatchError::from)?;
436 }
437 Ok(())
438 }
439
440 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
441 if !self.suspended.swap(false, Ordering::SeqCst) {
442 return Ok(());
443 }
444
445 let singleton_gate = {
446 let acquired = self.acquired.lock().await;
447 acquired.singleton.as_ref().and_then(|lease| {
448 if lease.held {
449 None
450 } else {
451 Some(lease.gate.clone())
452 }
453 })
454 };
455 if let Some(gate) = singleton_gate {
456 self.state
457 .flow_control
458 .acquire_singleton(&gate)
459 .await
460 .map_err(DispatchError::from)?;
461 let mut acquired = self.acquired.lock().await;
462 if let Some(lease) = acquired.singleton.as_mut() {
463 lease.held = true;
464 }
465 }
466
467 let concurrency_spec = {
468 let acquired = self.acquired.lock().await;
469 acquired.concurrency.as_ref().and_then(|lease| {
470 if lease.permit.is_some() {
471 None
472 } else {
473 Some((lease.gate.clone(), lease.max, lease.priority_rank))
474 }
475 })
476 };
477 if let Some((gate, max, priority_rank)) = concurrency_spec {
478 let permit = self
479 .state
480 .flow_control
481 .acquire_concurrency(&gate, max, priority_rank)
482 .await
483 .map_err(DispatchError::from)?;
484 let mut acquired = self.acquired.lock().await;
485 if let Some(lease) = acquired.concurrency.as_mut() {
486 lease.permit = Some(permit);
487 }
488 }
489 Ok(())
490 }
491}
492
493enum FlowControlOutcome {
494 Dispatch {
495 event: Box<TriggerEvent>,
496 acquired: AcquiredFlowControl,
497 },
498 Skip {
499 reason: String,
500 },
501}
502
503#[derive(Clone, Debug)]
504enum DispatchSkipStage {
505 Predicate,
506 FlowControl,
507}
508
509#[derive(Debug)]
510pub enum DispatchError {
511 EventLog(String),
512 Registry(String),
513 Serde(String),
514 Local(String),
515 A2a(String),
516 Denied(String),
517 Timeout(String),
518 Waiting(String),
519 Cancelled(String),
520 NotImplemented(String),
521}
522
523impl std::fmt::Display for DispatchError {
524 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
525 match self {
526 Self::EventLog(message)
527 | Self::Registry(message)
528 | Self::Serde(message)
529 | Self::Local(message)
530 | Self::A2a(message)
531 | Self::Denied(message)
532 | Self::Timeout(message)
533 | Self::Waiting(message)
534 | Self::Cancelled(message)
535 | Self::NotImplemented(message) => f.write_str(message),
536 }
537 }
538}
539
540impl std::error::Error for DispatchError {}
541
542impl DispatchError {
543 fn retryable(&self) -> bool {
544 !matches!(
545 self,
546 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
547 )
548 }
549}
550
551impl DispatchSkipStage {
552 fn as_str(&self) -> &'static str {
553 match self {
554 Self::Predicate => "predicate",
555 Self::FlowControl => "flow_control",
556 }
557 }
558}
559
560impl From<LogError> for DispatchError {
561 fn from(value: LogError) -> Self {
562 Self::EventLog(value.to_string())
563 }
564}
565
566pub async fn append_dispatch_cancel_request(
567 event_log: &Arc<AnyEventLog>,
568 request: &DispatchCancelRequest,
569) -> Result<u64, DispatchError> {
570 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
571 .expect("static trigger cancel topic should always be valid");
572 event_log
573 .append(
574 &topic,
575 LogEvent::new(
576 "dispatch_cancel_requested",
577 serde_json::to_value(request)
578 .map_err(|error| DispatchError::Serde(error.to_string()))?,
579 ),
580 )
581 .await
582 .map_err(DispatchError::from)
583}
584
585impl Dispatcher {
586 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
587 self.event_log.clone()
588 }
589
590 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
591 let event_log = active_event_log().ok_or_else(|| {
592 DispatchError::EventLog("dispatcher requires an active event log".to_string())
593 })?;
594 Ok(Self::with_event_log(base_vm, event_log))
595 }
596
597 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
598 Self::with_event_log_and_metrics(base_vm, event_log, None)
599 }
600
601 pub fn with_event_log_and_metrics(
602 base_vm: Vm,
603 event_log: Arc<AnyEventLog>,
604 metrics: Option<Arc<crate::MetricsRegistry>>,
605 ) -> Self {
606 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
607 ACTIVE_DISPATCHER_STATE.with(|slot| {
608 *slot.borrow_mut() = Some(state.clone());
609 });
610 let (cancel_tx, _) = broadcast::channel(32);
611 Self {
612 base_vm: Rc::new(base_vm),
613 event_log,
614 cancel_tx,
615 state,
616 metrics,
617 }
618 }
619
620 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
621 DispatcherStatsSnapshot {
622 in_flight: self.state.in_flight.load(Ordering::Relaxed),
623 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
624 dlq_depth: self
625 .state
626 .dlq
627 .lock()
628 .expect("dispatcher dlq poisoned")
629 .len() as u64,
630 }
631 }
632
633 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
634 self.state
635 .dlq
636 .lock()
637 .expect("dispatcher dlq poisoned")
638 .clone()
639 }
640
641 pub fn shutdown(&self) {
642 self.state.shutting_down.store(true, Ordering::SeqCst);
643 for token in self
644 .state
645 .cancel_tokens
646 .lock()
647 .expect("dispatcher cancel tokens poisoned")
648 .iter()
649 {
650 token.store(true, Ordering::SeqCst);
651 }
652 let _ = self.cancel_tx.send(());
653 }
654
655 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
656 self.enqueue_targeted(None, None, event).await
657 }
658
659 pub async fn enqueue_targeted(
660 &self,
661 trigger_id: Option<String>,
662 binding_version: Option<u32>,
663 event: TriggerEvent,
664 ) -> Result<u64, DispatchError> {
665 self.enqueue_targeted_with_headers(trigger_id, binding_version, event, None)
666 .await
667 }
668
669 pub async fn enqueue_targeted_with_headers(
670 &self,
671 trigger_id: Option<String>,
672 binding_version: Option<u32>,
673 event: TriggerEvent,
674 parent_headers: Option<&BTreeMap<String, String>>,
675 ) -> Result<u64, DispatchError> {
676 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
677 .expect("static trigger inbox envelopes topic is valid");
678 let trigger_id_for_metrics = trigger_id.clone();
679 let mut headers = parent_headers.cloned().unwrap_or_default();
680 headers.extend(event_headers(&event, None, None, None));
681 if let Some(trigger_id) = trigger_id_for_metrics.as_ref() {
682 headers.insert("trigger_id".to_string(), trigger_id.clone());
683 headers.insert(
684 "binding_key".to_string(),
685 binding_key_from_parts(trigger_id, binding_version),
686 );
687 }
688 headers
689 .entry(TRIGGER_ACCEPTED_AT_MS_HEADER.to_string())
690 .or_insert_with(|| unix_ms(event.received_at).to_string());
691 let payload = serde_json::to_value(InboxEnvelope {
692 trigger_id,
693 binding_version,
694 event: event.clone(),
695 })
696 .map_err(|error| DispatchError::Serde(error.to_string()))?;
697 let mut log_event = LogEvent::new("event_ingested", payload);
698 let had_queue_appended_at = headers.contains_key(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER);
699 let queue_appended_at_ms = headers
700 .get(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
701 .and_then(|value| value.parse::<i64>().ok())
702 .unwrap_or(log_event.occurred_at_ms);
703 headers
704 .entry(TRIGGER_QUEUE_APPENDED_AT_MS_HEADER.to_string())
705 .or_insert_with(|| log_event.occurred_at_ms.to_string());
706 if let (Some(metrics), Some(trigger_id)) =
707 (self.metrics.as_ref(), trigger_id_for_metrics.as_ref())
708 {
709 let binding_key = binding_key_from_parts(trigger_id, binding_version);
710 let accepted_at_ms = accepted_at_ms(Some(&headers), &event);
711 if !had_queue_appended_at {
712 metrics.record_trigger_accepted_to_queue_append(
713 trigger_id,
714 &binding_key,
715 event.provider.as_str(),
716 tenant_id(&event),
717 "queued",
718 duration_between_ms(queue_appended_at_ms, accepted_at_ms),
719 );
720 }
721 metrics.note_trigger_pending_event(
722 event.id.0.as_str(),
723 trigger_id,
724 &binding_key,
725 event.provider.as_str(),
726 tenant_id(&event),
727 accepted_at_ms,
728 queue_appended_at_ms,
729 );
730 }
731 log_event.headers = headers;
732 self.event_log
733 .append(&topic, log_event)
734 .await
735 .map_err(DispatchError::from)
736 }
737
738 pub async fn run(&self) -> Result<(), DispatchError> {
739 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
740 .expect("static trigger inbox envelopes topic is valid");
741 let start_from = self.event_log.latest(&topic).await?;
742 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
743 pin_mut!(stream);
744 let mut cancel_rx = self.cancel_tx.subscribe();
745
746 loop {
747 tokio::select! {
748 received = stream.next() => {
749 let Some(received) = received else {
750 break;
751 };
752 let (_, event) = received.map_err(DispatchError::from)?;
753 if event.kind != "event_ingested" {
754 continue;
755 }
756 let parent_headers = event.headers.clone();
757 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
758 .map_err(|error| DispatchError::Serde(error.to_string()))?;
759 notify_test_inbox_dequeued();
760 let _ = self
761 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
762 .await;
763 }
764 _ = recv_cancel(&mut cancel_rx) => break,
765 }
766 }
767
768 Ok(())
769 }
770
771 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
772 let deadline = tokio::time::Instant::now() + timeout;
773 loop {
774 let snapshot = self.snapshot();
775 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
776 return Ok(DispatcherDrainReport {
777 drained: true,
778 in_flight: snapshot.in_flight,
779 retry_queue_depth: snapshot.retry_queue_depth,
780 dlq_depth: snapshot.dlq_depth,
781 });
782 }
783
784 let notified = self.state.idle_notify.notified();
785 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
786 if remaining.is_zero() {
787 return Ok(DispatcherDrainReport {
788 drained: false,
789 in_flight: snapshot.in_flight,
790 retry_queue_depth: snapshot.retry_queue_depth,
791 dlq_depth: snapshot.dlq_depth,
792 });
793 }
794 if tokio::time::timeout(remaining, notified).await.is_err() {
795 let snapshot = self.snapshot();
796 return Ok(DispatcherDrainReport {
797 drained: false,
798 in_flight: snapshot.in_flight,
799 retry_queue_depth: snapshot.retry_queue_depth,
800 dlq_depth: snapshot.dlq_depth,
801 });
802 }
803 }
804 }
805
806 pub async fn dispatch_inbox_envelope(
807 &self,
808 envelope: InboxEnvelope,
809 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
810 self.dispatch_inbox_envelope_with_headers(envelope, None)
811 .await
812 }
813
814 pub async fn dispatch_inbox_envelope_with_parent_headers(
815 &self,
816 envelope: InboxEnvelope,
817 parent_headers: &BTreeMap<String, String>,
818 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
819 self.dispatch_inbox_envelope_with_headers(envelope, Some(parent_headers))
820 .await
821 }
822
823 async fn dispatch_inbox_envelope_with_headers(
824 &self,
825 envelope: InboxEnvelope,
826 parent_headers: Option<&BTreeMap<String, String>>,
827 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
828 if let Some(trigger_id) = envelope.trigger_id {
829 let binding = super::registry::resolve_live_trigger_binding(
830 &trigger_id,
831 envelope.binding_version,
832 )
833 .map_err(|error| DispatchError::Registry(error.to_string()))?;
834 return Ok(vec![
835 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
836 .await?,
837 ]);
838 }
839
840 let cron_target = match &envelope.event.provider_payload {
841 crate::triggers::ProviderPayload::Known(
842 crate::triggers::event::KnownProviderPayload::Cron(payload),
843 ) => payload.cron_id.clone(),
844 _ => None,
845 };
846 if let Some(trigger_id) = cron_target {
847 let binding = super::registry::resolve_live_trigger_binding(
848 &trigger_id,
849 envelope.binding_version,
850 )
851 .map_err(|error| DispatchError::Registry(error.to_string()))?;
852 return Ok(vec![
853 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
854 .await?,
855 ]);
856 }
857
858 self.dispatch_event_with_headers(envelope.event, parent_headers)
859 .await
860 }
861
862 pub async fn dispatch_event(
863 &self,
864 event: TriggerEvent,
865 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
866 self.dispatch_event_with_headers(event, None).await
867 }
868
869 async fn dispatch_event_with_headers(
870 &self,
871 event: TriggerEvent,
872 parent_headers: Option<&BTreeMap<String, String>>,
873 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
874 let bindings = matching_bindings(&event);
875 let mut outcomes = Vec::new();
876 for binding in bindings {
877 outcomes.push(
878 self.dispatch_with_replay(&binding, event.clone(), None, None, parent_headers)
879 .await?,
880 );
881 }
882 Ok(outcomes)
883 }
884
885 pub async fn dispatch(
886 &self,
887 binding: &TriggerBinding,
888 event: TriggerEvent,
889 ) -> Result<DispatchOutcome, DispatchError> {
890 self.dispatch_with_replay(binding, event, None, None, None)
891 .await
892 }
893
894 pub async fn dispatch_replay(
895 &self,
896 binding: &TriggerBinding,
897 event: TriggerEvent,
898 replay_of_event_id: String,
899 ) -> Result<DispatchOutcome, DispatchError> {
900 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
901 .await
902 }
903
904 pub async fn dispatch_with_parent_span_id(
905 &self,
906 binding: &TriggerBinding,
907 event: TriggerEvent,
908 parent_span_id: Option<String>,
909 ) -> Result<DispatchOutcome, DispatchError> {
910 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
911 .await
912 }
913
914 async fn dispatch_with_replay(
915 &self,
916 binding: &TriggerBinding,
917 event: TriggerEvent,
918 replay_of_event_id: Option<String>,
919 parent_span_id: Option<String>,
920 parent_headers: Option<&BTreeMap<String, String>>,
921 ) -> Result<DispatchOutcome, DispatchError> {
922 let parent_headers_for_metrics = parent_headers.cloned();
923 let admitted_at_ms = current_unix_ms();
924 if let Some(metrics) = self.metrics.as_ref() {
925 let binding_key = binding.binding_key();
926 let queue_appended_at_ms = queue_appended_at_ms(parent_headers, &event);
927 metrics.record_trigger_queue_age_at_dispatch_admission(
928 binding.id.as_str(),
929 &binding_key,
930 event.provider.as_str(),
931 tenant_id(&event),
932 "admitted",
933 duration_between_ms(admitted_at_ms, queue_appended_at_ms),
934 );
935 metrics.clear_trigger_pending_event(
936 event.id.0.as_str(),
937 binding.id.as_str(),
938 &binding_key,
939 event.provider.as_str(),
940 tenant_id(&event),
941 admitted_at_ms,
942 );
943 }
944 let span = tracing::info_span!(
945 "dispatch",
946 trigger_id = %binding.id.as_str(),
947 binding_version = binding.version,
948 trace_id = %event.trace_id.0
949 );
950 #[cfg(feature = "otel")]
951 let span_for_otel = span.clone();
952 let _ = if let Some(headers) = parent_headers {
953 crate::observability::otel::set_span_parent_from_headers(
954 &span,
955 headers,
956 &event.trace_id,
957 parent_span_id.as_deref(),
958 )
959 } else {
960 crate::observability::otel::set_span_parent(
961 &span,
962 &event.trace_id,
963 parent_span_id.as_deref(),
964 )
965 };
966 #[cfg(feature = "otel")]
967 let started_at = Instant::now();
968 let metrics = self.metrics.clone();
969 let outcome = ACTIVE_DISPATCH_IS_REPLAY
970 .scope(
971 replay_of_event_id.is_some(),
972 self.dispatch_with_replay_inner(
973 binding,
974 event,
975 replay_of_event_id,
976 parent_headers_for_metrics,
977 )
978 .instrument(span),
979 )
980 .await;
981 if let Some(metrics) = metrics.as_ref() {
982 match &outcome {
983 Ok(dispatch_outcome) => match dispatch_outcome.status {
984 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
985 metrics.record_dispatch_succeeded();
986 }
987 DispatchStatus::Waiting => {}
988 _ => metrics.record_dispatch_failed(),
989 },
990 Err(_) => metrics.record_dispatch_failed(),
991 }
992 let outcome_label = match &outcome {
993 Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
994 Err(DispatchError::Cancelled(_)) => "cancelled",
995 Err(_) => "failed",
996 };
997 metrics.record_trigger_dispatched(
998 binding.id.as_str(),
999 binding.handler.kind(),
1000 outcome_label,
1001 );
1002 metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
1003 }
1004 #[cfg(feature = "otel")]
1005 {
1006 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
1007
1008 let duration_ms = started_at.elapsed().as_millis() as i64;
1009 let status = match &outcome {
1010 Ok(dispatch_outcome) => match dispatch_outcome.status {
1011 DispatchStatus::Succeeded => "succeeded",
1012 DispatchStatus::Skipped => "skipped",
1013 DispatchStatus::Waiting => "waiting",
1014 DispatchStatus::Cancelled => "cancelled",
1015 DispatchStatus::Failed => "failed",
1016 DispatchStatus::Dlq => "dlq",
1017 },
1018 Err(DispatchError::Cancelled(_)) => "cancelled",
1019 Err(_) => "failed",
1020 };
1021 span_for_otel.set_attribute("result.status", status);
1022 span_for_otel.set_attribute("result.duration_ms", duration_ms);
1023 }
1024 outcome
1025 }
1026
1027 async fn dispatch_with_replay_inner(
1028 &self,
1029 binding: &TriggerBinding,
1030 event: TriggerEvent,
1031 replay_of_event_id: Option<String>,
1032 parent_headers: Option<BTreeMap<String, String>>,
1033 ) -> Result<DispatchOutcome, DispatchError> {
1034 let autonomy_tier = crate::resolve_agent_autonomy_tier(
1035 &self.event_log,
1036 binding.id.as_str(),
1037 binding.autonomy_tier,
1038 )
1039 .await
1040 .unwrap_or(binding.autonomy_tier);
1041 let binding_key = binding.binding_key();
1042 let route = DispatchUri::from(&binding.handler);
1043 let trigger_id = binding.id.as_str().to_string();
1044 let event_id = event.id.0.clone();
1045 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
1046 let begin = if replay_of_event_id.is_some() {
1047 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
1048 } else {
1049 begin_in_flight(binding.id.as_str(), binding.version)
1050 };
1051 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
1052
1053 let mut attempts = Vec::new();
1054 let mut source_node_id = format!("trigger:{}", event.id.0);
1055 let mut initial_nodes = Vec::new();
1056 let mut initial_edges = Vec::new();
1057 if let Some(original_event_id) = replay_of_event_id.as_ref() {
1058 let original_node_id = format!("trigger:{original_event_id}");
1059 initial_nodes.push(RunActionGraphNodeRecord {
1060 id: original_node_id.clone(),
1061 label: format!(
1062 "{}:{} (original {})",
1063 event.provider.as_str(),
1064 event.kind,
1065 original_event_id
1066 ),
1067 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1068 status: "historical".to_string(),
1069 outcome: "replayed_from".to_string(),
1070 trace_id: Some(event.trace_id.0.clone()),
1071 stage_id: None,
1072 node_id: None,
1073 worker_id: None,
1074 run_id: None,
1075 run_path: None,
1076 metadata: trigger_node_metadata(&event),
1077 });
1078 initial_edges.push(RunActionGraphEdgeRecord {
1079 from_id: original_node_id,
1080 to_id: source_node_id.clone(),
1081 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
1082 label: Some("replay chain".to_string()),
1083 });
1084 }
1085 initial_nodes.push(RunActionGraphNodeRecord {
1086 id: source_node_id.clone(),
1087 label: format!("{}:{}", event.provider.as_str(), event.kind),
1088 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
1089 status: "received".to_string(),
1090 outcome: "received".to_string(),
1091 trace_id: Some(event.trace_id.0.clone()),
1092 stage_id: None,
1093 node_id: None,
1094 worker_id: None,
1095 run_id: None,
1096 run_path: None,
1097 metadata: trigger_node_metadata(&event),
1098 });
1099 self.emit_action_graph(
1100 &event,
1101 initial_nodes,
1102 initial_edges,
1103 serde_json::json!({
1104 "source": "dispatcher",
1105 "trigger_id": trigger_id,
1106 "binding_key": binding_key,
1107 "event_id": event_id,
1108 "replay_of_event_id": replay_of_event_id,
1109 }),
1110 )
1111 .await?;
1112
1113 if dispatch_cancel_requested(
1114 &self.event_log,
1115 &binding_key,
1116 &event.id.0,
1117 replay_of_event_id.as_ref(),
1118 )
1119 .await?
1120 {
1121 finish_in_flight(
1122 binding.id.as_str(),
1123 binding.version,
1124 TriggerDispatchOutcome::Failed,
1125 )
1126 .await
1127 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1128 decrement_in_flight(&self.state);
1129 return Ok(cancelled_dispatch_outcome(
1130 binding,
1131 &route,
1132 &event,
1133 replay_of_event_id,
1134 0,
1135 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
1136 ));
1137 }
1138
1139 if let Some(predicate) = binding.when.as_ref() {
1140 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
1141 let evaluation = self
1142 .evaluate_predicate(
1143 binding,
1144 predicate,
1145 &event,
1146 replay_of_event_id.as_ref(),
1147 autonomy_tier,
1148 )
1149 .await?;
1150 let passed = evaluation.result;
1151 self.emit_action_graph(
1152 &event,
1153 vec![RunActionGraphNodeRecord {
1154 id: predicate_node_id.clone(),
1155 label: predicate.raw.clone(),
1156 kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
1157 status: "completed".to_string(),
1158 outcome: passed.to_string(),
1159 trace_id: Some(event.trace_id.0.clone()),
1160 stage_id: None,
1161 node_id: None,
1162 worker_id: None,
1163 run_id: None,
1164 run_path: None,
1165 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
1166 }],
1167 vec![RunActionGraphEdgeRecord {
1168 from_id: source_node_id.clone(),
1169 to_id: predicate_node_id.clone(),
1170 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
1171 label: None,
1172 }],
1173 serde_json::json!({
1174 "source": "dispatcher",
1175 "trigger_id": binding.id.as_str(),
1176 "binding_key": binding.binding_key(),
1177 "event_id": event.id.0,
1178 "predicate": predicate.raw,
1179 "reason": evaluation.reason,
1180 "cached": evaluation.cached,
1181 "cost_usd": evaluation.cost_usd,
1182 "tokens": evaluation.tokens,
1183 "latency_ms": evaluation.latency_ms,
1184 "replay_of_event_id": replay_of_event_id,
1185 }),
1186 )
1187 .await?;
1188
1189 if !passed {
1190 if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
1191 let final_error = format!(
1192 "trigger budget exhausted: {}",
1193 evaluation.reason.as_deref().unwrap_or("budget_exhausted")
1194 );
1195 self.move_budget_exhausted_to_dlq(
1196 binding,
1197 &route,
1198 &event,
1199 replay_of_event_id.as_ref(),
1200 &final_error,
1201 )
1202 .await?;
1203 finish_in_flight(
1204 binding.id.as_str(),
1205 binding.version,
1206 TriggerDispatchOutcome::Dlq,
1207 )
1208 .await
1209 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1210 decrement_in_flight(&self.state);
1211 self.append_dispatch_trust_record(
1212 binding,
1213 &route,
1214 &event,
1215 replay_of_event_id.as_ref(),
1216 autonomy_tier,
1217 TrustOutcome::Failure,
1218 "dlq",
1219 0,
1220 Some(final_error.clone()),
1221 )
1222 .await?;
1223 return Ok(DispatchOutcome {
1224 trigger_id: binding.id.as_str().to_string(),
1225 binding_key: binding.binding_key(),
1226 event_id: event.id.0,
1227 attempt_count: 0,
1228 status: DispatchStatus::Dlq,
1229 handler_kind: route.kind().to_string(),
1230 target_uri: route.target_uri(),
1231 replay_of_event_id,
1232 result: None,
1233 error: Some(final_error),
1234 });
1235 }
1236
1237 if evaluation.exhaustion_strategy
1238 == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1239 {
1240 self.append_budget_deferred_event(
1241 binding,
1242 &route,
1243 &event,
1244 replay_of_event_id.as_ref(),
1245 evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1246 )
1247 .await?;
1248 finish_in_flight(
1249 binding.id.as_str(),
1250 binding.version,
1251 TriggerDispatchOutcome::Dispatched,
1252 )
1253 .await
1254 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1255 decrement_in_flight(&self.state);
1256 self.append_dispatch_trust_record(
1257 binding,
1258 &route,
1259 &event,
1260 replay_of_event_id.as_ref(),
1261 autonomy_tier,
1262 TrustOutcome::Denied,
1263 "waiting",
1264 0,
1265 evaluation.reason.clone(),
1266 )
1267 .await?;
1268 return Ok(DispatchOutcome {
1269 trigger_id: binding.id.as_str().to_string(),
1270 binding_key: binding.binding_key(),
1271 event_id: event.id.0,
1272 attempt_count: 0,
1273 status: DispatchStatus::Waiting,
1274 handler_kind: route.kind().to_string(),
1275 target_uri: route.target_uri(),
1276 replay_of_event_id,
1277 result: Some(serde_json::json!({
1278 "deferred": true,
1279 "predicate": predicate.raw,
1280 "reason": evaluation.reason,
1281 })),
1282 error: None,
1283 });
1284 }
1285
1286 self.append_skipped_outbox_event(
1287 binding,
1288 &route,
1289 &event,
1290 replay_of_event_id.as_ref(),
1291 DispatchSkipStage::Predicate,
1292 serde_json::json!({
1293 "predicate": predicate.raw,
1294 "reason": evaluation.reason,
1295 }),
1296 )
1297 .await?;
1298 finish_in_flight(
1299 binding.id.as_str(),
1300 binding.version,
1301 TriggerDispatchOutcome::Dispatched,
1302 )
1303 .await
1304 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1305 decrement_in_flight(&self.state);
1306 self.append_dispatch_trust_record(
1307 binding,
1308 &route,
1309 &event,
1310 replay_of_event_id.as_ref(),
1311 autonomy_tier,
1312 TrustOutcome::Denied,
1313 "skipped",
1314 0,
1315 None,
1316 )
1317 .await?;
1318 return Ok(DispatchOutcome {
1319 trigger_id: binding.id.as_str().to_string(),
1320 binding_key: binding.binding_key(),
1321 event_id: event.id.0,
1322 attempt_count: 0,
1323 status: DispatchStatus::Skipped,
1324 handler_kind: route.kind().to_string(),
1325 target_uri: route.target_uri(),
1326 replay_of_event_id,
1327 result: Some(serde_json::json!({
1328 "skipped": true,
1329 "predicate": predicate.raw,
1330 "reason": evaluation.reason,
1331 })),
1332 error: None,
1333 });
1334 }
1335
1336 source_node_id = predicate_node_id;
1337 }
1338
1339 if autonomy_tier == AutonomyTier::ActAuto {
1340 if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1341 let request_id = self
1342 .append_autonomy_budget_approval_request(
1343 binding,
1344 &route,
1345 &event,
1346 replay_of_event_id.as_ref(),
1347 reason,
1348 )
1349 .await?;
1350 self.emit_autonomy_budget_approval_action_graph(
1351 binding,
1352 &route,
1353 &event,
1354 &source_node_id,
1355 replay_of_event_id.as_ref(),
1356 reason,
1357 &request_id,
1358 )
1359 .await?;
1360 finish_in_flight(
1361 binding.id.as_str(),
1362 binding.version,
1363 TriggerDispatchOutcome::Dispatched,
1364 )
1365 .await
1366 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1367 decrement_in_flight(&self.state);
1368 self.append_tier_transition_trust_record(
1369 binding,
1370 &event,
1371 replay_of_event_id.as_ref(),
1372 autonomy_tier,
1373 AutonomyTier::ActWithApproval,
1374 reason,
1375 &request_id,
1376 )
1377 .await?;
1378 self.append_dispatch_trust_record(
1379 binding,
1380 &route,
1381 &event,
1382 replay_of_event_id.as_ref(),
1383 autonomy_tier,
1384 TrustOutcome::Denied,
1385 "waiting",
1386 0,
1387 Some(reason.to_string()),
1388 )
1389 .await?;
1390 return Ok(DispatchOutcome {
1391 trigger_id: binding.id.as_str().to_string(),
1392 binding_key: binding.binding_key(),
1393 event_id: event.id.0,
1394 attempt_count: 0,
1395 status: DispatchStatus::Waiting,
1396 handler_kind: route.kind().to_string(),
1397 target_uri: route.target_uri(),
1398 replay_of_event_id,
1399 result: Some(serde_json::json!({
1400 "approval_required": true,
1401 "request_id": request_id,
1402 "reason": reason,
1403 "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1404 })),
1405 error: None,
1406 });
1407 }
1408 note_autonomous_decision(binding);
1409 }
1410
1411 let (event, acquired_flow) = match self
1412 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1413 .await?
1414 {
1415 FlowControlOutcome::Dispatch { event, acquired } => {
1416 (*event, Arc::new(AsyncMutex::new(acquired)))
1417 }
1418 FlowControlOutcome::Skip { reason } => {
1419 self.append_skipped_outbox_event(
1420 binding,
1421 &route,
1422 &event,
1423 replay_of_event_id.as_ref(),
1424 DispatchSkipStage::FlowControl,
1425 serde_json::json!({
1426 "flow_control": reason,
1427 }),
1428 )
1429 .await?;
1430 finish_in_flight(
1431 binding.id.as_str(),
1432 binding.version,
1433 TriggerDispatchOutcome::Dispatched,
1434 )
1435 .await
1436 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1437 decrement_in_flight(&self.state);
1438 return Ok(DispatchOutcome {
1439 trigger_id: binding.id.as_str().to_string(),
1440 binding_key: binding.binding_key(),
1441 event_id: event.id.0,
1442 attempt_count: 0,
1443 status: DispatchStatus::Skipped,
1444 handler_kind: route.kind().to_string(),
1445 target_uri: route.target_uri(),
1446 replay_of_event_id,
1447 result: Some(serde_json::json!({
1448 "skipped": true,
1449 "flow_control": reason,
1450 })),
1451 error: None,
1452 });
1453 }
1454 };
1455
1456 let destination_key = destination_circuit_key(&route);
1457 let half_open_probe = match self.state.destination_circuits.check(&destination_key) {
1458 DestinationCircuitProbe::Allow { half_open } => {
1459 if half_open {
1460 if let Some(metrics) = self.metrics.as_ref() {
1461 metrics.record_backpressure_event("circuit", "half_open_probe");
1462 }
1463 }
1464 half_open
1465 }
1466 DestinationCircuitProbe::Block { retry_after } => {
1467 if let Some(metrics) = self.metrics.as_ref() {
1468 metrics.record_backpressure_event("circuit", "fail_fast");
1469 }
1470 let final_error = format!(
1471 "destination circuit open for {}; retry after {}s",
1472 destination_key,
1473 retry_after.as_secs().max(1)
1474 );
1475 self.move_circuit_open_to_dlq(
1476 binding,
1477 &route,
1478 &event,
1479 replay_of_event_id.as_ref(),
1480 &final_error,
1481 &destination_key,
1482 )
1483 .await?;
1484 finish_in_flight(
1485 binding.id.as_str(),
1486 binding.version,
1487 TriggerDispatchOutcome::Dlq,
1488 )
1489 .await
1490 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1491 self.release_flow_control(&acquired_flow).await?;
1492 decrement_in_flight(&self.state);
1493 self.append_dispatch_trust_record(
1494 binding,
1495 &route,
1496 &event,
1497 replay_of_event_id.as_ref(),
1498 autonomy_tier,
1499 TrustOutcome::Failure,
1500 "dlq",
1501 0,
1502 Some(final_error.clone()),
1503 )
1504 .await?;
1505 return Ok(DispatchOutcome {
1506 trigger_id: binding.id.as_str().to_string(),
1507 binding_key: binding.binding_key(),
1508 event_id: event.id.0,
1509 attempt_count: 0,
1510 status: DispatchStatus::Dlq,
1511 handler_kind: route.kind().to_string(),
1512 target_uri: route.target_uri(),
1513 replay_of_event_id,
1514 result: None,
1515 error: Some(final_error),
1516 });
1517 }
1518 };
1519
1520 let mut previous_retry_node = None;
1521 let max_attempts = binding.retry.max_attempts();
1522 for attempt in 1..=max_attempts {
1523 if dispatch_cancel_requested(
1524 &self.event_log,
1525 &binding_key,
1526 &event.id.0,
1527 replay_of_event_id.as_ref(),
1528 )
1529 .await?
1530 {
1531 finish_in_flight(
1532 binding.id.as_str(),
1533 binding.version,
1534 TriggerDispatchOutcome::Failed,
1535 )
1536 .await
1537 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1538 decrement_in_flight(&self.state);
1539 return Ok(cancelled_dispatch_outcome(
1540 binding,
1541 &route,
1542 &event,
1543 replay_of_event_id,
1544 attempt.saturating_sub(1),
1545 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1546 ));
1547 }
1548 maybe_fail_before_outbox();
1549 let attempt_started_instant = Instant::now();
1550 let attempt_started_at_ms = current_unix_ms();
1551 let queue_age_at_start = duration_between_ms(
1552 attempt_started_at_ms,
1553 queue_appended_at_ms(parent_headers.as_ref(), &event),
1554 );
1555 if attempt == 1 {
1556 if let Some(metrics) = self.metrics.as_ref() {
1557 metrics.record_trigger_queue_age_at_dispatch_start(
1558 binding.id.as_str(),
1559 &binding_key,
1560 event.provider.as_str(),
1561 tenant_id(&event),
1562 "started",
1563 queue_age_at_start,
1564 );
1565 }
1566 }
1567 tracing::info!(
1568 component = "dispatcher",
1569 lifecycle = "dispatch_started",
1570 trigger_id = %binding.id.as_str(),
1571 binding_key = %binding_key,
1572 event_id = %event.id.0,
1573 attempt,
1574 queue_age_ms = queue_age_at_start.as_millis(),
1575 trace_id = %event.trace_id.0
1576 );
1577 let started_at = now_rfc3339();
1578 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1579 self.append_lifecycle_event(
1580 "DispatchStarted",
1581 &event,
1582 binding,
1583 serde_json::json!({
1584 "event_id": event.id.0,
1585 "attempt": attempt,
1586 "handler_kind": route.kind(),
1587 "target_uri": route.target_uri(),
1588 "replay_of_event_id": replay_of_event_id,
1589 }),
1590 replay_of_event_id.as_ref(),
1591 )
1592 .await?;
1593 self.append_topic_event(
1594 TRIGGER_OUTBOX_TOPIC,
1595 "dispatch_started",
1596 &event,
1597 Some(binding),
1598 Some(attempt),
1599 serde_json::json!({
1600 "event_id": event.id.0,
1601 "attempt": attempt,
1602 "trigger_id": binding.id.as_str(),
1603 "binding_key": binding.binding_key(),
1604 "handler_kind": route.kind(),
1605 "target_uri": route.target_uri(),
1606 "replay_of_event_id": replay_of_event_id,
1607 }),
1608 replay_of_event_id.as_ref(),
1609 )
1610 .await?;
1611
1612 let mut dispatch_edges = Vec::new();
1613 if attempt == 1 {
1614 dispatch_edges.push(RunActionGraphEdgeRecord {
1615 from_id: source_node_id.clone(),
1616 to_id: attempt_node_id.clone(),
1617 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1618 label: binding.when.as_ref().map(|_| "true".to_string()),
1619 });
1620 } else if let Some(retry_node_id) = previous_retry_node.take() {
1621 dispatch_edges.push(RunActionGraphEdgeRecord {
1622 from_id: retry_node_id,
1623 to_id: attempt_node_id.clone(),
1624 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1625 label: Some(format!("attempt {attempt}")),
1626 });
1627 }
1628
1629 self.emit_action_graph(
1630 &event,
1631 vec![RunActionGraphNodeRecord {
1632 id: attempt_node_id.clone(),
1633 label: dispatch_node_label(&route),
1634 kind: dispatch_node_kind(&route).to_string(),
1635 status: "running".to_string(),
1636 outcome: format!("attempt_{attempt}"),
1637 trace_id: Some(event.trace_id.0.clone()),
1638 stage_id: None,
1639 node_id: None,
1640 worker_id: None,
1641 run_id: None,
1642 run_path: None,
1643 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1644 }],
1645 dispatch_edges,
1646 serde_json::json!({
1647 "source": "dispatcher",
1648 "trigger_id": binding.id.as_str(),
1649 "binding_key": binding.binding_key(),
1650 "event_id": event.id.0,
1651 "attempt": attempt,
1652 "handler_kind": route.kind(),
1653 "target_uri": route.target_uri(),
1654 "target_agent": dispatch_target_agent(&route),
1655 "replay_of_event_id": replay_of_event_id,
1656 }),
1657 )
1658 .await?;
1659
1660 let result = self
1661 .dispatch_once(
1662 binding,
1663 &route,
1664 &event,
1665 autonomy_tier,
1666 Some(DispatchWaitLease::new(
1667 self.state.clone(),
1668 acquired_flow.clone(),
1669 )),
1670 &mut self.cancel_tx.subscribe(),
1671 )
1672 .await;
1673 let attempt_runtime = attempt_started_instant.elapsed();
1674 let attempt_status = dispatch_result_status(&result);
1675 if let Some(metrics) = self.metrics.as_ref() {
1676 metrics.record_trigger_dispatch_runtime(
1677 binding.id.as_str(),
1678 &binding_key,
1679 event.provider.as_str(),
1680 tenant_id(&event),
1681 attempt_status,
1682 attempt_runtime,
1683 );
1684 }
1685 tracing::info!(
1686 component = "dispatcher",
1687 lifecycle = "handler_completed",
1688 trigger_id = %binding.id.as_str(),
1689 binding_key = %binding_key,
1690 event_id = %event.id.0,
1691 attempt,
1692 status = attempt_status,
1693 runtime_ms = attempt_runtime.as_millis(),
1694 trace_id = %event.trace_id.0
1695 );
1696 let completed_at = now_rfc3339();
1697
1698 match result {
1699 Ok(result) => {
1700 let attempt_record = DispatchAttemptRecord {
1701 trigger_id: binding.id.as_str().to_string(),
1702 binding_key: binding.binding_key(),
1703 event_id: event.id.0.clone(),
1704 attempt,
1705 handler_kind: route.kind().to_string(),
1706 started_at,
1707 completed_at,
1708 outcome: "success".to_string(),
1709 error_msg: None,
1710 };
1711 attempts.push(attempt_record.clone());
1712 self.append_attempt_record(
1713 &event,
1714 binding,
1715 &attempt_record,
1716 replay_of_event_id.as_ref(),
1717 )
1718 .await?;
1719 self.append_lifecycle_event(
1720 "DispatchSucceeded",
1721 &event,
1722 binding,
1723 serde_json::json!({
1724 "event_id": event.id.0,
1725 "attempt": attempt,
1726 "handler_kind": route.kind(),
1727 "target_uri": route.target_uri(),
1728 "result": result,
1729 "replay_of_event_id": replay_of_event_id,
1730 }),
1731 replay_of_event_id.as_ref(),
1732 )
1733 .await?;
1734 self.append_topic_event(
1735 TRIGGER_OUTBOX_TOPIC,
1736 "dispatch_succeeded",
1737 &event,
1738 Some(binding),
1739 Some(attempt),
1740 serde_json::json!({
1741 "event_id": event.id.0,
1742 "attempt": attempt,
1743 "trigger_id": binding.id.as_str(),
1744 "binding_key": binding.binding_key(),
1745 "handler_kind": route.kind(),
1746 "target_uri": route.target_uri(),
1747 "result": result,
1748 "replay_of_event_id": replay_of_event_id,
1749 }),
1750 replay_of_event_id.as_ref(),
1751 )
1752 .await?;
1753 self.emit_action_graph(
1754 &event,
1755 vec![RunActionGraphNodeRecord {
1756 id: attempt_node_id.clone(),
1757 label: dispatch_node_label(&route),
1758 kind: dispatch_node_kind(&route).to_string(),
1759 status: "completed".to_string(),
1760 outcome: dispatch_success_outcome(&route, &result).to_string(),
1761 trace_id: Some(event.trace_id.0.clone()),
1762 stage_id: None,
1763 node_id: None,
1764 worker_id: None,
1765 run_id: None,
1766 run_path: None,
1767 metadata: dispatch_success_metadata(
1768 &route, binding, &event, attempt, &result,
1769 ),
1770 }],
1771 Vec::new(),
1772 serde_json::json!({
1773 "source": "dispatcher",
1774 "trigger_id": binding.id.as_str(),
1775 "binding_key": binding.binding_key(),
1776 "event_id": event.id.0,
1777 "attempt": attempt,
1778 "handler_kind": route.kind(),
1779 "target_uri": route.target_uri(),
1780 "result": result,
1781 "replay_of_event_id": replay_of_event_id,
1782 }),
1783 )
1784 .await?;
1785 self.state
1786 .destination_circuits
1787 .record_success(&destination_key);
1788 if half_open_probe {
1789 if let Some(metrics) = self.metrics.as_ref() {
1790 metrics.record_backpressure_event("circuit", "closed");
1791 }
1792 }
1793 finish_in_flight(
1794 binding.id.as_str(),
1795 binding.version,
1796 TriggerDispatchOutcome::Dispatched,
1797 )
1798 .await
1799 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1800 self.release_flow_control(&acquired_flow).await?;
1801 decrement_in_flight(&self.state);
1802 self.append_dispatch_trust_record(
1803 binding,
1804 &route,
1805 &event,
1806 replay_of_event_id.as_ref(),
1807 autonomy_tier,
1808 TrustOutcome::Success,
1809 "succeeded",
1810 attempt,
1811 None,
1812 )
1813 .await?;
1814 return Ok(DispatchOutcome {
1815 trigger_id: binding.id.as_str().to_string(),
1816 binding_key: binding.binding_key(),
1817 event_id: event.id.0,
1818 attempt_count: attempt,
1819 status: DispatchStatus::Succeeded,
1820 handler_kind: route.kind().to_string(),
1821 target_uri: route.target_uri(),
1822 replay_of_event_id,
1823 result: Some(result),
1824 error: None,
1825 });
1826 }
1827 Err(error) => {
1828 let attempt_record = DispatchAttemptRecord {
1829 trigger_id: binding.id.as_str().to_string(),
1830 binding_key: binding.binding_key(),
1831 event_id: event.id.0.clone(),
1832 attempt,
1833 handler_kind: route.kind().to_string(),
1834 started_at,
1835 completed_at,
1836 outcome: dispatch_error_label(&error).to_string(),
1837 error_msg: Some(error.to_string()),
1838 };
1839 attempts.push(attempt_record.clone());
1840 self.append_attempt_record(
1841 &event,
1842 binding,
1843 &attempt_record,
1844 replay_of_event_id.as_ref(),
1845 )
1846 .await?;
1847 if let DispatchError::Waiting(message) = &error {
1848 self.append_lifecycle_event(
1849 "DispatchWaiting",
1850 &event,
1851 binding,
1852 serde_json::json!({
1853 "event_id": event.id.0,
1854 "attempt": attempt,
1855 "handler_kind": route.kind(),
1856 "target_uri": route.target_uri(),
1857 "message": message,
1858 "replay_of_event_id": replay_of_event_id,
1859 }),
1860 replay_of_event_id.as_ref(),
1861 )
1862 .await?;
1863 self.append_topic_event(
1864 TRIGGER_OUTBOX_TOPIC,
1865 "dispatch_waiting",
1866 &event,
1867 Some(binding),
1868 Some(attempt),
1869 serde_json::json!({
1870 "event_id": event.id.0,
1871 "attempt": attempt,
1872 "trigger_id": binding.id.as_str(),
1873 "binding_key": binding.binding_key(),
1874 "handler_kind": route.kind(),
1875 "target_uri": route.target_uri(),
1876 "message": message,
1877 "replay_of_event_id": replay_of_event_id,
1878 }),
1879 replay_of_event_id.as_ref(),
1880 )
1881 .await?;
1882 finish_in_flight(
1883 binding.id.as_str(),
1884 binding.version,
1885 TriggerDispatchOutcome::Dispatched,
1886 )
1887 .await
1888 .map_err(|registry_error| {
1889 DispatchError::Registry(registry_error.to_string())
1890 })?;
1891 self.release_flow_control(&acquired_flow).await?;
1892 decrement_in_flight(&self.state);
1893 return Ok(DispatchOutcome {
1894 trigger_id: binding.id.as_str().to_string(),
1895 binding_key: binding.binding_key(),
1896 event_id: event.id.0,
1897 attempt_count: attempt,
1898 status: DispatchStatus::Waiting,
1899 handler_kind: route.kind().to_string(),
1900 target_uri: route.target_uri(),
1901 replay_of_event_id,
1902 result: Some(serde_json::json!({
1903 "waiting": true,
1904 "message": message,
1905 })),
1906 error: None,
1907 });
1908 }
1909
1910 self.append_lifecycle_event(
1911 "DispatchFailed",
1912 &event,
1913 binding,
1914 serde_json::json!({
1915 "event_id": event.id.0,
1916 "attempt": attempt,
1917 "handler_kind": route.kind(),
1918 "target_uri": route.target_uri(),
1919 "error": error.to_string(),
1920 "replay_of_event_id": replay_of_event_id,
1921 }),
1922 replay_of_event_id.as_ref(),
1923 )
1924 .await?;
1925 self.append_topic_event(
1926 TRIGGER_OUTBOX_TOPIC,
1927 "dispatch_failed",
1928 &event,
1929 Some(binding),
1930 Some(attempt),
1931 serde_json::json!({
1932 "event_id": event.id.0,
1933 "attempt": attempt,
1934 "trigger_id": binding.id.as_str(),
1935 "binding_key": binding.binding_key(),
1936 "handler_kind": route.kind(),
1937 "target_uri": route.target_uri(),
1938 "error": error.to_string(),
1939 "replay_of_event_id": replay_of_event_id,
1940 }),
1941 replay_of_event_id.as_ref(),
1942 )
1943 .await?;
1944 self.emit_action_graph(
1945 &event,
1946 vec![RunActionGraphNodeRecord {
1947 id: attempt_node_id.clone(),
1948 label: dispatch_node_label(&route),
1949 kind: dispatch_node_kind(&route).to_string(),
1950 status: if matches!(error, DispatchError::Cancelled(_)) {
1951 "cancelled".to_string()
1952 } else {
1953 "failed".to_string()
1954 },
1955 outcome: dispatch_error_label(&error).to_string(),
1956 trace_id: Some(event.trace_id.0.clone()),
1957 stage_id: None,
1958 node_id: None,
1959 worker_id: None,
1960 run_id: None,
1961 run_path: None,
1962 metadata: dispatch_error_metadata(
1963 &route, binding, &event, attempt, &error,
1964 ),
1965 }],
1966 Vec::new(),
1967 serde_json::json!({
1968 "source": "dispatcher",
1969 "trigger_id": binding.id.as_str(),
1970 "binding_key": binding.binding_key(),
1971 "event_id": event.id.0,
1972 "attempt": attempt,
1973 "handler_kind": route.kind(),
1974 "target_uri": route.target_uri(),
1975 "error": error.to_string(),
1976 "replay_of_event_id": replay_of_event_id,
1977 }),
1978 )
1979 .await?;
1980
1981 let circuit_opened = if error.retryable() {
1982 self.state
1983 .destination_circuits
1984 .record_failure(&destination_key)
1985 } else {
1986 false
1987 };
1988 if circuit_opened {
1989 if let Some(metrics) = self.metrics.as_ref() {
1990 metrics.record_backpressure_event("circuit", "opened");
1991 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
1992 metrics.record_trigger_accepted_to_dlq(
1993 binding.id.as_str(),
1994 &binding_key,
1995 event.provider.as_str(),
1996 tenant_id(&event),
1997 "circuit_open",
1998 duration_between_ms(
1999 current_unix_ms(),
2000 accepted_at_ms(parent_headers.as_ref(), &event),
2001 ),
2002 );
2003 }
2004 let final_error = format!(
2005 "destination circuit opened for {} after {} consecutive failures: {}",
2006 destination_key, DESTINATION_CIRCUIT_FAILURE_THRESHOLD, error
2007 );
2008 let dlq_entry = DlqEntry {
2009 trigger_id: binding.id.as_str().to_string(),
2010 binding_key: binding.binding_key(),
2011 event: event.clone(),
2012 attempt_count: attempt,
2013 final_error: final_error.clone(),
2014 attempts: attempts.clone(),
2015 };
2016 self.state
2017 .dlq
2018 .lock()
2019 .expect("dispatcher dlq poisoned")
2020 .push(dlq_entry.clone());
2021 self.append_lifecycle_event(
2022 "DlqMoved",
2023 &event,
2024 binding,
2025 serde_json::json!({
2026 "event_id": event.id.0,
2027 "attempt_count": attempt,
2028 "final_error": dlq_entry.final_error,
2029 "reason": "circuit_open",
2030 "destination": destination_key,
2031 "replay_of_event_id": replay_of_event_id,
2032 }),
2033 replay_of_event_id.as_ref(),
2034 )
2035 .await?;
2036 self.append_topic_event(
2037 TRIGGER_DLQ_TOPIC,
2038 "dlq_moved",
2039 &event,
2040 Some(binding),
2041 Some(attempt),
2042 serde_json::to_value(&dlq_entry).map_err(|serde_error| {
2043 DispatchError::Serde(serde_error.to_string())
2044 })?,
2045 replay_of_event_id.as_ref(),
2046 )
2047 .await?;
2048 finish_in_flight(
2049 binding.id.as_str(),
2050 binding.version,
2051 TriggerDispatchOutcome::Dlq,
2052 )
2053 .await
2054 .map_err(|registry_error| {
2055 DispatchError::Registry(registry_error.to_string())
2056 })?;
2057 self.release_flow_control(&acquired_flow).await?;
2058 decrement_in_flight(&self.state);
2059 self.append_dispatch_trust_record(
2060 binding,
2061 &route,
2062 &event,
2063 replay_of_event_id.as_ref(),
2064 autonomy_tier,
2065 TrustOutcome::Failure,
2066 "dlq",
2067 attempt,
2068 Some(final_error.clone()),
2069 )
2070 .await?;
2071 return Ok(DispatchOutcome {
2072 trigger_id: binding.id.as_str().to_string(),
2073 binding_key: binding.binding_key(),
2074 event_id: event.id.0,
2075 attempt_count: attempt,
2076 status: DispatchStatus::Dlq,
2077 handler_kind: route.kind().to_string(),
2078 target_uri: route.target_uri(),
2079 replay_of_event_id,
2080 result: None,
2081 error: Some(final_error),
2082 });
2083 }
2084
2085 if !error.retryable() {
2086 finish_in_flight(
2087 binding.id.as_str(),
2088 binding.version,
2089 TriggerDispatchOutcome::Failed,
2090 )
2091 .await
2092 .map_err(|registry_error| {
2093 DispatchError::Registry(registry_error.to_string())
2094 })?;
2095 self.release_flow_control(&acquired_flow).await?;
2096 decrement_in_flight(&self.state);
2097 let trust_outcome = match error {
2098 DispatchError::Denied(_) => TrustOutcome::Denied,
2099 DispatchError::Timeout(_) => TrustOutcome::Timeout,
2100 _ => TrustOutcome::Failure,
2101 };
2102 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
2103 "cancelled"
2104 } else {
2105 "failed"
2106 };
2107 self.append_dispatch_trust_record(
2108 binding,
2109 &route,
2110 &event,
2111 replay_of_event_id.as_ref(),
2112 autonomy_tier,
2113 trust_outcome,
2114 terminal_status,
2115 attempt,
2116 Some(error.to_string()),
2117 )
2118 .await?;
2119 return Ok(DispatchOutcome {
2120 trigger_id: binding.id.as_str().to_string(),
2121 binding_key: binding.binding_key(),
2122 event_id: event.id.0,
2123 attempt_count: attempt,
2124 status: if matches!(error, DispatchError::Cancelled(_)) {
2125 DispatchStatus::Cancelled
2126 } else {
2127 DispatchStatus::Failed
2128 },
2129 handler_kind: route.kind().to_string(),
2130 target_uri: route.target_uri(),
2131 replay_of_event_id,
2132 result: None,
2133 error: Some(error.to_string()),
2134 });
2135 }
2136
2137 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
2138 if let Some(metrics) = self.metrics.as_ref() {
2139 metrics.record_retry_scheduled();
2140 metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
2141 metrics.record_trigger_retry_delay(
2142 binding.id.as_str(),
2143 &binding_key,
2144 event.provider.as_str(),
2145 tenant_id(&event),
2146 "scheduled",
2147 delay,
2148 );
2149 }
2150 tracing::info!(
2151 component = "dispatcher",
2152 lifecycle = "retry_scheduled",
2153 trigger_id = %binding.id.as_str(),
2154 binding_key = %binding_key,
2155 event_id = %event.id.0,
2156 attempt = attempt + 1,
2157 delay_ms = delay.as_millis(),
2158 trace_id = %event.trace_id.0
2159 );
2160 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
2161 previous_retry_node = Some(retry_node_id.clone());
2162 self.emit_action_graph(
2163 &event,
2164 vec![RunActionGraphNodeRecord {
2165 id: retry_node_id.clone(),
2166 label: format!("retry in {}ms", delay.as_millis()),
2167 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
2168 status: "scheduled".to_string(),
2169 outcome: format!("attempt_{}", attempt + 1),
2170 trace_id: Some(event.trace_id.0.clone()),
2171 stage_id: None,
2172 node_id: None,
2173 worker_id: None,
2174 run_id: None,
2175 run_path: None,
2176 metadata: retry_node_metadata(
2177 binding,
2178 &event,
2179 attempt + 1,
2180 delay,
2181 &error,
2182 ),
2183 }],
2184 vec![RunActionGraphEdgeRecord {
2185 from_id: attempt_node_id,
2186 to_id: retry_node_id.clone(),
2187 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
2188 label: Some(format!("attempt {}", attempt + 1)),
2189 }],
2190 serde_json::json!({
2191 "source": "dispatcher",
2192 "trigger_id": binding.id.as_str(),
2193 "binding_key": binding.binding_key(),
2194 "event_id": event.id.0,
2195 "attempt": attempt + 1,
2196 "delay_ms": delay.as_millis(),
2197 "replay_of_event_id": replay_of_event_id,
2198 }),
2199 )
2200 .await?;
2201 self.append_lifecycle_event(
2202 "RetryScheduled",
2203 &event,
2204 binding,
2205 serde_json::json!({
2206 "event_id": event.id.0,
2207 "attempt": attempt + 1,
2208 "delay_ms": delay.as_millis(),
2209 "error": error.to_string(),
2210 "replay_of_event_id": replay_of_event_id,
2211 }),
2212 replay_of_event_id.as_ref(),
2213 )
2214 .await?;
2215 self.append_topic_event(
2216 TRIGGER_ATTEMPTS_TOPIC,
2217 "retry_scheduled",
2218 &event,
2219 Some(binding),
2220 Some(attempt + 1),
2221 serde_json::json!({
2222 "event_id": event.id.0,
2223 "attempt": attempt + 1,
2224 "trigger_id": binding.id.as_str(),
2225 "binding_key": binding.binding_key(),
2226 "delay_ms": delay.as_millis(),
2227 "error": error.to_string(),
2228 "replay_of_event_id": replay_of_event_id,
2229 }),
2230 replay_of_event_id.as_ref(),
2231 )
2232 .await?;
2233 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
2234 let sleep_result = sleep_or_cancel_or_request(
2235 &self.event_log,
2236 delay,
2237 &binding_key,
2238 &event.id.0,
2239 replay_of_event_id.as_ref(),
2240 &mut self.cancel_tx.subscribe(),
2241 )
2242 .await;
2243 decrement_retry_queue_depth(&self.state);
2244 if sleep_result.is_err() {
2245 finish_in_flight(
2246 binding.id.as_str(),
2247 binding.version,
2248 TriggerDispatchOutcome::Failed,
2249 )
2250 .await
2251 .map_err(|registry_error| {
2252 DispatchError::Registry(registry_error.to_string())
2253 })?;
2254 self.release_flow_control(&acquired_flow).await?;
2255 decrement_in_flight(&self.state);
2256 self.append_dispatch_trust_record(
2257 binding,
2258 &route,
2259 &event,
2260 replay_of_event_id.as_ref(),
2261 autonomy_tier,
2262 TrustOutcome::Failure,
2263 "cancelled",
2264 attempt,
2265 Some("dispatcher shutdown cancelled retry wait".to_string()),
2266 )
2267 .await?;
2268 return Ok(DispatchOutcome {
2269 trigger_id: binding.id.as_str().to_string(),
2270 binding_key: binding.binding_key(),
2271 event_id: event.id.0,
2272 attempt_count: attempt,
2273 status: DispatchStatus::Cancelled,
2274 handler_kind: route.kind().to_string(),
2275 target_uri: route.target_uri(),
2276 replay_of_event_id,
2277 result: None,
2278 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
2279 });
2280 }
2281 continue;
2282 }
2283
2284 let final_error = error.to_string();
2285 let dlq_entry = DlqEntry {
2286 trigger_id: binding.id.as_str().to_string(),
2287 binding_key: binding.binding_key(),
2288 event: event.clone(),
2289 attempt_count: attempt,
2290 final_error: final_error.clone(),
2291 attempts: attempts.clone(),
2292 };
2293 self.state
2294 .dlq
2295 .lock()
2296 .expect("dispatcher dlq poisoned")
2297 .push(dlq_entry.clone());
2298 if let Some(metrics) = self.metrics.as_ref() {
2299 metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
2300 metrics.record_trigger_accepted_to_dlq(
2301 binding.id.as_str(),
2302 &binding_key,
2303 event.provider.as_str(),
2304 tenant_id(&event),
2305 "retry_exhausted",
2306 duration_between_ms(
2307 current_unix_ms(),
2308 accepted_at_ms(parent_headers.as_ref(), &event),
2309 ),
2310 );
2311 }
2312 tracing::info!(
2313 component = "dispatcher",
2314 lifecycle = "dlq_moved",
2315 trigger_id = %binding.id.as_str(),
2316 binding_key = %binding_key,
2317 event_id = %event.id.0,
2318 attempt_count = attempt,
2319 reason = "retry_exhausted",
2320 trace_id = %event.trace_id.0
2321 );
2322 self.emit_action_graph(
2323 &event,
2324 vec![RunActionGraphNodeRecord {
2325 id: format!("dlq:{binding_key}:{}", event.id.0),
2326 label: binding.id.as_str().to_string(),
2327 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
2328 status: "queued".to_string(),
2329 outcome: "retry_exhausted".to_string(),
2330 trace_id: Some(event.trace_id.0.clone()),
2331 stage_id: None,
2332 node_id: None,
2333 worker_id: None,
2334 run_id: None,
2335 run_path: None,
2336 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
2337 }],
2338 vec![RunActionGraphEdgeRecord {
2339 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
2340 to_id: format!("dlq:{binding_key}:{}", event.id.0),
2341 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
2342 label: Some(format!("{attempt} attempts")),
2343 }],
2344 serde_json::json!({
2345 "source": "dispatcher",
2346 "trigger_id": binding.id.as_str(),
2347 "binding_key": binding.binding_key(),
2348 "event_id": event.id.0,
2349 "attempt_count": attempt,
2350 "final_error": final_error,
2351 "replay_of_event_id": replay_of_event_id,
2352 }),
2353 )
2354 .await?;
2355 self.append_lifecycle_event(
2356 "DlqMoved",
2357 &event,
2358 binding,
2359 serde_json::json!({
2360 "event_id": event.id.0,
2361 "attempt_count": attempt,
2362 "final_error": dlq_entry.final_error,
2363 "replay_of_event_id": replay_of_event_id,
2364 }),
2365 replay_of_event_id.as_ref(),
2366 )
2367 .await?;
2368 self.append_topic_event(
2369 TRIGGER_DLQ_TOPIC,
2370 "dlq_moved",
2371 &event,
2372 Some(binding),
2373 Some(attempt),
2374 serde_json::to_value(&dlq_entry)
2375 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
2376 replay_of_event_id.as_ref(),
2377 )
2378 .await?;
2379 finish_in_flight(
2380 binding.id.as_str(),
2381 binding.version,
2382 TriggerDispatchOutcome::Dlq,
2383 )
2384 .await
2385 .map_err(|registry_error| {
2386 DispatchError::Registry(registry_error.to_string())
2387 })?;
2388 self.release_flow_control(&acquired_flow).await?;
2389 decrement_in_flight(&self.state);
2390 self.append_dispatch_trust_record(
2391 binding,
2392 &route,
2393 &event,
2394 replay_of_event_id.as_ref(),
2395 autonomy_tier,
2396 TrustOutcome::Failure,
2397 "dlq",
2398 attempt,
2399 Some(error.to_string()),
2400 )
2401 .await?;
2402 return Ok(DispatchOutcome {
2403 trigger_id: binding.id.as_str().to_string(),
2404 binding_key: binding.binding_key(),
2405 event_id: event.id.0,
2406 attempt_count: attempt,
2407 status: DispatchStatus::Dlq,
2408 handler_kind: route.kind().to_string(),
2409 target_uri: route.target_uri(),
2410 replay_of_event_id,
2411 result: None,
2412 error: Some(error.to_string()),
2413 });
2414 }
2415 }
2416 }
2417
2418 finish_in_flight(
2419 binding.id.as_str(),
2420 binding.version,
2421 TriggerDispatchOutcome::Failed,
2422 )
2423 .await
2424 .map_err(|error| DispatchError::Registry(error.to_string()))?;
2425 self.release_flow_control(&acquired_flow).await?;
2426 decrement_in_flight(&self.state);
2427 self.append_dispatch_trust_record(
2428 binding,
2429 &route,
2430 &event,
2431 replay_of_event_id.as_ref(),
2432 autonomy_tier,
2433 TrustOutcome::Failure,
2434 "failed",
2435 max_attempts,
2436 Some("dispatch exhausted without terminal outcome".to_string()),
2437 )
2438 .await?;
2439 Ok(DispatchOutcome {
2440 trigger_id: binding.id.as_str().to_string(),
2441 binding_key: binding.binding_key(),
2442 event_id: event.id.0,
2443 attempt_count: max_attempts,
2444 status: DispatchStatus::Failed,
2445 handler_kind: route.kind().to_string(),
2446 target_uri: route.target_uri(),
2447 replay_of_event_id,
2448 result: None,
2449 error: Some("dispatch exhausted without terminal outcome".to_string()),
2450 })
2451 }
2452
2453 async fn dispatch_once(
2454 &self,
2455 binding: &TriggerBinding,
2456 route: &DispatchUri,
2457 event: &TriggerEvent,
2458 autonomy_tier: AutonomyTier,
2459 wait_lease: Option<DispatchWaitLease>,
2460 cancel_rx: &mut broadcast::Receiver<()>,
2461 ) -> Result<serde_json::Value, DispatchError> {
2462 match route {
2463 DispatchUri::Local { .. } => {
2464 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2465 return Err(DispatchError::Local(format!(
2466 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2467 binding.id.as_str()
2468 )));
2469 };
2470 let value = self
2471 .invoke_vm_callable(
2472 closure,
2473 &binding.binding_key(),
2474 event,
2475 None,
2476 binding.id.as_str(),
2477 &format!("{}.{}", event.provider.as_str(), event.kind),
2478 autonomy_tier,
2479 wait_lease,
2480 cancel_rx,
2481 )
2482 .await?;
2483 Ok(vm_value_to_json(&value))
2484 }
2485 DispatchUri::A2a {
2486 target,
2487 allow_cleartext,
2488 } => {
2489 if self.state.shutting_down.load(Ordering::SeqCst) {
2490 return Err(DispatchError::Cancelled(
2491 "dispatcher shutdown cancelled A2A dispatch".to_string(),
2492 ));
2493 }
2494 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2495 target,
2496 *allow_cleartext,
2497 binding.id.as_str(),
2498 &binding.binding_key(),
2499 event,
2500 cancel_rx,
2501 )
2502 .await
2503 .map_err(|error| match error {
2504 crate::a2a::A2aClientError::Cancelled(message) => {
2505 DispatchError::Cancelled(message)
2506 }
2507 other => DispatchError::A2a(other.to_string()),
2508 })?;
2509 match ack {
2510 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2511 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2512 }
2513 }
2514 DispatchUri::Worker { queue } => {
2515 let receipt = crate::WorkerQueue::new(self.event_log.clone())
2516 .enqueue(&crate::WorkerQueueJob {
2517 queue: queue.clone(),
2518 trigger_id: binding.id.as_str().to_string(),
2519 binding_key: binding.binding_key(),
2520 binding_version: binding.version,
2521 event: event.clone(),
2522 replay_of_event_id: current_dispatch_context()
2523 .and_then(|context| context.replay_of_event_id),
2524 priority: worker_queue_priority(binding, event),
2525 })
2526 .await
2527 .map_err(DispatchError::from)?;
2528 Ok(serde_json::to_value(receipt)
2529 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2530 }
2531 }
2532 }
2533
2534 async fn apply_flow_control(
2535 &self,
2536 binding: &TriggerBinding,
2537 event: &TriggerEvent,
2538 replay_of_event_id: Option<&String>,
2539 ) -> Result<FlowControlOutcome, DispatchError> {
2540 let flow = &binding.flow_control;
2541 let mut managed_event = event.clone();
2542
2543 if let Some(batch) = &flow.batch {
2544 let gate = self
2545 .resolve_flow_gate(
2546 &binding.binding_key(),
2547 batch.key.as_ref(),
2548 &managed_event,
2549 replay_of_event_id,
2550 )
2551 .await?;
2552 match self
2553 .state
2554 .flow_control
2555 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2556 .await
2557 .map_err(DispatchError::from)?
2558 {
2559 BatchDecision::Dispatch(events) => {
2560 managed_event = build_batched_event(events)?;
2561 }
2562 BatchDecision::Merged => {
2563 return Ok(FlowControlOutcome::Skip {
2564 reason: "batch_merged".to_string(),
2565 })
2566 }
2567 }
2568 }
2569
2570 if let Some(debounce) = &flow.debounce {
2571 let gate = self
2572 .resolve_flow_gate(
2573 &binding.binding_key(),
2574 Some(&debounce.key),
2575 &managed_event,
2576 replay_of_event_id,
2577 )
2578 .await?;
2579 let latest = self
2580 .state
2581 .flow_control
2582 .debounce(&gate, debounce.period)
2583 .await
2584 .map_err(DispatchError::from)?;
2585 if !latest {
2586 return Ok(FlowControlOutcome::Skip {
2587 reason: "debounced".to_string(),
2588 });
2589 }
2590 }
2591
2592 if let Some(rate_limit) = &flow.rate_limit {
2593 let gate = self
2594 .resolve_flow_gate(
2595 &binding.binding_key(),
2596 rate_limit.key.as_ref(),
2597 &managed_event,
2598 replay_of_event_id,
2599 )
2600 .await?;
2601 let allowed = self
2602 .state
2603 .flow_control
2604 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2605 .await
2606 .map_err(DispatchError::from)?;
2607 if !allowed {
2608 return Ok(FlowControlOutcome::Skip {
2609 reason: "rate_limited".to_string(),
2610 });
2611 }
2612 }
2613
2614 if let Some(throttle) = &flow.throttle {
2615 let gate = self
2616 .resolve_flow_gate(
2617 &binding.binding_key(),
2618 throttle.key.as_ref(),
2619 &managed_event,
2620 replay_of_event_id,
2621 )
2622 .await?;
2623 self.state
2624 .flow_control
2625 .wait_for_throttle(&gate, throttle.period, throttle.max)
2626 .await
2627 .map_err(DispatchError::from)?;
2628 }
2629
2630 let mut acquired = AcquiredFlowControl::default();
2631 if let Some(singleton) = &flow.singleton {
2632 let gate = self
2633 .resolve_flow_gate(
2634 &binding.binding_key(),
2635 singleton.key.as_ref(),
2636 &managed_event,
2637 replay_of_event_id,
2638 )
2639 .await?;
2640 let acquired_singleton = self
2641 .state
2642 .flow_control
2643 .try_acquire_singleton(&gate)
2644 .await
2645 .map_err(DispatchError::from)?;
2646 if !acquired_singleton {
2647 return Ok(FlowControlOutcome::Skip {
2648 reason: "singleton_active".to_string(),
2649 });
2650 }
2651 acquired.singleton = Some(SingletonLease { gate, held: true });
2652 }
2653
2654 if let Some(concurrency) = &flow.concurrency {
2655 let gate = self
2656 .resolve_flow_gate(
2657 &binding.binding_key(),
2658 concurrency.key.as_ref(),
2659 &managed_event,
2660 replay_of_event_id,
2661 )
2662 .await?;
2663 let priority_rank = self
2664 .resolve_priority_rank(
2665 &binding.binding_key(),
2666 flow.priority.as_ref(),
2667 &managed_event,
2668 replay_of_event_id,
2669 )
2670 .await?;
2671 let permit = self
2672 .state
2673 .flow_control
2674 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2675 .await
2676 .map_err(DispatchError::from)?;
2677 acquired.concurrency = Some(ConcurrencyLease {
2678 gate,
2679 max: concurrency.max,
2680 priority_rank,
2681 permit: Some(permit),
2682 });
2683 }
2684
2685 Ok(FlowControlOutcome::Dispatch {
2686 event: Box::new(managed_event),
2687 acquired,
2688 })
2689 }
2690
2691 async fn release_flow_control(
2692 &self,
2693 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2694 ) -> Result<(), DispatchError> {
2695 let (singleton_gate, concurrency_permit) = {
2696 let mut acquired = acquired.lock().await;
2697 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2698 if lease.held {
2699 lease.held = false;
2700 Some(lease.gate.clone())
2701 } else {
2702 None
2703 }
2704 });
2705 let concurrency_permit = acquired
2706 .concurrency
2707 .as_mut()
2708 .and_then(|lease| lease.permit.take());
2709 (singleton_gate, concurrency_permit)
2710 };
2711 if let Some(gate) = singleton_gate {
2712 self.state
2713 .flow_control
2714 .release_singleton(&gate)
2715 .await
2716 .map_err(DispatchError::from)?;
2717 }
2718 if let Some(permit) = concurrency_permit {
2719 self.state
2720 .flow_control
2721 .release_concurrency(permit)
2722 .await
2723 .map_err(DispatchError::from)?;
2724 }
2725 Ok(())
2726 }
2727
2728 async fn resolve_flow_gate(
2729 &self,
2730 binding_key: &str,
2731 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2732 event: &TriggerEvent,
2733 replay_of_event_id: Option<&String>,
2734 ) -> Result<String, DispatchError> {
2735 let key = match expr {
2736 Some(expr) => {
2737 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2738 .await?
2739 }
2740 None => "_global".to_string(),
2741 };
2742 Ok(format!("{binding_key}:{key}"))
2743 }
2744
2745 async fn resolve_priority_rank(
2746 &self,
2747 binding_key: &str,
2748 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2749 event: &TriggerEvent,
2750 replay_of_event_id: Option<&String>,
2751 ) -> Result<usize, DispatchError> {
2752 let Some(priority) = priority else {
2753 return Ok(0);
2754 };
2755 let value = self
2756 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2757 .await?;
2758 Ok(priority
2759 .order
2760 .iter()
2761 .position(|candidate| candidate == &value)
2762 .unwrap_or(priority.order.len()))
2763 }
2764
2765 async fn evaluate_flow_expression(
2766 &self,
2767 binding_key: &str,
2768 expr: &crate::triggers::TriggerExpressionSpec,
2769 event: &TriggerEvent,
2770 replay_of_event_id: Option<&String>,
2771 ) -> Result<String, DispatchError> {
2772 let value = self
2773 .invoke_vm_callable(
2774 &expr.closure,
2775 binding_key,
2776 event,
2777 replay_of_event_id,
2778 "",
2779 "flow_control",
2780 AutonomyTier::Suggest,
2781 None,
2782 &mut self.cancel_tx.subscribe(),
2783 )
2784 .await?;
2785 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2786 }
2787
2788 #[allow(clippy::too_many_arguments)]
2789 async fn invoke_vm_callable(
2790 &self,
2791 closure: &crate::value::VmClosure,
2792 binding_key: &str,
2793 event: &TriggerEvent,
2794 replay_of_event_id: Option<&String>,
2795 agent_id: &str,
2796 action: &str,
2797 autonomy_tier: AutonomyTier,
2798 wait_lease: Option<DispatchWaitLease>,
2799 cancel_rx: &mut broadcast::Receiver<()>,
2800 ) -> Result<VmValue, DispatchError> {
2801 let mut vm = self.base_vm.child_vm();
2802 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2803 if self.state.shutting_down.load(Ordering::SeqCst) {
2804 cancel_token.store(true, Ordering::SeqCst);
2805 }
2806 self.state
2807 .cancel_tokens
2808 .lock()
2809 .expect("dispatcher cancel tokens poisoned")
2810 .push(cancel_token.clone());
2811 vm.install_cancel_token(cancel_token.clone());
2812 let arg = event_to_handler_value(event)?;
2813 let args = [arg];
2814 let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2815 let effective_policy = match crate::orchestration::current_execution_policy() {
2816 Some(parent) => parent
2817 .intersect(&tier_policy)
2818 .map_err(|error| DispatchError::Local(error.to_string()))?,
2819 None => tier_policy,
2820 };
2821 crate::orchestration::push_execution_policy(effective_policy);
2822 let _policy_guard = DispatchExecutionPolicyGuard;
2823 let future = vm.call_closure_pub(closure, &args);
2824 pin_mut!(future);
2825 let (binding_id, binding_version) = split_binding_key(binding_key);
2826 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2827 slot.borrow_mut().replace(DispatchContext {
2828 trigger_event: event.clone(),
2829 replay_of_event_id: replay_of_event_id.cloned(),
2830 binding_id,
2831 binding_version,
2832 agent_id: agent_id.to_string(),
2833 action: action.to_string(),
2834 autonomy_tier,
2835 })
2836 });
2837 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2838 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2839 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2840 crate::stdlib::hitl::reset_hitl_state();
2841 let mut poll = tokio::time::interval(Duration::from_millis(100));
2842 let result = loop {
2843 tokio::select! {
2844 result = &mut future => break result,
2845 _ = recv_cancel(cancel_rx) => {
2846 cancel_token.store(true, Ordering::SeqCst);
2847 }
2848 _ = poll.tick() => {
2849 if dispatch_cancel_requested(
2850 &self.event_log,
2851 binding_key,
2852 &event.id.0,
2853 replay_of_event_id,
2854 )
2855 .await? {
2856 cancel_token.store(true, Ordering::SeqCst);
2857 }
2858 }
2859 }
2860 };
2861 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2862 *slot.borrow_mut() = prior_context;
2863 });
2864 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2865 *slot.borrow_mut() = prior_wait_lease;
2866 });
2867 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2868 {
2869 let mut tokens = self
2870 .state
2871 .cancel_tokens
2872 .lock()
2873 .expect("dispatcher cancel tokens poisoned");
2874 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2875 }
2876
2877 if cancel_token.load(Ordering::SeqCst) {
2878 if dispatch_cancel_requested(
2879 &self.event_log,
2880 binding_key,
2881 &event.id.0,
2882 replay_of_event_id,
2883 )
2884 .await?
2885 {
2886 Err(DispatchError::Cancelled(
2887 "trigger cancel request cancelled local handler".to_string(),
2888 ))
2889 } else {
2890 Err(DispatchError::Cancelled(
2891 "dispatcher shutdown cancelled local handler".to_string(),
2892 ))
2893 }
2894 } else {
2895 result.map_err(dispatch_error_from_vm_error)
2896 }
2897 }
2898
2899 async fn evaluate_predicate(
2900 &self,
2901 binding: &TriggerBinding,
2902 predicate: &super::registry::TriggerPredicateSpec,
2903 event: &TriggerEvent,
2904 replay_of_event_id: Option<&String>,
2905 autonomy_tier: AutonomyTier,
2906 ) -> Result<PredicateEvaluationRecord, DispatchError> {
2907 let event_id = event.id.0.clone();
2908 let trigger_id = binding.id.as_str().to_string();
2909 let now_ms = now_unix_ms();
2910 reset_binding_budget_windows(binding);
2911
2912 let breaker_open_until = {
2913 let state = binding
2914 .predicate_state
2915 .lock()
2916 .expect("trigger predicate state poisoned");
2917 if state
2918 .breaker_open_until_ms
2919 .is_some_and(|until_ms| until_ms > now_ms)
2920 {
2921 state.breaker_open_until_ms
2922 } else {
2923 None
2924 }
2925 };
2926
2927 if breaker_open_until.is_some() {
2928 let mut metadata = BTreeMap::new();
2929 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2930 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2931 metadata.insert(
2932 "breaker_open_until_ms".to_string(),
2933 serde_json::json!(breaker_open_until),
2934 );
2935 crate::events::log_warn_meta(
2936 "trigger.predicate.circuit_breaker",
2937 "trigger predicate circuit breaker is open; short-circuiting to false",
2938 metadata,
2939 );
2940 let record = PredicateEvaluationRecord {
2941 result: false,
2942 reason: Some("circuit_open".to_string()),
2943 ..Default::default()
2944 };
2945 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2946 .await?;
2947 return Ok(record);
2948 }
2949
2950 let expected_cost = expected_predicate_cost_usd_micros(binding);
2951 if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2952 .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2953 {
2954 self.append_budget_exhausted_event(
2955 binding,
2956 event,
2957 reason,
2958 expected_cost,
2959 None,
2960 replay_of_event_id,
2961 )
2962 .await?;
2963 let record = PredicateEvaluationRecord {
2964 result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2965 reason: Some(reason.to_string()),
2966 exhaustion_strategy: Some(binding.on_budget_exhausted),
2967 ..Default::default()
2968 };
2969 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2970 .await?;
2971 return Ok(record);
2972 }
2973
2974 let replay_cache = self
2975 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2976 .await?;
2977 let guard = start_predicate_evaluation(
2978 binding.when_budget.clone().unwrap_or_default(),
2979 replay_cache,
2980 );
2981 let started = std::time::Instant::now();
2982 let eval = self
2983 .invoke_vm_callable_with_timeout(
2984 &predicate.closure,
2985 &binding.binding_key(),
2986 event,
2987 replay_of_event_id,
2988 binding.id.as_str(),
2989 &format!("{}.{}", event.provider.as_str(), event.kind),
2990 autonomy_tier,
2991 &mut self.cancel_tx.subscribe(),
2992 binding
2993 .when_budget
2994 .as_ref()
2995 .and_then(|budget| budget.timeout()),
2996 )
2997 .await;
2998 let capture = guard.finish();
2999 let latency_ms = started.elapsed().as_millis() as u64;
3000 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
3001 self.append_predicate_cache_record(binding, event, &capture.entries)
3002 .await?;
3003 }
3004
3005 let mut record = PredicateEvaluationRecord {
3006 result: false,
3007 cost_usd: capture.total_cost_usd,
3008 tokens: capture.total_tokens,
3009 latency_ms,
3010 cached: capture.cached,
3011 reason: None,
3012 exhaustion_strategy: None,
3013 };
3014
3015 let mut count_failure = false;
3016 let mut opened_breaker = false;
3017
3018 match eval {
3019 Ok(value) => match predicate_value_as_bool(value) {
3020 Ok(result) => {
3021 record.result = result;
3022 }
3023 Err(reason) => {
3024 count_failure = true;
3025 record.reason = Some(reason);
3026 }
3027 },
3028 Err(error) => {
3029 count_failure = true;
3030 record.reason = Some(error.to_string());
3031 }
3032 }
3033
3034 let cost_usd_micros = usd_to_micros(record.cost_usd);
3035 if cost_usd_micros > 0 {
3036 binding
3037 .metrics
3038 .cost_total_usd_micros
3039 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3040 binding
3041 .metrics
3042 .cost_today_usd_micros
3043 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3044 binding
3045 .metrics
3046 .cost_hour_usd_micros
3047 .fetch_add(cost_usd_micros, Ordering::Relaxed);
3048 note_orchestrator_budget_cost(cost_usd_micros);
3049 record_predicate_cost_sample(binding, cost_usd_micros);
3050 }
3051
3052 let timed_out = matches!(
3053 record.reason.as_deref(),
3054 Some("predicate evaluation timed out")
3055 );
3056 if capture.budget_exceeded || timed_out {
3057 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3058 record.result = false;
3059 } else if timed_out {
3060 record.result = true;
3061 }
3062 record.reason = Some("budget_exceeded".to_string());
3063 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3064 self.append_budget_exhausted_event(
3065 binding,
3066 event,
3067 "budget_exceeded",
3068 cost_usd_micros,
3069 Some(record.tokens),
3070 replay_of_event_id,
3071 )
3072 .await?;
3073 self.append_lifecycle_event(
3074 "predicate.invocation_budget_exceeded",
3075 event,
3076 binding,
3077 serde_json::json!({
3078 "trigger_id": binding.id.as_str(),
3079 "event_id": event.id.0,
3080 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
3081 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
3082 "cost_usd": record.cost_usd,
3083 "tokens": record.tokens,
3084 "replay_of_event_id": replay_of_event_id,
3085 }),
3086 replay_of_event_id,
3087 )
3088 .await?;
3089 }
3090
3091 if let Some(reason) =
3092 binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
3093 {
3094 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
3095 record.result = false;
3096 }
3097 record.reason = Some(reason.to_string());
3098 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
3099 self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
3100 .await?;
3101 }
3102
3103 {
3104 let mut state = binding
3105 .predicate_state
3106 .lock()
3107 .expect("trigger predicate state poisoned");
3108 if count_failure {
3109 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
3110 if state.consecutive_failures >= 3 {
3111 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
3112 opened_breaker = true;
3113 }
3114 } else {
3115 state.consecutive_failures = 0;
3116 state.breaker_open_until_ms = None;
3117 }
3118 }
3119
3120 if opened_breaker {
3121 let mut metadata = BTreeMap::new();
3122 metadata.insert(
3123 "trigger_id".to_string(),
3124 serde_json::json!(binding.id.as_str()),
3125 );
3126 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3127 metadata.insert("failure_count".to_string(), serde_json::json!(3));
3128 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
3129 crate::events::log_warn_meta(
3130 "trigger.predicate.circuit_breaker",
3131 "trigger predicate circuit breaker opened for 5 minutes",
3132 metadata,
3133 );
3134 }
3135
3136 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
3137 .await?;
3138 Ok(record)
3139 }
3140
3141 #[allow(clippy::too_many_arguments)]
3142 #[allow(clippy::too_many_arguments)]
3143 async fn invoke_vm_callable_with_timeout(
3144 &self,
3145 closure: &crate::value::VmClosure,
3146 binding_key: &str,
3147 event: &TriggerEvent,
3148 replay_of_event_id: Option<&String>,
3149 agent_id: &str,
3150 action: &str,
3151 autonomy_tier: AutonomyTier,
3152 cancel_rx: &mut broadcast::Receiver<()>,
3153 timeout: Option<Duration>,
3154 ) -> Result<VmValue, DispatchError> {
3155 let future = self.invoke_vm_callable(
3156 closure,
3157 binding_key,
3158 event,
3159 replay_of_event_id,
3160 agent_id,
3161 action,
3162 autonomy_tier,
3163 None,
3164 cancel_rx,
3165 );
3166 pin_mut!(future);
3167 if let Some(timeout) = timeout {
3168 match tokio::time::timeout(timeout, future).await {
3169 Ok(result) => result,
3170 Err(_) => Err(DispatchError::Local(
3171 "predicate evaluation timed out".to_string(),
3172 )),
3173 }
3174 } else {
3175 future.await
3176 }
3177 }
3178
3179 async fn append_predicate_evaluated_event(
3180 &self,
3181 binding: &TriggerBinding,
3182 event: &TriggerEvent,
3183 record: &PredicateEvaluationRecord,
3184 replay_of_event_id: Option<&String>,
3185 ) -> Result<(), DispatchError> {
3186 if let Some(metrics) = self.metrics.as_ref() {
3187 metrics.record_trigger_predicate_evaluation(
3188 binding.id.as_str(),
3189 record.result,
3190 record.cost_usd,
3191 );
3192 metrics.set_trigger_budget_cost_today(
3193 binding.id.as_str(),
3194 current_predicate_daily_cost(binding),
3195 );
3196 if matches!(
3197 record.reason.as_deref(),
3198 Some(
3199 "budget_exceeded"
3200 | "daily_budget_exceeded"
3201 | "hourly_budget_exceeded"
3202 | "orchestrator_daily_budget_exceeded"
3203 | "orchestrator_hourly_budget_exceeded"
3204 )
3205 ) {
3206 metrics.record_trigger_budget_exhausted(
3207 binding.id.as_str(),
3208 record
3209 .exhaustion_strategy
3210 .map(TriggerBudgetExhaustionStrategy::as_str)
3211 .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
3212 );
3213 }
3214 }
3215 self.append_lifecycle_event(
3216 "predicate.evaluated",
3217 event,
3218 binding,
3219 serde_json::json!({
3220 "trigger_id": binding.id.as_str(),
3221 "event_id": event.id.0,
3222 "result": record.result,
3223 "cost_usd": record.cost_usd,
3224 "tokens": record.tokens,
3225 "latency_ms": record.latency_ms,
3226 "cached": record.cached,
3227 "reason": record.reason,
3228 "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
3229 "replay_of_event_id": replay_of_event_id,
3230 }),
3231 replay_of_event_id,
3232 )
3233 .await
3234 }
3235
3236 async fn append_predicate_cache_record(
3237 &self,
3238 binding: &TriggerBinding,
3239 event: &TriggerEvent,
3240 entries: &[PredicateCacheEntry],
3241 ) -> Result<(), DispatchError> {
3242 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3243 .expect("static trigger inbox legacy topic name is valid");
3244 let payload = serde_json::to_value(PredicateCacheRecord {
3245 trigger_id: binding.id.as_str().to_string(),
3246 event_id: event.id.0.clone(),
3247 entries: entries.to_vec(),
3248 })
3249 .map_err(|error| DispatchError::Serde(error.to_string()))?;
3250 self.event_log
3251 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
3252 .await
3253 .map_err(DispatchError::from)
3254 .map(|_| ())
3255 }
3256
3257 async fn read_predicate_cache_record(
3258 &self,
3259 event_id: &str,
3260 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
3261 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
3262 .expect("static trigger inbox legacy topic name is valid");
3263 let records = self
3264 .event_log
3265 .read_range(&topic, None, usize::MAX)
3266 .await
3267 .map_err(DispatchError::from)?;
3268 Ok(records
3269 .into_iter()
3270 .filter(|(_, event)| event.kind == "predicate_llm_cache")
3271 .filter_map(|(_, event)| {
3272 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
3273 })
3274 .filter(|record| record.event_id == event_id)
3275 .flat_map(|record| record.entries)
3276 .collect())
3277 }
3278
3279 #[allow(clippy::too_many_arguments)]
3280 async fn append_dispatch_trust_record(
3281 &self,
3282 binding: &TriggerBinding,
3283 route: &DispatchUri,
3284 event: &TriggerEvent,
3285 replay_of_event_id: Option<&String>,
3286 autonomy_tier: AutonomyTier,
3287 outcome: TrustOutcome,
3288 terminal_status: &str,
3289 attempt_count: u32,
3290 error: Option<String>,
3291 ) -> Result<(), DispatchError> {
3292 let mut record = TrustRecord::new(
3293 binding.id.as_str().to_string(),
3294 format!("{}.{}", event.provider.as_str(), event.kind),
3295 None,
3296 outcome,
3297 event.trace_id.0.clone(),
3298 autonomy_tier,
3299 );
3300 record.metadata.insert(
3301 "binding_key".to_string(),
3302 serde_json::json!(binding.binding_key()),
3303 );
3304 record.metadata.insert(
3305 "binding_version".to_string(),
3306 serde_json::json!(binding.version),
3307 );
3308 record.metadata.insert(
3309 "provider".to_string(),
3310 serde_json::json!(event.provider.as_str()),
3311 );
3312 record
3313 .metadata
3314 .insert("event_kind".to_string(), serde_json::json!(event.kind));
3315 record
3316 .metadata
3317 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3318 record.metadata.insert(
3319 "target_uri".to_string(),
3320 serde_json::json!(route.target_uri()),
3321 );
3322 record.metadata.insert(
3323 "terminal_status".to_string(),
3324 serde_json::json!(terminal_status),
3325 );
3326 record.metadata.insert(
3327 "attempt_count".to_string(),
3328 serde_json::json!(attempt_count),
3329 );
3330 if let Some(replay_of_event_id) = replay_of_event_id {
3331 record.metadata.insert(
3332 "replay_of_event_id".to_string(),
3333 serde_json::json!(replay_of_event_id),
3334 );
3335 }
3336 if let Some(error) = error {
3337 record
3338 .metadata
3339 .insert("error".to_string(), serde_json::json!(error));
3340 }
3341 append_trust_record(&self.event_log, &record)
3342 .await
3343 .map(|_| ())
3344 .map_err(DispatchError::from)
3345 }
3346
3347 async fn append_attempt_record(
3348 &self,
3349 event: &TriggerEvent,
3350 binding: &TriggerBinding,
3351 attempt: &DispatchAttemptRecord,
3352 replay_of_event_id: Option<&String>,
3353 ) -> Result<(), DispatchError> {
3354 self.append_topic_event(
3355 TRIGGER_ATTEMPTS_TOPIC,
3356 "attempt_recorded",
3357 event,
3358 Some(binding),
3359 Some(attempt.attempt),
3360 serde_json::to_value(attempt)
3361 .map_err(|error| DispatchError::Serde(error.to_string()))?,
3362 replay_of_event_id,
3363 )
3364 .await
3365 }
3366
3367 async fn append_lifecycle_event(
3368 &self,
3369 kind: &str,
3370 event: &TriggerEvent,
3371 binding: &TriggerBinding,
3372 payload: serde_json::Value,
3373 replay_of_event_id: Option<&String>,
3374 ) -> Result<(), DispatchError> {
3375 self.append_topic_event(
3376 TRIGGERS_LIFECYCLE_TOPIC,
3377 kind,
3378 event,
3379 Some(binding),
3380 None,
3381 payload,
3382 replay_of_event_id,
3383 )
3384 .await
3385 }
3386
3387 async fn append_budget_exhausted_event(
3388 &self,
3389 binding: &TriggerBinding,
3390 event: &TriggerEvent,
3391 reason: &str,
3392 expected_cost_usd_micros: u64,
3393 tokens: Option<u64>,
3394 replay_of_event_id: Option<&String>,
3395 ) -> Result<(), DispatchError> {
3396 let payload = serde_json::json!({
3397 "trigger_id": binding.id.as_str(),
3398 "event_id": event.id.0,
3399 "reason": reason,
3400 "strategy": binding.on_budget_exhausted.as_str(),
3401 "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
3402 "cost_usd": micros_to_usd(expected_cost_usd_micros),
3403 "tokens": tokens,
3404 "daily_limit_usd": binding.daily_cost_usd,
3405 "hourly_limit_usd": binding.hourly_cost_usd,
3406 "cost_today_usd": current_predicate_daily_cost(binding),
3407 "cost_hour_usd": current_predicate_hourly_cost(binding),
3408 "replay_of_event_id": replay_of_event_id,
3409 });
3410 self.append_lifecycle_event(
3411 "predicate.budget_exceeded",
3412 event,
3413 binding,
3414 payload.clone(),
3415 replay_of_event_id,
3416 )
3417 .await?;
3418 let legacy_kind = match reason {
3419 "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
3420 "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
3421 "orchestrator_daily_budget_exceeded" => {
3422 Some("predicate.orchestrator_daily_budget_exceeded")
3423 }
3424 "orchestrator_hourly_budget_exceeded" => {
3425 Some("predicate.orchestrator_hourly_budget_exceeded")
3426 }
3427 _ => None,
3428 };
3429 if let Some(kind) = legacy_kind {
3430 self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
3431 .await?;
3432 }
3433 Ok(())
3434 }
3435
3436 async fn append_autonomy_budget_approval_request(
3437 &self,
3438 binding: &TriggerBinding,
3439 route: &DispatchUri,
3440 event: &TriggerEvent,
3441 replay_of_event_id: Option<&String>,
3442 reason: &str,
3443 ) -> Result<String, DispatchError> {
3444 let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
3445 let detail = serde_json::json!({
3446 "trigger_id": binding.id.as_str(),
3447 "binding_key": binding.binding_key(),
3448 "event_id": event.id.0,
3449 "reason": reason,
3450 "from_tier": AutonomyTier::ActAuto.as_str(),
3451 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3452 "handler_kind": route.kind(),
3453 "target_uri": route.target_uri(),
3454 "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
3455 "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
3456 "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
3457 "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
3458 "replay_of_event_id": replay_of_event_id,
3459 });
3460 let request_id = crate::stdlib::hitl::append_approval_request_on(
3461 &self.event_log,
3462 binding.id.as_str().to_string(),
3463 event.trace_id.0.clone(),
3464 format!(
3465 "approve autonomous dispatch for trigger '{}' after {}",
3466 binding.id.as_str(),
3467 reason
3468 ),
3469 detail.clone(),
3470 reviewers.clone(),
3471 )
3472 .await
3473 .map_err(dispatch_error_from_vm_error)?;
3474 self.append_lifecycle_event(
3475 "autonomy.budget_exceeded",
3476 event,
3477 binding,
3478 serde_json::json!({
3479 "trigger_id": binding.id.as_str(),
3480 "event_id": event.id.0,
3481 "reason": reason,
3482 "request_id": request_id,
3483 "reviewers": reviewers,
3484 "from_tier": AutonomyTier::ActAuto.as_str(),
3485 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3486 "replay_of_event_id": replay_of_event_id,
3487 }),
3488 replay_of_event_id,
3489 )
3490 .await?;
3491 Ok(request_id)
3492 }
3493
3494 #[allow(clippy::too_many_arguments)]
3495 async fn emit_autonomy_budget_approval_action_graph(
3496 &self,
3497 binding: &TriggerBinding,
3498 route: &DispatchUri,
3499 event: &TriggerEvent,
3500 source_node_id: &str,
3501 replay_of_event_id: Option<&String>,
3502 reason: &str,
3503 request_id: &str,
3504 ) -> Result<(), DispatchError> {
3505 let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3506 self.emit_action_graph(
3507 event,
3508 vec![RunActionGraphNodeRecord {
3509 id: approval_node_id.clone(),
3510 label: format!("approval required: {reason}"),
3511 kind: "approval".to_string(),
3512 status: "waiting".to_string(),
3513 outcome: "request_approval".to_string(),
3514 trace_id: Some(event.trace_id.0.clone()),
3515 stage_id: None,
3516 node_id: None,
3517 worker_id: None,
3518 run_id: None,
3519 run_path: None,
3520 metadata: BTreeMap::from([
3521 (
3522 "trigger_id".to_string(),
3523 serde_json::json!(binding.id.as_str()),
3524 ),
3525 (
3526 "binding_key".to_string(),
3527 serde_json::json!(binding.binding_key()),
3528 ),
3529 ("event_id".to_string(), serde_json::json!(event.id.0)),
3530 ("reason".to_string(), serde_json::json!(reason)),
3531 ("request_id".to_string(), serde_json::json!(request_id)),
3532 (
3533 "reviewers".to_string(),
3534 serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3535 ),
3536 ("handler_kind".to_string(), serde_json::json!(route.kind())),
3537 (
3538 "target_uri".to_string(),
3539 serde_json::json!(route.target_uri()),
3540 ),
3541 (
3542 "from_tier".to_string(),
3543 serde_json::json!(AutonomyTier::ActAuto.as_str()),
3544 ),
3545 (
3546 "requested_tier".to_string(),
3547 serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3548 ),
3549 (
3550 "replay_of_event_id".to_string(),
3551 serde_json::json!(replay_of_event_id),
3552 ),
3553 ]),
3554 }],
3555 vec![RunActionGraphEdgeRecord {
3556 from_id: source_node_id.to_string(),
3557 to_id: approval_node_id,
3558 kind: "approval_gate".to_string(),
3559 label: Some("autonomy budget".to_string()),
3560 }],
3561 serde_json::json!({
3562 "source": "dispatcher",
3563 "trigger_id": binding.id.as_str(),
3564 "binding_key": binding.binding_key(),
3565 "event_id": event.id.0,
3566 "reason": reason,
3567 "request_id": request_id,
3568 "replay_of_event_id": replay_of_event_id,
3569 }),
3570 )
3571 .await
3572 }
3573
3574 #[allow(clippy::too_many_arguments)]
3575 async fn append_tier_transition_trust_record(
3576 &self,
3577 binding: &TriggerBinding,
3578 event: &TriggerEvent,
3579 replay_of_event_id: Option<&String>,
3580 from_tier: AutonomyTier,
3581 to_tier: AutonomyTier,
3582 reason: &str,
3583 request_id: &str,
3584 ) -> Result<(), DispatchError> {
3585 let mut record = TrustRecord::new(
3586 binding.id.as_str().to_string(),
3587 "autonomy.tier_transition",
3588 Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3589 TrustOutcome::Denied,
3590 event.trace_id.0.clone(),
3591 to_tier,
3592 );
3593 record.metadata.insert(
3594 "binding_key".to_string(),
3595 serde_json::json!(binding.binding_key()),
3596 );
3597 record
3598 .metadata
3599 .insert("event_id".to_string(), serde_json::json!(event.id.0));
3600 record.metadata.insert(
3601 "from_tier".to_string(),
3602 serde_json::json!(from_tier.as_str()),
3603 );
3604 record
3605 .metadata
3606 .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3607 record
3608 .metadata
3609 .insert("reason".to_string(), serde_json::json!(reason));
3610 record
3611 .metadata
3612 .insert("request_id".to_string(), serde_json::json!(request_id));
3613 if let Some(replay_of_event_id) = replay_of_event_id {
3614 record.metadata.insert(
3615 "replay_of_event_id".to_string(),
3616 serde_json::json!(replay_of_event_id),
3617 );
3618 }
3619 append_trust_record(&self.event_log, &record)
3620 .await
3621 .map(|_| ())
3622 .map_err(DispatchError::from)
3623 }
3624
3625 async fn append_budget_deferred_event(
3626 &self,
3627 binding: &TriggerBinding,
3628 route: &DispatchUri,
3629 event: &TriggerEvent,
3630 replay_of_event_id: Option<&String>,
3631 reason: &str,
3632 ) -> Result<(), DispatchError> {
3633 self.append_topic_event(
3634 TRIGGER_ATTEMPTS_TOPIC,
3635 "budget_deferred",
3636 event,
3637 Some(binding),
3638 None,
3639 serde_json::json!({
3640 "event_id": event.id.0,
3641 "trigger_id": binding.id.as_str(),
3642 "binding_key": binding.binding_key(),
3643 "handler_kind": route.kind(),
3644 "target_uri": route.target_uri(),
3645 "reason": reason,
3646 "retry_at": next_budget_reset_rfc3339(binding),
3647 "replay_of_event_id": replay_of_event_id,
3648 }),
3649 replay_of_event_id,
3650 )
3651 .await?;
3652 self.append_skipped_outbox_event(
3653 binding,
3654 route,
3655 event,
3656 replay_of_event_id,
3657 DispatchSkipStage::Predicate,
3658 serde_json::json!({
3659 "deferred": true,
3660 "reason": reason,
3661 "retry_at": next_budget_reset_rfc3339(binding),
3662 }),
3663 )
3664 .await
3665 }
3666
3667 async fn move_budget_exhausted_to_dlq(
3668 &self,
3669 binding: &TriggerBinding,
3670 route: &DispatchUri,
3671 event: &TriggerEvent,
3672 replay_of_event_id: Option<&String>,
3673 final_error: &str,
3674 ) -> Result<(), DispatchError> {
3675 let dlq_entry = DlqEntry {
3676 trigger_id: binding.id.as_str().to_string(),
3677 binding_key: binding.binding_key(),
3678 event: event.clone(),
3679 attempt_count: 0,
3680 final_error: final_error.to_string(),
3681 attempts: Vec::new(),
3682 };
3683 self.state
3684 .dlq
3685 .lock()
3686 .expect("dispatcher dlq poisoned")
3687 .push(dlq_entry.clone());
3688 if let Some(metrics) = self.metrics.as_ref() {
3689 metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3690 metrics.record_trigger_accepted_to_dlq(
3691 binding.id.as_str(),
3692 &binding.binding_key(),
3693 event.provider.as_str(),
3694 tenant_id(event),
3695 "budget_exhausted",
3696 duration_between_ms(current_unix_ms(), accepted_at_ms(None, event)),
3697 );
3698 }
3699 tracing::info!(
3700 component = "dispatcher",
3701 lifecycle = "dlq_moved",
3702 trigger_id = %binding.id.as_str(),
3703 binding_key = %binding.binding_key(),
3704 event_id = %event.id.0,
3705 reason = "budget_exhausted",
3706 trace_id = %event.trace_id.0
3707 );
3708 self.emit_action_graph(
3709 event,
3710 vec![RunActionGraphNodeRecord {
3711 id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3712 label: binding.id.as_str().to_string(),
3713 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3714 status: "queued".to_string(),
3715 outcome: "budget_exhausted".to_string(),
3716 trace_id: Some(event.trace_id.0.clone()),
3717 stage_id: None,
3718 node_id: None,
3719 worker_id: None,
3720 run_id: None,
3721 run_path: None,
3722 metadata: dlq_node_metadata(binding, event, 0, final_error),
3723 }],
3724 vec![RunActionGraphEdgeRecord {
3725 from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3726 to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3727 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3728 label: Some("budget exhausted".to_string()),
3729 }],
3730 serde_json::json!({
3731 "source": "dispatcher",
3732 "trigger_id": binding.id.as_str(),
3733 "binding_key": binding.binding_key(),
3734 "event_id": event.id.0,
3735 "handler_kind": route.kind(),
3736 "target_uri": route.target_uri(),
3737 "final_error": final_error,
3738 "replay_of_event_id": replay_of_event_id,
3739 }),
3740 )
3741 .await?;
3742 self.append_lifecycle_event(
3743 "DlqMoved",
3744 event,
3745 binding,
3746 serde_json::json!({
3747 "event_id": event.id.0,
3748 "attempt_count": 0,
3749 "final_error": final_error,
3750 "reason": "budget_exhausted",
3751 "replay_of_event_id": replay_of_event_id,
3752 }),
3753 replay_of_event_id,
3754 )
3755 .await?;
3756 self.append_topic_event(
3757 TRIGGER_DLQ_TOPIC,
3758 "dlq_moved",
3759 event,
3760 Some(binding),
3761 None,
3762 serde_json::to_value(&dlq_entry)
3763 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3764 replay_of_event_id,
3765 )
3766 .await
3767 }
3768
3769 async fn move_circuit_open_to_dlq(
3770 &self,
3771 binding: &TriggerBinding,
3772 route: &DispatchUri,
3773 event: &TriggerEvent,
3774 replay_of_event_id: Option<&String>,
3775 final_error: &str,
3776 destination: &str,
3777 ) -> Result<(), DispatchError> {
3778 let dlq_entry = DlqEntry {
3779 trigger_id: binding.id.as_str().to_string(),
3780 binding_key: binding.binding_key(),
3781 event: event.clone(),
3782 attempt_count: 0,
3783 final_error: final_error.to_string(),
3784 attempts: Vec::new(),
3785 };
3786 self.state
3787 .dlq
3788 .lock()
3789 .expect("dispatcher dlq poisoned")
3790 .push(dlq_entry.clone());
3791 if let Some(metrics) = self.metrics.as_ref() {
3792 metrics.record_trigger_dlq(binding.id.as_str(), "circuit_open");
3793 metrics.record_trigger_accepted_to_dlq(
3794 binding.id.as_str(),
3795 &binding.binding_key(),
3796 event.provider.as_str(),
3797 tenant_id(event),
3798 "circuit_open",
3799 Duration::ZERO,
3800 );
3801 }
3802 tracing::info!(
3803 component = "dispatcher",
3804 lifecycle = "dlq_moved",
3805 trigger_id = %binding.id.as_str(),
3806 binding_key = %binding.binding_key(),
3807 event_id = %event.id.0,
3808 reason = "circuit_open",
3809 destination,
3810 trace_id = %event.trace_id.0
3811 );
3812 self.emit_action_graph(
3813 event,
3814 vec![RunActionGraphNodeRecord {
3815 id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3816 label: binding.id.as_str().to_string(),
3817 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3818 status: "queued".to_string(),
3819 outcome: "circuit_open".to_string(),
3820 trace_id: Some(event.trace_id.0.clone()),
3821 stage_id: None,
3822 node_id: None,
3823 worker_id: None,
3824 run_id: None,
3825 run_path: None,
3826 metadata: dlq_node_metadata(binding, event, 0, final_error),
3827 }],
3828 vec![RunActionGraphEdgeRecord {
3829 from_id: format!("trigger:{}", event.id.0),
3830 to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3831 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3832 label: Some("circuit open".to_string()),
3833 }],
3834 serde_json::json!({
3835 "source": "dispatcher",
3836 "trigger_id": binding.id.as_str(),
3837 "binding_key": binding.binding_key(),
3838 "event_id": event.id.0,
3839 "handler_kind": route.kind(),
3840 "target_uri": route.target_uri(),
3841 "destination": destination,
3842 "final_error": final_error,
3843 "replay_of_event_id": replay_of_event_id,
3844 }),
3845 )
3846 .await?;
3847 self.append_lifecycle_event(
3848 "DlqMoved",
3849 event,
3850 binding,
3851 serde_json::json!({
3852 "event_id": event.id.0,
3853 "attempt_count": 0,
3854 "final_error": final_error,
3855 "reason": "circuit_open",
3856 "destination": destination,
3857 "replay_of_event_id": replay_of_event_id,
3858 }),
3859 replay_of_event_id,
3860 )
3861 .await?;
3862 self.append_topic_event(
3863 TRIGGER_DLQ_TOPIC,
3864 "dlq_moved",
3865 event,
3866 Some(binding),
3867 None,
3868 serde_json::to_value(&dlq_entry)
3869 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3870 replay_of_event_id,
3871 )
3872 .await
3873 }
3874
3875 async fn append_skipped_outbox_event(
3876 &self,
3877 binding: &TriggerBinding,
3878 route: &DispatchUri,
3879 event: &TriggerEvent,
3880 replay_of_event_id: Option<&String>,
3881 stage: DispatchSkipStage,
3882 detail: serde_json::Value,
3883 ) -> Result<(), DispatchError> {
3884 self.append_topic_event(
3885 TRIGGER_OUTBOX_TOPIC,
3886 "dispatch_skipped",
3887 event,
3888 Some(binding),
3889 None,
3890 serde_json::json!({
3891 "event_id": event.id.0,
3892 "trigger_id": binding.id.as_str(),
3893 "binding_key": binding.binding_key(),
3894 "handler_kind": route.kind(),
3895 "target_uri": route.target_uri(),
3896 "skip_stage": stage.as_str(),
3897 "detail": detail,
3898 "replay_of_event_id": replay_of_event_id,
3899 }),
3900 replay_of_event_id,
3901 )
3902 .await
3903 }
3904
3905 async fn append_topic_event(
3906 &self,
3907 topic_name: &str,
3908 kind: &str,
3909 event: &TriggerEvent,
3910 binding: Option<&TriggerBinding>,
3911 attempt: Option<u32>,
3912 payload: serde_json::Value,
3913 replay_of_event_id: Option<&String>,
3914 ) -> Result<(), DispatchError> {
3915 let topic = Topic::new(topic_name)
3916 .expect("static trigger dispatcher topic names should always be valid");
3917 let headers = event_headers(event, binding, attempt, replay_of_event_id);
3918 self.event_log
3919 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3920 .await
3921 .map_err(DispatchError::from)
3922 .map(|_| ())
3923 }
3924
3925 async fn emit_action_graph(
3926 &self,
3927 event: &TriggerEvent,
3928 nodes: Vec<RunActionGraphNodeRecord>,
3929 edges: Vec<RunActionGraphEdgeRecord>,
3930 extra: serde_json::Value,
3931 ) -> Result<(), DispatchError> {
3932 let mut headers = BTreeMap::new();
3933 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3934 headers.insert("event_id".to_string(), event.id.0.clone());
3935 let observability = RunObservabilityRecord {
3936 schema_version: 1,
3937 action_graph_nodes: nodes,
3938 action_graph_edges: edges,
3939 ..Default::default()
3940 };
3941 append_action_graph_update(
3942 headers,
3943 serde_json::json!({
3944 "source": "dispatcher",
3945 "trace_id": event.trace_id.0,
3946 "event_id": event.id.0,
3947 "observability": observability,
3948 "context": extra,
3949 }),
3950 )
3951 .await
3952 .map_err(DispatchError::from)
3953 }
3954}
3955
3956async fn dispatch_cancel_requested(
3957 event_log: &Arc<AnyEventLog>,
3958 binding_key: &str,
3959 event_id: &str,
3960 replay_of_event_id: Option<&String>,
3961) -> Result<bool, DispatchError> {
3962 if replay_of_event_id.is_some() {
3963 return Ok(false);
3964 }
3965 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3966 .expect("static trigger cancel topic should always be valid");
3967 let events = event_log.read_range(&topic, None, usize::MAX).await?;
3968 let requested = events
3969 .into_iter()
3970 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3971 .filter_map(|(_, event)| {
3972 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3973 })
3974 .collect::<BTreeSet<_>>();
3975 Ok(requested
3976 .iter()
3977 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3978}
3979
3980async fn sleep_or_cancel_or_request(
3981 event_log: &Arc<AnyEventLog>,
3982 delay: Duration,
3983 binding_key: &str,
3984 event_id: &str,
3985 replay_of_event_id: Option<&String>,
3986 cancel_rx: &mut broadcast::Receiver<()>,
3987) -> Result<(), DispatchError> {
3988 let sleep = tokio::time::sleep(delay);
3989 pin_mut!(sleep);
3990 let mut poll = tokio::time::interval(Duration::from_millis(100));
3991 loop {
3992 tokio::select! {
3993 _ = &mut sleep => return Ok(()),
3994 _ = recv_cancel(cancel_rx) => {
3995 return Err(DispatchError::Cancelled(
3996 "dispatcher shutdown cancelled retry wait".to_string(),
3997 ));
3998 }
3999 _ = poll.tick() => {
4000 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
4001 return Err(DispatchError::Cancelled(
4002 "trigger cancel request cancelled retry wait".to_string(),
4003 ));
4004 }
4005 }
4006 }
4007 }
4008}
4009
4010fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
4011 let mut iter = events.into_iter();
4012 let Some(mut root) = iter.next() else {
4013 return Err(DispatchError::Registry(
4014 "batch dispatch produced an empty event list".to_string(),
4015 ));
4016 };
4017 let mut batch = Vec::new();
4018 batch.push(
4019 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
4020 );
4021 for event in iter {
4022 batch.push(
4023 serde_json::to_value(&event)
4024 .map_err(|error| DispatchError::Serde(error.to_string()))?,
4025 );
4026 }
4027 root.batch = Some(batch);
4028 Ok(root)
4029}
4030
4031fn json_value_to_gate(value: &serde_json::Value) -> String {
4032 match value {
4033 serde_json::Value::Null => "null".to_string(),
4034 serde_json::Value::String(text) => text.clone(),
4035 serde_json::Value::Bool(value) => value.to_string(),
4036 serde_json::Value::Number(value) => value.to_string(),
4037 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
4038 }
4039}
4040
4041fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
4042 let json =
4043 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4044 let value = json_to_vm_value(&json);
4045 match (&event.raw_body, value) {
4046 (Some(raw_body), VmValue::Dict(dict)) => {
4047 let mut map = (*dict).clone();
4048 map.insert(
4049 "raw_body".to_string(),
4050 VmValue::Bytes(Rc::new(raw_body.clone())),
4051 );
4052 Ok(VmValue::Dict(Rc::new(map)))
4053 }
4054 (_, other) => Ok(other),
4055 }
4056}
4057
4058fn decrement_in_flight(state: &DispatcherRuntimeState) {
4059 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
4060 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
4061 state.idle_notify.notify_waiters();
4062 }
4063}
4064
4065fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
4066 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
4067 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
4068 state.idle_notify.notify_waiters();
4069 }
4070}
4071
4072#[cfg(test)]
4073fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
4074 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4075 *slot.borrow_mut() = Some(tx);
4076 });
4077}
4078
4079#[cfg(not(test))]
4080fn notify_test_inbox_dequeued() {}
4081
4082#[cfg(test)]
4083fn notify_test_inbox_dequeued() {
4084 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
4085 if let Some(tx) = slot.borrow_mut().take() {
4086 let _ = tx.send(());
4087 }
4088 });
4089}
4090
4091pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
4092 event_log: &L,
4093 event: &TriggerEvent,
4094) -> Result<u64, DispatchError> {
4095 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
4096 .expect("static trigger.inbox.envelopes topic is valid");
4097 let headers = event_headers(event, None, None, None);
4098 let payload =
4099 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
4100 event_log
4101 .append(
4102 &topic,
4103 LogEvent::new("event_ingested", payload).with_headers(headers),
4104 )
4105 .await
4106 .map_err(DispatchError::from)
4107}
4108
4109pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
4110 ACTIVE_DISPATCHER_STATE.with(|slot| {
4111 slot.borrow()
4112 .as_ref()
4113 .map(|state| DispatcherStatsSnapshot {
4114 in_flight: state.in_flight.load(Ordering::Relaxed),
4115 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
4116 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
4117 })
4118 .unwrap_or_default()
4119 })
4120}
4121
4122pub fn clear_dispatcher_state() {
4123 ACTIVE_DISPATCHER_STATE.with(|slot| {
4124 *slot.borrow_mut() = None;
4125 });
4126 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
4127 *slot.borrow_mut() = None;
4128 });
4129}
4130
4131fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
4132 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
4133 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
4134 }
4135 if is_cancelled_vm_error(&error) {
4136 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
4137 }
4138 if let VmError::Thrown(VmValue::String(message)) = &error {
4139 return DispatchError::Local(message.to_string());
4140 }
4141 match error_to_category(&error) {
4142 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
4143 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
4144 ErrorCategory::Cancelled => {
4145 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
4146 }
4147 _ => DispatchError::Local(error.to_string()),
4148 }
4149}
4150
4151fn dispatch_error_label(error: &DispatchError) -> &'static str {
4152 match error {
4153 DispatchError::Denied(_) => "denied",
4154 DispatchError::Timeout(_) => "timeout",
4155 DispatchError::Waiting(_) => "waiting",
4156 DispatchError::Cancelled(_) => "cancelled",
4157 _ => "failed",
4158 }
4159}
4160
4161fn destination_circuit_key(route: &DispatchUri) -> String {
4162 format!("{}:{}", route.kind(), route.target_uri())
4163}
4164
4165fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
4166 match route {
4167 DispatchUri::Worker { .. } => "enqueued",
4168 DispatchUri::A2a { .. }
4169 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
4170 {
4171 "pending"
4172 }
4173 DispatchUri::A2a { .. } => "completed",
4174 DispatchUri::Local { .. } => "success",
4175 }
4176}
4177
4178fn dispatch_node_id(
4179 route: &DispatchUri,
4180 binding_key: &str,
4181 event_id: &str,
4182 attempt: u32,
4183) -> String {
4184 let prefix = match route {
4185 DispatchUri::A2a { .. } => "a2a",
4186 _ => "dispatch",
4187 };
4188 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
4189}
4190
4191fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
4192 match route {
4193 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
4194 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
4195 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
4196 }
4197}
4198
4199fn dispatch_node_label(route: &DispatchUri) -> String {
4200 match route {
4201 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
4202 _ => route.target_uri(),
4203 }
4204}
4205
4206fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
4207 match route {
4208 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
4209 _ => None,
4210 }
4211}
4212
4213fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
4214 match route {
4215 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
4216 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
4217 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
4218 }
4219}
4220
4221fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
4222 match status {
4223 crate::triggers::SignatureStatus::Verified => "verified",
4224 crate::triggers::SignatureStatus::Unsigned => "unsigned",
4225 crate::triggers::SignatureStatus::Failed { .. } => "failed",
4226 }
4227}
4228
4229fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
4230 let mut metadata = BTreeMap::new();
4231 metadata.insert(
4232 "provider".to_string(),
4233 serde_json::json!(event.provider.as_str()),
4234 );
4235 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
4236 metadata.insert(
4237 "dedupe_key".to_string(),
4238 serde_json::json!(event.dedupe_key),
4239 );
4240 metadata.insert(
4241 "signature_status".to_string(),
4242 serde_json::json!(signature_status_label(&event.signature_status)),
4243 );
4244 metadata
4245}
4246
4247fn predicate_node_metadata(
4248 binding: &TriggerBinding,
4249 predicate: &super::registry::TriggerPredicateSpec,
4250 event: &TriggerEvent,
4251 evaluation: &PredicateEvaluationRecord,
4252) -> BTreeMap<String, serde_json::Value> {
4253 let mut metadata = BTreeMap::new();
4254 metadata.insert(
4255 "trigger_id".to_string(),
4256 serde_json::json!(binding.id.as_str()),
4257 );
4258 metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
4259 metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
4260 metadata.insert(
4261 "cost_usd".to_string(),
4262 serde_json::json!(evaluation.cost_usd),
4263 );
4264 metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
4265 metadata.insert(
4266 "latency_ms".to_string(),
4267 serde_json::json!(evaluation.latency_ms),
4268 );
4269 metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
4270 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4271 if let Some(reason) = evaluation.reason.as_ref() {
4272 metadata.insert("reason".to_string(), serde_json::json!(reason));
4273 }
4274 metadata
4275}
4276
4277fn dispatch_node_metadata(
4278 route: &DispatchUri,
4279 binding: &TriggerBinding,
4280 event: &TriggerEvent,
4281 attempt: u32,
4282) -> BTreeMap<String, serde_json::Value> {
4283 let mut metadata = BTreeMap::new();
4284 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
4285 metadata.insert(
4286 "target_uri".to_string(),
4287 serde_json::json!(route.target_uri()),
4288 );
4289 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4290 metadata.insert(
4291 "trigger_id".to_string(),
4292 serde_json::json!(binding.id.as_str()),
4293 );
4294 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4295 if let Some(target_agent) = dispatch_target_agent(route) {
4296 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
4297 }
4298 if let DispatchUri::Worker { queue } = route {
4299 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
4300 }
4301 metadata
4302}
4303
4304fn dispatch_success_metadata(
4305 route: &DispatchUri,
4306 binding: &TriggerBinding,
4307 event: &TriggerEvent,
4308 attempt: u32,
4309 result: &serde_json::Value,
4310) -> BTreeMap<String, serde_json::Value> {
4311 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4312 match route {
4313 DispatchUri::A2a { .. } => {
4314 if let Some(task_id) = result
4315 .get("task_id")
4316 .or_else(|| result.get("id"))
4317 .and_then(|value| value.as_str())
4318 {
4319 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
4320 }
4321 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
4322 metadata.insert("state".to_string(), serde_json::json!(state));
4323 }
4324 }
4325 DispatchUri::Worker { .. } => {
4326 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
4327 {
4328 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
4329 }
4330 if let Some(response_topic) = result
4331 .get("response_topic")
4332 .and_then(|value| value.as_str())
4333 {
4334 metadata.insert(
4335 "response_topic".to_string(),
4336 serde_json::json!(response_topic),
4337 );
4338 }
4339 }
4340 DispatchUri::Local { .. } => {}
4341 }
4342 metadata
4343}
4344
4345fn dispatch_error_metadata(
4346 route: &DispatchUri,
4347 binding: &TriggerBinding,
4348 event: &TriggerEvent,
4349 attempt: u32,
4350 error: &DispatchError,
4351) -> BTreeMap<String, serde_json::Value> {
4352 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
4353 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4354 metadata
4355}
4356
4357fn retry_node_metadata(
4358 binding: &TriggerBinding,
4359 event: &TriggerEvent,
4360 attempt: u32,
4361 delay: Duration,
4362 error: &DispatchError,
4363) -> BTreeMap<String, serde_json::Value> {
4364 let mut metadata = BTreeMap::new();
4365 metadata.insert(
4366 "trigger_id".to_string(),
4367 serde_json::json!(binding.id.as_str()),
4368 );
4369 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4370 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
4371 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
4372 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
4373 metadata
4374}
4375
4376fn dlq_node_metadata(
4377 binding: &TriggerBinding,
4378 event: &TriggerEvent,
4379 attempt_count: u32,
4380 final_error: &str,
4381) -> BTreeMap<String, serde_json::Value> {
4382 let mut metadata = BTreeMap::new();
4383 metadata.insert(
4384 "trigger_id".to_string(),
4385 serde_json::json!(binding.id.as_str()),
4386 );
4387 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
4388 metadata.insert(
4389 "attempt_count".to_string(),
4390 serde_json::json!(attempt_count),
4391 );
4392 metadata.insert("final_error".to_string(), serde_json::json!(final_error));
4393 metadata
4394}
4395
4396fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
4397 match value {
4398 VmValue::Bool(result) => Ok(result),
4399 VmValue::EnumVariant {
4400 enum_name,
4401 variant,
4402 fields,
4403 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
4404 Some(VmValue::Bool(result)) => Ok(*result),
4405 Some(other) => Err(format!(
4406 "predicate Result.Ok payload must be bool, got {}",
4407 other.type_name()
4408 )),
4409 None => Err("predicate Result.Ok payload is missing".to_string()),
4410 },
4411 VmValue::EnumVariant {
4412 enum_name,
4413 variant,
4414 fields,
4415 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
4416 .first()
4417 .map(VmValue::display)
4418 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
4419 other => Err(format!(
4420 "predicate must return bool or Result<bool, _>, got {}",
4421 other.type_name()
4422 )),
4423 }
4424}
4425
4426fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
4427 micros_to_usd(
4428 binding
4429 .metrics
4430 .cost_today_usd_micros
4431 .load(Ordering::Relaxed),
4432 )
4433}
4434
4435fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
4436 micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
4437}
4438
4439fn split_binding_key(binding_key: &str) -> (String, u32) {
4440 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
4441 return (binding_key.to_string(), 0);
4442 };
4443 let version = suffix.parse::<u32>().unwrap_or(0);
4444 (binding_id.to_string(), version)
4445}
4446
4447fn binding_key_from_parts(trigger_id: &str, binding_version: Option<u32>) -> String {
4448 match binding_version {
4449 Some(version) => format!("{trigger_id}@v{version}"),
4450 None => trigger_id.to_string(),
4451 }
4452}
4453
4454fn tenant_id(event: &TriggerEvent) -> Option<&str> {
4455 event.tenant_id.as_ref().map(|tenant| tenant.0.as_str())
4456}
4457
4458fn current_unix_ms() -> i64 {
4459 unix_ms(time::OffsetDateTime::now_utc())
4460}
4461
4462fn unix_ms(timestamp: time::OffsetDateTime) -> i64 {
4463 (timestamp.unix_timestamp_nanos() / 1_000_000) as i64
4464}
4465
4466fn accepted_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4467 lifecycle_header_ms(headers, TRIGGER_ACCEPTED_AT_MS_HEADER)
4468 .unwrap_or_else(|| unix_ms(event.received_at))
4469}
4470
4471fn queue_appended_at_ms(headers: Option<&BTreeMap<String, String>>, event: &TriggerEvent) -> i64 {
4472 lifecycle_header_ms(headers, TRIGGER_QUEUE_APPENDED_AT_MS_HEADER)
4473 .unwrap_or_else(|| accepted_at_ms(headers, event))
4474}
4475
4476fn lifecycle_header_ms(headers: Option<&BTreeMap<String, String>>, name: &str) -> Option<i64> {
4477 headers
4478 .and_then(|headers| headers.get(name))
4479 .and_then(|value| value.parse::<i64>().ok())
4480}
4481
4482fn duration_between_ms(later_ms: i64, earlier_ms: i64) -> Duration {
4483 Duration::from_millis(later_ms.saturating_sub(earlier_ms).max(0) as u64)
4484}
4485
4486fn dispatch_result_status(result: &Result<serde_json::Value, DispatchError>) -> &'static str {
4487 match result {
4488 Ok(_) => "succeeded",
4489 Err(DispatchError::Waiting(_)) => "waiting",
4490 Err(DispatchError::Cancelled(_)) => "cancelled",
4491 Err(DispatchError::Denied(_)) => "denied",
4492 Err(DispatchError::Timeout(_)) => "timeout",
4493 Err(_) => "failed",
4494 }
4495}
4496
4497fn is_cancelled_vm_error(error: &VmError) -> bool {
4498 matches!(
4499 error,
4500 VmError::Thrown(VmValue::String(message))
4501 if message.starts_with("kind:cancelled:")
4502 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
4503}
4504
4505fn event_headers(
4506 event: &TriggerEvent,
4507 binding: Option<&TriggerBinding>,
4508 attempt: Option<u32>,
4509 replay_of_event_id: Option<&String>,
4510) -> BTreeMap<String, String> {
4511 let mut headers = BTreeMap::new();
4512 headers.insert("event_id".to_string(), event.id.0.clone());
4513 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
4514 headers.insert("provider".to_string(), event.provider.as_str().to_string());
4515 headers.insert("kind".to_string(), event.kind.clone());
4516 if let Some(replay_of_event_id) = replay_of_event_id {
4517 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
4518 }
4519 if let Some(binding) = binding {
4520 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
4521 headers.insert("binding_key".to_string(), binding.binding_key());
4522 headers.insert(
4523 "handler_kind".to_string(),
4524 DispatchUri::from(&binding.handler).kind().to_string(),
4525 );
4526 }
4527 if let Some(attempt) = attempt {
4528 headers.insert("attempt".to_string(), attempt.to_string());
4529 }
4530 headers
4531}
4532
4533fn worker_queue_priority(
4534 binding: &super::registry::TriggerBinding,
4535 event: &TriggerEvent,
4536) -> crate::WorkerQueuePriority {
4537 match event
4538 .headers
4539 .get("priority")
4540 .map(|value| value.trim().to_ascii_lowercase())
4541 .as_deref()
4542 {
4543 Some("high") => crate::WorkerQueuePriority::High,
4544 Some("low") => crate::WorkerQueuePriority::Low,
4545 _ => binding.dispatch_priority,
4546 }
4547}
4548
4549const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
4550
4551fn maybe_fail_before_outbox() {
4552 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
4553 std::process::exit(86);
4554 }
4555}
4556
4557fn now_rfc3339() -> String {
4558 time::OffsetDateTime::now_utc()
4559 .format(&time::format_description::well_known::Rfc3339)
4560 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4561}
4562
4563fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
4564 let now = time::OffsetDateTime::now_utc();
4565 let reset = if binding.hourly_cost_usd.is_some() {
4566 let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
4567 time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
4568 } else {
4569 let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
4570 time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
4571 };
4572 reset
4573 .format(&time::format_description::well_known::Rfc3339)
4574 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
4575}
4576
4577fn now_unix_ms() -> i64 {
4578 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
4579}
4580
4581fn cancelled_dispatch_outcome(
4582 binding: &TriggerBinding,
4583 route: &DispatchUri,
4584 event: &TriggerEvent,
4585 replay_of_event_id: Option<String>,
4586 attempt_count: u32,
4587 error: String,
4588) -> DispatchOutcome {
4589 DispatchOutcome {
4590 trigger_id: binding.id.as_str().to_string(),
4591 binding_key: binding.binding_key(),
4592 event_id: event.id.0.clone(),
4593 attempt_count,
4594 status: DispatchStatus::Cancelled,
4595 handler_kind: route.kind().to_string(),
4596 target_uri: route.target_uri(),
4597 replay_of_event_id,
4598 result: None,
4599 error: Some(error),
4600 }
4601}
4602
4603async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
4604 let _ = cancel_rx.recv().await;
4605}
4606
4607#[cfg(test)]
4608mod tests;