1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::{Mutex as AsyncMutex, Notify};
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25 ACTION_GRAPH_NODE_KIND_PREDICATE, ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26 ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{
30 append_trust_record, policy_for_autonomy_tier, AutonomyTier, TrustOutcome, TrustRecord,
31};
32use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
33use crate::vm::Vm;
34
35use self::uri::DispatchUri;
36use super::registry::{
37 binding_autonomy_budget_would_exceed, binding_budget_would_exceed,
38 expected_predicate_cost_usd_micros, matching_bindings, micros_to_usd, note_autonomous_decision,
39 note_orchestrator_budget_cost, orchestrator_budget_would_exceed, record_predicate_cost_sample,
40 reset_binding_budget_windows, usd_to_micros, TriggerBinding, TriggerBudgetExhaustionStrategy,
41 TriggerHandlerSpec,
42};
43use super::{
44 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
45 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
46 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
47 TRIGGER_OUTBOX_TOPIC,
48};
49use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
50
51mod flow_control;
52pub mod retry;
53pub mod uri;
54
55pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
56
57thread_local! {
58 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
59 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
60 static ACTIVE_DISPATCH_WAIT_LEASE: RefCell<Option<DispatchWaitLease>> = const { RefCell::new(None) };
61 #[cfg(test)]
62 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
63}
64
65tokio::task_local! {
66 static ACTIVE_DISPATCH_IS_REPLAY: bool;
67}
68
69#[derive(Clone, Debug)]
70pub(crate) struct DispatchContext {
71 pub trigger_event: TriggerEvent,
72 pub replay_of_event_id: Option<String>,
73 pub binding_id: String,
74 pub binding_version: u32,
75 pub agent_id: String,
76 pub action: String,
77 pub autonomy_tier: AutonomyTier,
78}
79
80struct DispatchExecutionPolicyGuard;
81
82impl Drop for DispatchExecutionPolicyGuard {
83 fn drop(&mut self) {
84 crate::orchestration::pop_execution_policy();
85 }
86}
87
88#[derive(Clone, Debug, Serialize, Deserialize)]
89struct PredicateCacheRecord {
90 trigger_id: String,
91 event_id: String,
92 entries: Vec<PredicateCacheEntry>,
93}
94
95#[derive(Clone, Debug, Default)]
96struct PredicateEvaluationRecord {
97 result: bool,
98 cost_usd: f64,
99 tokens: u64,
100 latency_ms: u64,
101 cached: bool,
102 reason: Option<String>,
103 exhaustion_strategy: Option<TriggerBudgetExhaustionStrategy>,
104}
105
106const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
107
108pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
109 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
110}
111
112pub(crate) fn current_dispatch_is_replay() -> bool {
113 ACTIVE_DISPATCH_IS_REPLAY
114 .try_with(|is_replay| *is_replay)
115 .unwrap_or(false)
116}
117
118pub(crate) fn current_dispatch_wait_lease() -> Option<DispatchWaitLease> {
119 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| slot.borrow().clone())
120}
121
122#[derive(Clone)]
123pub struct Dispatcher {
124 base_vm: Rc<Vm>,
125 event_log: Arc<AnyEventLog>,
126 cancel_tx: broadcast::Sender<()>,
127 state: Arc<DispatcherRuntimeState>,
128 metrics: Option<Arc<crate::MetricsRegistry>>,
129}
130
131#[derive(Debug)]
132struct DispatcherRuntimeState {
133 in_flight: AtomicU64,
134 retry_queue_depth: AtomicU64,
135 dlq: Mutex<Vec<DlqEntry>>,
136 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
137 shutting_down: std::sync::atomic::AtomicBool,
138 idle_notify: Notify,
139 flow_control: FlowControlManager,
140}
141
142impl DispatcherRuntimeState {
143 fn new(event_log: Arc<AnyEventLog>) -> Self {
144 Self {
145 in_flight: AtomicU64::new(0),
146 retry_queue_depth: AtomicU64::new(0),
147 dlq: Mutex::new(Vec::new()),
148 cancel_tokens: Mutex::new(Vec::new()),
149 shutting_down: std::sync::atomic::AtomicBool::new(false),
150 idle_notify: Notify::new(),
151 flow_control: FlowControlManager::new(event_log),
152 }
153 }
154}
155
156#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
157#[serde(default)]
158pub struct DispatcherStatsSnapshot {
159 pub in_flight: u64,
160 pub retry_queue_depth: u64,
161 pub dlq_depth: u64,
162}
163
164#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
165#[serde(rename_all = "snake_case")]
166pub enum DispatchStatus {
167 Succeeded,
168 Failed,
169 Dlq,
170 Skipped,
171 Waiting,
172 Cancelled,
173}
174
175impl DispatchStatus {
176 pub fn as_str(&self) -> &'static str {
177 match self {
178 Self::Succeeded => "succeeded",
179 Self::Failed => "failed",
180 Self::Dlq => "dlq",
181 Self::Skipped => "skipped",
182 Self::Waiting => "waiting",
183 Self::Cancelled => "cancelled",
184 }
185 }
186}
187
188#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
189#[serde(default)]
190pub struct DispatchOutcome {
191 pub trigger_id: String,
192 pub binding_key: String,
193 pub event_id: String,
194 pub attempt_count: u32,
195 pub status: DispatchStatus,
196 pub handler_kind: String,
197 pub target_uri: String,
198 pub replay_of_event_id: Option<String>,
199 pub result: Option<serde_json::Value>,
200 pub error: Option<String>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize)]
204pub struct InboxEnvelope {
205 pub trigger_id: Option<String>,
206 pub binding_version: Option<u32>,
207 pub event: TriggerEvent,
208}
209
210#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
211#[serde(default)]
212pub struct DispatcherDrainReport {
213 pub drained: bool,
214 pub in_flight: u64,
215 pub retry_queue_depth: u64,
216 pub dlq_depth: u64,
217}
218
219impl Default for DispatchOutcome {
220 fn default() -> Self {
221 Self {
222 trigger_id: String::new(),
223 binding_key: String::new(),
224 event_id: String::new(),
225 attempt_count: 0,
226 status: DispatchStatus::Failed,
227 handler_kind: String::new(),
228 target_uri: String::new(),
229 replay_of_event_id: None,
230 result: None,
231 error: None,
232 }
233 }
234}
235
236#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
237#[serde(default)]
238pub struct DispatchAttemptRecord {
239 pub trigger_id: String,
240 pub binding_key: String,
241 pub event_id: String,
242 pub attempt: u32,
243 pub handler_kind: String,
244 pub started_at: String,
245 pub completed_at: String,
246 pub outcome: String,
247 pub error_msg: Option<String>,
248}
249
250#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
251pub struct DispatchCancelRequest {
252 pub binding_key: String,
253 pub event_id: String,
254 #[serde(with = "time::serde::rfc3339")]
255 pub requested_at: time::OffsetDateTime,
256 #[serde(default)]
257 pub requested_by: Option<String>,
258 #[serde(default)]
259 pub audit_id: Option<String>,
260}
261
262#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
263pub struct DlqEntry {
264 pub trigger_id: String,
265 pub binding_key: String,
266 pub event: TriggerEvent,
267 pub attempt_count: u32,
268 pub final_error: String,
269 pub attempts: Vec<DispatchAttemptRecord>,
270}
271
272#[derive(Clone, Debug)]
273struct SingletonLease {
274 gate: String,
275 held: bool,
276}
277
278#[derive(Clone, Debug)]
279struct ConcurrencyLease {
280 gate: String,
281 max: u32,
282 priority_rank: usize,
283 permit: Option<ConcurrencyPermit>,
284}
285
286#[derive(Default, Debug)]
287struct AcquiredFlowControl {
288 singleton: Option<SingletonLease>,
289 concurrency: Option<ConcurrencyLease>,
290}
291
292#[derive(Clone)]
293pub(crate) struct DispatchWaitLease {
294 state: Arc<DispatcherRuntimeState>,
295 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
296 suspended: Arc<AtomicBool>,
297}
298
299impl DispatchWaitLease {
300 fn new(
301 state: Arc<DispatcherRuntimeState>,
302 acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
303 ) -> Self {
304 Self {
305 state,
306 acquired,
307 suspended: Arc::new(AtomicBool::new(false)),
308 }
309 }
310
311 pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
312 if self.suspended.swap(true, Ordering::SeqCst) {
313 return Ok(());
314 }
315 let (singleton_gate, concurrency_permit) = {
316 let mut acquired = self.acquired.lock().await;
317 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
318 if lease.held {
319 lease.held = false;
320 Some(lease.gate.clone())
321 } else {
322 None
323 }
324 });
325 let concurrency_permit = acquired
326 .concurrency
327 .as_mut()
328 .and_then(|lease| lease.permit.take());
329 (singleton_gate, concurrency_permit)
330 };
331
332 if let Some(gate) = singleton_gate {
333 self.state
334 .flow_control
335 .release_singleton(&gate)
336 .await
337 .map_err(DispatchError::from)?;
338 }
339 if let Some(permit) = concurrency_permit {
340 self.state
341 .flow_control
342 .release_concurrency(permit)
343 .await
344 .map_err(DispatchError::from)?;
345 }
346 Ok(())
347 }
348
349 pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
350 if !self.suspended.swap(false, Ordering::SeqCst) {
351 return Ok(());
352 }
353
354 let singleton_gate = {
355 let acquired = self.acquired.lock().await;
356 acquired.singleton.as_ref().and_then(|lease| {
357 if lease.held {
358 None
359 } else {
360 Some(lease.gate.clone())
361 }
362 })
363 };
364 if let Some(gate) = singleton_gate {
365 self.state
366 .flow_control
367 .acquire_singleton(&gate)
368 .await
369 .map_err(DispatchError::from)?;
370 let mut acquired = self.acquired.lock().await;
371 if let Some(lease) = acquired.singleton.as_mut() {
372 lease.held = true;
373 }
374 }
375
376 let concurrency_spec = {
377 let acquired = self.acquired.lock().await;
378 acquired.concurrency.as_ref().and_then(|lease| {
379 if lease.permit.is_some() {
380 None
381 } else {
382 Some((lease.gate.clone(), lease.max, lease.priority_rank))
383 }
384 })
385 };
386 if let Some((gate, max, priority_rank)) = concurrency_spec {
387 let permit = self
388 .state
389 .flow_control
390 .acquire_concurrency(&gate, max, priority_rank)
391 .await
392 .map_err(DispatchError::from)?;
393 let mut acquired = self.acquired.lock().await;
394 if let Some(lease) = acquired.concurrency.as_mut() {
395 lease.permit = Some(permit);
396 }
397 }
398 Ok(())
399 }
400}
401
402enum FlowControlOutcome {
403 Dispatch {
404 event: Box<TriggerEvent>,
405 acquired: AcquiredFlowControl,
406 },
407 Skip {
408 reason: String,
409 },
410}
411
412#[derive(Clone, Debug)]
413enum DispatchSkipStage {
414 Predicate,
415 FlowControl,
416}
417
418#[derive(Debug)]
419pub enum DispatchError {
420 EventLog(String),
421 Registry(String),
422 Serde(String),
423 Local(String),
424 A2a(String),
425 Denied(String),
426 Timeout(String),
427 Waiting(String),
428 Cancelled(String),
429 NotImplemented(String),
430}
431
432impl std::fmt::Display for DispatchError {
433 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
434 match self {
435 Self::EventLog(message)
436 | Self::Registry(message)
437 | Self::Serde(message)
438 | Self::Local(message)
439 | Self::A2a(message)
440 | Self::Denied(message)
441 | Self::Timeout(message)
442 | Self::Waiting(message)
443 | Self::Cancelled(message)
444 | Self::NotImplemented(message) => f.write_str(message),
445 }
446 }
447}
448
449impl std::error::Error for DispatchError {}
450
451impl DispatchError {
452 fn retryable(&self) -> bool {
453 !matches!(
454 self,
455 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
456 )
457 }
458}
459
460impl DispatchSkipStage {
461 fn as_str(&self) -> &'static str {
462 match self {
463 Self::Predicate => "predicate",
464 Self::FlowControl => "flow_control",
465 }
466 }
467}
468
469impl From<LogError> for DispatchError {
470 fn from(value: LogError) -> Self {
471 Self::EventLog(value.to_string())
472 }
473}
474
475pub async fn append_dispatch_cancel_request(
476 event_log: &Arc<AnyEventLog>,
477 request: &DispatchCancelRequest,
478) -> Result<u64, DispatchError> {
479 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
480 .expect("static trigger cancel topic should always be valid");
481 event_log
482 .append(
483 &topic,
484 LogEvent::new(
485 "dispatch_cancel_requested",
486 serde_json::to_value(request)
487 .map_err(|error| DispatchError::Serde(error.to_string()))?,
488 ),
489 )
490 .await
491 .map_err(DispatchError::from)
492}
493
494impl Dispatcher {
495 pub fn event_log_handle(&self) -> Arc<AnyEventLog> {
496 self.event_log.clone()
497 }
498
499 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
500 let event_log = active_event_log().ok_or_else(|| {
501 DispatchError::EventLog("dispatcher requires an active event log".to_string())
502 })?;
503 Ok(Self::with_event_log(base_vm, event_log))
504 }
505
506 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
507 Self::with_event_log_and_metrics(base_vm, event_log, None)
508 }
509
510 pub fn with_event_log_and_metrics(
511 base_vm: Vm,
512 event_log: Arc<AnyEventLog>,
513 metrics: Option<Arc<crate::MetricsRegistry>>,
514 ) -> Self {
515 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
516 ACTIVE_DISPATCHER_STATE.with(|slot| {
517 *slot.borrow_mut() = Some(state.clone());
518 });
519 let (cancel_tx, _) = broadcast::channel(32);
520 Self {
521 base_vm: Rc::new(base_vm),
522 event_log,
523 cancel_tx,
524 state,
525 metrics,
526 }
527 }
528
529 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
530 DispatcherStatsSnapshot {
531 in_flight: self.state.in_flight.load(Ordering::Relaxed),
532 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
533 dlq_depth: self
534 .state
535 .dlq
536 .lock()
537 .expect("dispatcher dlq poisoned")
538 .len() as u64,
539 }
540 }
541
542 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
543 self.state
544 .dlq
545 .lock()
546 .expect("dispatcher dlq poisoned")
547 .clone()
548 }
549
550 pub fn shutdown(&self) {
551 self.state.shutting_down.store(true, Ordering::SeqCst);
552 for token in self
553 .state
554 .cancel_tokens
555 .lock()
556 .expect("dispatcher cancel tokens poisoned")
557 .iter()
558 {
559 token.store(true, Ordering::SeqCst);
560 }
561 let _ = self.cancel_tx.send(());
562 }
563
564 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
565 self.enqueue_targeted(None, None, event).await
566 }
567
568 pub async fn enqueue_targeted(
569 &self,
570 trigger_id: Option<String>,
571 binding_version: Option<u32>,
572 event: TriggerEvent,
573 ) -> Result<u64, DispatchError> {
574 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
575 .expect("static trigger inbox envelopes topic is valid");
576 let headers = event_headers(&event, None, None, None);
577 let payload = serde_json::to_value(InboxEnvelope {
578 trigger_id,
579 binding_version,
580 event,
581 })
582 .map_err(|error| DispatchError::Serde(error.to_string()))?;
583 self.event_log
584 .append(
585 &topic,
586 LogEvent::new("event_ingested", payload).with_headers(headers),
587 )
588 .await
589 .map_err(DispatchError::from)
590 }
591
592 pub async fn run(&self) -> Result<(), DispatchError> {
593 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
594 .expect("static trigger inbox envelopes topic is valid");
595 let start_from = self.event_log.latest(&topic).await?;
596 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
597 pin_mut!(stream);
598 let mut cancel_rx = self.cancel_tx.subscribe();
599
600 loop {
601 tokio::select! {
602 received = stream.next() => {
603 let Some(received) = received else {
604 break;
605 };
606 let (_, event) = received.map_err(DispatchError::from)?;
607 if event.kind != "event_ingested" {
608 continue;
609 }
610 let parent_headers = event.headers.clone();
611 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
612 .map_err(|error| DispatchError::Serde(error.to_string()))?;
613 notify_test_inbox_dequeued();
614 let _ = self
615 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
616 .await;
617 }
618 _ = recv_cancel(&mut cancel_rx) => break,
619 }
620 }
621
622 Ok(())
623 }
624
625 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
626 let deadline = tokio::time::Instant::now() + timeout;
627 loop {
628 let snapshot = self.snapshot();
629 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
630 return Ok(DispatcherDrainReport {
631 drained: true,
632 in_flight: snapshot.in_flight,
633 retry_queue_depth: snapshot.retry_queue_depth,
634 dlq_depth: snapshot.dlq_depth,
635 });
636 }
637
638 let notified = self.state.idle_notify.notified();
639 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
640 if remaining.is_zero() {
641 return Ok(DispatcherDrainReport {
642 drained: false,
643 in_flight: snapshot.in_flight,
644 retry_queue_depth: snapshot.retry_queue_depth,
645 dlq_depth: snapshot.dlq_depth,
646 });
647 }
648 if tokio::time::timeout(remaining, notified).await.is_err() {
649 let snapshot = self.snapshot();
650 return Ok(DispatcherDrainReport {
651 drained: false,
652 in_flight: snapshot.in_flight,
653 retry_queue_depth: snapshot.retry_queue_depth,
654 dlq_depth: snapshot.dlq_depth,
655 });
656 }
657 }
658 }
659
660 pub async fn dispatch_inbox_envelope(
661 &self,
662 envelope: InboxEnvelope,
663 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
664 self.dispatch_inbox_envelope_with_headers(envelope, None)
665 .await
666 }
667
668 async fn dispatch_inbox_envelope_with_headers(
669 &self,
670 envelope: InboxEnvelope,
671 parent_headers: Option<&BTreeMap<String, String>>,
672 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
673 if let Some(trigger_id) = envelope.trigger_id {
674 let binding = super::registry::resolve_live_trigger_binding(
675 &trigger_id,
676 envelope.binding_version,
677 )
678 .map_err(|error| DispatchError::Registry(error.to_string()))?;
679 return Ok(vec![
680 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
681 .await?,
682 ]);
683 }
684
685 let cron_target = match &envelope.event.provider_payload {
686 crate::triggers::ProviderPayload::Known(
687 crate::triggers::event::KnownProviderPayload::Cron(payload),
688 ) => payload.cron_id.clone(),
689 _ => None,
690 };
691 if let Some(trigger_id) = cron_target {
692 let binding = super::registry::resolve_live_trigger_binding(
693 &trigger_id,
694 envelope.binding_version,
695 )
696 .map_err(|error| DispatchError::Registry(error.to_string()))?;
697 return Ok(vec![
698 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
699 .await?,
700 ]);
701 }
702
703 self.dispatch_event(envelope.event).await
704 }
705
706 pub async fn dispatch_event(
707 &self,
708 event: TriggerEvent,
709 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
710 let bindings = matching_bindings(&event);
711 let mut outcomes = Vec::new();
712 for binding in bindings {
713 outcomes.push(self.dispatch(&binding, event.clone()).await?);
714 }
715 Ok(outcomes)
716 }
717
718 pub async fn dispatch(
719 &self,
720 binding: &TriggerBinding,
721 event: TriggerEvent,
722 ) -> Result<DispatchOutcome, DispatchError> {
723 self.dispatch_with_replay(binding, event, None, None, None)
724 .await
725 }
726
727 pub async fn dispatch_replay(
728 &self,
729 binding: &TriggerBinding,
730 event: TriggerEvent,
731 replay_of_event_id: String,
732 ) -> Result<DispatchOutcome, DispatchError> {
733 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
734 .await
735 }
736
737 pub async fn dispatch_with_parent_span_id(
738 &self,
739 binding: &TriggerBinding,
740 event: TriggerEvent,
741 parent_span_id: Option<String>,
742 ) -> Result<DispatchOutcome, DispatchError> {
743 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
744 .await
745 }
746
747 async fn dispatch_with_replay(
748 &self,
749 binding: &TriggerBinding,
750 event: TriggerEvent,
751 replay_of_event_id: Option<String>,
752 parent_span_id: Option<String>,
753 parent_headers: Option<&BTreeMap<String, String>>,
754 ) -> Result<DispatchOutcome, DispatchError> {
755 let span = tracing::info_span!(
756 "dispatch",
757 trigger_id = %binding.id.as_str(),
758 binding_version = binding.version,
759 trace_id = %event.trace_id.0
760 );
761 #[cfg(feature = "otel")]
762 let span_for_otel = span.clone();
763 let _ = if let Some(headers) = parent_headers {
764 crate::observability::otel::set_span_parent_from_headers(
765 &span,
766 headers,
767 &event.trace_id,
768 parent_span_id.as_deref(),
769 )
770 } else {
771 crate::observability::otel::set_span_parent(
772 &span,
773 &event.trace_id,
774 parent_span_id.as_deref(),
775 )
776 };
777 #[cfg(feature = "otel")]
778 let started_at = Instant::now();
779 let metrics = self.metrics.clone();
780 let outcome = ACTIVE_DISPATCH_IS_REPLAY
781 .scope(
782 replay_of_event_id.is_some(),
783 self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
784 .instrument(span),
785 )
786 .await;
787 if let Some(metrics) = metrics.as_ref() {
788 match &outcome {
789 Ok(dispatch_outcome) => match dispatch_outcome.status {
790 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
791 metrics.record_dispatch_succeeded();
792 }
793 DispatchStatus::Waiting => {}
794 _ => metrics.record_dispatch_failed(),
795 },
796 Err(_) => metrics.record_dispatch_failed(),
797 }
798 let outcome_label = match &outcome {
799 Ok(dispatch_outcome) => dispatch_outcome.status.as_str(),
800 Err(DispatchError::Cancelled(_)) => "cancelled",
801 Err(_) => "failed",
802 };
803 metrics.record_trigger_dispatched(
804 binding.id.as_str(),
805 binding.handler.kind(),
806 outcome_label,
807 );
808 metrics.set_trigger_inflight(binding.id.as_str(), binding.metrics_snapshot().in_flight);
809 }
810 #[cfg(feature = "otel")]
811 {
812 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
813
814 let duration_ms = started_at.elapsed().as_millis() as i64;
815 let status = match &outcome {
816 Ok(dispatch_outcome) => match dispatch_outcome.status {
817 DispatchStatus::Succeeded => "succeeded",
818 DispatchStatus::Skipped => "skipped",
819 DispatchStatus::Waiting => "waiting",
820 DispatchStatus::Cancelled => "cancelled",
821 DispatchStatus::Failed => "failed",
822 DispatchStatus::Dlq => "dlq",
823 },
824 Err(DispatchError::Cancelled(_)) => "cancelled",
825 Err(_) => "failed",
826 };
827 span_for_otel.set_attribute("result.status", status);
828 span_for_otel.set_attribute("result.duration_ms", duration_ms);
829 }
830 outcome
831 }
832
833 async fn dispatch_with_replay_inner(
834 &self,
835 binding: &TriggerBinding,
836 event: TriggerEvent,
837 replay_of_event_id: Option<String>,
838 ) -> Result<DispatchOutcome, DispatchError> {
839 let autonomy_tier = crate::resolve_agent_autonomy_tier(
840 &self.event_log,
841 binding.id.as_str(),
842 binding.autonomy_tier,
843 )
844 .await
845 .unwrap_or(binding.autonomy_tier);
846 let binding_key = binding.binding_key();
847 let route = DispatchUri::from(&binding.handler);
848 let trigger_id = binding.id.as_str().to_string();
849 let event_id = event.id.0.clone();
850 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
851 let begin = if replay_of_event_id.is_some() {
852 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
853 } else {
854 begin_in_flight(binding.id.as_str(), binding.version)
855 };
856 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
857
858 let mut attempts = Vec::new();
859 let mut source_node_id = format!("trigger:{}", event.id.0);
860 let mut initial_nodes = Vec::new();
861 let mut initial_edges = Vec::new();
862 if let Some(original_event_id) = replay_of_event_id.as_ref() {
863 let original_node_id = format!("trigger:{original_event_id}");
864 initial_nodes.push(RunActionGraphNodeRecord {
865 id: original_node_id.clone(),
866 label: format!(
867 "{}:{} (original {})",
868 event.provider.as_str(),
869 event.kind,
870 original_event_id
871 ),
872 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
873 status: "historical".to_string(),
874 outcome: "replayed_from".to_string(),
875 trace_id: Some(event.trace_id.0.clone()),
876 stage_id: None,
877 node_id: None,
878 worker_id: None,
879 run_id: None,
880 run_path: None,
881 metadata: trigger_node_metadata(&event),
882 });
883 initial_edges.push(RunActionGraphEdgeRecord {
884 from_id: original_node_id,
885 to_id: source_node_id.clone(),
886 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
887 label: Some("replay chain".to_string()),
888 });
889 }
890 initial_nodes.push(RunActionGraphNodeRecord {
891 id: source_node_id.clone(),
892 label: format!("{}:{}", event.provider.as_str(), event.kind),
893 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
894 status: "received".to_string(),
895 outcome: "received".to_string(),
896 trace_id: Some(event.trace_id.0.clone()),
897 stage_id: None,
898 node_id: None,
899 worker_id: None,
900 run_id: None,
901 run_path: None,
902 metadata: trigger_node_metadata(&event),
903 });
904 self.emit_action_graph(
905 &event,
906 initial_nodes,
907 initial_edges,
908 serde_json::json!({
909 "source": "dispatcher",
910 "trigger_id": trigger_id,
911 "binding_key": binding_key,
912 "event_id": event_id,
913 "replay_of_event_id": replay_of_event_id,
914 }),
915 )
916 .await?;
917
918 if dispatch_cancel_requested(
919 &self.event_log,
920 &binding_key,
921 &event.id.0,
922 replay_of_event_id.as_ref(),
923 )
924 .await?
925 {
926 finish_in_flight(
927 binding.id.as_str(),
928 binding.version,
929 TriggerDispatchOutcome::Failed,
930 )
931 .await
932 .map_err(|error| DispatchError::Registry(error.to_string()))?;
933 decrement_in_flight(&self.state);
934 return Ok(cancelled_dispatch_outcome(
935 binding,
936 &route,
937 &event,
938 replay_of_event_id,
939 0,
940 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
941 ));
942 }
943
944 if let Some(predicate) = binding.when.as_ref() {
945 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
946 let evaluation = self
947 .evaluate_predicate(
948 binding,
949 predicate,
950 &event,
951 replay_of_event_id.as_ref(),
952 autonomy_tier,
953 )
954 .await?;
955 let passed = evaluation.result;
956 self.emit_action_graph(
957 &event,
958 vec![RunActionGraphNodeRecord {
959 id: predicate_node_id.clone(),
960 label: predicate.raw.clone(),
961 kind: ACTION_GRAPH_NODE_KIND_PREDICATE.to_string(),
962 status: "completed".to_string(),
963 outcome: passed.to_string(),
964 trace_id: Some(event.trace_id.0.clone()),
965 stage_id: None,
966 node_id: None,
967 worker_id: None,
968 run_id: None,
969 run_path: None,
970 metadata: predicate_node_metadata(binding, predicate, &event, &evaluation),
971 }],
972 vec![RunActionGraphEdgeRecord {
973 from_id: source_node_id.clone(),
974 to_id: predicate_node_id.clone(),
975 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
976 label: None,
977 }],
978 serde_json::json!({
979 "source": "dispatcher",
980 "trigger_id": binding.id.as_str(),
981 "binding_key": binding.binding_key(),
982 "event_id": event.id.0,
983 "predicate": predicate.raw,
984 "reason": evaluation.reason,
985 "cached": evaluation.cached,
986 "cost_usd": evaluation.cost_usd,
987 "tokens": evaluation.tokens,
988 "latency_ms": evaluation.latency_ms,
989 "replay_of_event_id": replay_of_event_id,
990 }),
991 )
992 .await?;
993
994 if !passed {
995 if evaluation.exhaustion_strategy == Some(TriggerBudgetExhaustionStrategy::Fail) {
996 let final_error = format!(
997 "trigger budget exhausted: {}",
998 evaluation.reason.as_deref().unwrap_or("budget_exhausted")
999 );
1000 self.move_budget_exhausted_to_dlq(
1001 binding,
1002 &route,
1003 &event,
1004 replay_of_event_id.as_ref(),
1005 &final_error,
1006 )
1007 .await?;
1008 finish_in_flight(
1009 binding.id.as_str(),
1010 binding.version,
1011 TriggerDispatchOutcome::Dlq,
1012 )
1013 .await
1014 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1015 decrement_in_flight(&self.state);
1016 self.append_dispatch_trust_record(
1017 binding,
1018 &route,
1019 &event,
1020 replay_of_event_id.as_ref(),
1021 autonomy_tier,
1022 TrustOutcome::Failure,
1023 "dlq",
1024 0,
1025 Some(final_error.clone()),
1026 )
1027 .await?;
1028 return Ok(DispatchOutcome {
1029 trigger_id: binding.id.as_str().to_string(),
1030 binding_key: binding.binding_key(),
1031 event_id: event.id.0,
1032 attempt_count: 0,
1033 status: DispatchStatus::Dlq,
1034 handler_kind: route.kind().to_string(),
1035 target_uri: route.target_uri(),
1036 replay_of_event_id,
1037 result: None,
1038 error: Some(final_error),
1039 });
1040 }
1041
1042 if evaluation.exhaustion_strategy
1043 == Some(TriggerBudgetExhaustionStrategy::RetryLater)
1044 {
1045 self.append_budget_deferred_event(
1046 binding,
1047 &route,
1048 &event,
1049 replay_of_event_id.as_ref(),
1050 evaluation.reason.as_deref().unwrap_or("budget_exhausted"),
1051 )
1052 .await?;
1053 finish_in_flight(
1054 binding.id.as_str(),
1055 binding.version,
1056 TriggerDispatchOutcome::Dispatched,
1057 )
1058 .await
1059 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1060 decrement_in_flight(&self.state);
1061 self.append_dispatch_trust_record(
1062 binding,
1063 &route,
1064 &event,
1065 replay_of_event_id.as_ref(),
1066 autonomy_tier,
1067 TrustOutcome::Denied,
1068 "waiting",
1069 0,
1070 evaluation.reason.clone(),
1071 )
1072 .await?;
1073 return Ok(DispatchOutcome {
1074 trigger_id: binding.id.as_str().to_string(),
1075 binding_key: binding.binding_key(),
1076 event_id: event.id.0,
1077 attempt_count: 0,
1078 status: DispatchStatus::Waiting,
1079 handler_kind: route.kind().to_string(),
1080 target_uri: route.target_uri(),
1081 replay_of_event_id,
1082 result: Some(serde_json::json!({
1083 "deferred": true,
1084 "predicate": predicate.raw,
1085 "reason": evaluation.reason,
1086 })),
1087 error: None,
1088 });
1089 }
1090
1091 self.append_skipped_outbox_event(
1092 binding,
1093 &route,
1094 &event,
1095 replay_of_event_id.as_ref(),
1096 DispatchSkipStage::Predicate,
1097 serde_json::json!({
1098 "predicate": predicate.raw,
1099 "reason": evaluation.reason,
1100 }),
1101 )
1102 .await?;
1103 finish_in_flight(
1104 binding.id.as_str(),
1105 binding.version,
1106 TriggerDispatchOutcome::Dispatched,
1107 )
1108 .await
1109 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1110 decrement_in_flight(&self.state);
1111 self.append_dispatch_trust_record(
1112 binding,
1113 &route,
1114 &event,
1115 replay_of_event_id.as_ref(),
1116 autonomy_tier,
1117 TrustOutcome::Denied,
1118 "skipped",
1119 0,
1120 None,
1121 )
1122 .await?;
1123 return Ok(DispatchOutcome {
1124 trigger_id: binding.id.as_str().to_string(),
1125 binding_key: binding.binding_key(),
1126 event_id: event.id.0,
1127 attempt_count: 0,
1128 status: DispatchStatus::Skipped,
1129 handler_kind: route.kind().to_string(),
1130 target_uri: route.target_uri(),
1131 replay_of_event_id,
1132 result: Some(serde_json::json!({
1133 "skipped": true,
1134 "predicate": predicate.raw,
1135 "reason": evaluation.reason,
1136 })),
1137 error: None,
1138 });
1139 }
1140
1141 source_node_id = predicate_node_id;
1142 }
1143
1144 if autonomy_tier == AutonomyTier::ActAuto {
1145 if let Some(reason) = binding_autonomy_budget_would_exceed(binding) {
1146 let request_id = self
1147 .append_autonomy_budget_approval_request(
1148 binding,
1149 &route,
1150 &event,
1151 replay_of_event_id.as_ref(),
1152 reason,
1153 )
1154 .await?;
1155 self.emit_autonomy_budget_approval_action_graph(
1156 binding,
1157 &route,
1158 &event,
1159 &source_node_id,
1160 replay_of_event_id.as_ref(),
1161 reason,
1162 &request_id,
1163 )
1164 .await?;
1165 finish_in_flight(
1166 binding.id.as_str(),
1167 binding.version,
1168 TriggerDispatchOutcome::Dispatched,
1169 )
1170 .await
1171 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1172 decrement_in_flight(&self.state);
1173 self.append_tier_transition_trust_record(
1174 binding,
1175 &event,
1176 replay_of_event_id.as_ref(),
1177 autonomy_tier,
1178 AutonomyTier::ActWithApproval,
1179 reason,
1180 &request_id,
1181 )
1182 .await?;
1183 self.append_dispatch_trust_record(
1184 binding,
1185 &route,
1186 &event,
1187 replay_of_event_id.as_ref(),
1188 autonomy_tier,
1189 TrustOutcome::Denied,
1190 "waiting",
1191 0,
1192 Some(reason.to_string()),
1193 )
1194 .await?;
1195 return Ok(DispatchOutcome {
1196 trigger_id: binding.id.as_str().to_string(),
1197 binding_key: binding.binding_key(),
1198 event_id: event.id.0,
1199 attempt_count: 0,
1200 status: DispatchStatus::Waiting,
1201 handler_kind: route.kind().to_string(),
1202 target_uri: route.target_uri(),
1203 replay_of_event_id,
1204 result: Some(serde_json::json!({
1205 "approval_required": true,
1206 "request_id": request_id,
1207 "reason": reason,
1208 "reviewers": [DEFAULT_AUTONOMY_BUDGET_REVIEWER],
1209 })),
1210 error: None,
1211 });
1212 }
1213 note_autonomous_decision(binding);
1214 }
1215
1216 let (event, acquired_flow) = match self
1217 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
1218 .await?
1219 {
1220 FlowControlOutcome::Dispatch { event, acquired } => {
1221 (*event, Arc::new(AsyncMutex::new(acquired)))
1222 }
1223 FlowControlOutcome::Skip { reason } => {
1224 self.append_skipped_outbox_event(
1225 binding,
1226 &route,
1227 &event,
1228 replay_of_event_id.as_ref(),
1229 DispatchSkipStage::FlowControl,
1230 serde_json::json!({
1231 "flow_control": reason,
1232 }),
1233 )
1234 .await?;
1235 finish_in_flight(
1236 binding.id.as_str(),
1237 binding.version,
1238 TriggerDispatchOutcome::Dispatched,
1239 )
1240 .await
1241 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1242 decrement_in_flight(&self.state);
1243 return Ok(DispatchOutcome {
1244 trigger_id: binding.id.as_str().to_string(),
1245 binding_key: binding.binding_key(),
1246 event_id: event.id.0,
1247 attempt_count: 0,
1248 status: DispatchStatus::Skipped,
1249 handler_kind: route.kind().to_string(),
1250 target_uri: route.target_uri(),
1251 replay_of_event_id,
1252 result: Some(serde_json::json!({
1253 "skipped": true,
1254 "flow_control": reason,
1255 })),
1256 error: None,
1257 });
1258 }
1259 };
1260
1261 let mut previous_retry_node = None;
1262 let max_attempts = binding.retry.max_attempts();
1263 for attempt in 1..=max_attempts {
1264 if dispatch_cancel_requested(
1265 &self.event_log,
1266 &binding_key,
1267 &event.id.0,
1268 replay_of_event_id.as_ref(),
1269 )
1270 .await?
1271 {
1272 finish_in_flight(
1273 binding.id.as_str(),
1274 binding.version,
1275 TriggerDispatchOutcome::Failed,
1276 )
1277 .await
1278 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1279 decrement_in_flight(&self.state);
1280 return Ok(cancelled_dispatch_outcome(
1281 binding,
1282 &route,
1283 &event,
1284 replay_of_event_id,
1285 attempt.saturating_sub(1),
1286 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
1287 ));
1288 }
1289 maybe_fail_before_outbox();
1290 let started_at = now_rfc3339();
1291 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
1292 self.append_lifecycle_event(
1293 "DispatchStarted",
1294 &event,
1295 binding,
1296 serde_json::json!({
1297 "event_id": event.id.0,
1298 "attempt": attempt,
1299 "handler_kind": route.kind(),
1300 "target_uri": route.target_uri(),
1301 "replay_of_event_id": replay_of_event_id,
1302 }),
1303 replay_of_event_id.as_ref(),
1304 )
1305 .await?;
1306 self.append_topic_event(
1307 TRIGGER_OUTBOX_TOPIC,
1308 "dispatch_started",
1309 &event,
1310 Some(binding),
1311 Some(attempt),
1312 serde_json::json!({
1313 "event_id": event.id.0,
1314 "attempt": attempt,
1315 "trigger_id": binding.id.as_str(),
1316 "binding_key": binding.binding_key(),
1317 "handler_kind": route.kind(),
1318 "target_uri": route.target_uri(),
1319 "replay_of_event_id": replay_of_event_id,
1320 }),
1321 replay_of_event_id.as_ref(),
1322 )
1323 .await?;
1324
1325 let mut dispatch_edges = Vec::new();
1326 if attempt == 1 {
1327 dispatch_edges.push(RunActionGraphEdgeRecord {
1328 from_id: source_node_id.clone(),
1329 to_id: attempt_node_id.clone(),
1330 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
1331 label: binding.when.as_ref().map(|_| "true".to_string()),
1332 });
1333 } else if let Some(retry_node_id) = previous_retry_node.take() {
1334 dispatch_edges.push(RunActionGraphEdgeRecord {
1335 from_id: retry_node_id,
1336 to_id: attempt_node_id.clone(),
1337 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1338 label: Some(format!("attempt {attempt}")),
1339 });
1340 }
1341
1342 self.emit_action_graph(
1343 &event,
1344 vec![RunActionGraphNodeRecord {
1345 id: attempt_node_id.clone(),
1346 label: dispatch_node_label(&route),
1347 kind: dispatch_node_kind(&route).to_string(),
1348 status: "running".to_string(),
1349 outcome: format!("attempt_{attempt}"),
1350 trace_id: Some(event.trace_id.0.clone()),
1351 stage_id: None,
1352 node_id: None,
1353 worker_id: None,
1354 run_id: None,
1355 run_path: None,
1356 metadata: dispatch_node_metadata(&route, binding, &event, attempt),
1357 }],
1358 dispatch_edges,
1359 serde_json::json!({
1360 "source": "dispatcher",
1361 "trigger_id": binding.id.as_str(),
1362 "binding_key": binding.binding_key(),
1363 "event_id": event.id.0,
1364 "attempt": attempt,
1365 "handler_kind": route.kind(),
1366 "target_uri": route.target_uri(),
1367 "target_agent": dispatch_target_agent(&route),
1368 "replay_of_event_id": replay_of_event_id,
1369 }),
1370 )
1371 .await?;
1372
1373 let result = self
1374 .dispatch_once(
1375 binding,
1376 &route,
1377 &event,
1378 autonomy_tier,
1379 Some(DispatchWaitLease::new(
1380 self.state.clone(),
1381 acquired_flow.clone(),
1382 )),
1383 &mut self.cancel_tx.subscribe(),
1384 )
1385 .await;
1386 let completed_at = now_rfc3339();
1387
1388 match result {
1389 Ok(result) => {
1390 let attempt_record = DispatchAttemptRecord {
1391 trigger_id: binding.id.as_str().to_string(),
1392 binding_key: binding.binding_key(),
1393 event_id: event.id.0.clone(),
1394 attempt,
1395 handler_kind: route.kind().to_string(),
1396 started_at,
1397 completed_at,
1398 outcome: "success".to_string(),
1399 error_msg: None,
1400 };
1401 attempts.push(attempt_record.clone());
1402 self.append_attempt_record(
1403 &event,
1404 binding,
1405 &attempt_record,
1406 replay_of_event_id.as_ref(),
1407 )
1408 .await?;
1409 self.append_lifecycle_event(
1410 "DispatchSucceeded",
1411 &event,
1412 binding,
1413 serde_json::json!({
1414 "event_id": event.id.0,
1415 "attempt": attempt,
1416 "handler_kind": route.kind(),
1417 "target_uri": route.target_uri(),
1418 "result": result,
1419 "replay_of_event_id": replay_of_event_id,
1420 }),
1421 replay_of_event_id.as_ref(),
1422 )
1423 .await?;
1424 self.append_topic_event(
1425 TRIGGER_OUTBOX_TOPIC,
1426 "dispatch_succeeded",
1427 &event,
1428 Some(binding),
1429 Some(attempt),
1430 serde_json::json!({
1431 "event_id": event.id.0,
1432 "attempt": attempt,
1433 "trigger_id": binding.id.as_str(),
1434 "binding_key": binding.binding_key(),
1435 "handler_kind": route.kind(),
1436 "target_uri": route.target_uri(),
1437 "result": result,
1438 "replay_of_event_id": replay_of_event_id,
1439 }),
1440 replay_of_event_id.as_ref(),
1441 )
1442 .await?;
1443 self.emit_action_graph(
1444 &event,
1445 vec![RunActionGraphNodeRecord {
1446 id: attempt_node_id.clone(),
1447 label: dispatch_node_label(&route),
1448 kind: dispatch_node_kind(&route).to_string(),
1449 status: "completed".to_string(),
1450 outcome: dispatch_success_outcome(&route, &result).to_string(),
1451 trace_id: Some(event.trace_id.0.clone()),
1452 stage_id: None,
1453 node_id: None,
1454 worker_id: None,
1455 run_id: None,
1456 run_path: None,
1457 metadata: dispatch_success_metadata(
1458 &route, binding, &event, attempt, &result,
1459 ),
1460 }],
1461 Vec::new(),
1462 serde_json::json!({
1463 "source": "dispatcher",
1464 "trigger_id": binding.id.as_str(),
1465 "binding_key": binding.binding_key(),
1466 "event_id": event.id.0,
1467 "attempt": attempt,
1468 "handler_kind": route.kind(),
1469 "target_uri": route.target_uri(),
1470 "result": result,
1471 "replay_of_event_id": replay_of_event_id,
1472 }),
1473 )
1474 .await?;
1475 finish_in_flight(
1476 binding.id.as_str(),
1477 binding.version,
1478 TriggerDispatchOutcome::Dispatched,
1479 )
1480 .await
1481 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1482 self.release_flow_control(&acquired_flow).await?;
1483 decrement_in_flight(&self.state);
1484 self.append_dispatch_trust_record(
1485 binding,
1486 &route,
1487 &event,
1488 replay_of_event_id.as_ref(),
1489 autonomy_tier,
1490 TrustOutcome::Success,
1491 "succeeded",
1492 attempt,
1493 None,
1494 )
1495 .await?;
1496 return Ok(DispatchOutcome {
1497 trigger_id: binding.id.as_str().to_string(),
1498 binding_key: binding.binding_key(),
1499 event_id: event.id.0,
1500 attempt_count: attempt,
1501 status: DispatchStatus::Succeeded,
1502 handler_kind: route.kind().to_string(),
1503 target_uri: route.target_uri(),
1504 replay_of_event_id,
1505 result: Some(result),
1506 error: None,
1507 });
1508 }
1509 Err(error) => {
1510 let attempt_record = DispatchAttemptRecord {
1511 trigger_id: binding.id.as_str().to_string(),
1512 binding_key: binding.binding_key(),
1513 event_id: event.id.0.clone(),
1514 attempt,
1515 handler_kind: route.kind().to_string(),
1516 started_at,
1517 completed_at,
1518 outcome: dispatch_error_label(&error).to_string(),
1519 error_msg: Some(error.to_string()),
1520 };
1521 attempts.push(attempt_record.clone());
1522 self.append_attempt_record(
1523 &event,
1524 binding,
1525 &attempt_record,
1526 replay_of_event_id.as_ref(),
1527 )
1528 .await?;
1529 if let DispatchError::Waiting(message) = &error {
1530 self.append_lifecycle_event(
1531 "DispatchWaiting",
1532 &event,
1533 binding,
1534 serde_json::json!({
1535 "event_id": event.id.0,
1536 "attempt": attempt,
1537 "handler_kind": route.kind(),
1538 "target_uri": route.target_uri(),
1539 "message": message,
1540 "replay_of_event_id": replay_of_event_id,
1541 }),
1542 replay_of_event_id.as_ref(),
1543 )
1544 .await?;
1545 self.append_topic_event(
1546 TRIGGER_OUTBOX_TOPIC,
1547 "dispatch_waiting",
1548 &event,
1549 Some(binding),
1550 Some(attempt),
1551 serde_json::json!({
1552 "event_id": event.id.0,
1553 "attempt": attempt,
1554 "trigger_id": binding.id.as_str(),
1555 "binding_key": binding.binding_key(),
1556 "handler_kind": route.kind(),
1557 "target_uri": route.target_uri(),
1558 "message": message,
1559 "replay_of_event_id": replay_of_event_id,
1560 }),
1561 replay_of_event_id.as_ref(),
1562 )
1563 .await?;
1564 finish_in_flight(
1565 binding.id.as_str(),
1566 binding.version,
1567 TriggerDispatchOutcome::Dispatched,
1568 )
1569 .await
1570 .map_err(|registry_error| {
1571 DispatchError::Registry(registry_error.to_string())
1572 })?;
1573 self.release_flow_control(&acquired_flow).await?;
1574 decrement_in_flight(&self.state);
1575 return Ok(DispatchOutcome {
1576 trigger_id: binding.id.as_str().to_string(),
1577 binding_key: binding.binding_key(),
1578 event_id: event.id.0,
1579 attempt_count: attempt,
1580 status: DispatchStatus::Waiting,
1581 handler_kind: route.kind().to_string(),
1582 target_uri: route.target_uri(),
1583 replay_of_event_id,
1584 result: Some(serde_json::json!({
1585 "waiting": true,
1586 "message": message,
1587 })),
1588 error: None,
1589 });
1590 }
1591
1592 self.append_lifecycle_event(
1593 "DispatchFailed",
1594 &event,
1595 binding,
1596 serde_json::json!({
1597 "event_id": event.id.0,
1598 "attempt": attempt,
1599 "handler_kind": route.kind(),
1600 "target_uri": route.target_uri(),
1601 "error": error.to_string(),
1602 "replay_of_event_id": replay_of_event_id,
1603 }),
1604 replay_of_event_id.as_ref(),
1605 )
1606 .await?;
1607 self.append_topic_event(
1608 TRIGGER_OUTBOX_TOPIC,
1609 "dispatch_failed",
1610 &event,
1611 Some(binding),
1612 Some(attempt),
1613 serde_json::json!({
1614 "event_id": event.id.0,
1615 "attempt": attempt,
1616 "trigger_id": binding.id.as_str(),
1617 "binding_key": binding.binding_key(),
1618 "handler_kind": route.kind(),
1619 "target_uri": route.target_uri(),
1620 "error": error.to_string(),
1621 "replay_of_event_id": replay_of_event_id,
1622 }),
1623 replay_of_event_id.as_ref(),
1624 )
1625 .await?;
1626 self.emit_action_graph(
1627 &event,
1628 vec![RunActionGraphNodeRecord {
1629 id: attempt_node_id.clone(),
1630 label: dispatch_node_label(&route),
1631 kind: dispatch_node_kind(&route).to_string(),
1632 status: if matches!(error, DispatchError::Cancelled(_)) {
1633 "cancelled".to_string()
1634 } else {
1635 "failed".to_string()
1636 },
1637 outcome: dispatch_error_label(&error).to_string(),
1638 trace_id: Some(event.trace_id.0.clone()),
1639 stage_id: None,
1640 node_id: None,
1641 worker_id: None,
1642 run_id: None,
1643 run_path: None,
1644 metadata: dispatch_error_metadata(
1645 &route, binding, &event, attempt, &error,
1646 ),
1647 }],
1648 Vec::new(),
1649 serde_json::json!({
1650 "source": "dispatcher",
1651 "trigger_id": binding.id.as_str(),
1652 "binding_key": binding.binding_key(),
1653 "event_id": event.id.0,
1654 "attempt": attempt,
1655 "handler_kind": route.kind(),
1656 "target_uri": route.target_uri(),
1657 "error": error.to_string(),
1658 "replay_of_event_id": replay_of_event_id,
1659 }),
1660 )
1661 .await?;
1662
1663 if !error.retryable() {
1664 finish_in_flight(
1665 binding.id.as_str(),
1666 binding.version,
1667 TriggerDispatchOutcome::Failed,
1668 )
1669 .await
1670 .map_err(|registry_error| {
1671 DispatchError::Registry(registry_error.to_string())
1672 })?;
1673 self.release_flow_control(&acquired_flow).await?;
1674 decrement_in_flight(&self.state);
1675 let trust_outcome = match error {
1676 DispatchError::Denied(_) => TrustOutcome::Denied,
1677 DispatchError::Timeout(_) => TrustOutcome::Timeout,
1678 _ => TrustOutcome::Failure,
1679 };
1680 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1681 "cancelled"
1682 } else {
1683 "failed"
1684 };
1685 self.append_dispatch_trust_record(
1686 binding,
1687 &route,
1688 &event,
1689 replay_of_event_id.as_ref(),
1690 autonomy_tier,
1691 trust_outcome,
1692 terminal_status,
1693 attempt,
1694 Some(error.to_string()),
1695 )
1696 .await?;
1697 return Ok(DispatchOutcome {
1698 trigger_id: binding.id.as_str().to_string(),
1699 binding_key: binding.binding_key(),
1700 event_id: event.id.0,
1701 attempt_count: attempt,
1702 status: if matches!(error, DispatchError::Cancelled(_)) {
1703 DispatchStatus::Cancelled
1704 } else {
1705 DispatchStatus::Failed
1706 },
1707 handler_kind: route.kind().to_string(),
1708 target_uri: route.target_uri(),
1709 replay_of_event_id,
1710 result: None,
1711 error: Some(error.to_string()),
1712 });
1713 }
1714
1715 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1716 if let Some(metrics) = self.metrics.as_ref() {
1717 metrics.record_retry_scheduled();
1718 metrics.record_trigger_retry(binding.id.as_str(), attempt + 1);
1719 }
1720 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1721 previous_retry_node = Some(retry_node_id.clone());
1722 self.emit_action_graph(
1723 &event,
1724 vec![RunActionGraphNodeRecord {
1725 id: retry_node_id.clone(),
1726 label: format!("retry in {}ms", delay.as_millis()),
1727 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1728 status: "scheduled".to_string(),
1729 outcome: format!("attempt_{}", attempt + 1),
1730 trace_id: Some(event.trace_id.0.clone()),
1731 stage_id: None,
1732 node_id: None,
1733 worker_id: None,
1734 run_id: None,
1735 run_path: None,
1736 metadata: retry_node_metadata(
1737 binding,
1738 &event,
1739 attempt + 1,
1740 delay,
1741 &error,
1742 ),
1743 }],
1744 vec![RunActionGraphEdgeRecord {
1745 from_id: attempt_node_id,
1746 to_id: retry_node_id.clone(),
1747 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1748 label: Some(format!("attempt {}", attempt + 1)),
1749 }],
1750 serde_json::json!({
1751 "source": "dispatcher",
1752 "trigger_id": binding.id.as_str(),
1753 "binding_key": binding.binding_key(),
1754 "event_id": event.id.0,
1755 "attempt": attempt + 1,
1756 "delay_ms": delay.as_millis(),
1757 "replay_of_event_id": replay_of_event_id,
1758 }),
1759 )
1760 .await?;
1761 self.append_lifecycle_event(
1762 "RetryScheduled",
1763 &event,
1764 binding,
1765 serde_json::json!({
1766 "event_id": event.id.0,
1767 "attempt": attempt + 1,
1768 "delay_ms": delay.as_millis(),
1769 "error": error.to_string(),
1770 "replay_of_event_id": replay_of_event_id,
1771 }),
1772 replay_of_event_id.as_ref(),
1773 )
1774 .await?;
1775 self.append_topic_event(
1776 TRIGGER_ATTEMPTS_TOPIC,
1777 "retry_scheduled",
1778 &event,
1779 Some(binding),
1780 Some(attempt + 1),
1781 serde_json::json!({
1782 "event_id": event.id.0,
1783 "attempt": attempt + 1,
1784 "trigger_id": binding.id.as_str(),
1785 "binding_key": binding.binding_key(),
1786 "delay_ms": delay.as_millis(),
1787 "error": error.to_string(),
1788 "replay_of_event_id": replay_of_event_id,
1789 }),
1790 replay_of_event_id.as_ref(),
1791 )
1792 .await?;
1793 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1794 let sleep_result = sleep_or_cancel_or_request(
1795 &self.event_log,
1796 delay,
1797 &binding_key,
1798 &event.id.0,
1799 replay_of_event_id.as_ref(),
1800 &mut self.cancel_tx.subscribe(),
1801 )
1802 .await;
1803 decrement_retry_queue_depth(&self.state);
1804 if sleep_result.is_err() {
1805 finish_in_flight(
1806 binding.id.as_str(),
1807 binding.version,
1808 TriggerDispatchOutcome::Failed,
1809 )
1810 .await
1811 .map_err(|registry_error| {
1812 DispatchError::Registry(registry_error.to_string())
1813 })?;
1814 self.release_flow_control(&acquired_flow).await?;
1815 decrement_in_flight(&self.state);
1816 self.append_dispatch_trust_record(
1817 binding,
1818 &route,
1819 &event,
1820 replay_of_event_id.as_ref(),
1821 autonomy_tier,
1822 TrustOutcome::Failure,
1823 "cancelled",
1824 attempt,
1825 Some("dispatcher shutdown cancelled retry wait".to_string()),
1826 )
1827 .await?;
1828 return Ok(DispatchOutcome {
1829 trigger_id: binding.id.as_str().to_string(),
1830 binding_key: binding.binding_key(),
1831 event_id: event.id.0,
1832 attempt_count: attempt,
1833 status: DispatchStatus::Cancelled,
1834 handler_kind: route.kind().to_string(),
1835 target_uri: route.target_uri(),
1836 replay_of_event_id,
1837 result: None,
1838 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1839 });
1840 }
1841 continue;
1842 }
1843
1844 let final_error = error.to_string();
1845 let dlq_entry = DlqEntry {
1846 trigger_id: binding.id.as_str().to_string(),
1847 binding_key: binding.binding_key(),
1848 event: event.clone(),
1849 attempt_count: attempt,
1850 final_error: final_error.clone(),
1851 attempts: attempts.clone(),
1852 };
1853 self.state
1854 .dlq
1855 .lock()
1856 .expect("dispatcher dlq poisoned")
1857 .push(dlq_entry.clone());
1858 if let Some(metrics) = self.metrics.as_ref() {
1859 metrics.record_trigger_dlq(binding.id.as_str(), "retry_exhausted");
1860 }
1861 self.emit_action_graph(
1862 &event,
1863 vec![RunActionGraphNodeRecord {
1864 id: format!("dlq:{binding_key}:{}", event.id.0),
1865 label: binding.id.as_str().to_string(),
1866 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1867 status: "queued".to_string(),
1868 outcome: "retry_exhausted".to_string(),
1869 trace_id: Some(event.trace_id.0.clone()),
1870 stage_id: None,
1871 node_id: None,
1872 worker_id: None,
1873 run_id: None,
1874 run_path: None,
1875 metadata: dlq_node_metadata(binding, &event, attempt, &final_error),
1876 }],
1877 vec![RunActionGraphEdgeRecord {
1878 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1879 to_id: format!("dlq:{binding_key}:{}", event.id.0),
1880 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1881 label: Some(format!("{attempt} attempts")),
1882 }],
1883 serde_json::json!({
1884 "source": "dispatcher",
1885 "trigger_id": binding.id.as_str(),
1886 "binding_key": binding.binding_key(),
1887 "event_id": event.id.0,
1888 "attempt_count": attempt,
1889 "final_error": final_error,
1890 "replay_of_event_id": replay_of_event_id,
1891 }),
1892 )
1893 .await?;
1894 self.append_lifecycle_event(
1895 "DlqMoved",
1896 &event,
1897 binding,
1898 serde_json::json!({
1899 "event_id": event.id.0,
1900 "attempt_count": attempt,
1901 "final_error": dlq_entry.final_error,
1902 "replay_of_event_id": replay_of_event_id,
1903 }),
1904 replay_of_event_id.as_ref(),
1905 )
1906 .await?;
1907 self.append_topic_event(
1908 TRIGGER_DLQ_TOPIC,
1909 "dlq_moved",
1910 &event,
1911 Some(binding),
1912 Some(attempt),
1913 serde_json::to_value(&dlq_entry)
1914 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1915 replay_of_event_id.as_ref(),
1916 )
1917 .await?;
1918 finish_in_flight(
1919 binding.id.as_str(),
1920 binding.version,
1921 TriggerDispatchOutcome::Dlq,
1922 )
1923 .await
1924 .map_err(|registry_error| {
1925 DispatchError::Registry(registry_error.to_string())
1926 })?;
1927 self.release_flow_control(&acquired_flow).await?;
1928 decrement_in_flight(&self.state);
1929 self.append_dispatch_trust_record(
1930 binding,
1931 &route,
1932 &event,
1933 replay_of_event_id.as_ref(),
1934 autonomy_tier,
1935 TrustOutcome::Failure,
1936 "dlq",
1937 attempt,
1938 Some(error.to_string()),
1939 )
1940 .await?;
1941 return Ok(DispatchOutcome {
1942 trigger_id: binding.id.as_str().to_string(),
1943 binding_key: binding.binding_key(),
1944 event_id: event.id.0,
1945 attempt_count: attempt,
1946 status: DispatchStatus::Dlq,
1947 handler_kind: route.kind().to_string(),
1948 target_uri: route.target_uri(),
1949 replay_of_event_id,
1950 result: None,
1951 error: Some(error.to_string()),
1952 });
1953 }
1954 }
1955 }
1956
1957 finish_in_flight(
1958 binding.id.as_str(),
1959 binding.version,
1960 TriggerDispatchOutcome::Failed,
1961 )
1962 .await
1963 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1964 self.release_flow_control(&acquired_flow).await?;
1965 decrement_in_flight(&self.state);
1966 self.append_dispatch_trust_record(
1967 binding,
1968 &route,
1969 &event,
1970 replay_of_event_id.as_ref(),
1971 autonomy_tier,
1972 TrustOutcome::Failure,
1973 "failed",
1974 max_attempts,
1975 Some("dispatch exhausted without terminal outcome".to_string()),
1976 )
1977 .await?;
1978 Ok(DispatchOutcome {
1979 trigger_id: binding.id.as_str().to_string(),
1980 binding_key: binding.binding_key(),
1981 event_id: event.id.0,
1982 attempt_count: max_attempts,
1983 status: DispatchStatus::Failed,
1984 handler_kind: route.kind().to_string(),
1985 target_uri: route.target_uri(),
1986 replay_of_event_id,
1987 result: None,
1988 error: Some("dispatch exhausted without terminal outcome".to_string()),
1989 })
1990 }
1991
1992 async fn dispatch_once(
1993 &self,
1994 binding: &TriggerBinding,
1995 route: &DispatchUri,
1996 event: &TriggerEvent,
1997 autonomy_tier: AutonomyTier,
1998 wait_lease: Option<DispatchWaitLease>,
1999 cancel_rx: &mut broadcast::Receiver<()>,
2000 ) -> Result<serde_json::Value, DispatchError> {
2001 match route {
2002 DispatchUri::Local { .. } => {
2003 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
2004 return Err(DispatchError::Local(format!(
2005 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
2006 binding.id.as_str()
2007 )));
2008 };
2009 let value = self
2010 .invoke_vm_callable(
2011 closure,
2012 &binding.binding_key(),
2013 event,
2014 None,
2015 binding.id.as_str(),
2016 &format!("{}.{}", event.provider.as_str(), event.kind),
2017 autonomy_tier,
2018 wait_lease,
2019 cancel_rx,
2020 )
2021 .await?;
2022 Ok(vm_value_to_json(&value))
2023 }
2024 DispatchUri::A2a {
2025 target,
2026 allow_cleartext,
2027 } => {
2028 if self.state.shutting_down.load(Ordering::SeqCst) {
2029 return Err(DispatchError::Cancelled(
2030 "dispatcher shutdown cancelled A2A dispatch".to_string(),
2031 ));
2032 }
2033 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
2034 target,
2035 *allow_cleartext,
2036 binding.id.as_str(),
2037 &binding.binding_key(),
2038 event,
2039 cancel_rx,
2040 )
2041 .await
2042 .map_err(|error| match error {
2043 crate::a2a::A2aClientError::Cancelled(message) => {
2044 DispatchError::Cancelled(message)
2045 }
2046 other => DispatchError::A2a(other.to_string()),
2047 })?;
2048 match ack {
2049 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
2050 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
2051 }
2052 }
2053 DispatchUri::Worker { queue } => {
2054 let receipt = crate::WorkerQueue::new(self.event_log.clone())
2055 .enqueue(&crate::WorkerQueueJob {
2056 queue: queue.clone(),
2057 trigger_id: binding.id.as_str().to_string(),
2058 binding_key: binding.binding_key(),
2059 binding_version: binding.version,
2060 event: event.clone(),
2061 replay_of_event_id: current_dispatch_context()
2062 .and_then(|context| context.replay_of_event_id),
2063 priority: worker_queue_priority(binding, event),
2064 })
2065 .await
2066 .map_err(DispatchError::from)?;
2067 Ok(serde_json::to_value(receipt)
2068 .map_err(|error| DispatchError::Serde(error.to_string()))?)
2069 }
2070 }
2071 }
2072
2073 async fn apply_flow_control(
2074 &self,
2075 binding: &TriggerBinding,
2076 event: &TriggerEvent,
2077 replay_of_event_id: Option<&String>,
2078 ) -> Result<FlowControlOutcome, DispatchError> {
2079 let flow = &binding.flow_control;
2080 let mut managed_event = event.clone();
2081
2082 if let Some(batch) = &flow.batch {
2083 let gate = self
2084 .resolve_flow_gate(
2085 &binding.binding_key(),
2086 batch.key.as_ref(),
2087 &managed_event,
2088 replay_of_event_id,
2089 )
2090 .await?;
2091 match self
2092 .state
2093 .flow_control
2094 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
2095 .await
2096 .map_err(DispatchError::from)?
2097 {
2098 BatchDecision::Dispatch(events) => {
2099 managed_event = build_batched_event(events)?;
2100 }
2101 BatchDecision::Merged => {
2102 return Ok(FlowControlOutcome::Skip {
2103 reason: "batch_merged".to_string(),
2104 })
2105 }
2106 }
2107 }
2108
2109 if let Some(debounce) = &flow.debounce {
2110 let gate = self
2111 .resolve_flow_gate(
2112 &binding.binding_key(),
2113 Some(&debounce.key),
2114 &managed_event,
2115 replay_of_event_id,
2116 )
2117 .await?;
2118 let latest = self
2119 .state
2120 .flow_control
2121 .debounce(&gate, debounce.period)
2122 .await
2123 .map_err(DispatchError::from)?;
2124 if !latest {
2125 return Ok(FlowControlOutcome::Skip {
2126 reason: "debounced".to_string(),
2127 });
2128 }
2129 }
2130
2131 if let Some(rate_limit) = &flow.rate_limit {
2132 let gate = self
2133 .resolve_flow_gate(
2134 &binding.binding_key(),
2135 rate_limit.key.as_ref(),
2136 &managed_event,
2137 replay_of_event_id,
2138 )
2139 .await?;
2140 let allowed = self
2141 .state
2142 .flow_control
2143 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
2144 .await
2145 .map_err(DispatchError::from)?;
2146 if !allowed {
2147 return Ok(FlowControlOutcome::Skip {
2148 reason: "rate_limited".to_string(),
2149 });
2150 }
2151 }
2152
2153 if let Some(throttle) = &flow.throttle {
2154 let gate = self
2155 .resolve_flow_gate(
2156 &binding.binding_key(),
2157 throttle.key.as_ref(),
2158 &managed_event,
2159 replay_of_event_id,
2160 )
2161 .await?;
2162 self.state
2163 .flow_control
2164 .wait_for_throttle(&gate, throttle.period, throttle.max)
2165 .await
2166 .map_err(DispatchError::from)?;
2167 }
2168
2169 let mut acquired = AcquiredFlowControl::default();
2170 if let Some(singleton) = &flow.singleton {
2171 let gate = self
2172 .resolve_flow_gate(
2173 &binding.binding_key(),
2174 singleton.key.as_ref(),
2175 &managed_event,
2176 replay_of_event_id,
2177 )
2178 .await?;
2179 let acquired_singleton = self
2180 .state
2181 .flow_control
2182 .try_acquire_singleton(&gate)
2183 .await
2184 .map_err(DispatchError::from)?;
2185 if !acquired_singleton {
2186 return Ok(FlowControlOutcome::Skip {
2187 reason: "singleton_active".to_string(),
2188 });
2189 }
2190 acquired.singleton = Some(SingletonLease { gate, held: true });
2191 }
2192
2193 if let Some(concurrency) = &flow.concurrency {
2194 let gate = self
2195 .resolve_flow_gate(
2196 &binding.binding_key(),
2197 concurrency.key.as_ref(),
2198 &managed_event,
2199 replay_of_event_id,
2200 )
2201 .await?;
2202 let priority_rank = self
2203 .resolve_priority_rank(
2204 &binding.binding_key(),
2205 flow.priority.as_ref(),
2206 &managed_event,
2207 replay_of_event_id,
2208 )
2209 .await?;
2210 let permit = self
2211 .state
2212 .flow_control
2213 .acquire_concurrency(&gate, concurrency.max, priority_rank)
2214 .await
2215 .map_err(DispatchError::from)?;
2216 acquired.concurrency = Some(ConcurrencyLease {
2217 gate,
2218 max: concurrency.max,
2219 priority_rank,
2220 permit: Some(permit),
2221 });
2222 }
2223
2224 Ok(FlowControlOutcome::Dispatch {
2225 event: Box::new(managed_event),
2226 acquired,
2227 })
2228 }
2229
2230 async fn release_flow_control(
2231 &self,
2232 acquired: &Arc<AsyncMutex<AcquiredFlowControl>>,
2233 ) -> Result<(), DispatchError> {
2234 let (singleton_gate, concurrency_permit) = {
2235 let mut acquired = acquired.lock().await;
2236 let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
2237 if lease.held {
2238 lease.held = false;
2239 Some(lease.gate.clone())
2240 } else {
2241 None
2242 }
2243 });
2244 let concurrency_permit = acquired
2245 .concurrency
2246 .as_mut()
2247 .and_then(|lease| lease.permit.take());
2248 (singleton_gate, concurrency_permit)
2249 };
2250 if let Some(gate) = singleton_gate {
2251 self.state
2252 .flow_control
2253 .release_singleton(&gate)
2254 .await
2255 .map_err(DispatchError::from)?;
2256 }
2257 if let Some(permit) = concurrency_permit {
2258 self.state
2259 .flow_control
2260 .release_concurrency(permit)
2261 .await
2262 .map_err(DispatchError::from)?;
2263 }
2264 Ok(())
2265 }
2266
2267 async fn resolve_flow_gate(
2268 &self,
2269 binding_key: &str,
2270 expr: Option<&crate::triggers::TriggerExpressionSpec>,
2271 event: &TriggerEvent,
2272 replay_of_event_id: Option<&String>,
2273 ) -> Result<String, DispatchError> {
2274 let key = match expr {
2275 Some(expr) => {
2276 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
2277 .await?
2278 }
2279 None => "_global".to_string(),
2280 };
2281 Ok(format!("{binding_key}:{key}"))
2282 }
2283
2284 async fn resolve_priority_rank(
2285 &self,
2286 binding_key: &str,
2287 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
2288 event: &TriggerEvent,
2289 replay_of_event_id: Option<&String>,
2290 ) -> Result<usize, DispatchError> {
2291 let Some(priority) = priority else {
2292 return Ok(0);
2293 };
2294 let value = self
2295 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
2296 .await?;
2297 Ok(priority
2298 .order
2299 .iter()
2300 .position(|candidate| candidate == &value)
2301 .unwrap_or(priority.order.len()))
2302 }
2303
2304 async fn evaluate_flow_expression(
2305 &self,
2306 binding_key: &str,
2307 expr: &crate::triggers::TriggerExpressionSpec,
2308 event: &TriggerEvent,
2309 replay_of_event_id: Option<&String>,
2310 ) -> Result<String, DispatchError> {
2311 let value = self
2312 .invoke_vm_callable(
2313 &expr.closure,
2314 binding_key,
2315 event,
2316 replay_of_event_id,
2317 "",
2318 "flow_control",
2319 AutonomyTier::Suggest,
2320 None,
2321 &mut self.cancel_tx.subscribe(),
2322 )
2323 .await?;
2324 Ok(json_value_to_gate(&vm_value_to_json(&value)))
2325 }
2326
2327 #[allow(clippy::too_many_arguments)]
2328 async fn invoke_vm_callable(
2329 &self,
2330 closure: &crate::value::VmClosure,
2331 binding_key: &str,
2332 event: &TriggerEvent,
2333 replay_of_event_id: Option<&String>,
2334 agent_id: &str,
2335 action: &str,
2336 autonomy_tier: AutonomyTier,
2337 wait_lease: Option<DispatchWaitLease>,
2338 cancel_rx: &mut broadcast::Receiver<()>,
2339 ) -> Result<VmValue, DispatchError> {
2340 let mut vm = self.base_vm.child_vm();
2341 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
2342 if self.state.shutting_down.load(Ordering::SeqCst) {
2343 cancel_token.store(true, Ordering::SeqCst);
2344 }
2345 self.state
2346 .cancel_tokens
2347 .lock()
2348 .expect("dispatcher cancel tokens poisoned")
2349 .push(cancel_token.clone());
2350 vm.install_cancel_token(cancel_token.clone());
2351 let arg = event_to_handler_value(event)?;
2352 let args = [arg];
2353 let tier_policy = policy_for_autonomy_tier(autonomy_tier);
2354 let effective_policy = match crate::orchestration::current_execution_policy() {
2355 Some(parent) => parent
2356 .intersect(&tier_policy)
2357 .map_err(|error| DispatchError::Local(error.to_string()))?,
2358 None => tier_policy,
2359 };
2360 crate::orchestration::push_execution_policy(effective_policy);
2361 let _policy_guard = DispatchExecutionPolicyGuard;
2362 let future = vm.call_closure_pub(closure, &args);
2363 pin_mut!(future);
2364 let (binding_id, binding_version) = split_binding_key(binding_key);
2365 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2366 slot.borrow_mut().replace(DispatchContext {
2367 trigger_event: event.clone(),
2368 replay_of_event_id: replay_of_event_id.cloned(),
2369 binding_id,
2370 binding_version,
2371 agent_id: agent_id.to_string(),
2372 action: action.to_string(),
2373 autonomy_tier,
2374 })
2375 });
2376 let prior_wait_lease = ACTIVE_DISPATCH_WAIT_LEASE
2377 .with(|slot| std::mem::replace(&mut *slot.borrow_mut(), wait_lease));
2378 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
2379 crate::stdlib::hitl::reset_hitl_state();
2380 let mut poll = tokio::time::interval(Duration::from_millis(100));
2381 let result = loop {
2382 tokio::select! {
2383 result = &mut future => break result,
2384 _ = recv_cancel(cancel_rx) => {
2385 cancel_token.store(true, Ordering::SeqCst);
2386 }
2387 _ = poll.tick() => {
2388 if dispatch_cancel_requested(
2389 &self.event_log,
2390 binding_key,
2391 &event.id.0,
2392 replay_of_event_id,
2393 )
2394 .await? {
2395 cancel_token.store(true, Ordering::SeqCst);
2396 }
2397 }
2398 }
2399 };
2400 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
2401 *slot.borrow_mut() = prior_context;
2402 });
2403 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
2404 *slot.borrow_mut() = prior_wait_lease;
2405 });
2406 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
2407 {
2408 let mut tokens = self
2409 .state
2410 .cancel_tokens
2411 .lock()
2412 .expect("dispatcher cancel tokens poisoned");
2413 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
2414 }
2415
2416 if cancel_token.load(Ordering::SeqCst) {
2417 if dispatch_cancel_requested(
2418 &self.event_log,
2419 binding_key,
2420 &event.id.0,
2421 replay_of_event_id,
2422 )
2423 .await?
2424 {
2425 Err(DispatchError::Cancelled(
2426 "trigger cancel request cancelled local handler".to_string(),
2427 ))
2428 } else {
2429 Err(DispatchError::Cancelled(
2430 "dispatcher shutdown cancelled local handler".to_string(),
2431 ))
2432 }
2433 } else {
2434 result.map_err(dispatch_error_from_vm_error)
2435 }
2436 }
2437
2438 async fn evaluate_predicate(
2439 &self,
2440 binding: &TriggerBinding,
2441 predicate: &super::registry::TriggerPredicateSpec,
2442 event: &TriggerEvent,
2443 replay_of_event_id: Option<&String>,
2444 autonomy_tier: AutonomyTier,
2445 ) -> Result<PredicateEvaluationRecord, DispatchError> {
2446 let event_id = event.id.0.clone();
2447 let trigger_id = binding.id.as_str().to_string();
2448 let now_ms = now_unix_ms();
2449 reset_binding_budget_windows(binding);
2450
2451 let breaker_open_until = {
2452 let state = binding
2453 .predicate_state
2454 .lock()
2455 .expect("trigger predicate state poisoned");
2456 if state
2457 .breaker_open_until_ms
2458 .is_some_and(|until_ms| until_ms > now_ms)
2459 {
2460 state.breaker_open_until_ms
2461 } else {
2462 None
2463 }
2464 };
2465
2466 if breaker_open_until.is_some() {
2467 let mut metadata = BTreeMap::new();
2468 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
2469 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
2470 metadata.insert(
2471 "breaker_open_until_ms".to_string(),
2472 serde_json::json!(breaker_open_until),
2473 );
2474 crate::events::log_warn_meta(
2475 "trigger.predicate.circuit_breaker",
2476 "trigger predicate circuit breaker is open; short-circuiting to false",
2477 metadata,
2478 );
2479 let record = PredicateEvaluationRecord {
2480 result: false,
2481 reason: Some("circuit_open".to_string()),
2482 ..Default::default()
2483 };
2484 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2485 .await?;
2486 return Ok(record);
2487 }
2488
2489 let expected_cost = expected_predicate_cost_usd_micros(binding);
2490 if let Some(reason) = binding_budget_would_exceed(binding, expected_cost)
2491 .or_else(|| orchestrator_budget_would_exceed(expected_cost))
2492 {
2493 self.append_budget_exhausted_event(
2494 binding,
2495 event,
2496 reason,
2497 expected_cost,
2498 None,
2499 replay_of_event_id,
2500 )
2501 .await?;
2502 let record = PredicateEvaluationRecord {
2503 result: binding.on_budget_exhausted == TriggerBudgetExhaustionStrategy::Warn,
2504 reason: Some(reason.to_string()),
2505 exhaustion_strategy: Some(binding.on_budget_exhausted),
2506 ..Default::default()
2507 };
2508 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2509 .await?;
2510 return Ok(record);
2511 }
2512
2513 let replay_cache = self
2514 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
2515 .await?;
2516 let guard = start_predicate_evaluation(
2517 binding.when_budget.clone().unwrap_or_default(),
2518 replay_cache,
2519 );
2520 let started = std::time::Instant::now();
2521 let eval = self
2522 .invoke_vm_callable_with_timeout(
2523 &predicate.closure,
2524 &binding.binding_key(),
2525 event,
2526 replay_of_event_id,
2527 binding.id.as_str(),
2528 &format!("{}.{}", event.provider.as_str(), event.kind),
2529 autonomy_tier,
2530 &mut self.cancel_tx.subscribe(),
2531 binding
2532 .when_budget
2533 .as_ref()
2534 .and_then(|budget| budget.timeout()),
2535 )
2536 .await;
2537 let capture = guard.finish();
2538 let latency_ms = started.elapsed().as_millis() as u64;
2539 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
2540 self.append_predicate_cache_record(binding, event, &capture.entries)
2541 .await?;
2542 }
2543
2544 let mut record = PredicateEvaluationRecord {
2545 result: false,
2546 cost_usd: capture.total_cost_usd,
2547 tokens: capture.total_tokens,
2548 latency_ms,
2549 cached: capture.cached,
2550 reason: None,
2551 exhaustion_strategy: None,
2552 };
2553
2554 let mut count_failure = false;
2555 let mut opened_breaker = false;
2556
2557 match eval {
2558 Ok(value) => match predicate_value_as_bool(value) {
2559 Ok(result) => {
2560 record.result = result;
2561 }
2562 Err(reason) => {
2563 count_failure = true;
2564 record.reason = Some(reason);
2565 }
2566 },
2567 Err(error) => {
2568 count_failure = true;
2569 record.reason = Some(error.to_string());
2570 }
2571 }
2572
2573 let cost_usd_micros = usd_to_micros(record.cost_usd);
2574 if cost_usd_micros > 0 {
2575 binding
2576 .metrics
2577 .cost_total_usd_micros
2578 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2579 binding
2580 .metrics
2581 .cost_today_usd_micros
2582 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2583 binding
2584 .metrics
2585 .cost_hour_usd_micros
2586 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2587 note_orchestrator_budget_cost(cost_usd_micros);
2588 record_predicate_cost_sample(binding, cost_usd_micros);
2589 }
2590
2591 let timed_out = matches!(
2592 record.reason.as_deref(),
2593 Some("predicate evaluation timed out")
2594 );
2595 if capture.budget_exceeded || timed_out {
2596 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
2597 record.result = false;
2598 } else if timed_out {
2599 record.result = true;
2600 }
2601 record.reason = Some("budget_exceeded".to_string());
2602 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
2603 self.append_budget_exhausted_event(
2604 binding,
2605 event,
2606 "budget_exceeded",
2607 cost_usd_micros,
2608 Some(record.tokens),
2609 replay_of_event_id,
2610 )
2611 .await?;
2612 self.append_lifecycle_event(
2613 "predicate.invocation_budget_exceeded",
2614 event,
2615 binding,
2616 serde_json::json!({
2617 "trigger_id": binding.id.as_str(),
2618 "event_id": event.id.0,
2619 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2620 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2621 "cost_usd": record.cost_usd,
2622 "tokens": record.tokens,
2623 "replay_of_event_id": replay_of_event_id,
2624 }),
2625 replay_of_event_id,
2626 )
2627 .await?;
2628 }
2629
2630 if let Some(reason) =
2631 binding_budget_would_exceed(binding, 0).or_else(|| orchestrator_budget_would_exceed(0))
2632 {
2633 if binding.on_budget_exhausted != TriggerBudgetExhaustionStrategy::Warn {
2634 record.result = false;
2635 }
2636 record.reason = Some(reason.to_string());
2637 record.exhaustion_strategy = Some(binding.on_budget_exhausted);
2638 self.append_budget_exhausted_event(binding, event, reason, 0, None, replay_of_event_id)
2639 .await?;
2640 }
2641
2642 {
2643 let mut state = binding
2644 .predicate_state
2645 .lock()
2646 .expect("trigger predicate state poisoned");
2647 if count_failure {
2648 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2649 if state.consecutive_failures >= 3 {
2650 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2651 opened_breaker = true;
2652 }
2653 } else {
2654 state.consecutive_failures = 0;
2655 state.breaker_open_until_ms = None;
2656 }
2657 }
2658
2659 if opened_breaker {
2660 let mut metadata = BTreeMap::new();
2661 metadata.insert(
2662 "trigger_id".to_string(),
2663 serde_json::json!(binding.id.as_str()),
2664 );
2665 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2666 metadata.insert("failure_count".to_string(), serde_json::json!(3));
2667 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2668 crate::events::log_warn_meta(
2669 "trigger.predicate.circuit_breaker",
2670 "trigger predicate circuit breaker opened for 5 minutes",
2671 metadata,
2672 );
2673 }
2674
2675 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2676 .await?;
2677 Ok(record)
2678 }
2679
2680 #[allow(clippy::too_many_arguments)]
2681 #[allow(clippy::too_many_arguments)]
2682 async fn invoke_vm_callable_with_timeout(
2683 &self,
2684 closure: &crate::value::VmClosure,
2685 binding_key: &str,
2686 event: &TriggerEvent,
2687 replay_of_event_id: Option<&String>,
2688 agent_id: &str,
2689 action: &str,
2690 autonomy_tier: AutonomyTier,
2691 cancel_rx: &mut broadcast::Receiver<()>,
2692 timeout: Option<Duration>,
2693 ) -> Result<VmValue, DispatchError> {
2694 let future = self.invoke_vm_callable(
2695 closure,
2696 binding_key,
2697 event,
2698 replay_of_event_id,
2699 agent_id,
2700 action,
2701 autonomy_tier,
2702 None,
2703 cancel_rx,
2704 );
2705 pin_mut!(future);
2706 if let Some(timeout) = timeout {
2707 match tokio::time::timeout(timeout, future).await {
2708 Ok(result) => result,
2709 Err(_) => Err(DispatchError::Local(
2710 "predicate evaluation timed out".to_string(),
2711 )),
2712 }
2713 } else {
2714 future.await
2715 }
2716 }
2717
2718 async fn append_predicate_evaluated_event(
2719 &self,
2720 binding: &TriggerBinding,
2721 event: &TriggerEvent,
2722 record: &PredicateEvaluationRecord,
2723 replay_of_event_id: Option<&String>,
2724 ) -> Result<(), DispatchError> {
2725 if let Some(metrics) = self.metrics.as_ref() {
2726 metrics.record_trigger_predicate_evaluation(
2727 binding.id.as_str(),
2728 record.result,
2729 record.cost_usd,
2730 );
2731 metrics.set_trigger_budget_cost_today(
2732 binding.id.as_str(),
2733 current_predicate_daily_cost(binding),
2734 );
2735 if matches!(
2736 record.reason.as_deref(),
2737 Some(
2738 "budget_exceeded"
2739 | "daily_budget_exceeded"
2740 | "hourly_budget_exceeded"
2741 | "orchestrator_daily_budget_exceeded"
2742 | "orchestrator_hourly_budget_exceeded"
2743 )
2744 ) {
2745 metrics.record_trigger_budget_exhausted(
2746 binding.id.as_str(),
2747 record
2748 .exhaustion_strategy
2749 .map(TriggerBudgetExhaustionStrategy::as_str)
2750 .unwrap_or_else(|| record.reason.as_deref().unwrap_or("predicate")),
2751 );
2752 }
2753 }
2754 self.append_lifecycle_event(
2755 "predicate.evaluated",
2756 event,
2757 binding,
2758 serde_json::json!({
2759 "trigger_id": binding.id.as_str(),
2760 "event_id": event.id.0,
2761 "result": record.result,
2762 "cost_usd": record.cost_usd,
2763 "tokens": record.tokens,
2764 "latency_ms": record.latency_ms,
2765 "cached": record.cached,
2766 "reason": record.reason,
2767 "on_budget_exhausted": record.exhaustion_strategy.map(TriggerBudgetExhaustionStrategy::as_str),
2768 "replay_of_event_id": replay_of_event_id,
2769 }),
2770 replay_of_event_id,
2771 )
2772 .await
2773 }
2774
2775 async fn append_predicate_cache_record(
2776 &self,
2777 binding: &TriggerBinding,
2778 event: &TriggerEvent,
2779 entries: &[PredicateCacheEntry],
2780 ) -> Result<(), DispatchError> {
2781 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2782 .expect("static trigger inbox legacy topic name is valid");
2783 let payload = serde_json::to_value(PredicateCacheRecord {
2784 trigger_id: binding.id.as_str().to_string(),
2785 event_id: event.id.0.clone(),
2786 entries: entries.to_vec(),
2787 })
2788 .map_err(|error| DispatchError::Serde(error.to_string()))?;
2789 self.event_log
2790 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2791 .await
2792 .map_err(DispatchError::from)
2793 .map(|_| ())
2794 }
2795
2796 async fn read_predicate_cache_record(
2797 &self,
2798 event_id: &str,
2799 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2800 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2801 .expect("static trigger inbox legacy topic name is valid");
2802 let records = self
2803 .event_log
2804 .read_range(&topic, None, usize::MAX)
2805 .await
2806 .map_err(DispatchError::from)?;
2807 Ok(records
2808 .into_iter()
2809 .filter(|(_, event)| event.kind == "predicate_llm_cache")
2810 .filter_map(|(_, event)| {
2811 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2812 })
2813 .filter(|record| record.event_id == event_id)
2814 .flat_map(|record| record.entries)
2815 .collect())
2816 }
2817
2818 #[allow(clippy::too_many_arguments)]
2819 async fn append_dispatch_trust_record(
2820 &self,
2821 binding: &TriggerBinding,
2822 route: &DispatchUri,
2823 event: &TriggerEvent,
2824 replay_of_event_id: Option<&String>,
2825 autonomy_tier: AutonomyTier,
2826 outcome: TrustOutcome,
2827 terminal_status: &str,
2828 attempt_count: u32,
2829 error: Option<String>,
2830 ) -> Result<(), DispatchError> {
2831 let mut record = TrustRecord::new(
2832 binding.id.as_str().to_string(),
2833 format!("{}.{}", event.provider.as_str(), event.kind),
2834 None,
2835 outcome,
2836 event.trace_id.0.clone(),
2837 autonomy_tier,
2838 );
2839 record.metadata.insert(
2840 "binding_key".to_string(),
2841 serde_json::json!(binding.binding_key()),
2842 );
2843 record.metadata.insert(
2844 "binding_version".to_string(),
2845 serde_json::json!(binding.version),
2846 );
2847 record.metadata.insert(
2848 "provider".to_string(),
2849 serde_json::json!(event.provider.as_str()),
2850 );
2851 record
2852 .metadata
2853 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2854 record
2855 .metadata
2856 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2857 record.metadata.insert(
2858 "target_uri".to_string(),
2859 serde_json::json!(route.target_uri()),
2860 );
2861 record.metadata.insert(
2862 "terminal_status".to_string(),
2863 serde_json::json!(terminal_status),
2864 );
2865 record.metadata.insert(
2866 "attempt_count".to_string(),
2867 serde_json::json!(attempt_count),
2868 );
2869 if let Some(replay_of_event_id) = replay_of_event_id {
2870 record.metadata.insert(
2871 "replay_of_event_id".to_string(),
2872 serde_json::json!(replay_of_event_id),
2873 );
2874 }
2875 if let Some(error) = error {
2876 record
2877 .metadata
2878 .insert("error".to_string(), serde_json::json!(error));
2879 }
2880 append_trust_record(&self.event_log, &record)
2881 .await
2882 .map(|_| ())
2883 .map_err(DispatchError::from)
2884 }
2885
2886 async fn append_attempt_record(
2887 &self,
2888 event: &TriggerEvent,
2889 binding: &TriggerBinding,
2890 attempt: &DispatchAttemptRecord,
2891 replay_of_event_id: Option<&String>,
2892 ) -> Result<(), DispatchError> {
2893 self.append_topic_event(
2894 TRIGGER_ATTEMPTS_TOPIC,
2895 "attempt_recorded",
2896 event,
2897 Some(binding),
2898 Some(attempt.attempt),
2899 serde_json::to_value(attempt)
2900 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2901 replay_of_event_id,
2902 )
2903 .await
2904 }
2905
2906 async fn append_lifecycle_event(
2907 &self,
2908 kind: &str,
2909 event: &TriggerEvent,
2910 binding: &TriggerBinding,
2911 payload: serde_json::Value,
2912 replay_of_event_id: Option<&String>,
2913 ) -> Result<(), DispatchError> {
2914 self.append_topic_event(
2915 TRIGGERS_LIFECYCLE_TOPIC,
2916 kind,
2917 event,
2918 Some(binding),
2919 None,
2920 payload,
2921 replay_of_event_id,
2922 )
2923 .await
2924 }
2925
2926 async fn append_budget_exhausted_event(
2927 &self,
2928 binding: &TriggerBinding,
2929 event: &TriggerEvent,
2930 reason: &str,
2931 expected_cost_usd_micros: u64,
2932 tokens: Option<u64>,
2933 replay_of_event_id: Option<&String>,
2934 ) -> Result<(), DispatchError> {
2935 let payload = serde_json::json!({
2936 "trigger_id": binding.id.as_str(),
2937 "event_id": event.id.0,
2938 "reason": reason,
2939 "strategy": binding.on_budget_exhausted.as_str(),
2940 "expected_cost_usd": micros_to_usd(expected_cost_usd_micros),
2941 "cost_usd": micros_to_usd(expected_cost_usd_micros),
2942 "tokens": tokens,
2943 "daily_limit_usd": binding.daily_cost_usd,
2944 "hourly_limit_usd": binding.hourly_cost_usd,
2945 "cost_today_usd": current_predicate_daily_cost(binding),
2946 "cost_hour_usd": current_predicate_hourly_cost(binding),
2947 "replay_of_event_id": replay_of_event_id,
2948 });
2949 self.append_lifecycle_event(
2950 "predicate.budget_exceeded",
2951 event,
2952 binding,
2953 payload.clone(),
2954 replay_of_event_id,
2955 )
2956 .await?;
2957 let legacy_kind = match reason {
2958 "daily_budget_exceeded" => Some("predicate.daily_budget_exceeded"),
2959 "hourly_budget_exceeded" => Some("predicate.hourly_budget_exceeded"),
2960 "orchestrator_daily_budget_exceeded" => {
2961 Some("predicate.orchestrator_daily_budget_exceeded")
2962 }
2963 "orchestrator_hourly_budget_exceeded" => {
2964 Some("predicate.orchestrator_hourly_budget_exceeded")
2965 }
2966 _ => None,
2967 };
2968 if let Some(kind) = legacy_kind {
2969 self.append_lifecycle_event(kind, event, binding, payload, replay_of_event_id)
2970 .await?;
2971 }
2972 Ok(())
2973 }
2974
2975 async fn append_autonomy_budget_approval_request(
2976 &self,
2977 binding: &TriggerBinding,
2978 route: &DispatchUri,
2979 event: &TriggerEvent,
2980 replay_of_event_id: Option<&String>,
2981 reason: &str,
2982 ) -> Result<String, DispatchError> {
2983 let reviewers = vec![DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()];
2984 let detail = serde_json::json!({
2985 "trigger_id": binding.id.as_str(),
2986 "binding_key": binding.binding_key(),
2987 "event_id": event.id.0,
2988 "reason": reason,
2989 "from_tier": AutonomyTier::ActAuto.as_str(),
2990 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
2991 "handler_kind": route.kind(),
2992 "target_uri": route.target_uri(),
2993 "max_autonomous_decisions_per_hour": binding.max_autonomous_decisions_per_hour,
2994 "max_autonomous_decisions_per_day": binding.max_autonomous_decisions_per_day,
2995 "autonomous_decisions_hour": binding.metrics.autonomous_decisions_hour.load(Ordering::Relaxed),
2996 "autonomous_decisions_today": binding.metrics.autonomous_decisions_today.load(Ordering::Relaxed),
2997 "replay_of_event_id": replay_of_event_id,
2998 });
2999 let request_id = crate::stdlib::hitl::append_approval_request_on(
3000 &self.event_log,
3001 binding.id.as_str().to_string(),
3002 event.trace_id.0.clone(),
3003 format!(
3004 "approve autonomous dispatch for trigger '{}' after {}",
3005 binding.id.as_str(),
3006 reason
3007 ),
3008 detail.clone(),
3009 reviewers.clone(),
3010 )
3011 .await
3012 .map_err(dispatch_error_from_vm_error)?;
3013 self.append_lifecycle_event(
3014 "autonomy.budget_exceeded",
3015 event,
3016 binding,
3017 serde_json::json!({
3018 "trigger_id": binding.id.as_str(),
3019 "event_id": event.id.0,
3020 "reason": reason,
3021 "request_id": request_id,
3022 "reviewers": reviewers,
3023 "from_tier": AutonomyTier::ActAuto.as_str(),
3024 "requested_tier": AutonomyTier::ActWithApproval.as_str(),
3025 "replay_of_event_id": replay_of_event_id,
3026 }),
3027 replay_of_event_id,
3028 )
3029 .await?;
3030 Ok(request_id)
3031 }
3032
3033 #[allow(clippy::too_many_arguments)]
3034 async fn emit_autonomy_budget_approval_action_graph(
3035 &self,
3036 binding: &TriggerBinding,
3037 route: &DispatchUri,
3038 event: &TriggerEvent,
3039 source_node_id: &str,
3040 replay_of_event_id: Option<&String>,
3041 reason: &str,
3042 request_id: &str,
3043 ) -> Result<(), DispatchError> {
3044 let approval_node_id = format!("approval:{}:{}", binding.binding_key(), event.id.0);
3045 self.emit_action_graph(
3046 event,
3047 vec![RunActionGraphNodeRecord {
3048 id: approval_node_id.clone(),
3049 label: format!("approval required: {reason}"),
3050 kind: "approval".to_string(),
3051 status: "waiting".to_string(),
3052 outcome: "request_approval".to_string(),
3053 trace_id: Some(event.trace_id.0.clone()),
3054 stage_id: None,
3055 node_id: None,
3056 worker_id: None,
3057 run_id: None,
3058 run_path: None,
3059 metadata: BTreeMap::from([
3060 (
3061 "trigger_id".to_string(),
3062 serde_json::json!(binding.id.as_str()),
3063 ),
3064 (
3065 "binding_key".to_string(),
3066 serde_json::json!(binding.binding_key()),
3067 ),
3068 ("event_id".to_string(), serde_json::json!(event.id.0)),
3069 ("reason".to_string(), serde_json::json!(reason)),
3070 ("request_id".to_string(), serde_json::json!(request_id)),
3071 (
3072 "reviewers".to_string(),
3073 serde_json::json!([DEFAULT_AUTONOMY_BUDGET_REVIEWER]),
3074 ),
3075 ("handler_kind".to_string(), serde_json::json!(route.kind())),
3076 (
3077 "target_uri".to_string(),
3078 serde_json::json!(route.target_uri()),
3079 ),
3080 (
3081 "from_tier".to_string(),
3082 serde_json::json!(AutonomyTier::ActAuto.as_str()),
3083 ),
3084 (
3085 "requested_tier".to_string(),
3086 serde_json::json!(AutonomyTier::ActWithApproval.as_str()),
3087 ),
3088 (
3089 "replay_of_event_id".to_string(),
3090 serde_json::json!(replay_of_event_id),
3091 ),
3092 ]),
3093 }],
3094 vec![RunActionGraphEdgeRecord {
3095 from_id: source_node_id.to_string(),
3096 to_id: approval_node_id,
3097 kind: "approval_gate".to_string(),
3098 label: Some("autonomy budget".to_string()),
3099 }],
3100 serde_json::json!({
3101 "source": "dispatcher",
3102 "trigger_id": binding.id.as_str(),
3103 "binding_key": binding.binding_key(),
3104 "event_id": event.id.0,
3105 "reason": reason,
3106 "request_id": request_id,
3107 "replay_of_event_id": replay_of_event_id,
3108 }),
3109 )
3110 .await
3111 }
3112
3113 #[allow(clippy::too_many_arguments)]
3114 async fn append_tier_transition_trust_record(
3115 &self,
3116 binding: &TriggerBinding,
3117 event: &TriggerEvent,
3118 replay_of_event_id: Option<&String>,
3119 from_tier: AutonomyTier,
3120 to_tier: AutonomyTier,
3121 reason: &str,
3122 request_id: &str,
3123 ) -> Result<(), DispatchError> {
3124 let mut record = TrustRecord::new(
3125 binding.id.as_str().to_string(),
3126 "autonomy.tier_transition",
3127 Some(DEFAULT_AUTONOMY_BUDGET_REVIEWER.to_string()),
3128 TrustOutcome::Denied,
3129 event.trace_id.0.clone(),
3130 to_tier,
3131 );
3132 record.metadata.insert(
3133 "binding_key".to_string(),
3134 serde_json::json!(binding.binding_key()),
3135 );
3136 record
3137 .metadata
3138 .insert("event_id".to_string(), serde_json::json!(event.id.0));
3139 record.metadata.insert(
3140 "from_tier".to_string(),
3141 serde_json::json!(from_tier.as_str()),
3142 );
3143 record
3144 .metadata
3145 .insert("to_tier".to_string(), serde_json::json!(to_tier.as_str()));
3146 record
3147 .metadata
3148 .insert("reason".to_string(), serde_json::json!(reason));
3149 record
3150 .metadata
3151 .insert("request_id".to_string(), serde_json::json!(request_id));
3152 if let Some(replay_of_event_id) = replay_of_event_id {
3153 record.metadata.insert(
3154 "replay_of_event_id".to_string(),
3155 serde_json::json!(replay_of_event_id),
3156 );
3157 }
3158 append_trust_record(&self.event_log, &record)
3159 .await
3160 .map(|_| ())
3161 .map_err(DispatchError::from)
3162 }
3163
3164 async fn append_budget_deferred_event(
3165 &self,
3166 binding: &TriggerBinding,
3167 route: &DispatchUri,
3168 event: &TriggerEvent,
3169 replay_of_event_id: Option<&String>,
3170 reason: &str,
3171 ) -> Result<(), DispatchError> {
3172 self.append_topic_event(
3173 TRIGGER_ATTEMPTS_TOPIC,
3174 "budget_deferred",
3175 event,
3176 Some(binding),
3177 None,
3178 serde_json::json!({
3179 "event_id": event.id.0,
3180 "trigger_id": binding.id.as_str(),
3181 "binding_key": binding.binding_key(),
3182 "handler_kind": route.kind(),
3183 "target_uri": route.target_uri(),
3184 "reason": reason,
3185 "retry_at": next_budget_reset_rfc3339(binding),
3186 "replay_of_event_id": replay_of_event_id,
3187 }),
3188 replay_of_event_id,
3189 )
3190 .await?;
3191 self.append_skipped_outbox_event(
3192 binding,
3193 route,
3194 event,
3195 replay_of_event_id,
3196 DispatchSkipStage::Predicate,
3197 serde_json::json!({
3198 "deferred": true,
3199 "reason": reason,
3200 "retry_at": next_budget_reset_rfc3339(binding),
3201 }),
3202 )
3203 .await
3204 }
3205
3206 async fn move_budget_exhausted_to_dlq(
3207 &self,
3208 binding: &TriggerBinding,
3209 route: &DispatchUri,
3210 event: &TriggerEvent,
3211 replay_of_event_id: Option<&String>,
3212 final_error: &str,
3213 ) -> Result<(), DispatchError> {
3214 let dlq_entry = DlqEntry {
3215 trigger_id: binding.id.as_str().to_string(),
3216 binding_key: binding.binding_key(),
3217 event: event.clone(),
3218 attempt_count: 0,
3219 final_error: final_error.to_string(),
3220 attempts: Vec::new(),
3221 };
3222 self.state
3223 .dlq
3224 .lock()
3225 .expect("dispatcher dlq poisoned")
3226 .push(dlq_entry.clone());
3227 if let Some(metrics) = self.metrics.as_ref() {
3228 metrics.record_trigger_dlq(binding.id.as_str(), "budget_exhausted");
3229 }
3230 self.emit_action_graph(
3231 event,
3232 vec![RunActionGraphNodeRecord {
3233 id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3234 label: binding.id.as_str().to_string(),
3235 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
3236 status: "queued".to_string(),
3237 outcome: "budget_exhausted".to_string(),
3238 trace_id: Some(event.trace_id.0.clone()),
3239 stage_id: None,
3240 node_id: None,
3241 worker_id: None,
3242 run_id: None,
3243 run_path: None,
3244 metadata: dlq_node_metadata(binding, event, 0, final_error),
3245 }],
3246 vec![RunActionGraphEdgeRecord {
3247 from_id: format!("predicate:{}:{}", binding.binding_key(), event.id.0),
3248 to_id: format!("dlq:{}:{}", binding.binding_key(), event.id.0),
3249 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
3250 label: Some("budget exhausted".to_string()),
3251 }],
3252 serde_json::json!({
3253 "source": "dispatcher",
3254 "trigger_id": binding.id.as_str(),
3255 "binding_key": binding.binding_key(),
3256 "event_id": event.id.0,
3257 "handler_kind": route.kind(),
3258 "target_uri": route.target_uri(),
3259 "final_error": final_error,
3260 "replay_of_event_id": replay_of_event_id,
3261 }),
3262 )
3263 .await?;
3264 self.append_lifecycle_event(
3265 "DlqMoved",
3266 event,
3267 binding,
3268 serde_json::json!({
3269 "event_id": event.id.0,
3270 "attempt_count": 0,
3271 "final_error": final_error,
3272 "reason": "budget_exhausted",
3273 "replay_of_event_id": replay_of_event_id,
3274 }),
3275 replay_of_event_id,
3276 )
3277 .await?;
3278 self.append_topic_event(
3279 TRIGGER_DLQ_TOPIC,
3280 "dlq_moved",
3281 event,
3282 Some(binding),
3283 None,
3284 serde_json::to_value(&dlq_entry)
3285 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
3286 replay_of_event_id,
3287 )
3288 .await
3289 }
3290
3291 async fn append_skipped_outbox_event(
3292 &self,
3293 binding: &TriggerBinding,
3294 route: &DispatchUri,
3295 event: &TriggerEvent,
3296 replay_of_event_id: Option<&String>,
3297 stage: DispatchSkipStage,
3298 detail: serde_json::Value,
3299 ) -> Result<(), DispatchError> {
3300 self.append_topic_event(
3301 TRIGGER_OUTBOX_TOPIC,
3302 "dispatch_skipped",
3303 event,
3304 Some(binding),
3305 None,
3306 serde_json::json!({
3307 "event_id": event.id.0,
3308 "trigger_id": binding.id.as_str(),
3309 "binding_key": binding.binding_key(),
3310 "handler_kind": route.kind(),
3311 "target_uri": route.target_uri(),
3312 "skip_stage": stage.as_str(),
3313 "detail": detail,
3314 "replay_of_event_id": replay_of_event_id,
3315 }),
3316 replay_of_event_id,
3317 )
3318 .await
3319 }
3320
3321 async fn append_topic_event(
3322 &self,
3323 topic_name: &str,
3324 kind: &str,
3325 event: &TriggerEvent,
3326 binding: Option<&TriggerBinding>,
3327 attempt: Option<u32>,
3328 payload: serde_json::Value,
3329 replay_of_event_id: Option<&String>,
3330 ) -> Result<(), DispatchError> {
3331 let topic = Topic::new(topic_name)
3332 .expect("static trigger dispatcher topic names should always be valid");
3333 let headers = event_headers(event, binding, attempt, replay_of_event_id);
3334 self.event_log
3335 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
3336 .await
3337 .map_err(DispatchError::from)
3338 .map(|_| ())
3339 }
3340
3341 async fn emit_action_graph(
3342 &self,
3343 event: &TriggerEvent,
3344 nodes: Vec<RunActionGraphNodeRecord>,
3345 edges: Vec<RunActionGraphEdgeRecord>,
3346 extra: serde_json::Value,
3347 ) -> Result<(), DispatchError> {
3348 let mut headers = BTreeMap::new();
3349 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3350 headers.insert("event_id".to_string(), event.id.0.clone());
3351 let observability = RunObservabilityRecord {
3352 schema_version: 1,
3353 action_graph_nodes: nodes,
3354 action_graph_edges: edges,
3355 ..Default::default()
3356 };
3357 append_action_graph_update(
3358 headers,
3359 serde_json::json!({
3360 "source": "dispatcher",
3361 "trace_id": event.trace_id.0,
3362 "event_id": event.id.0,
3363 "observability": observability,
3364 "context": extra,
3365 }),
3366 )
3367 .await
3368 .map_err(DispatchError::from)
3369 }
3370}
3371
3372async fn dispatch_cancel_requested(
3373 event_log: &Arc<AnyEventLog>,
3374 binding_key: &str,
3375 event_id: &str,
3376 replay_of_event_id: Option<&String>,
3377) -> Result<bool, DispatchError> {
3378 if replay_of_event_id.is_some() {
3379 return Ok(false);
3380 }
3381 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
3382 .expect("static trigger cancel topic should always be valid");
3383 let events = event_log.read_range(&topic, None, usize::MAX).await?;
3384 let requested = events
3385 .into_iter()
3386 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
3387 .filter_map(|(_, event)| {
3388 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
3389 })
3390 .collect::<BTreeSet<_>>();
3391 Ok(requested
3392 .iter()
3393 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
3394}
3395
3396async fn sleep_or_cancel_or_request(
3397 event_log: &Arc<AnyEventLog>,
3398 delay: Duration,
3399 binding_key: &str,
3400 event_id: &str,
3401 replay_of_event_id: Option<&String>,
3402 cancel_rx: &mut broadcast::Receiver<()>,
3403) -> Result<(), DispatchError> {
3404 let sleep = tokio::time::sleep(delay);
3405 pin_mut!(sleep);
3406 let mut poll = tokio::time::interval(Duration::from_millis(100));
3407 loop {
3408 tokio::select! {
3409 _ = &mut sleep => return Ok(()),
3410 _ = recv_cancel(cancel_rx) => {
3411 return Err(DispatchError::Cancelled(
3412 "dispatcher shutdown cancelled retry wait".to_string(),
3413 ));
3414 }
3415 _ = poll.tick() => {
3416 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
3417 return Err(DispatchError::Cancelled(
3418 "trigger cancel request cancelled retry wait".to_string(),
3419 ));
3420 }
3421 }
3422 }
3423 }
3424}
3425
3426fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
3427 let mut iter = events.into_iter();
3428 let Some(mut root) = iter.next() else {
3429 return Err(DispatchError::Registry(
3430 "batch dispatch produced an empty event list".to_string(),
3431 ));
3432 };
3433 let mut batch = Vec::new();
3434 batch.push(
3435 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
3436 );
3437 for event in iter {
3438 batch.push(
3439 serde_json::to_value(&event)
3440 .map_err(|error| DispatchError::Serde(error.to_string()))?,
3441 );
3442 }
3443 root.batch = Some(batch);
3444 Ok(root)
3445}
3446
3447fn json_value_to_gate(value: &serde_json::Value) -> String {
3448 match value {
3449 serde_json::Value::Null => "null".to_string(),
3450 serde_json::Value::String(text) => text.clone(),
3451 serde_json::Value::Bool(value) => value.to_string(),
3452 serde_json::Value::Number(value) => value.to_string(),
3453 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
3454 }
3455}
3456
3457fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
3458 let json =
3459 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3460 let value = json_to_vm_value(&json);
3461 match (&event.raw_body, value) {
3462 (Some(raw_body), VmValue::Dict(dict)) => {
3463 let mut map = (*dict).clone();
3464 map.insert(
3465 "raw_body".to_string(),
3466 VmValue::Bytes(Rc::new(raw_body.clone())),
3467 );
3468 Ok(VmValue::Dict(Rc::new(map)))
3469 }
3470 (_, other) => Ok(other),
3471 }
3472}
3473
3474fn decrement_in_flight(state: &DispatcherRuntimeState) {
3475 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
3476 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
3477 state.idle_notify.notify_waiters();
3478 }
3479}
3480
3481fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
3482 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
3483 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
3484 state.idle_notify.notify_waiters();
3485 }
3486}
3487
3488#[cfg(test)]
3489fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
3490 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3491 *slot.borrow_mut() = Some(tx);
3492 });
3493}
3494
3495#[cfg(not(test))]
3496fn notify_test_inbox_dequeued() {}
3497
3498#[cfg(test)]
3499fn notify_test_inbox_dequeued() {
3500 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
3501 if let Some(tx) = slot.borrow_mut().take() {
3502 let _ = tx.send(());
3503 }
3504 });
3505}
3506
3507pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
3508 event_log: &L,
3509 event: &TriggerEvent,
3510) -> Result<u64, DispatchError> {
3511 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
3512 .expect("static trigger.inbox.envelopes topic is valid");
3513 let headers = event_headers(event, None, None, None);
3514 let payload =
3515 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
3516 event_log
3517 .append(
3518 &topic,
3519 LogEvent::new("event_ingested", payload).with_headers(headers),
3520 )
3521 .await
3522 .map_err(DispatchError::from)
3523}
3524
3525pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
3526 ACTIVE_DISPATCHER_STATE.with(|slot| {
3527 slot.borrow()
3528 .as_ref()
3529 .map(|state| DispatcherStatsSnapshot {
3530 in_flight: state.in_flight.load(Ordering::Relaxed),
3531 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
3532 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
3533 })
3534 .unwrap_or_default()
3535 })
3536}
3537
3538pub fn clear_dispatcher_state() {
3539 ACTIVE_DISPATCHER_STATE.with(|slot| {
3540 *slot.borrow_mut() = None;
3541 });
3542 ACTIVE_DISPATCH_WAIT_LEASE.with(|slot| {
3543 *slot.borrow_mut() = None;
3544 });
3545}
3546
3547fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
3548 if let Some(wait_id) = crate::stdlib::waitpoint::is_waitpoint_suspension(&error) {
3549 return DispatchError::Waiting(format!("waitpoint suspended: {wait_id}"));
3550 }
3551 if is_cancelled_vm_error(&error) {
3552 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
3553 }
3554 if let VmError::Thrown(VmValue::String(message)) = &error {
3555 return DispatchError::Local(message.to_string());
3556 }
3557 match error_to_category(&error) {
3558 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
3559 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
3560 ErrorCategory::Cancelled => {
3561 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
3562 }
3563 _ => DispatchError::Local(error.to_string()),
3564 }
3565}
3566
3567fn dispatch_error_label(error: &DispatchError) -> &'static str {
3568 match error {
3569 DispatchError::Denied(_) => "denied",
3570 DispatchError::Timeout(_) => "timeout",
3571 DispatchError::Waiting(_) => "waiting",
3572 DispatchError::Cancelled(_) => "cancelled",
3573 _ => "failed",
3574 }
3575}
3576
3577fn dispatch_success_outcome(route: &DispatchUri, result: &serde_json::Value) -> &'static str {
3578 match route {
3579 DispatchUri::Worker { .. } => "enqueued",
3580 DispatchUri::A2a { .. }
3581 if result.get("kind").and_then(|value| value.as_str()) == Some("a2a_task_handle") =>
3582 {
3583 "pending"
3584 }
3585 DispatchUri::A2a { .. } => "completed",
3586 DispatchUri::Local { .. } => "success",
3587 }
3588}
3589
3590fn dispatch_node_id(
3591 route: &DispatchUri,
3592 binding_key: &str,
3593 event_id: &str,
3594 attempt: u32,
3595) -> String {
3596 let prefix = match route {
3597 DispatchUri::A2a { .. } => "a2a",
3598 _ => "dispatch",
3599 };
3600 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
3601}
3602
3603fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
3604 match route {
3605 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
3606 DispatchUri::Worker { .. } => ACTION_GRAPH_NODE_KIND_WORKER_ENQUEUE,
3607 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
3608 }
3609}
3610
3611fn dispatch_node_label(route: &DispatchUri) -> String {
3612 match route {
3613 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
3614 _ => route.target_uri(),
3615 }
3616}
3617
3618fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
3619 match route {
3620 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
3621 _ => None,
3622 }
3623}
3624
3625fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
3626 match route {
3627 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
3628 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
3629 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
3630 }
3631}
3632
3633fn signature_status_label(status: &crate::triggers::SignatureStatus) -> &'static str {
3634 match status {
3635 crate::triggers::SignatureStatus::Verified => "verified",
3636 crate::triggers::SignatureStatus::Unsigned => "unsigned",
3637 crate::triggers::SignatureStatus::Failed { .. } => "failed",
3638 }
3639}
3640
3641fn trigger_node_metadata(event: &TriggerEvent) -> BTreeMap<String, serde_json::Value> {
3642 let mut metadata = BTreeMap::new();
3643 metadata.insert(
3644 "provider".to_string(),
3645 serde_json::json!(event.provider.as_str()),
3646 );
3647 metadata.insert("event_kind".to_string(), serde_json::json!(event.kind));
3648 metadata.insert(
3649 "dedupe_key".to_string(),
3650 serde_json::json!(event.dedupe_key),
3651 );
3652 metadata.insert(
3653 "signature_status".to_string(),
3654 serde_json::json!(signature_status_label(&event.signature_status)),
3655 );
3656 metadata
3657}
3658
3659fn predicate_node_metadata(
3660 binding: &TriggerBinding,
3661 predicate: &super::registry::TriggerPredicateSpec,
3662 event: &TriggerEvent,
3663 evaluation: &PredicateEvaluationRecord,
3664) -> BTreeMap<String, serde_json::Value> {
3665 let mut metadata = BTreeMap::new();
3666 metadata.insert(
3667 "trigger_id".to_string(),
3668 serde_json::json!(binding.id.as_str()),
3669 );
3670 metadata.insert("predicate".to_string(), serde_json::json!(predicate.raw));
3671 metadata.insert("result".to_string(), serde_json::json!(evaluation.result));
3672 metadata.insert(
3673 "cost_usd".to_string(),
3674 serde_json::json!(evaluation.cost_usd),
3675 );
3676 metadata.insert("tokens".to_string(), serde_json::json!(evaluation.tokens));
3677 metadata.insert(
3678 "latency_ms".to_string(),
3679 serde_json::json!(evaluation.latency_ms),
3680 );
3681 metadata.insert("cached".to_string(), serde_json::json!(evaluation.cached));
3682 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3683 if let Some(reason) = evaluation.reason.as_ref() {
3684 metadata.insert("reason".to_string(), serde_json::json!(reason));
3685 }
3686 metadata
3687}
3688
3689fn dispatch_node_metadata(
3690 route: &DispatchUri,
3691 binding: &TriggerBinding,
3692 event: &TriggerEvent,
3693 attempt: u32,
3694) -> BTreeMap<String, serde_json::Value> {
3695 let mut metadata = BTreeMap::new();
3696 metadata.insert("handler_kind".to_string(), serde_json::json!(route.kind()));
3697 metadata.insert(
3698 "target_uri".to_string(),
3699 serde_json::json!(route.target_uri()),
3700 );
3701 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3702 metadata.insert(
3703 "trigger_id".to_string(),
3704 serde_json::json!(binding.id.as_str()),
3705 );
3706 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3707 if let Some(target_agent) = dispatch_target_agent(route) {
3708 metadata.insert("target_agent".to_string(), serde_json::json!(target_agent));
3709 }
3710 if let DispatchUri::Worker { queue } = route {
3711 metadata.insert("queue_name".to_string(), serde_json::json!(queue));
3712 }
3713 metadata
3714}
3715
3716fn dispatch_success_metadata(
3717 route: &DispatchUri,
3718 binding: &TriggerBinding,
3719 event: &TriggerEvent,
3720 attempt: u32,
3721 result: &serde_json::Value,
3722) -> BTreeMap<String, serde_json::Value> {
3723 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3724 match route {
3725 DispatchUri::A2a { .. } => {
3726 if let Some(task_id) = result
3727 .get("task_id")
3728 .or_else(|| result.get("id"))
3729 .and_then(|value| value.as_str())
3730 {
3731 metadata.insert("task_id".to_string(), serde_json::json!(task_id));
3732 }
3733 if let Some(state) = result.get("state").and_then(|value| value.as_str()) {
3734 metadata.insert("state".to_string(), serde_json::json!(state));
3735 }
3736 }
3737 DispatchUri::Worker { .. } => {
3738 if let Some(job_event_id) = result.get("job_event_id").and_then(|value| value.as_u64())
3739 {
3740 metadata.insert("job_event_id".to_string(), serde_json::json!(job_event_id));
3741 }
3742 if let Some(response_topic) = result
3743 .get("response_topic")
3744 .and_then(|value| value.as_str())
3745 {
3746 metadata.insert(
3747 "response_topic".to_string(),
3748 serde_json::json!(response_topic),
3749 );
3750 }
3751 }
3752 DispatchUri::Local { .. } => {}
3753 }
3754 metadata
3755}
3756
3757fn dispatch_error_metadata(
3758 route: &DispatchUri,
3759 binding: &TriggerBinding,
3760 event: &TriggerEvent,
3761 attempt: u32,
3762 error: &DispatchError,
3763) -> BTreeMap<String, serde_json::Value> {
3764 let mut metadata = dispatch_node_metadata(route, binding, event, attempt);
3765 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3766 metadata
3767}
3768
3769fn retry_node_metadata(
3770 binding: &TriggerBinding,
3771 event: &TriggerEvent,
3772 attempt: u32,
3773 delay: Duration,
3774 error: &DispatchError,
3775) -> BTreeMap<String, serde_json::Value> {
3776 let mut metadata = BTreeMap::new();
3777 metadata.insert(
3778 "trigger_id".to_string(),
3779 serde_json::json!(binding.id.as_str()),
3780 );
3781 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3782 metadata.insert("attempt".to_string(), serde_json::json!(attempt));
3783 metadata.insert("delay_ms".to_string(), serde_json::json!(delay.as_millis()));
3784 metadata.insert("error".to_string(), serde_json::json!(error.to_string()));
3785 metadata
3786}
3787
3788fn dlq_node_metadata(
3789 binding: &TriggerBinding,
3790 event: &TriggerEvent,
3791 attempt_count: u32,
3792 final_error: &str,
3793) -> BTreeMap<String, serde_json::Value> {
3794 let mut metadata = BTreeMap::new();
3795 metadata.insert(
3796 "trigger_id".to_string(),
3797 serde_json::json!(binding.id.as_str()),
3798 );
3799 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
3800 metadata.insert(
3801 "attempt_count".to_string(),
3802 serde_json::json!(attempt_count),
3803 );
3804 metadata.insert("final_error".to_string(), serde_json::json!(final_error));
3805 metadata
3806}
3807
3808fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
3809 match value {
3810 VmValue::Bool(result) => Ok(result),
3811 VmValue::EnumVariant {
3812 enum_name,
3813 variant,
3814 fields,
3815 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Ok" => match fields.first() {
3816 Some(VmValue::Bool(result)) => Ok(*result),
3817 Some(other) => Err(format!(
3818 "predicate Result.Ok payload must be bool, got {}",
3819 other.type_name()
3820 )),
3821 None => Err("predicate Result.Ok payload is missing".to_string()),
3822 },
3823 VmValue::EnumVariant {
3824 enum_name,
3825 variant,
3826 fields,
3827 } if enum_name.as_ref() == "Result" && variant.as_ref() == "Err" => Err(fields
3828 .first()
3829 .map(VmValue::display)
3830 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
3831 other => Err(format!(
3832 "predicate must return bool or Result<bool, _>, got {}",
3833 other.type_name()
3834 )),
3835 }
3836}
3837
3838fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
3839 micros_to_usd(
3840 binding
3841 .metrics
3842 .cost_today_usd_micros
3843 .load(Ordering::Relaxed),
3844 )
3845}
3846
3847fn current_predicate_hourly_cost(binding: &TriggerBinding) -> f64 {
3848 micros_to_usd(binding.metrics.cost_hour_usd_micros.load(Ordering::Relaxed))
3849}
3850
3851fn split_binding_key(binding_key: &str) -> (String, u32) {
3852 let Some((binding_id, suffix)) = binding_key.rsplit_once("@v") else {
3853 return (binding_key.to_string(), 0);
3854 };
3855 let version = suffix.parse::<u32>().unwrap_or(0);
3856 (binding_id.to_string(), version)
3857}
3858
3859fn is_cancelled_vm_error(error: &VmError) -> bool {
3860 matches!(
3861 error,
3862 VmError::Thrown(VmValue::String(message))
3863 if message.starts_with("kind:cancelled:")
3864 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
3865}
3866
3867fn event_headers(
3868 event: &TriggerEvent,
3869 binding: Option<&TriggerBinding>,
3870 attempt: Option<u32>,
3871 replay_of_event_id: Option<&String>,
3872) -> BTreeMap<String, String> {
3873 let mut headers = BTreeMap::new();
3874 headers.insert("event_id".to_string(), event.id.0.clone());
3875 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
3876 headers.insert("provider".to_string(), event.provider.as_str().to_string());
3877 headers.insert("kind".to_string(), event.kind.clone());
3878 if let Some(replay_of_event_id) = replay_of_event_id {
3879 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
3880 }
3881 if let Some(binding) = binding {
3882 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
3883 headers.insert("binding_key".to_string(), binding.binding_key());
3884 headers.insert(
3885 "handler_kind".to_string(),
3886 DispatchUri::from(&binding.handler).kind().to_string(),
3887 );
3888 }
3889 if let Some(attempt) = attempt {
3890 headers.insert("attempt".to_string(), attempt.to_string());
3891 }
3892 headers
3893}
3894
3895fn worker_queue_priority(
3896 binding: &super::registry::TriggerBinding,
3897 event: &TriggerEvent,
3898) -> crate::WorkerQueuePriority {
3899 match event
3900 .headers
3901 .get("priority")
3902 .map(|value| value.trim().to_ascii_lowercase())
3903 .as_deref()
3904 {
3905 Some("high") => crate::WorkerQueuePriority::High,
3906 Some("low") => crate::WorkerQueuePriority::Low,
3907 _ => binding.dispatch_priority,
3908 }
3909}
3910
3911const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
3912
3913fn maybe_fail_before_outbox() {
3914 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
3915 std::process::exit(86);
3916 }
3917}
3918
3919fn now_rfc3339() -> String {
3920 time::OffsetDateTime::now_utc()
3921 .format(&time::format_description::well_known::Rfc3339)
3922 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3923}
3924
3925fn next_budget_reset_rfc3339(binding: &TriggerBinding) -> String {
3926 let now = time::OffsetDateTime::now_utc();
3927 let reset = if binding.hourly_cost_usd.is_some() {
3928 let next_hour = (now.unix_timestamp() / 3_600 + 1) * 3_600;
3929 time::OffsetDateTime::from_unix_timestamp(next_hour).unwrap_or(now)
3930 } else {
3931 let next_day = ((now.unix_timestamp() / 86_400) + 1) * 86_400;
3932 time::OffsetDateTime::from_unix_timestamp(next_day).unwrap_or(now)
3933 };
3934 reset
3935 .format(&time::format_description::well_known::Rfc3339)
3936 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
3937}
3938
3939fn now_unix_ms() -> i64 {
3940 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
3941}
3942
3943fn cancelled_dispatch_outcome(
3944 binding: &TriggerBinding,
3945 route: &DispatchUri,
3946 event: &TriggerEvent,
3947 replay_of_event_id: Option<String>,
3948 attempt_count: u32,
3949 error: String,
3950) -> DispatchOutcome {
3951 DispatchOutcome {
3952 trigger_id: binding.id.as_str().to_string(),
3953 binding_key: binding.binding_key(),
3954 event_id: event.id.0.clone(),
3955 attempt_count,
3956 status: DispatchStatus::Cancelled,
3957 handler_kind: route.kind().to_string(),
3958 target_uri: route.target_uri(),
3959 replay_of_event_id,
3960 result: None,
3961 error: Some(error),
3962 }
3963}
3964
3965async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
3966 let _ = cancel_rx.recv().await;
3967}
3968
3969#[cfg(test)]
3970mod tests;