1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::Notify;
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25 ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26 ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40 TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53 #[cfg(test)]
54 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
55}
56
57tokio::task_local! {
58 static ACTIVE_DISPATCH_IS_REPLAY: bool;
59}
60
61#[derive(Clone, Debug)]
62pub(crate) struct DispatchContext {
63 pub trigger_event: TriggerEvent,
64 pub replay_of_event_id: Option<String>,
65 pub agent_id: String,
66 pub action: String,
67 pub autonomy_tier: AutonomyTier,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71struct PredicateCacheRecord {
72 trigger_id: String,
73 event_id: String,
74 entries: Vec<PredicateCacheEntry>,
75}
76
77#[derive(Clone, Debug, Default)]
78struct PredicateEvaluationRecord {
79 result: bool,
80 cost_usd: f64,
81 tokens: u64,
82 latency_ms: u64,
83 cached: bool,
84 reason: Option<String>,
85}
86
87pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
88 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
89}
90
91pub(crate) fn current_dispatch_is_replay() -> bool {
92 ACTIVE_DISPATCH_IS_REPLAY
93 .try_with(|is_replay| *is_replay)
94 .unwrap_or(false)
95}
96
97#[derive(Clone)]
98pub struct Dispatcher {
99 base_vm: Rc<Vm>,
100 event_log: Arc<AnyEventLog>,
101 cancel_tx: broadcast::Sender<()>,
102 state: Arc<DispatcherRuntimeState>,
103 metrics: Option<Arc<crate::MetricsRegistry>>,
104}
105
106#[derive(Debug)]
107struct DispatcherRuntimeState {
108 in_flight: AtomicU64,
109 retry_queue_depth: AtomicU64,
110 dlq: Mutex<Vec<DlqEntry>>,
111 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
112 shutting_down: std::sync::atomic::AtomicBool,
113 idle_notify: Notify,
114 flow_control: FlowControlManager,
115}
116
117impl DispatcherRuntimeState {
118 fn new(event_log: Arc<AnyEventLog>) -> Self {
119 Self {
120 in_flight: AtomicU64::new(0),
121 retry_queue_depth: AtomicU64::new(0),
122 dlq: Mutex::new(Vec::new()),
123 cancel_tokens: Mutex::new(Vec::new()),
124 shutting_down: std::sync::atomic::AtomicBool::new(false),
125 idle_notify: Notify::new(),
126 flow_control: FlowControlManager::new(event_log),
127 }
128 }
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(default)]
133pub struct DispatcherStatsSnapshot {
134 pub in_flight: u64,
135 pub retry_queue_depth: u64,
136 pub dlq_depth: u64,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum DispatchStatus {
142 Succeeded,
143 Failed,
144 Dlq,
145 Skipped,
146 Cancelled,
147}
148
149impl DispatchStatus {
150 pub fn as_str(&self) -> &'static str {
151 match self {
152 Self::Succeeded => "succeeded",
153 Self::Failed => "failed",
154 Self::Dlq => "dlq",
155 Self::Skipped => "skipped",
156 Self::Cancelled => "cancelled",
157 }
158 }
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162#[serde(default)]
163pub struct DispatchOutcome {
164 pub trigger_id: String,
165 pub binding_key: String,
166 pub event_id: String,
167 pub attempt_count: u32,
168 pub status: DispatchStatus,
169 pub handler_kind: String,
170 pub target_uri: String,
171 pub replay_of_event_id: Option<String>,
172 pub result: Option<serde_json::Value>,
173 pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct InboxEnvelope {
178 pub trigger_id: Option<String>,
179 pub binding_version: Option<u32>,
180 pub event: TriggerEvent,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(default)]
185pub struct DispatcherDrainReport {
186 pub drained: bool,
187 pub in_flight: u64,
188 pub retry_queue_depth: u64,
189 pub dlq_depth: u64,
190}
191
192impl Default for DispatchOutcome {
193 fn default() -> Self {
194 Self {
195 trigger_id: String::new(),
196 binding_key: String::new(),
197 event_id: String::new(),
198 attempt_count: 0,
199 status: DispatchStatus::Failed,
200 handler_kind: String::new(),
201 target_uri: String::new(),
202 replay_of_event_id: None,
203 result: None,
204 error: None,
205 }
206 }
207}
208
209#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
210#[serde(default)]
211pub struct DispatchAttemptRecord {
212 pub trigger_id: String,
213 pub binding_key: String,
214 pub event_id: String,
215 pub attempt: u32,
216 pub handler_kind: String,
217 pub started_at: String,
218 pub completed_at: String,
219 pub outcome: String,
220 pub error_msg: Option<String>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
224pub struct DispatchCancelRequest {
225 pub binding_key: String,
226 pub event_id: String,
227 #[serde(with = "time::serde::rfc3339")]
228 pub requested_at: time::OffsetDateTime,
229 #[serde(default)]
230 pub requested_by: Option<String>,
231 #[serde(default)]
232 pub audit_id: Option<String>,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct DlqEntry {
237 pub trigger_id: String,
238 pub binding_key: String,
239 pub event: TriggerEvent,
240 pub attempt_count: u32,
241 pub final_error: String,
242 pub attempts: Vec<DispatchAttemptRecord>,
243}
244
245#[derive(Default)]
246struct AcquiredFlowControl {
247 singleton_gate: Option<String>,
248 concurrency: Option<ConcurrencyPermit>,
249}
250
251enum FlowControlOutcome {
252 Dispatch {
253 event: Box<TriggerEvent>,
254 acquired: AcquiredFlowControl,
255 },
256 Skip {
257 reason: String,
258 },
259}
260
261#[derive(Debug)]
262pub enum DispatchError {
263 EventLog(String),
264 Registry(String),
265 Serde(String),
266 Local(String),
267 A2a(String),
268 Denied(String),
269 Timeout(String),
270 Cancelled(String),
271 NotImplemented(String),
272}
273
274impl std::fmt::Display for DispatchError {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::EventLog(message)
278 | Self::Registry(message)
279 | Self::Serde(message)
280 | Self::Local(message)
281 | Self::A2a(message)
282 | Self::Denied(message)
283 | Self::Timeout(message)
284 | Self::Cancelled(message)
285 | Self::NotImplemented(message) => f.write_str(message),
286 }
287 }
288}
289
290impl std::error::Error for DispatchError {}
291
292impl DispatchError {
293 fn retryable(&self) -> bool {
294 !matches!(
295 self,
296 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_)
297 )
298 }
299}
300
301impl From<LogError> for DispatchError {
302 fn from(value: LogError) -> Self {
303 Self::EventLog(value.to_string())
304 }
305}
306
307pub async fn append_dispatch_cancel_request(
308 event_log: &Arc<AnyEventLog>,
309 request: &DispatchCancelRequest,
310) -> Result<u64, DispatchError> {
311 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
312 .expect("static trigger cancel topic should always be valid");
313 event_log
314 .append(
315 &topic,
316 LogEvent::new(
317 "dispatch_cancel_requested",
318 serde_json::to_value(request)
319 .map_err(|error| DispatchError::Serde(error.to_string()))?,
320 ),
321 )
322 .await
323 .map_err(DispatchError::from)
324}
325
326impl Dispatcher {
327 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
328 let event_log = active_event_log().ok_or_else(|| {
329 DispatchError::EventLog("dispatcher requires an active event log".to_string())
330 })?;
331 Ok(Self::with_event_log(base_vm, event_log))
332 }
333
334 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
335 Self::with_event_log_and_metrics(base_vm, event_log, None)
336 }
337
338 pub fn with_event_log_and_metrics(
339 base_vm: Vm,
340 event_log: Arc<AnyEventLog>,
341 metrics: Option<Arc<crate::MetricsRegistry>>,
342 ) -> Self {
343 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
344 ACTIVE_DISPATCHER_STATE.with(|slot| {
345 *slot.borrow_mut() = Some(state.clone());
346 });
347 let (cancel_tx, _) = broadcast::channel(32);
348 Self {
349 base_vm: Rc::new(base_vm),
350 event_log,
351 cancel_tx,
352 state,
353 metrics,
354 }
355 }
356
357 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
358 DispatcherStatsSnapshot {
359 in_flight: self.state.in_flight.load(Ordering::Relaxed),
360 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
361 dlq_depth: self
362 .state
363 .dlq
364 .lock()
365 .expect("dispatcher dlq poisoned")
366 .len() as u64,
367 }
368 }
369
370 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
371 self.state
372 .dlq
373 .lock()
374 .expect("dispatcher dlq poisoned")
375 .clone()
376 }
377
378 pub fn shutdown(&self) {
379 self.state.shutting_down.store(true, Ordering::SeqCst);
380 for token in self
381 .state
382 .cancel_tokens
383 .lock()
384 .expect("dispatcher cancel tokens poisoned")
385 .iter()
386 {
387 token.store(true, Ordering::SeqCst);
388 }
389 let _ = self.cancel_tx.send(());
390 }
391
392 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
393 self.enqueue_targeted(None, None, event).await
394 }
395
396 pub async fn enqueue_targeted(
397 &self,
398 trigger_id: Option<String>,
399 binding_version: Option<u32>,
400 event: TriggerEvent,
401 ) -> Result<u64, DispatchError> {
402 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
403 .expect("static trigger inbox envelopes topic is valid");
404 let headers = event_headers(&event, None, None, None);
405 let payload = serde_json::to_value(InboxEnvelope {
406 trigger_id,
407 binding_version,
408 event,
409 })
410 .map_err(|error| DispatchError::Serde(error.to_string()))?;
411 self.event_log
412 .append(
413 &topic,
414 LogEvent::new("event_ingested", payload).with_headers(headers),
415 )
416 .await
417 .map_err(DispatchError::from)
418 }
419
420 pub async fn run(&self) -> Result<(), DispatchError> {
421 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
422 .expect("static trigger inbox envelopes topic is valid");
423 let start_from = self.event_log.latest(&topic).await?;
424 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
425 pin_mut!(stream);
426 let mut cancel_rx = self.cancel_tx.subscribe();
427
428 loop {
429 tokio::select! {
430 received = stream.next() => {
431 let Some(received) = received else {
432 break;
433 };
434 let (_, event) = received.map_err(DispatchError::from)?;
435 if event.kind != "event_ingested" {
436 continue;
437 }
438 let parent_headers = event.headers.clone();
439 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
440 .map_err(|error| DispatchError::Serde(error.to_string()))?;
441 notify_test_inbox_dequeued();
442 let _ = self
443 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
444 .await;
445 }
446 _ = recv_cancel(&mut cancel_rx) => break,
447 }
448 }
449
450 Ok(())
451 }
452
453 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
454 let deadline = tokio::time::Instant::now() + timeout;
455 loop {
456 let snapshot = self.snapshot();
457 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
458 return Ok(DispatcherDrainReport {
459 drained: true,
460 in_flight: snapshot.in_flight,
461 retry_queue_depth: snapshot.retry_queue_depth,
462 dlq_depth: snapshot.dlq_depth,
463 });
464 }
465
466 let notified = self.state.idle_notify.notified();
467 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
468 if remaining.is_zero() {
469 return Ok(DispatcherDrainReport {
470 drained: false,
471 in_flight: snapshot.in_flight,
472 retry_queue_depth: snapshot.retry_queue_depth,
473 dlq_depth: snapshot.dlq_depth,
474 });
475 }
476 if tokio::time::timeout(remaining, notified).await.is_err() {
477 let snapshot = self.snapshot();
478 return Ok(DispatcherDrainReport {
479 drained: false,
480 in_flight: snapshot.in_flight,
481 retry_queue_depth: snapshot.retry_queue_depth,
482 dlq_depth: snapshot.dlq_depth,
483 });
484 }
485 }
486 }
487
488 pub async fn dispatch_inbox_envelope(
489 &self,
490 envelope: InboxEnvelope,
491 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
492 self.dispatch_inbox_envelope_with_headers(envelope, None)
493 .await
494 }
495
496 async fn dispatch_inbox_envelope_with_headers(
497 &self,
498 envelope: InboxEnvelope,
499 parent_headers: Option<&BTreeMap<String, String>>,
500 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
501 if let Some(trigger_id) = envelope.trigger_id {
502 let binding = super::registry::resolve_live_trigger_binding(
503 &trigger_id,
504 envelope.binding_version,
505 )
506 .map_err(|error| DispatchError::Registry(error.to_string()))?;
507 return Ok(vec![
508 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
509 .await?,
510 ]);
511 }
512
513 let cron_target = match &envelope.event.provider_payload {
514 crate::triggers::ProviderPayload::Known(
515 crate::triggers::event::KnownProviderPayload::Cron(payload),
516 ) => payload.cron_id.clone(),
517 _ => None,
518 };
519 if let Some(trigger_id) = cron_target {
520 let binding = super::registry::resolve_live_trigger_binding(
521 &trigger_id,
522 envelope.binding_version,
523 )
524 .map_err(|error| DispatchError::Registry(error.to_string()))?;
525 return Ok(vec![
526 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
527 .await?,
528 ]);
529 }
530
531 self.dispatch_event(envelope.event).await
532 }
533
534 pub async fn dispatch_event(
535 &self,
536 event: TriggerEvent,
537 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
538 let bindings = matching_bindings(&event);
539 let mut outcomes = Vec::new();
540 for binding in bindings {
541 outcomes.push(self.dispatch(&binding, event.clone()).await?);
542 }
543 Ok(outcomes)
544 }
545
546 pub async fn dispatch(
547 &self,
548 binding: &TriggerBinding,
549 event: TriggerEvent,
550 ) -> Result<DispatchOutcome, DispatchError> {
551 self.dispatch_with_replay(binding, event, None, None, None)
552 .await
553 }
554
555 pub async fn dispatch_replay(
556 &self,
557 binding: &TriggerBinding,
558 event: TriggerEvent,
559 replay_of_event_id: String,
560 ) -> Result<DispatchOutcome, DispatchError> {
561 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
562 .await
563 }
564
565 pub async fn dispatch_with_parent_span_id(
566 &self,
567 binding: &TriggerBinding,
568 event: TriggerEvent,
569 parent_span_id: Option<String>,
570 ) -> Result<DispatchOutcome, DispatchError> {
571 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
572 .await
573 }
574
575 async fn dispatch_with_replay(
576 &self,
577 binding: &TriggerBinding,
578 event: TriggerEvent,
579 replay_of_event_id: Option<String>,
580 parent_span_id: Option<String>,
581 parent_headers: Option<&BTreeMap<String, String>>,
582 ) -> Result<DispatchOutcome, DispatchError> {
583 let span = tracing::info_span!(
584 "dispatch",
585 trigger_id = %binding.id.as_str(),
586 binding_version = binding.version,
587 trace_id = %event.trace_id.0
588 );
589 #[cfg(feature = "otel")]
590 let span_for_otel = span.clone();
591 let _ = if let Some(headers) = parent_headers {
592 crate::observability::otel::set_span_parent_from_headers(
593 &span,
594 headers,
595 &event.trace_id,
596 parent_span_id.as_deref(),
597 )
598 } else {
599 crate::observability::otel::set_span_parent(
600 &span,
601 &event.trace_id,
602 parent_span_id.as_deref(),
603 )
604 };
605 #[cfg(feature = "otel")]
606 let started_at = Instant::now();
607 let metrics = self.metrics.clone();
608 let outcome = ACTIVE_DISPATCH_IS_REPLAY
609 .scope(
610 replay_of_event_id.is_some(),
611 self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
612 .instrument(span),
613 )
614 .await;
615 if let Some(metrics) = metrics.as_ref() {
616 match &outcome {
617 Ok(dispatch_outcome) => match dispatch_outcome.status {
618 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
619 metrics.record_dispatch_succeeded();
620 }
621 _ => metrics.record_dispatch_failed(),
622 },
623 Err(_) => metrics.record_dispatch_failed(),
624 }
625 }
626 #[cfg(feature = "otel")]
627 {
628 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
629
630 let duration_ms = started_at.elapsed().as_millis() as i64;
631 let status = match &outcome {
632 Ok(dispatch_outcome) => match dispatch_outcome.status {
633 DispatchStatus::Succeeded => "succeeded",
634 DispatchStatus::Skipped => "skipped",
635 DispatchStatus::Cancelled => "cancelled",
636 DispatchStatus::Failed => "failed",
637 DispatchStatus::Dlq => "dlq",
638 },
639 Err(DispatchError::Cancelled(_)) => "cancelled",
640 Err(_) => "failed",
641 };
642 span_for_otel.set_attribute("result.status", status);
643 span_for_otel.set_attribute("result.duration_ms", duration_ms);
644 }
645 outcome
646 }
647
648 async fn dispatch_with_replay_inner(
649 &self,
650 binding: &TriggerBinding,
651 event: TriggerEvent,
652 replay_of_event_id: Option<String>,
653 ) -> Result<DispatchOutcome, DispatchError> {
654 let autonomy_tier = crate::resolve_agent_autonomy_tier(
655 &self.event_log,
656 binding.id.as_str(),
657 binding.autonomy_tier,
658 )
659 .await
660 .unwrap_or(binding.autonomy_tier);
661 let binding_key = binding.binding_key();
662 let route = DispatchUri::from(&binding.handler);
663 let trigger_id = binding.id.as_str().to_string();
664 let event_id = event.id.0.clone();
665 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
666 let begin = if replay_of_event_id.is_some() {
667 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
668 } else {
669 begin_in_flight(binding.id.as_str(), binding.version)
670 };
671 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
672
673 let mut attempts = Vec::new();
674 let mut source_node_id = format!("trigger:{}", event.id.0);
675 let mut initial_nodes = Vec::new();
676 let mut initial_edges = Vec::new();
677 if let Some(original_event_id) = replay_of_event_id.as_ref() {
678 let original_node_id = format!("trigger:{original_event_id}");
679 initial_nodes.push(RunActionGraphNodeRecord {
680 id: original_node_id.clone(),
681 label: format!(
682 "{}:{} (original {})",
683 event.provider.as_str(),
684 event.kind,
685 original_event_id
686 ),
687 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
688 status: "historical".to_string(),
689 outcome: "replayed_from".to_string(),
690 trace_id: Some(event.trace_id.0.clone()),
691 stage_id: None,
692 node_id: None,
693 worker_id: None,
694 run_id: None,
695 run_path: None,
696 });
697 initial_edges.push(RunActionGraphEdgeRecord {
698 from_id: original_node_id,
699 to_id: source_node_id.clone(),
700 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
701 label: Some("replay chain".to_string()),
702 });
703 }
704 initial_nodes.push(RunActionGraphNodeRecord {
705 id: source_node_id.clone(),
706 label: format!("{}:{}", event.provider.as_str(), event.kind),
707 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
708 status: "received".to_string(),
709 outcome: "received".to_string(),
710 trace_id: Some(event.trace_id.0.clone()),
711 stage_id: None,
712 node_id: None,
713 worker_id: None,
714 run_id: None,
715 run_path: None,
716 });
717 self.emit_action_graph(
718 &event,
719 initial_nodes,
720 initial_edges,
721 serde_json::json!({
722 "source": "dispatcher",
723 "trigger_id": trigger_id,
724 "binding_key": binding_key,
725 "event_id": event_id,
726 "replay_of_event_id": replay_of_event_id,
727 }),
728 )
729 .await?;
730
731 if dispatch_cancel_requested(
732 &self.event_log,
733 &binding_key,
734 &event.id.0,
735 replay_of_event_id.as_ref(),
736 )
737 .await?
738 {
739 finish_in_flight(
740 binding.id.as_str(),
741 binding.version,
742 TriggerDispatchOutcome::Failed,
743 )
744 .await
745 .map_err(|error| DispatchError::Registry(error.to_string()))?;
746 decrement_in_flight(&self.state);
747 return Ok(cancelled_dispatch_outcome(
748 binding,
749 &route,
750 &event,
751 replay_of_event_id,
752 0,
753 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
754 ));
755 }
756
757 if let Some(predicate) = binding.when.as_ref() {
758 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
759 let evaluation = self
760 .evaluate_predicate(
761 binding,
762 predicate,
763 &event,
764 replay_of_event_id.as_ref(),
765 autonomy_tier,
766 )
767 .await?;
768 let passed = evaluation.result;
769 self.emit_action_graph(
770 &event,
771 vec![RunActionGraphNodeRecord {
772 id: predicate_node_id.clone(),
773 label: predicate.raw.clone(),
774 kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
775 status: "completed".to_string(),
776 outcome: passed.to_string(),
777 trace_id: Some(event.trace_id.0.clone()),
778 stage_id: None,
779 node_id: None,
780 worker_id: None,
781 run_id: None,
782 run_path: None,
783 }],
784 vec![RunActionGraphEdgeRecord {
785 from_id: source_node_id.clone(),
786 to_id: predicate_node_id.clone(),
787 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
788 label: None,
789 }],
790 serde_json::json!({
791 "source": "dispatcher",
792 "trigger_id": binding.id.as_str(),
793 "binding_key": binding.binding_key(),
794 "event_id": event.id.0,
795 "predicate": predicate.raw,
796 "reason": evaluation.reason,
797 "cached": evaluation.cached,
798 "cost_usd": evaluation.cost_usd,
799 "tokens": evaluation.tokens,
800 "latency_ms": evaluation.latency_ms,
801 "replay_of_event_id": replay_of_event_id,
802 }),
803 )
804 .await?;
805
806 if !passed {
807 finish_in_flight(
808 binding.id.as_str(),
809 binding.version,
810 TriggerDispatchOutcome::Dispatched,
811 )
812 .await
813 .map_err(|error| DispatchError::Registry(error.to_string()))?;
814 decrement_in_flight(&self.state);
815 self.append_dispatch_trust_record(
816 binding,
817 &route,
818 &event,
819 replay_of_event_id.as_ref(),
820 autonomy_tier,
821 TrustOutcome::Denied,
822 "skipped",
823 0,
824 None,
825 )
826 .await?;
827 return Ok(DispatchOutcome {
828 trigger_id: binding.id.as_str().to_string(),
829 binding_key: binding.binding_key(),
830 event_id: event.id.0,
831 attempt_count: 0,
832 status: DispatchStatus::Skipped,
833 handler_kind: route.kind().to_string(),
834 target_uri: route.target_uri(),
835 replay_of_event_id,
836 result: Some(serde_json::json!({
837 "skipped": true,
838 "predicate": predicate.raw,
839 "reason": evaluation.reason,
840 })),
841 error: None,
842 });
843 }
844
845 source_node_id = predicate_node_id;
846 }
847
848 let (event, mut acquired_flow) = match self
849 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
850 .await?
851 {
852 FlowControlOutcome::Dispatch { event, acquired } => (*event, acquired),
853 FlowControlOutcome::Skip { reason } => {
854 finish_in_flight(
855 binding.id.as_str(),
856 binding.version,
857 TriggerDispatchOutcome::Dispatched,
858 )
859 .await
860 .map_err(|error| DispatchError::Registry(error.to_string()))?;
861 decrement_in_flight(&self.state);
862 return Ok(DispatchOutcome {
863 trigger_id: binding.id.as_str().to_string(),
864 binding_key: binding.binding_key(),
865 event_id: event.id.0,
866 attempt_count: 0,
867 status: DispatchStatus::Skipped,
868 handler_kind: route.kind().to_string(),
869 target_uri: route.target_uri(),
870 replay_of_event_id,
871 result: Some(serde_json::json!({
872 "skipped": true,
873 "flow_control": reason,
874 })),
875 error: None,
876 });
877 }
878 };
879
880 let mut previous_retry_node = None;
881 let max_attempts = binding.retry.max_attempts();
882 for attempt in 1..=max_attempts {
883 if dispatch_cancel_requested(
884 &self.event_log,
885 &binding_key,
886 &event.id.0,
887 replay_of_event_id.as_ref(),
888 )
889 .await?
890 {
891 finish_in_flight(
892 binding.id.as_str(),
893 binding.version,
894 TriggerDispatchOutcome::Failed,
895 )
896 .await
897 .map_err(|error| DispatchError::Registry(error.to_string()))?;
898 decrement_in_flight(&self.state);
899 return Ok(cancelled_dispatch_outcome(
900 binding,
901 &route,
902 &event,
903 replay_of_event_id,
904 attempt.saturating_sub(1),
905 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
906 ));
907 }
908 maybe_fail_before_outbox();
909 let started_at = now_rfc3339();
910 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
911 self.append_lifecycle_event(
912 "DispatchStarted",
913 &event,
914 binding,
915 serde_json::json!({
916 "event_id": event.id.0,
917 "attempt": attempt,
918 "handler_kind": route.kind(),
919 "target_uri": route.target_uri(),
920 "replay_of_event_id": replay_of_event_id,
921 }),
922 replay_of_event_id.as_ref(),
923 )
924 .await?;
925 self.append_topic_event(
926 TRIGGER_OUTBOX_TOPIC,
927 "dispatch_started",
928 &event,
929 Some(binding),
930 Some(attempt),
931 serde_json::json!({
932 "event_id": event.id.0,
933 "attempt": attempt,
934 "trigger_id": binding.id.as_str(),
935 "binding_key": binding.binding_key(),
936 "handler_kind": route.kind(),
937 "target_uri": route.target_uri(),
938 "replay_of_event_id": replay_of_event_id,
939 }),
940 replay_of_event_id.as_ref(),
941 )
942 .await?;
943
944 let mut dispatch_edges = Vec::new();
945 if attempt == 1 {
946 dispatch_edges.push(RunActionGraphEdgeRecord {
947 from_id: source_node_id.clone(),
948 to_id: attempt_node_id.clone(),
949 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
950 label: binding.when.as_ref().map(|_| "true".to_string()),
951 });
952 } else if let Some(retry_node_id) = previous_retry_node.take() {
953 dispatch_edges.push(RunActionGraphEdgeRecord {
954 from_id: retry_node_id,
955 to_id: attempt_node_id.clone(),
956 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
957 label: Some(format!("attempt {attempt}")),
958 });
959 }
960
961 self.emit_action_graph(
962 &event,
963 vec![RunActionGraphNodeRecord {
964 id: attempt_node_id.clone(),
965 label: dispatch_node_label(&route),
966 kind: dispatch_node_kind(&route).to_string(),
967 status: "running".to_string(),
968 outcome: format!("attempt_{attempt}"),
969 trace_id: Some(event.trace_id.0.clone()),
970 stage_id: None,
971 node_id: None,
972 worker_id: None,
973 run_id: None,
974 run_path: None,
975 }],
976 dispatch_edges,
977 serde_json::json!({
978 "source": "dispatcher",
979 "trigger_id": binding.id.as_str(),
980 "binding_key": binding.binding_key(),
981 "event_id": event.id.0,
982 "attempt": attempt,
983 "handler_kind": route.kind(),
984 "target_uri": route.target_uri(),
985 "target_agent": dispatch_target_agent(&route),
986 "replay_of_event_id": replay_of_event_id,
987 }),
988 )
989 .await?;
990
991 let result = self
992 .dispatch_once(
993 binding,
994 &route,
995 &event,
996 autonomy_tier,
997 &mut self.cancel_tx.subscribe(),
998 )
999 .await;
1000 let completed_at = now_rfc3339();
1001
1002 match result {
1003 Ok(result) => {
1004 let attempt_record = DispatchAttemptRecord {
1005 trigger_id: binding.id.as_str().to_string(),
1006 binding_key: binding.binding_key(),
1007 event_id: event.id.0.clone(),
1008 attempt,
1009 handler_kind: route.kind().to_string(),
1010 started_at,
1011 completed_at,
1012 outcome: "success".to_string(),
1013 error_msg: None,
1014 };
1015 attempts.push(attempt_record.clone());
1016 self.append_attempt_record(
1017 &event,
1018 binding,
1019 &attempt_record,
1020 replay_of_event_id.as_ref(),
1021 )
1022 .await?;
1023 self.append_lifecycle_event(
1024 "DispatchSucceeded",
1025 &event,
1026 binding,
1027 serde_json::json!({
1028 "event_id": event.id.0,
1029 "attempt": attempt,
1030 "handler_kind": route.kind(),
1031 "target_uri": route.target_uri(),
1032 "result": result,
1033 "replay_of_event_id": replay_of_event_id,
1034 }),
1035 replay_of_event_id.as_ref(),
1036 )
1037 .await?;
1038 self.append_topic_event(
1039 TRIGGER_OUTBOX_TOPIC,
1040 "dispatch_succeeded",
1041 &event,
1042 Some(binding),
1043 Some(attempt),
1044 serde_json::json!({
1045 "event_id": event.id.0,
1046 "attempt": attempt,
1047 "trigger_id": binding.id.as_str(),
1048 "binding_key": binding.binding_key(),
1049 "handler_kind": route.kind(),
1050 "target_uri": route.target_uri(),
1051 "result": result,
1052 "replay_of_event_id": replay_of_event_id,
1053 }),
1054 replay_of_event_id.as_ref(),
1055 )
1056 .await?;
1057 finish_in_flight(
1058 binding.id.as_str(),
1059 binding.version,
1060 TriggerDispatchOutcome::Dispatched,
1061 )
1062 .await
1063 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1064 self.release_flow_control(&mut acquired_flow).await?;
1065 decrement_in_flight(&self.state);
1066 self.append_dispatch_trust_record(
1067 binding,
1068 &route,
1069 &event,
1070 replay_of_event_id.as_ref(),
1071 autonomy_tier,
1072 TrustOutcome::Success,
1073 "succeeded",
1074 attempt,
1075 None,
1076 )
1077 .await?;
1078 return Ok(DispatchOutcome {
1079 trigger_id: binding.id.as_str().to_string(),
1080 binding_key: binding.binding_key(),
1081 event_id: event.id.0,
1082 attempt_count: attempt,
1083 status: DispatchStatus::Succeeded,
1084 handler_kind: route.kind().to_string(),
1085 target_uri: route.target_uri(),
1086 replay_of_event_id,
1087 result: Some(result),
1088 error: None,
1089 });
1090 }
1091 Err(error) => {
1092 let attempt_record = DispatchAttemptRecord {
1093 trigger_id: binding.id.as_str().to_string(),
1094 binding_key: binding.binding_key(),
1095 event_id: event.id.0.clone(),
1096 attempt,
1097 handler_kind: route.kind().to_string(),
1098 started_at,
1099 completed_at,
1100 outcome: dispatch_error_label(&error).to_string(),
1101 error_msg: Some(error.to_string()),
1102 };
1103 attempts.push(attempt_record.clone());
1104 self.append_attempt_record(
1105 &event,
1106 binding,
1107 &attempt_record,
1108 replay_of_event_id.as_ref(),
1109 )
1110 .await?;
1111 self.append_lifecycle_event(
1112 "DispatchFailed",
1113 &event,
1114 binding,
1115 serde_json::json!({
1116 "event_id": event.id.0,
1117 "attempt": attempt,
1118 "handler_kind": route.kind(),
1119 "target_uri": route.target_uri(),
1120 "error": error.to_string(),
1121 "replay_of_event_id": replay_of_event_id,
1122 }),
1123 replay_of_event_id.as_ref(),
1124 )
1125 .await?;
1126 self.append_topic_event(
1127 TRIGGER_OUTBOX_TOPIC,
1128 "dispatch_failed",
1129 &event,
1130 Some(binding),
1131 Some(attempt),
1132 serde_json::json!({
1133 "event_id": event.id.0,
1134 "attempt": attempt,
1135 "trigger_id": binding.id.as_str(),
1136 "binding_key": binding.binding_key(),
1137 "handler_kind": route.kind(),
1138 "target_uri": route.target_uri(),
1139 "error": error.to_string(),
1140 "replay_of_event_id": replay_of_event_id,
1141 }),
1142 replay_of_event_id.as_ref(),
1143 )
1144 .await?;
1145
1146 if !error.retryable() {
1147 finish_in_flight(
1148 binding.id.as_str(),
1149 binding.version,
1150 TriggerDispatchOutcome::Failed,
1151 )
1152 .await
1153 .map_err(|registry_error| {
1154 DispatchError::Registry(registry_error.to_string())
1155 })?;
1156 self.release_flow_control(&mut acquired_flow).await?;
1157 decrement_in_flight(&self.state);
1158 let trust_outcome = match error {
1159 DispatchError::Denied(_) => TrustOutcome::Denied,
1160 DispatchError::Timeout(_) => TrustOutcome::Timeout,
1161 _ => TrustOutcome::Failure,
1162 };
1163 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1164 "cancelled"
1165 } else {
1166 "failed"
1167 };
1168 self.append_dispatch_trust_record(
1169 binding,
1170 &route,
1171 &event,
1172 replay_of_event_id.as_ref(),
1173 autonomy_tier,
1174 trust_outcome,
1175 terminal_status,
1176 attempt,
1177 Some(error.to_string()),
1178 )
1179 .await?;
1180 return Ok(DispatchOutcome {
1181 trigger_id: binding.id.as_str().to_string(),
1182 binding_key: binding.binding_key(),
1183 event_id: event.id.0,
1184 attempt_count: attempt,
1185 status: if matches!(error, DispatchError::Cancelled(_)) {
1186 DispatchStatus::Cancelled
1187 } else {
1188 DispatchStatus::Failed
1189 },
1190 handler_kind: route.kind().to_string(),
1191 target_uri: route.target_uri(),
1192 replay_of_event_id,
1193 result: None,
1194 error: Some(error.to_string()),
1195 });
1196 }
1197
1198 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1199 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1200 previous_retry_node = Some(retry_node_id.clone());
1201 self.emit_action_graph(
1202 &event,
1203 vec![RunActionGraphNodeRecord {
1204 id: retry_node_id.clone(),
1205 label: format!("retry in {}ms", delay.as_millis()),
1206 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1207 status: "scheduled".to_string(),
1208 outcome: format!("attempt_{}", attempt + 1),
1209 trace_id: Some(event.trace_id.0.clone()),
1210 stage_id: None,
1211 node_id: None,
1212 worker_id: None,
1213 run_id: None,
1214 run_path: None,
1215 }],
1216 vec![RunActionGraphEdgeRecord {
1217 from_id: attempt_node_id,
1218 to_id: retry_node_id.clone(),
1219 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1220 label: Some(format!("attempt {}", attempt + 1)),
1221 }],
1222 serde_json::json!({
1223 "source": "dispatcher",
1224 "trigger_id": binding.id.as_str(),
1225 "binding_key": binding.binding_key(),
1226 "event_id": event.id.0,
1227 "attempt": attempt + 1,
1228 "delay_ms": delay.as_millis(),
1229 "replay_of_event_id": replay_of_event_id,
1230 }),
1231 )
1232 .await?;
1233 self.append_lifecycle_event(
1234 "RetryScheduled",
1235 &event,
1236 binding,
1237 serde_json::json!({
1238 "event_id": event.id.0,
1239 "attempt": attempt + 1,
1240 "delay_ms": delay.as_millis(),
1241 "error": error.to_string(),
1242 "replay_of_event_id": replay_of_event_id,
1243 }),
1244 replay_of_event_id.as_ref(),
1245 )
1246 .await?;
1247 self.append_topic_event(
1248 TRIGGER_ATTEMPTS_TOPIC,
1249 "retry_scheduled",
1250 &event,
1251 Some(binding),
1252 Some(attempt + 1),
1253 serde_json::json!({
1254 "event_id": event.id.0,
1255 "attempt": attempt + 1,
1256 "trigger_id": binding.id.as_str(),
1257 "binding_key": binding.binding_key(),
1258 "delay_ms": delay.as_millis(),
1259 "error": error.to_string(),
1260 "replay_of_event_id": replay_of_event_id,
1261 }),
1262 replay_of_event_id.as_ref(),
1263 )
1264 .await?;
1265 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1266 let sleep_result = sleep_or_cancel_or_request(
1267 &self.event_log,
1268 delay,
1269 &binding_key,
1270 &event.id.0,
1271 replay_of_event_id.as_ref(),
1272 &mut self.cancel_tx.subscribe(),
1273 )
1274 .await;
1275 decrement_retry_queue_depth(&self.state);
1276 if sleep_result.is_err() {
1277 finish_in_flight(
1278 binding.id.as_str(),
1279 binding.version,
1280 TriggerDispatchOutcome::Failed,
1281 )
1282 .await
1283 .map_err(|registry_error| {
1284 DispatchError::Registry(registry_error.to_string())
1285 })?;
1286 self.release_flow_control(&mut acquired_flow).await?;
1287 decrement_in_flight(&self.state);
1288 self.append_dispatch_trust_record(
1289 binding,
1290 &route,
1291 &event,
1292 replay_of_event_id.as_ref(),
1293 autonomy_tier,
1294 TrustOutcome::Failure,
1295 "cancelled",
1296 attempt,
1297 Some("dispatcher shutdown cancelled retry wait".to_string()),
1298 )
1299 .await?;
1300 return Ok(DispatchOutcome {
1301 trigger_id: binding.id.as_str().to_string(),
1302 binding_key: binding.binding_key(),
1303 event_id: event.id.0,
1304 attempt_count: attempt,
1305 status: DispatchStatus::Cancelled,
1306 handler_kind: route.kind().to_string(),
1307 target_uri: route.target_uri(),
1308 replay_of_event_id,
1309 result: None,
1310 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1311 });
1312 }
1313 continue;
1314 }
1315
1316 let final_error = error.to_string();
1317 let dlq_entry = DlqEntry {
1318 trigger_id: binding.id.as_str().to_string(),
1319 binding_key: binding.binding_key(),
1320 event: event.clone(),
1321 attempt_count: attempt,
1322 final_error: final_error.clone(),
1323 attempts: attempts.clone(),
1324 };
1325 self.state
1326 .dlq
1327 .lock()
1328 .expect("dispatcher dlq poisoned")
1329 .push(dlq_entry.clone());
1330 self.emit_action_graph(
1331 &event,
1332 vec![RunActionGraphNodeRecord {
1333 id: format!("dlq:{binding_key}:{}", event.id.0),
1334 label: binding.id.as_str().to_string(),
1335 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1336 status: "queued".to_string(),
1337 outcome: "retry_exhausted".to_string(),
1338 trace_id: Some(event.trace_id.0.clone()),
1339 stage_id: None,
1340 node_id: None,
1341 worker_id: None,
1342 run_id: None,
1343 run_path: None,
1344 }],
1345 vec![RunActionGraphEdgeRecord {
1346 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1347 to_id: format!("dlq:{binding_key}:{}", event.id.0),
1348 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1349 label: Some(format!("{attempt} attempts")),
1350 }],
1351 serde_json::json!({
1352 "source": "dispatcher",
1353 "trigger_id": binding.id.as_str(),
1354 "binding_key": binding.binding_key(),
1355 "event_id": event.id.0,
1356 "attempt_count": attempt,
1357 "final_error": final_error,
1358 "replay_of_event_id": replay_of_event_id,
1359 }),
1360 )
1361 .await?;
1362 self.append_lifecycle_event(
1363 "DlqMoved",
1364 &event,
1365 binding,
1366 serde_json::json!({
1367 "event_id": event.id.0,
1368 "attempt_count": attempt,
1369 "final_error": dlq_entry.final_error,
1370 "replay_of_event_id": replay_of_event_id,
1371 }),
1372 replay_of_event_id.as_ref(),
1373 )
1374 .await?;
1375 self.append_topic_event(
1376 TRIGGER_DLQ_TOPIC,
1377 "dlq_moved",
1378 &event,
1379 Some(binding),
1380 Some(attempt),
1381 serde_json::to_value(&dlq_entry)
1382 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1383 replay_of_event_id.as_ref(),
1384 )
1385 .await?;
1386 finish_in_flight(
1387 binding.id.as_str(),
1388 binding.version,
1389 TriggerDispatchOutcome::Dlq,
1390 )
1391 .await
1392 .map_err(|registry_error| {
1393 DispatchError::Registry(registry_error.to_string())
1394 })?;
1395 self.release_flow_control(&mut acquired_flow).await?;
1396 decrement_in_flight(&self.state);
1397 self.append_dispatch_trust_record(
1398 binding,
1399 &route,
1400 &event,
1401 replay_of_event_id.as_ref(),
1402 autonomy_tier,
1403 TrustOutcome::Failure,
1404 "dlq",
1405 attempt,
1406 Some(error.to_string()),
1407 )
1408 .await?;
1409 return Ok(DispatchOutcome {
1410 trigger_id: binding.id.as_str().to_string(),
1411 binding_key: binding.binding_key(),
1412 event_id: event.id.0,
1413 attempt_count: attempt,
1414 status: DispatchStatus::Dlq,
1415 handler_kind: route.kind().to_string(),
1416 target_uri: route.target_uri(),
1417 replay_of_event_id,
1418 result: None,
1419 error: Some(error.to_string()),
1420 });
1421 }
1422 }
1423 }
1424
1425 finish_in_flight(
1426 binding.id.as_str(),
1427 binding.version,
1428 TriggerDispatchOutcome::Failed,
1429 )
1430 .await
1431 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1432 self.release_flow_control(&mut acquired_flow).await?;
1433 decrement_in_flight(&self.state);
1434 self.append_dispatch_trust_record(
1435 binding,
1436 &route,
1437 &event,
1438 replay_of_event_id.as_ref(),
1439 autonomy_tier,
1440 TrustOutcome::Failure,
1441 "failed",
1442 max_attempts,
1443 Some("dispatch exhausted without terminal outcome".to_string()),
1444 )
1445 .await?;
1446 Ok(DispatchOutcome {
1447 trigger_id: binding.id.as_str().to_string(),
1448 binding_key: binding.binding_key(),
1449 event_id: event.id.0,
1450 attempt_count: max_attempts,
1451 status: DispatchStatus::Failed,
1452 handler_kind: route.kind().to_string(),
1453 target_uri: route.target_uri(),
1454 replay_of_event_id,
1455 result: None,
1456 error: Some("dispatch exhausted without terminal outcome".to_string()),
1457 })
1458 }
1459
1460 async fn dispatch_once(
1461 &self,
1462 binding: &TriggerBinding,
1463 route: &DispatchUri,
1464 event: &TriggerEvent,
1465 autonomy_tier: AutonomyTier,
1466 cancel_rx: &mut broadcast::Receiver<()>,
1467 ) -> Result<serde_json::Value, DispatchError> {
1468 match route {
1469 DispatchUri::Local { .. } => {
1470 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1471 return Err(DispatchError::Local(format!(
1472 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1473 binding.id.as_str()
1474 )));
1475 };
1476 let value = self
1477 .invoke_vm_callable(
1478 closure,
1479 &binding.binding_key(),
1480 event,
1481 None,
1482 binding.id.as_str(),
1483 &format!("{}.{}", event.provider.as_str(), event.kind),
1484 autonomy_tier,
1485 cancel_rx,
1486 )
1487 .await?;
1488 Ok(vm_value_to_json(&value))
1489 }
1490 DispatchUri::A2a {
1491 target,
1492 allow_cleartext,
1493 } => {
1494 if self.state.shutting_down.load(Ordering::SeqCst) {
1495 return Err(DispatchError::Cancelled(
1496 "dispatcher shutdown cancelled A2A dispatch".to_string(),
1497 ));
1498 }
1499 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1500 target,
1501 *allow_cleartext,
1502 binding.id.as_str(),
1503 &binding.binding_key(),
1504 event,
1505 cancel_rx,
1506 )
1507 .await
1508 .map_err(|error| match error {
1509 crate::a2a::A2aClientError::Cancelled(message) => {
1510 DispatchError::Cancelled(message)
1511 }
1512 other => DispatchError::A2a(other.to_string()),
1513 })?;
1514 match ack {
1515 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1516 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1517 }
1518 }
1519 DispatchUri::Worker { queue } => {
1520 let receipt = crate::WorkerQueue::new(self.event_log.clone())
1521 .enqueue(&crate::WorkerQueueJob {
1522 queue: queue.clone(),
1523 trigger_id: binding.id.as_str().to_string(),
1524 binding_key: binding.binding_key(),
1525 binding_version: binding.version,
1526 event: event.clone(),
1527 replay_of_event_id: current_dispatch_context()
1528 .and_then(|context| context.replay_of_event_id),
1529 priority: worker_queue_priority(binding, event),
1530 })
1531 .await
1532 .map_err(DispatchError::from)?;
1533 Ok(serde_json::to_value(receipt)
1534 .map_err(|error| DispatchError::Serde(error.to_string()))?)
1535 }
1536 }
1537 }
1538
1539 async fn apply_flow_control(
1540 &self,
1541 binding: &TriggerBinding,
1542 event: &TriggerEvent,
1543 replay_of_event_id: Option<&String>,
1544 ) -> Result<FlowControlOutcome, DispatchError> {
1545 let flow = &binding.flow_control;
1546 let mut managed_event = event.clone();
1547
1548 if let Some(batch) = &flow.batch {
1549 let gate = self
1550 .resolve_flow_gate(
1551 &binding.binding_key(),
1552 batch.key.as_ref(),
1553 &managed_event,
1554 replay_of_event_id,
1555 )
1556 .await?;
1557 match self
1558 .state
1559 .flow_control
1560 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1561 .await
1562 .map_err(DispatchError::from)?
1563 {
1564 BatchDecision::Dispatch(events) => {
1565 managed_event = build_batched_event(events)?;
1566 }
1567 BatchDecision::Merged => {
1568 return Ok(FlowControlOutcome::Skip {
1569 reason: "batch_merged".to_string(),
1570 })
1571 }
1572 }
1573 }
1574
1575 if let Some(debounce) = &flow.debounce {
1576 let gate = self
1577 .resolve_flow_gate(
1578 &binding.binding_key(),
1579 Some(&debounce.key),
1580 &managed_event,
1581 replay_of_event_id,
1582 )
1583 .await?;
1584 let latest = self
1585 .state
1586 .flow_control
1587 .debounce(&gate, debounce.period)
1588 .await
1589 .map_err(DispatchError::from)?;
1590 if !latest {
1591 return Ok(FlowControlOutcome::Skip {
1592 reason: "debounced".to_string(),
1593 });
1594 }
1595 }
1596
1597 if let Some(rate_limit) = &flow.rate_limit {
1598 let gate = self
1599 .resolve_flow_gate(
1600 &binding.binding_key(),
1601 rate_limit.key.as_ref(),
1602 &managed_event,
1603 replay_of_event_id,
1604 )
1605 .await?;
1606 let allowed = self
1607 .state
1608 .flow_control
1609 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1610 .await
1611 .map_err(DispatchError::from)?;
1612 if !allowed {
1613 return Ok(FlowControlOutcome::Skip {
1614 reason: "rate_limited".to_string(),
1615 });
1616 }
1617 }
1618
1619 if let Some(throttle) = &flow.throttle {
1620 let gate = self
1621 .resolve_flow_gate(
1622 &binding.binding_key(),
1623 throttle.key.as_ref(),
1624 &managed_event,
1625 replay_of_event_id,
1626 )
1627 .await?;
1628 self.state
1629 .flow_control
1630 .wait_for_throttle(&gate, throttle.period, throttle.max)
1631 .await
1632 .map_err(DispatchError::from)?;
1633 }
1634
1635 let mut acquired = AcquiredFlowControl::default();
1636 if let Some(singleton) = &flow.singleton {
1637 let gate = self
1638 .resolve_flow_gate(
1639 &binding.binding_key(),
1640 singleton.key.as_ref(),
1641 &managed_event,
1642 replay_of_event_id,
1643 )
1644 .await?;
1645 let acquired_singleton = self
1646 .state
1647 .flow_control
1648 .try_acquire_singleton(&gate)
1649 .await
1650 .map_err(DispatchError::from)?;
1651 if !acquired_singleton {
1652 return Ok(FlowControlOutcome::Skip {
1653 reason: "singleton_active".to_string(),
1654 });
1655 }
1656 acquired.singleton_gate = Some(gate);
1657 }
1658
1659 if let Some(concurrency) = &flow.concurrency {
1660 let gate = self
1661 .resolve_flow_gate(
1662 &binding.binding_key(),
1663 concurrency.key.as_ref(),
1664 &managed_event,
1665 replay_of_event_id,
1666 )
1667 .await?;
1668 let priority_rank = self
1669 .resolve_priority_rank(
1670 &binding.binding_key(),
1671 flow.priority.as_ref(),
1672 &managed_event,
1673 replay_of_event_id,
1674 )
1675 .await?;
1676 acquired.concurrency = Some(
1677 self.state
1678 .flow_control
1679 .acquire_concurrency(&gate, concurrency.max, priority_rank)
1680 .await
1681 .map_err(DispatchError::from)?,
1682 );
1683 }
1684
1685 Ok(FlowControlOutcome::Dispatch {
1686 event: Box::new(managed_event),
1687 acquired,
1688 })
1689 }
1690
1691 async fn release_flow_control(
1692 &self,
1693 acquired: &mut AcquiredFlowControl,
1694 ) -> Result<(), DispatchError> {
1695 if let Some(gate) = acquired.singleton_gate.take() {
1696 self.state
1697 .flow_control
1698 .release_singleton(&gate)
1699 .await
1700 .map_err(DispatchError::from)?;
1701 }
1702 if let Some(permit) = acquired.concurrency.take() {
1703 self.state
1704 .flow_control
1705 .release_concurrency(permit)
1706 .await
1707 .map_err(DispatchError::from)?;
1708 }
1709 Ok(())
1710 }
1711
1712 async fn resolve_flow_gate(
1713 &self,
1714 binding_key: &str,
1715 expr: Option<&crate::triggers::TriggerExpressionSpec>,
1716 event: &TriggerEvent,
1717 replay_of_event_id: Option<&String>,
1718 ) -> Result<String, DispatchError> {
1719 let key = match expr {
1720 Some(expr) => {
1721 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
1722 .await?
1723 }
1724 None => "_global".to_string(),
1725 };
1726 Ok(format!("{binding_key}:{key}"))
1727 }
1728
1729 async fn resolve_priority_rank(
1730 &self,
1731 binding_key: &str,
1732 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
1733 event: &TriggerEvent,
1734 replay_of_event_id: Option<&String>,
1735 ) -> Result<usize, DispatchError> {
1736 let Some(priority) = priority else {
1737 return Ok(0);
1738 };
1739 let value = self
1740 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
1741 .await?;
1742 Ok(priority
1743 .order
1744 .iter()
1745 .position(|candidate| candidate == &value)
1746 .unwrap_or(priority.order.len()))
1747 }
1748
1749 async fn evaluate_flow_expression(
1750 &self,
1751 binding_key: &str,
1752 expr: &crate::triggers::TriggerExpressionSpec,
1753 event: &TriggerEvent,
1754 replay_of_event_id: Option<&String>,
1755 ) -> Result<String, DispatchError> {
1756 let value = self
1757 .invoke_vm_callable(
1758 &expr.closure,
1759 binding_key,
1760 event,
1761 replay_of_event_id,
1762 "",
1763 "flow_control",
1764 AutonomyTier::Suggest,
1765 &mut self.cancel_tx.subscribe(),
1766 )
1767 .await?;
1768 Ok(json_value_to_gate(&vm_value_to_json(&value)))
1769 }
1770
1771 #[allow(clippy::too_many_arguments)]
1772 async fn invoke_vm_callable(
1773 &self,
1774 closure: &crate::value::VmClosure,
1775 binding_key: &str,
1776 event: &TriggerEvent,
1777 replay_of_event_id: Option<&String>,
1778 agent_id: &str,
1779 action: &str,
1780 autonomy_tier: AutonomyTier,
1781 cancel_rx: &mut broadcast::Receiver<()>,
1782 ) -> Result<VmValue, DispatchError> {
1783 let mut vm = self.base_vm.child_vm();
1784 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
1785 if self.state.shutting_down.load(Ordering::SeqCst) {
1786 cancel_token.store(true, Ordering::SeqCst);
1787 }
1788 self.state
1789 .cancel_tokens
1790 .lock()
1791 .expect("dispatcher cancel tokens poisoned")
1792 .push(cancel_token.clone());
1793 vm.install_cancel_token(cancel_token.clone());
1794 let arg = json_to_vm_value(
1795 &serde_json::to_value(event)
1796 .map_err(|error| DispatchError::Serde(error.to_string()))?,
1797 );
1798 let args = [arg];
1799 let future = vm.call_closure_pub(closure, &args, &[]);
1800 pin_mut!(future);
1801 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1802 slot.borrow_mut().replace(DispatchContext {
1803 trigger_event: event.clone(),
1804 replay_of_event_id: replay_of_event_id.cloned(),
1805 agent_id: agent_id.to_string(),
1806 action: action.to_string(),
1807 autonomy_tier,
1808 })
1809 });
1810 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
1811 crate::stdlib::hitl::reset_hitl_state();
1812 let mut poll = tokio::time::interval(Duration::from_millis(100));
1813 let result = loop {
1814 tokio::select! {
1815 result = &mut future => break result,
1816 _ = recv_cancel(cancel_rx) => {
1817 cancel_token.store(true, Ordering::SeqCst);
1818 }
1819 _ = poll.tick() => {
1820 if dispatch_cancel_requested(
1821 &self.event_log,
1822 binding_key,
1823 &event.id.0,
1824 replay_of_event_id,
1825 )
1826 .await? {
1827 cancel_token.store(true, Ordering::SeqCst);
1828 }
1829 }
1830 }
1831 };
1832 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1833 *slot.borrow_mut() = prior_context;
1834 });
1835 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
1836 {
1837 let mut tokens = self
1838 .state
1839 .cancel_tokens
1840 .lock()
1841 .expect("dispatcher cancel tokens poisoned");
1842 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
1843 }
1844
1845 if cancel_token.load(Ordering::SeqCst) {
1846 if dispatch_cancel_requested(
1847 &self.event_log,
1848 binding_key,
1849 &event.id.0,
1850 replay_of_event_id,
1851 )
1852 .await?
1853 {
1854 Err(DispatchError::Cancelled(
1855 "trigger cancel request cancelled local handler".to_string(),
1856 ))
1857 } else {
1858 Err(DispatchError::Cancelled(
1859 "dispatcher shutdown cancelled local handler".to_string(),
1860 ))
1861 }
1862 } else {
1863 result.map_err(dispatch_error_from_vm_error)
1864 }
1865 }
1866
1867 async fn evaluate_predicate(
1868 &self,
1869 binding: &TriggerBinding,
1870 predicate: &super::registry::TriggerPredicateSpec,
1871 event: &TriggerEvent,
1872 replay_of_event_id: Option<&String>,
1873 autonomy_tier: AutonomyTier,
1874 ) -> Result<PredicateEvaluationRecord, DispatchError> {
1875 let event_id = event.id.0.clone();
1876 let trigger_id = binding.id.as_str().to_string();
1877 let now_ms = now_unix_ms();
1878 let today = utc_day_key();
1879
1880 let breaker_open_until = {
1881 let mut state = binding
1882 .predicate_state
1883 .lock()
1884 .expect("trigger predicate state poisoned");
1885 if state.budget_day_utc != Some(today) {
1886 state.budget_day_utc = Some(today);
1887 binding
1888 .metrics
1889 .cost_today_usd_micros
1890 .store(0, Ordering::Relaxed);
1891 }
1892 if state
1893 .breaker_open_until_ms
1894 .is_some_and(|until_ms| until_ms > now_ms)
1895 {
1896 state.breaker_open_until_ms
1897 } else {
1898 None
1899 }
1900 };
1901
1902 if breaker_open_until.is_some() {
1903 let mut metadata = BTreeMap::new();
1904 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
1905 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
1906 metadata.insert(
1907 "breaker_open_until_ms".to_string(),
1908 serde_json::json!(breaker_open_until),
1909 );
1910 crate::events::log_warn_meta(
1911 "trigger.predicate.circuit_breaker",
1912 "trigger predicate circuit breaker is open; short-circuiting to false",
1913 metadata,
1914 );
1915 let record = PredicateEvaluationRecord {
1916 result: false,
1917 reason: Some("circuit_open".to_string()),
1918 ..Default::default()
1919 };
1920 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1921 .await?;
1922 return Ok(record);
1923 }
1924
1925 if binding
1926 .daily_cost_usd
1927 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
1928 {
1929 self.append_lifecycle_event(
1930 "predicate.daily_budget_exceeded",
1931 event,
1932 binding,
1933 serde_json::json!({
1934 "trigger_id": binding.id.as_str(),
1935 "event_id": event.id.0,
1936 "limit_usd": binding.daily_cost_usd,
1937 "cost_today_usd": current_predicate_daily_cost(binding),
1938 "replay_of_event_id": replay_of_event_id,
1939 }),
1940 replay_of_event_id,
1941 )
1942 .await?;
1943 let record = PredicateEvaluationRecord {
1944 result: false,
1945 reason: Some("daily_budget_exceeded".to_string()),
1946 ..Default::default()
1947 };
1948 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1949 .await?;
1950 return Ok(record);
1951 }
1952
1953 let replay_cache = self
1954 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
1955 .await?;
1956 let guard = start_predicate_evaluation(
1957 binding.when_budget.clone().unwrap_or_default(),
1958 replay_cache,
1959 );
1960 let started = std::time::Instant::now();
1961 let eval = self
1962 .invoke_vm_callable_with_timeout(
1963 &predicate.closure,
1964 &binding.binding_key(),
1965 event,
1966 replay_of_event_id,
1967 binding.id.as_str(),
1968 &format!("{}.{}", event.provider.as_str(), event.kind),
1969 autonomy_tier,
1970 &mut self.cancel_tx.subscribe(),
1971 binding
1972 .when_budget
1973 .as_ref()
1974 .and_then(|budget| budget.timeout()),
1975 )
1976 .await;
1977 let capture = guard.finish();
1978 let latency_ms = started.elapsed().as_millis() as u64;
1979 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
1980 self.append_predicate_cache_record(binding, event, &capture.entries)
1981 .await?;
1982 }
1983
1984 let mut record = PredicateEvaluationRecord {
1985 result: false,
1986 cost_usd: capture.total_cost_usd,
1987 tokens: capture.total_tokens,
1988 latency_ms,
1989 cached: capture.cached,
1990 reason: None,
1991 };
1992
1993 let mut count_failure = false;
1994 let mut opened_breaker = false;
1995
1996 match eval {
1997 Ok(value) => match predicate_value_as_bool(value) {
1998 Ok(result) => {
1999 record.result = result;
2000 }
2001 Err(reason) => {
2002 count_failure = true;
2003 record.reason = Some(reason);
2004 }
2005 },
2006 Err(error) => {
2007 count_failure = true;
2008 record.reason = Some(error.to_string());
2009 }
2010 }
2011
2012 let cost_usd_micros = usd_to_micros(record.cost_usd);
2013 if cost_usd_micros > 0 {
2014 binding
2015 .metrics
2016 .cost_total_usd_micros
2017 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2018 binding
2019 .metrics
2020 .cost_today_usd_micros
2021 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2022 }
2023
2024 let timed_out = matches!(
2025 record.reason.as_deref(),
2026 Some("predicate evaluation timed out")
2027 );
2028 if capture.budget_exceeded || timed_out {
2029 record.result = false;
2030 record.reason = Some("budget_exceeded".to_string());
2031 self.append_lifecycle_event(
2032 "predicate.budget_exceeded",
2033 event,
2034 binding,
2035 serde_json::json!({
2036 "trigger_id": binding.id.as_str(),
2037 "event_id": event.id.0,
2038 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2039 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2040 "cost_usd": record.cost_usd,
2041 "tokens": record.tokens,
2042 "replay_of_event_id": replay_of_event_id,
2043 }),
2044 replay_of_event_id,
2045 )
2046 .await?;
2047 }
2048
2049 if binding
2050 .daily_cost_usd
2051 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2052 {
2053 record.result = false;
2054 record.reason = Some("daily_budget_exceeded".to_string());
2055 self.append_lifecycle_event(
2056 "predicate.daily_budget_exceeded",
2057 event,
2058 binding,
2059 serde_json::json!({
2060 "trigger_id": binding.id.as_str(),
2061 "event_id": event.id.0,
2062 "limit_usd": binding.daily_cost_usd,
2063 "cost_today_usd": current_predicate_daily_cost(binding),
2064 "replay_of_event_id": replay_of_event_id,
2065 }),
2066 replay_of_event_id,
2067 )
2068 .await?;
2069 }
2070
2071 {
2072 let mut state = binding
2073 .predicate_state
2074 .lock()
2075 .expect("trigger predicate state poisoned");
2076 if state.budget_day_utc != Some(today) {
2077 state.budget_day_utc = Some(today);
2078 binding
2079 .metrics
2080 .cost_today_usd_micros
2081 .store(cost_usd_micros, Ordering::Relaxed);
2082 }
2083 if count_failure {
2084 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2085 if state.consecutive_failures >= 3 {
2086 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2087 opened_breaker = true;
2088 }
2089 } else {
2090 state.consecutive_failures = 0;
2091 state.breaker_open_until_ms = None;
2092 }
2093 }
2094
2095 if opened_breaker {
2096 let mut metadata = BTreeMap::new();
2097 metadata.insert(
2098 "trigger_id".to_string(),
2099 serde_json::json!(binding.id.as_str()),
2100 );
2101 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2102 metadata.insert("failure_count".to_string(), serde_json::json!(3));
2103 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2104 crate::events::log_warn_meta(
2105 "trigger.predicate.circuit_breaker",
2106 "trigger predicate circuit breaker opened for 5 minutes",
2107 metadata,
2108 );
2109 }
2110
2111 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2112 .await?;
2113 Ok(record)
2114 }
2115
2116 #[allow(clippy::too_many_arguments)]
2117 #[allow(clippy::too_many_arguments)]
2118 async fn invoke_vm_callable_with_timeout(
2119 &self,
2120 closure: &crate::value::VmClosure,
2121 binding_key: &str,
2122 event: &TriggerEvent,
2123 replay_of_event_id: Option<&String>,
2124 agent_id: &str,
2125 action: &str,
2126 autonomy_tier: AutonomyTier,
2127 cancel_rx: &mut broadcast::Receiver<()>,
2128 timeout: Option<Duration>,
2129 ) -> Result<VmValue, DispatchError> {
2130 let future = self.invoke_vm_callable(
2131 closure,
2132 binding_key,
2133 event,
2134 replay_of_event_id,
2135 agent_id,
2136 action,
2137 autonomy_tier,
2138 cancel_rx,
2139 );
2140 pin_mut!(future);
2141 if let Some(timeout) = timeout {
2142 match tokio::time::timeout(timeout, future).await {
2143 Ok(result) => result,
2144 Err(_) => Err(DispatchError::Local(
2145 "predicate evaluation timed out".to_string(),
2146 )),
2147 }
2148 } else {
2149 future.await
2150 }
2151 }
2152
2153 async fn append_predicate_evaluated_event(
2154 &self,
2155 binding: &TriggerBinding,
2156 event: &TriggerEvent,
2157 record: &PredicateEvaluationRecord,
2158 replay_of_event_id: Option<&String>,
2159 ) -> Result<(), DispatchError> {
2160 self.append_lifecycle_event(
2161 "predicate.evaluated",
2162 event,
2163 binding,
2164 serde_json::json!({
2165 "trigger_id": binding.id.as_str(),
2166 "event_id": event.id.0,
2167 "result": record.result,
2168 "cost_usd": record.cost_usd,
2169 "tokens": record.tokens,
2170 "latency_ms": record.latency_ms,
2171 "cached": record.cached,
2172 "reason": record.reason,
2173 "replay_of_event_id": replay_of_event_id,
2174 }),
2175 replay_of_event_id,
2176 )
2177 .await
2178 }
2179
2180 async fn append_predicate_cache_record(
2181 &self,
2182 binding: &TriggerBinding,
2183 event: &TriggerEvent,
2184 entries: &[PredicateCacheEntry],
2185 ) -> Result<(), DispatchError> {
2186 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2187 .expect("static trigger inbox legacy topic name is valid");
2188 let payload = serde_json::to_value(PredicateCacheRecord {
2189 trigger_id: binding.id.as_str().to_string(),
2190 event_id: event.id.0.clone(),
2191 entries: entries.to_vec(),
2192 })
2193 .map_err(|error| DispatchError::Serde(error.to_string()))?;
2194 self.event_log
2195 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2196 .await
2197 .map_err(DispatchError::from)
2198 .map(|_| ())
2199 }
2200
2201 async fn read_predicate_cache_record(
2202 &self,
2203 event_id: &str,
2204 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2205 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2206 .expect("static trigger inbox legacy topic name is valid");
2207 let records = self
2208 .event_log
2209 .read_range(&topic, None, usize::MAX)
2210 .await
2211 .map_err(DispatchError::from)?;
2212 Ok(records
2213 .into_iter()
2214 .filter(|(_, event)| event.kind == "predicate_llm_cache")
2215 .filter_map(|(_, event)| {
2216 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2217 })
2218 .filter(|record| record.event_id == event_id)
2219 .flat_map(|record| record.entries)
2220 .collect())
2221 }
2222
2223 #[allow(clippy::too_many_arguments)]
2224 async fn append_dispatch_trust_record(
2225 &self,
2226 binding: &TriggerBinding,
2227 route: &DispatchUri,
2228 event: &TriggerEvent,
2229 replay_of_event_id: Option<&String>,
2230 autonomy_tier: AutonomyTier,
2231 outcome: TrustOutcome,
2232 terminal_status: &str,
2233 attempt_count: u32,
2234 error: Option<String>,
2235 ) -> Result<(), DispatchError> {
2236 let mut record = TrustRecord::new(
2237 binding.id.as_str().to_string(),
2238 format!("{}.{}", event.provider.as_str(), event.kind),
2239 None,
2240 outcome,
2241 event.trace_id.0.clone(),
2242 autonomy_tier,
2243 );
2244 record.metadata.insert(
2245 "binding_key".to_string(),
2246 serde_json::json!(binding.binding_key()),
2247 );
2248 record.metadata.insert(
2249 "binding_version".to_string(),
2250 serde_json::json!(binding.version),
2251 );
2252 record.metadata.insert(
2253 "provider".to_string(),
2254 serde_json::json!(event.provider.as_str()),
2255 );
2256 record
2257 .metadata
2258 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2259 record
2260 .metadata
2261 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2262 record.metadata.insert(
2263 "target_uri".to_string(),
2264 serde_json::json!(route.target_uri()),
2265 );
2266 record.metadata.insert(
2267 "terminal_status".to_string(),
2268 serde_json::json!(terminal_status),
2269 );
2270 record.metadata.insert(
2271 "attempt_count".to_string(),
2272 serde_json::json!(attempt_count),
2273 );
2274 if let Some(replay_of_event_id) = replay_of_event_id {
2275 record.metadata.insert(
2276 "replay_of_event_id".to_string(),
2277 serde_json::json!(replay_of_event_id),
2278 );
2279 }
2280 if let Some(error) = error {
2281 record
2282 .metadata
2283 .insert("error".to_string(), serde_json::json!(error));
2284 }
2285 append_trust_record(&self.event_log, &record)
2286 .await
2287 .map_err(DispatchError::from)
2288 }
2289
2290 async fn append_attempt_record(
2291 &self,
2292 event: &TriggerEvent,
2293 binding: &TriggerBinding,
2294 attempt: &DispatchAttemptRecord,
2295 replay_of_event_id: Option<&String>,
2296 ) -> Result<(), DispatchError> {
2297 self.append_topic_event(
2298 TRIGGER_ATTEMPTS_TOPIC,
2299 "attempt_recorded",
2300 event,
2301 Some(binding),
2302 Some(attempt.attempt),
2303 serde_json::to_value(attempt)
2304 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2305 replay_of_event_id,
2306 )
2307 .await
2308 }
2309
2310 async fn append_lifecycle_event(
2311 &self,
2312 kind: &str,
2313 event: &TriggerEvent,
2314 binding: &TriggerBinding,
2315 payload: serde_json::Value,
2316 replay_of_event_id: Option<&String>,
2317 ) -> Result<(), DispatchError> {
2318 self.append_topic_event(
2319 TRIGGERS_LIFECYCLE_TOPIC,
2320 kind,
2321 event,
2322 Some(binding),
2323 None,
2324 payload,
2325 replay_of_event_id,
2326 )
2327 .await
2328 }
2329
2330 async fn append_topic_event(
2331 &self,
2332 topic_name: &str,
2333 kind: &str,
2334 event: &TriggerEvent,
2335 binding: Option<&TriggerBinding>,
2336 attempt: Option<u32>,
2337 payload: serde_json::Value,
2338 replay_of_event_id: Option<&String>,
2339 ) -> Result<(), DispatchError> {
2340 let topic = Topic::new(topic_name)
2341 .expect("static trigger dispatcher topic names should always be valid");
2342 let headers = event_headers(event, binding, attempt, replay_of_event_id);
2343 self.event_log
2344 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2345 .await
2346 .map_err(DispatchError::from)
2347 .map(|_| ())
2348 }
2349
2350 async fn emit_action_graph(
2351 &self,
2352 event: &TriggerEvent,
2353 nodes: Vec<RunActionGraphNodeRecord>,
2354 edges: Vec<RunActionGraphEdgeRecord>,
2355 extra: serde_json::Value,
2356 ) -> Result<(), DispatchError> {
2357 let mut headers = BTreeMap::new();
2358 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2359 headers.insert("event_id".to_string(), event.id.0.clone());
2360 let observability = RunObservabilityRecord {
2361 schema_version: 1,
2362 action_graph_nodes: nodes,
2363 action_graph_edges: edges,
2364 ..Default::default()
2365 };
2366 append_action_graph_update(
2367 headers,
2368 serde_json::json!({
2369 "source": "dispatcher",
2370 "trace_id": event.trace_id.0,
2371 "event_id": event.id.0,
2372 "observability": observability,
2373 "context": extra,
2374 }),
2375 )
2376 .await
2377 .map_err(DispatchError::from)
2378 }
2379}
2380
2381async fn dispatch_cancel_requested(
2382 event_log: &Arc<AnyEventLog>,
2383 binding_key: &str,
2384 event_id: &str,
2385 replay_of_event_id: Option<&String>,
2386) -> Result<bool, DispatchError> {
2387 if replay_of_event_id.is_some() {
2388 return Ok(false);
2389 }
2390 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2391 .expect("static trigger cancel topic should always be valid");
2392 let events = event_log.read_range(&topic, None, usize::MAX).await?;
2393 let requested = events
2394 .into_iter()
2395 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2396 .filter_map(|(_, event)| {
2397 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2398 })
2399 .collect::<BTreeSet<_>>();
2400 Ok(requested
2401 .iter()
2402 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2403}
2404
2405async fn sleep_or_cancel_or_request(
2406 event_log: &Arc<AnyEventLog>,
2407 delay: Duration,
2408 binding_key: &str,
2409 event_id: &str,
2410 replay_of_event_id: Option<&String>,
2411 cancel_rx: &mut broadcast::Receiver<()>,
2412) -> Result<(), DispatchError> {
2413 let sleep = tokio::time::sleep(delay);
2414 pin_mut!(sleep);
2415 let mut poll = tokio::time::interval(Duration::from_millis(100));
2416 loop {
2417 tokio::select! {
2418 _ = &mut sleep => return Ok(()),
2419 _ = recv_cancel(cancel_rx) => {
2420 return Err(DispatchError::Cancelled(
2421 "dispatcher shutdown cancelled retry wait".to_string(),
2422 ));
2423 }
2424 _ = poll.tick() => {
2425 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2426 return Err(DispatchError::Cancelled(
2427 "trigger cancel request cancelled retry wait".to_string(),
2428 ));
2429 }
2430 }
2431 }
2432 }
2433}
2434
2435fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2436 let mut iter = events.into_iter();
2437 let Some(mut root) = iter.next() else {
2438 return Err(DispatchError::Registry(
2439 "batch dispatch produced an empty event list".to_string(),
2440 ));
2441 };
2442 let mut batch = Vec::new();
2443 batch.push(
2444 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2445 );
2446 for event in iter {
2447 batch.push(
2448 serde_json::to_value(&event)
2449 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2450 );
2451 }
2452 root.batch = Some(batch);
2453 Ok(root)
2454}
2455
2456fn json_value_to_gate(value: &serde_json::Value) -> String {
2457 match value {
2458 serde_json::Value::Null => "null".to_string(),
2459 serde_json::Value::String(text) => text.clone(),
2460 serde_json::Value::Bool(value) => value.to_string(),
2461 serde_json::Value::Number(value) => value.to_string(),
2462 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2463 }
2464}
2465
2466fn decrement_in_flight(state: &DispatcherRuntimeState) {
2467 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2468 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2469 state.idle_notify.notify_waiters();
2470 }
2471}
2472
2473fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2474 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2475 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2476 state.idle_notify.notify_waiters();
2477 }
2478}
2479
2480#[cfg(test)]
2481fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2482 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2483 *slot.borrow_mut() = Some(tx);
2484 });
2485}
2486
2487#[cfg(not(test))]
2488fn notify_test_inbox_dequeued() {}
2489
2490#[cfg(test)]
2491fn notify_test_inbox_dequeued() {
2492 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2493 if let Some(tx) = slot.borrow_mut().take() {
2494 let _ = tx.send(());
2495 }
2496 });
2497}
2498
2499pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2500 event_log: &L,
2501 event: &TriggerEvent,
2502) -> Result<u64, DispatchError> {
2503 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2504 .expect("static trigger.inbox.envelopes topic is valid");
2505 let headers = event_headers(event, None, None, None);
2506 let payload =
2507 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2508 event_log
2509 .append(
2510 &topic,
2511 LogEvent::new("event_ingested", payload).with_headers(headers),
2512 )
2513 .await
2514 .map_err(DispatchError::from)
2515}
2516
2517pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2518 ACTIVE_DISPATCHER_STATE.with(|slot| {
2519 slot.borrow()
2520 .as_ref()
2521 .map(|state| DispatcherStatsSnapshot {
2522 in_flight: state.in_flight.load(Ordering::Relaxed),
2523 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2524 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2525 })
2526 .unwrap_or_default()
2527 })
2528}
2529
2530pub fn clear_dispatcher_state() {
2531 ACTIVE_DISPATCHER_STATE.with(|slot| {
2532 *slot.borrow_mut() = None;
2533 });
2534}
2535
2536fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2537 if is_cancelled_vm_error(&error) {
2538 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2539 }
2540 if let VmError::Thrown(VmValue::String(message)) = &error {
2541 return DispatchError::Local(message.to_string());
2542 }
2543 match error_to_category(&error) {
2544 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2545 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2546 ErrorCategory::Cancelled => {
2547 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2548 }
2549 _ => DispatchError::Local(error.to_string()),
2550 }
2551}
2552
2553fn dispatch_error_label(error: &DispatchError) -> &'static str {
2554 match error {
2555 DispatchError::Denied(_) => "denied",
2556 DispatchError::Timeout(_) => "timeout",
2557 DispatchError::Cancelled(_) => "cancelled",
2558 _ => "failed",
2559 }
2560}
2561
2562fn dispatch_node_id(
2563 route: &DispatchUri,
2564 binding_key: &str,
2565 event_id: &str,
2566 attempt: u32,
2567) -> String {
2568 let prefix = match route {
2569 DispatchUri::A2a { .. } => "a2a",
2570 _ => "dispatch",
2571 };
2572 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2573}
2574
2575fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
2576 match route {
2577 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
2578 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
2579 }
2580}
2581
2582fn dispatch_node_label(route: &DispatchUri) -> String {
2583 match route {
2584 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
2585 _ => route.target_uri(),
2586 }
2587}
2588
2589fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
2590 match route {
2591 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
2592 _ => None,
2593 }
2594}
2595
2596fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
2597 match route {
2598 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
2599 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
2600 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
2601 }
2602}
2603
2604fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
2605 match value {
2606 VmValue::Bool(result) => Ok(result),
2607 VmValue::EnumVariant {
2608 enum_name,
2609 variant,
2610 mut fields,
2611 } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
2612 Some(VmValue::Bool(result)) => Ok(result),
2613 Some(other) => Err(format!(
2614 "predicate Result.Ok payload must be bool, got {}",
2615 other.type_name()
2616 )),
2617 None => Err("predicate Result.Ok payload is missing".to_string()),
2618 },
2619 VmValue::EnumVariant {
2620 enum_name,
2621 variant,
2622 fields,
2623 } if enum_name == "Result" && variant == "Err" => Err(fields
2624 .first()
2625 .map(VmValue::display)
2626 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
2627 other => Err(format!(
2628 "predicate must return bool or Result<bool, _>, got {}",
2629 other.type_name()
2630 )),
2631 }
2632}
2633
2634fn usd_to_micros(value: f64) -> u64 {
2635 if !value.is_finite() || value <= 0.0 {
2636 return 0;
2637 }
2638 (value * 1_000_000.0).round() as u64
2639}
2640
2641fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
2642 binding
2643 .metrics
2644 .cost_today_usd_micros
2645 .load(Ordering::Relaxed) as f64
2646 / 1_000_000.0
2647}
2648
2649fn is_cancelled_vm_error(error: &VmError) -> bool {
2650 matches!(
2651 error,
2652 VmError::Thrown(VmValue::String(message))
2653 if message.starts_with("kind:cancelled:")
2654 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
2655}
2656
2657fn event_headers(
2658 event: &TriggerEvent,
2659 binding: Option<&TriggerBinding>,
2660 attempt: Option<u32>,
2661 replay_of_event_id: Option<&String>,
2662) -> BTreeMap<String, String> {
2663 let mut headers = BTreeMap::new();
2664 headers.insert("event_id".to_string(), event.id.0.clone());
2665 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2666 headers.insert("provider".to_string(), event.provider.as_str().to_string());
2667 headers.insert("kind".to_string(), event.kind.clone());
2668 if let Some(replay_of_event_id) = replay_of_event_id {
2669 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
2670 }
2671 if let Some(binding) = binding {
2672 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
2673 headers.insert("binding_key".to_string(), binding.binding_key());
2674 headers.insert(
2675 "handler_kind".to_string(),
2676 DispatchUri::from(&binding.handler).kind().to_string(),
2677 );
2678 }
2679 if let Some(attempt) = attempt {
2680 headers.insert("attempt".to_string(), attempt.to_string());
2681 }
2682 headers
2683}
2684
2685fn worker_queue_priority(
2686 binding: &super::registry::TriggerBinding,
2687 event: &TriggerEvent,
2688) -> crate::WorkerQueuePriority {
2689 match event
2690 .headers
2691 .get("priority")
2692 .map(|value| value.trim().to_ascii_lowercase())
2693 .as_deref()
2694 {
2695 Some("high") => crate::WorkerQueuePriority::High,
2696 Some("low") => crate::WorkerQueuePriority::Low,
2697 _ => binding.dispatch_priority,
2698 }
2699}
2700
2701const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
2702
2703fn maybe_fail_before_outbox() {
2704 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
2705 std::process::exit(86);
2706 }
2707}
2708
2709fn now_rfc3339() -> String {
2710 time::OffsetDateTime::now_utc()
2711 .format(&time::format_description::well_known::Rfc3339)
2712 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2713}
2714
2715fn now_unix_ms() -> i64 {
2716 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
2717}
2718
2719fn utc_day_key() -> i32 {
2720 time::OffsetDateTime::now_utc().date().to_julian_day()
2721}
2722
2723fn cancelled_dispatch_outcome(
2724 binding: &TriggerBinding,
2725 route: &DispatchUri,
2726 event: &TriggerEvent,
2727 replay_of_event_id: Option<String>,
2728 attempt_count: u32,
2729 error: String,
2730) -> DispatchOutcome {
2731 DispatchOutcome {
2732 trigger_id: binding.id.as_str().to_string(),
2733 binding_key: binding.binding_key(),
2734 event_id: event.id.0.clone(),
2735 attempt_count,
2736 status: DispatchStatus::Cancelled,
2737 handler_kind: route.kind().to_string(),
2738 target_uri: route.target_uri(),
2739 replay_of_event_id,
2740 result: None,
2741 error: Some(error),
2742 }
2743}
2744
2745async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
2746 let _ = cancel_rx.recv().await;
2747}
2748
2749#[cfg(test)]
2750mod tests;