1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::Notify;
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25 ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26 ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40 TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53 #[cfg(test)]
54 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
55}
56
57tokio::task_local! {
58 static ACTIVE_DISPATCH_IS_REPLAY: bool;
59}
60
61#[derive(Clone, Debug)]
62pub(crate) struct DispatchContext {
63 pub trigger_event: TriggerEvent,
64 pub replay_of_event_id: Option<String>,
65 pub agent_id: String,
66 pub action: String,
67 pub autonomy_tier: AutonomyTier,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71struct PredicateCacheRecord {
72 trigger_id: String,
73 event_id: String,
74 entries: Vec<PredicateCacheEntry>,
75}
76
77#[derive(Clone, Debug, Default)]
78struct PredicateEvaluationRecord {
79 result: bool,
80 cost_usd: f64,
81 tokens: u64,
82 latency_ms: u64,
83 cached: bool,
84 reason: Option<String>,
85}
86
87pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
88 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
89}
90
91pub(crate) fn current_dispatch_is_replay() -> bool {
92 ACTIVE_DISPATCH_IS_REPLAY
93 .try_with(|is_replay| *is_replay)
94 .unwrap_or(false)
95}
96
97#[derive(Clone)]
98pub struct Dispatcher {
99 base_vm: Rc<Vm>,
100 event_log: Arc<AnyEventLog>,
101 cancel_tx: broadcast::Sender<()>,
102 state: Arc<DispatcherRuntimeState>,
103 metrics: Option<Arc<crate::MetricsRegistry>>,
104}
105
106#[derive(Debug)]
107struct DispatcherRuntimeState {
108 in_flight: AtomicU64,
109 retry_queue_depth: AtomicU64,
110 dlq: Mutex<Vec<DlqEntry>>,
111 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
112 shutting_down: std::sync::atomic::AtomicBool,
113 idle_notify: Notify,
114 flow_control: FlowControlManager,
115}
116
117impl DispatcherRuntimeState {
118 fn new(event_log: Arc<AnyEventLog>) -> Self {
119 Self {
120 in_flight: AtomicU64::new(0),
121 retry_queue_depth: AtomicU64::new(0),
122 dlq: Mutex::new(Vec::new()),
123 cancel_tokens: Mutex::new(Vec::new()),
124 shutting_down: std::sync::atomic::AtomicBool::new(false),
125 idle_notify: Notify::new(),
126 flow_control: FlowControlManager::new(event_log),
127 }
128 }
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(default)]
133pub struct DispatcherStatsSnapshot {
134 pub in_flight: u64,
135 pub retry_queue_depth: u64,
136 pub dlq_depth: u64,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum DispatchStatus {
142 Succeeded,
143 Failed,
144 Dlq,
145 Skipped,
146 Cancelled,
147}
148
149impl DispatchStatus {
150 pub fn as_str(&self) -> &'static str {
151 match self {
152 Self::Succeeded => "succeeded",
153 Self::Failed => "failed",
154 Self::Dlq => "dlq",
155 Self::Skipped => "skipped",
156 Self::Cancelled => "cancelled",
157 }
158 }
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162#[serde(default)]
163pub struct DispatchOutcome {
164 pub trigger_id: String,
165 pub binding_key: String,
166 pub event_id: String,
167 pub attempt_count: u32,
168 pub status: DispatchStatus,
169 pub handler_kind: String,
170 pub target_uri: String,
171 pub replay_of_event_id: Option<String>,
172 pub result: Option<serde_json::Value>,
173 pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct InboxEnvelope {
178 pub trigger_id: Option<String>,
179 pub binding_version: Option<u32>,
180 pub event: TriggerEvent,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(default)]
185pub struct DispatcherDrainReport {
186 pub drained: bool,
187 pub in_flight: u64,
188 pub retry_queue_depth: u64,
189 pub dlq_depth: u64,
190}
191
192impl Default for DispatchOutcome {
193 fn default() -> Self {
194 Self {
195 trigger_id: String::new(),
196 binding_key: String::new(),
197 event_id: String::new(),
198 attempt_count: 0,
199 status: DispatchStatus::Failed,
200 handler_kind: String::new(),
201 target_uri: String::new(),
202 replay_of_event_id: None,
203 result: None,
204 error: None,
205 }
206 }
207}
208
209#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
210#[serde(default)]
211pub struct DispatchAttemptRecord {
212 pub trigger_id: String,
213 pub binding_key: String,
214 pub event_id: String,
215 pub attempt: u32,
216 pub handler_kind: String,
217 pub started_at: String,
218 pub completed_at: String,
219 pub outcome: String,
220 pub error_msg: Option<String>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
224pub struct DispatchCancelRequest {
225 pub binding_key: String,
226 pub event_id: String,
227 #[serde(with = "time::serde::rfc3339")]
228 pub requested_at: time::OffsetDateTime,
229 #[serde(default)]
230 pub requested_by: Option<String>,
231 #[serde(default)]
232 pub audit_id: Option<String>,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct DlqEntry {
237 pub trigger_id: String,
238 pub binding_key: String,
239 pub event: TriggerEvent,
240 pub attempt_count: u32,
241 pub final_error: String,
242 pub attempts: Vec<DispatchAttemptRecord>,
243}
244
245#[derive(Default)]
246struct AcquiredFlowControl {
247 singleton_gate: Option<String>,
248 concurrency: Option<ConcurrencyPermit>,
249}
250
251enum FlowControlOutcome {
252 Dispatch {
253 event: Box<TriggerEvent>,
254 acquired: AcquiredFlowControl,
255 },
256 Skip {
257 reason: String,
258 },
259}
260
261#[derive(Debug)]
262pub enum DispatchError {
263 EventLog(String),
264 Registry(String),
265 Serde(String),
266 Local(String),
267 A2a(String),
268 Denied(String),
269 Timeout(String),
270 Cancelled(String),
271 NotImplemented(String),
272}
273
274impl std::fmt::Display for DispatchError {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::EventLog(message)
278 | Self::Registry(message)
279 | Self::Serde(message)
280 | Self::Local(message)
281 | Self::A2a(message)
282 | Self::Denied(message)
283 | Self::Timeout(message)
284 | Self::Cancelled(message)
285 | Self::NotImplemented(message) => f.write_str(message),
286 }
287 }
288}
289
290impl std::error::Error for DispatchError {}
291
292impl DispatchError {
293 fn retryable(&self) -> bool {
294 !matches!(
295 self,
296 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_)
297 )
298 }
299}
300
301impl From<LogError> for DispatchError {
302 fn from(value: LogError) -> Self {
303 Self::EventLog(value.to_string())
304 }
305}
306
307pub async fn append_dispatch_cancel_request(
308 event_log: &Arc<AnyEventLog>,
309 request: &DispatchCancelRequest,
310) -> Result<u64, DispatchError> {
311 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
312 .expect("static trigger cancel topic should always be valid");
313 event_log
314 .append(
315 &topic,
316 LogEvent::new(
317 "dispatch_cancel_requested",
318 serde_json::to_value(request)
319 .map_err(|error| DispatchError::Serde(error.to_string()))?,
320 ),
321 )
322 .await
323 .map_err(DispatchError::from)
324}
325
326impl Dispatcher {
327 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
328 let event_log = active_event_log().ok_or_else(|| {
329 DispatchError::EventLog("dispatcher requires an active event log".to_string())
330 })?;
331 Ok(Self::with_event_log(base_vm, event_log))
332 }
333
334 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
335 Self::with_event_log_and_metrics(base_vm, event_log, None)
336 }
337
338 pub fn with_event_log_and_metrics(
339 base_vm: Vm,
340 event_log: Arc<AnyEventLog>,
341 metrics: Option<Arc<crate::MetricsRegistry>>,
342 ) -> Self {
343 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
344 ACTIVE_DISPATCHER_STATE.with(|slot| {
345 *slot.borrow_mut() = Some(state.clone());
346 });
347 let (cancel_tx, _) = broadcast::channel(32);
348 Self {
349 base_vm: Rc::new(base_vm),
350 event_log,
351 cancel_tx,
352 state,
353 metrics,
354 }
355 }
356
357 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
358 DispatcherStatsSnapshot {
359 in_flight: self.state.in_flight.load(Ordering::Relaxed),
360 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
361 dlq_depth: self
362 .state
363 .dlq
364 .lock()
365 .expect("dispatcher dlq poisoned")
366 .len() as u64,
367 }
368 }
369
370 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
371 self.state
372 .dlq
373 .lock()
374 .expect("dispatcher dlq poisoned")
375 .clone()
376 }
377
378 pub fn shutdown(&self) {
379 self.state.shutting_down.store(true, Ordering::SeqCst);
380 for token in self
381 .state
382 .cancel_tokens
383 .lock()
384 .expect("dispatcher cancel tokens poisoned")
385 .iter()
386 {
387 token.store(true, Ordering::SeqCst);
388 }
389 let _ = self.cancel_tx.send(());
390 }
391
392 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
393 self.enqueue_targeted(None, None, event).await
394 }
395
396 pub async fn enqueue_targeted(
397 &self,
398 trigger_id: Option<String>,
399 binding_version: Option<u32>,
400 event: TriggerEvent,
401 ) -> Result<u64, DispatchError> {
402 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
403 .expect("static trigger inbox envelopes topic is valid");
404 let headers = event_headers(&event, None, None, None);
405 let payload = serde_json::to_value(InboxEnvelope {
406 trigger_id,
407 binding_version,
408 event,
409 })
410 .map_err(|error| DispatchError::Serde(error.to_string()))?;
411 self.event_log
412 .append(
413 &topic,
414 LogEvent::new("event_ingested", payload).with_headers(headers),
415 )
416 .await
417 .map_err(DispatchError::from)
418 }
419
420 pub async fn run(&self) -> Result<(), DispatchError> {
421 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
422 .expect("static trigger inbox envelopes topic is valid");
423 let start_from = self.event_log.latest(&topic).await?;
424 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
425 pin_mut!(stream);
426 let mut cancel_rx = self.cancel_tx.subscribe();
427
428 loop {
429 tokio::select! {
430 received = stream.next() => {
431 let Some(received) = received else {
432 break;
433 };
434 let (_, event) = received.map_err(DispatchError::from)?;
435 if event.kind != "event_ingested" {
436 continue;
437 }
438 let parent_headers = event.headers.clone();
439 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
440 .map_err(|error| DispatchError::Serde(error.to_string()))?;
441 notify_test_inbox_dequeued();
442 let _ = self
443 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
444 .await;
445 }
446 _ = recv_cancel(&mut cancel_rx) => break,
447 }
448 }
449
450 Ok(())
451 }
452
453 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
454 let deadline = tokio::time::Instant::now() + timeout;
455 loop {
456 let snapshot = self.snapshot();
457 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
458 return Ok(DispatcherDrainReport {
459 drained: true,
460 in_flight: snapshot.in_flight,
461 retry_queue_depth: snapshot.retry_queue_depth,
462 dlq_depth: snapshot.dlq_depth,
463 });
464 }
465
466 let notified = self.state.idle_notify.notified();
467 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
468 if remaining.is_zero() {
469 return Ok(DispatcherDrainReport {
470 drained: false,
471 in_flight: snapshot.in_flight,
472 retry_queue_depth: snapshot.retry_queue_depth,
473 dlq_depth: snapshot.dlq_depth,
474 });
475 }
476 if tokio::time::timeout(remaining, notified).await.is_err() {
477 let snapshot = self.snapshot();
478 return Ok(DispatcherDrainReport {
479 drained: false,
480 in_flight: snapshot.in_flight,
481 retry_queue_depth: snapshot.retry_queue_depth,
482 dlq_depth: snapshot.dlq_depth,
483 });
484 }
485 }
486 }
487
488 pub async fn dispatch_inbox_envelope(
489 &self,
490 envelope: InboxEnvelope,
491 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
492 self.dispatch_inbox_envelope_with_headers(envelope, None)
493 .await
494 }
495
496 async fn dispatch_inbox_envelope_with_headers(
497 &self,
498 envelope: InboxEnvelope,
499 parent_headers: Option<&BTreeMap<String, String>>,
500 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
501 if let Some(trigger_id) = envelope.trigger_id {
502 let binding = super::registry::resolve_live_trigger_binding(
503 &trigger_id,
504 envelope.binding_version,
505 )
506 .map_err(|error| DispatchError::Registry(error.to_string()))?;
507 return Ok(vec![
508 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
509 .await?,
510 ]);
511 }
512
513 let cron_target = match &envelope.event.provider_payload {
514 crate::triggers::ProviderPayload::Known(
515 crate::triggers::event::KnownProviderPayload::Cron(payload),
516 ) => payload.cron_id.clone(),
517 _ => None,
518 };
519 if let Some(trigger_id) = cron_target {
520 let binding = super::registry::resolve_live_trigger_binding(
521 &trigger_id,
522 envelope.binding_version,
523 )
524 .map_err(|error| DispatchError::Registry(error.to_string()))?;
525 return Ok(vec![
526 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
527 .await?,
528 ]);
529 }
530
531 self.dispatch_event(envelope.event).await
532 }
533
534 pub async fn dispatch_event(
535 &self,
536 event: TriggerEvent,
537 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
538 let bindings = matching_bindings(&event);
539 let mut outcomes = Vec::new();
540 for binding in bindings {
541 outcomes.push(self.dispatch(&binding, event.clone()).await?);
542 }
543 Ok(outcomes)
544 }
545
546 pub async fn dispatch(
547 &self,
548 binding: &TriggerBinding,
549 event: TriggerEvent,
550 ) -> Result<DispatchOutcome, DispatchError> {
551 self.dispatch_with_replay(binding, event, None, None, None)
552 .await
553 }
554
555 pub async fn dispatch_replay(
556 &self,
557 binding: &TriggerBinding,
558 event: TriggerEvent,
559 replay_of_event_id: String,
560 ) -> Result<DispatchOutcome, DispatchError> {
561 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
562 .await
563 }
564
565 pub async fn dispatch_with_parent_span_id(
566 &self,
567 binding: &TriggerBinding,
568 event: TriggerEvent,
569 parent_span_id: Option<String>,
570 ) -> Result<DispatchOutcome, DispatchError> {
571 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
572 .await
573 }
574
575 async fn dispatch_with_replay(
576 &self,
577 binding: &TriggerBinding,
578 event: TriggerEvent,
579 replay_of_event_id: Option<String>,
580 parent_span_id: Option<String>,
581 parent_headers: Option<&BTreeMap<String, String>>,
582 ) -> Result<DispatchOutcome, DispatchError> {
583 let span = tracing::info_span!(
584 "dispatch",
585 trigger_id = %binding.id.as_str(),
586 binding_version = binding.version,
587 trace_id = %event.trace_id.0
588 );
589 #[cfg(feature = "otel")]
590 let span_for_otel = span.clone();
591 let _ = if let Some(headers) = parent_headers {
592 crate::observability::otel::set_span_parent_from_headers(
593 &span,
594 headers,
595 &event.trace_id,
596 parent_span_id.as_deref(),
597 )
598 } else {
599 crate::observability::otel::set_span_parent(
600 &span,
601 &event.trace_id,
602 parent_span_id.as_deref(),
603 )
604 };
605 #[cfg(feature = "otel")]
606 let started_at = Instant::now();
607 let metrics = self.metrics.clone();
608 let outcome = ACTIVE_DISPATCH_IS_REPLAY
609 .scope(
610 replay_of_event_id.is_some(),
611 self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
612 .instrument(span),
613 )
614 .await;
615 if let Some(metrics) = metrics.as_ref() {
616 match &outcome {
617 Ok(dispatch_outcome) => match dispatch_outcome.status {
618 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
619 metrics.record_dispatch_succeeded();
620 }
621 _ => metrics.record_dispatch_failed(),
622 },
623 Err(_) => metrics.record_dispatch_failed(),
624 }
625 }
626 #[cfg(feature = "otel")]
627 {
628 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
629
630 let duration_ms = started_at.elapsed().as_millis() as i64;
631 let status = match &outcome {
632 Ok(dispatch_outcome) => match dispatch_outcome.status {
633 DispatchStatus::Succeeded => "succeeded",
634 DispatchStatus::Skipped => "skipped",
635 DispatchStatus::Cancelled => "cancelled",
636 DispatchStatus::Failed => "failed",
637 DispatchStatus::Dlq => "dlq",
638 },
639 Err(DispatchError::Cancelled(_)) => "cancelled",
640 Err(_) => "failed",
641 };
642 span_for_otel.set_attribute("result.status", status);
643 span_for_otel.set_attribute("result.duration_ms", duration_ms);
644 }
645 outcome
646 }
647
648 async fn dispatch_with_replay_inner(
649 &self,
650 binding: &TriggerBinding,
651 event: TriggerEvent,
652 replay_of_event_id: Option<String>,
653 ) -> Result<DispatchOutcome, DispatchError> {
654 let autonomy_tier = crate::resolve_agent_autonomy_tier(
655 &self.event_log,
656 binding.id.as_str(),
657 binding.autonomy_tier,
658 )
659 .await
660 .unwrap_or(binding.autonomy_tier);
661 let binding_key = binding.binding_key();
662 let route = DispatchUri::from(&binding.handler);
663 let trigger_id = binding.id.as_str().to_string();
664 let event_id = event.id.0.clone();
665 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
666 let begin = if replay_of_event_id.is_some() {
667 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
668 } else {
669 begin_in_flight(binding.id.as_str(), binding.version)
670 };
671 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
672
673 let mut attempts = Vec::new();
674 let mut source_node_id = format!("trigger:{}", event.id.0);
675 let mut initial_nodes = Vec::new();
676 let mut initial_edges = Vec::new();
677 if let Some(original_event_id) = replay_of_event_id.as_ref() {
678 let original_node_id = format!("trigger:{original_event_id}");
679 initial_nodes.push(RunActionGraphNodeRecord {
680 id: original_node_id.clone(),
681 label: format!(
682 "{}:{} (original {})",
683 event.provider.as_str(),
684 event.kind,
685 original_event_id
686 ),
687 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
688 status: "historical".to_string(),
689 outcome: "replayed_from".to_string(),
690 trace_id: Some(event.trace_id.0.clone()),
691 stage_id: None,
692 node_id: None,
693 worker_id: None,
694 run_id: None,
695 run_path: None,
696 });
697 initial_edges.push(RunActionGraphEdgeRecord {
698 from_id: original_node_id,
699 to_id: source_node_id.clone(),
700 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
701 label: Some("replay chain".to_string()),
702 });
703 }
704 initial_nodes.push(RunActionGraphNodeRecord {
705 id: source_node_id.clone(),
706 label: format!("{}:{}", event.provider.as_str(), event.kind),
707 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
708 status: "received".to_string(),
709 outcome: "received".to_string(),
710 trace_id: Some(event.trace_id.0.clone()),
711 stage_id: None,
712 node_id: None,
713 worker_id: None,
714 run_id: None,
715 run_path: None,
716 });
717 self.emit_action_graph(
718 &event,
719 initial_nodes,
720 initial_edges,
721 serde_json::json!({
722 "source": "dispatcher",
723 "trigger_id": trigger_id,
724 "binding_key": binding_key,
725 "event_id": event_id,
726 "replay_of_event_id": replay_of_event_id,
727 }),
728 )
729 .await?;
730
731 if dispatch_cancel_requested(
732 &self.event_log,
733 &binding_key,
734 &event.id.0,
735 replay_of_event_id.as_ref(),
736 )
737 .await?
738 {
739 finish_in_flight(
740 binding.id.as_str(),
741 binding.version,
742 TriggerDispatchOutcome::Failed,
743 )
744 .await
745 .map_err(|error| DispatchError::Registry(error.to_string()))?;
746 decrement_in_flight(&self.state);
747 return Ok(cancelled_dispatch_outcome(
748 binding,
749 &route,
750 &event,
751 replay_of_event_id,
752 0,
753 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
754 ));
755 }
756
757 if let Some(predicate) = binding.when.as_ref() {
758 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
759 let evaluation = self
760 .evaluate_predicate(
761 binding,
762 predicate,
763 &event,
764 replay_of_event_id.as_ref(),
765 autonomy_tier,
766 )
767 .await?;
768 let passed = evaluation.result;
769 self.emit_action_graph(
770 &event,
771 vec![RunActionGraphNodeRecord {
772 id: predicate_node_id.clone(),
773 label: predicate.raw.clone(),
774 kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
775 status: "completed".to_string(),
776 outcome: passed.to_string(),
777 trace_id: Some(event.trace_id.0.clone()),
778 stage_id: None,
779 node_id: None,
780 worker_id: None,
781 run_id: None,
782 run_path: None,
783 }],
784 vec![RunActionGraphEdgeRecord {
785 from_id: source_node_id.clone(),
786 to_id: predicate_node_id.clone(),
787 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
788 label: None,
789 }],
790 serde_json::json!({
791 "source": "dispatcher",
792 "trigger_id": binding.id.as_str(),
793 "binding_key": binding.binding_key(),
794 "event_id": event.id.0,
795 "predicate": predicate.raw,
796 "reason": evaluation.reason,
797 "cached": evaluation.cached,
798 "cost_usd": evaluation.cost_usd,
799 "tokens": evaluation.tokens,
800 "latency_ms": evaluation.latency_ms,
801 "replay_of_event_id": replay_of_event_id,
802 }),
803 )
804 .await?;
805
806 if !passed {
807 finish_in_flight(
808 binding.id.as_str(),
809 binding.version,
810 TriggerDispatchOutcome::Dispatched,
811 )
812 .await
813 .map_err(|error| DispatchError::Registry(error.to_string()))?;
814 decrement_in_flight(&self.state);
815 self.append_dispatch_trust_record(
816 binding,
817 &route,
818 &event,
819 replay_of_event_id.as_ref(),
820 autonomy_tier,
821 TrustOutcome::Denied,
822 "skipped",
823 0,
824 None,
825 )
826 .await?;
827 return Ok(DispatchOutcome {
828 trigger_id: binding.id.as_str().to_string(),
829 binding_key: binding.binding_key(),
830 event_id: event.id.0,
831 attempt_count: 0,
832 status: DispatchStatus::Skipped,
833 handler_kind: route.kind().to_string(),
834 target_uri: route.target_uri(),
835 replay_of_event_id,
836 result: Some(serde_json::json!({
837 "skipped": true,
838 "predicate": predicate.raw,
839 "reason": evaluation.reason,
840 })),
841 error: None,
842 });
843 }
844
845 source_node_id = predicate_node_id;
846 }
847
848 let (event, mut acquired_flow) = match self
849 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
850 .await?
851 {
852 FlowControlOutcome::Dispatch { event, acquired } => (*event, acquired),
853 FlowControlOutcome::Skip { reason } => {
854 finish_in_flight(
855 binding.id.as_str(),
856 binding.version,
857 TriggerDispatchOutcome::Dispatched,
858 )
859 .await
860 .map_err(|error| DispatchError::Registry(error.to_string()))?;
861 decrement_in_flight(&self.state);
862 return Ok(DispatchOutcome {
863 trigger_id: binding.id.as_str().to_string(),
864 binding_key: binding.binding_key(),
865 event_id: event.id.0,
866 attempt_count: 0,
867 status: DispatchStatus::Skipped,
868 handler_kind: route.kind().to_string(),
869 target_uri: route.target_uri(),
870 replay_of_event_id,
871 result: Some(serde_json::json!({
872 "skipped": true,
873 "flow_control": reason,
874 })),
875 error: None,
876 });
877 }
878 };
879
880 let mut previous_retry_node = None;
881 let max_attempts = binding.retry.max_attempts();
882 for attempt in 1..=max_attempts {
883 if dispatch_cancel_requested(
884 &self.event_log,
885 &binding_key,
886 &event.id.0,
887 replay_of_event_id.as_ref(),
888 )
889 .await?
890 {
891 finish_in_flight(
892 binding.id.as_str(),
893 binding.version,
894 TriggerDispatchOutcome::Failed,
895 )
896 .await
897 .map_err(|error| DispatchError::Registry(error.to_string()))?;
898 decrement_in_flight(&self.state);
899 return Ok(cancelled_dispatch_outcome(
900 binding,
901 &route,
902 &event,
903 replay_of_event_id,
904 attempt.saturating_sub(1),
905 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
906 ));
907 }
908 maybe_fail_before_outbox();
909 let started_at = now_rfc3339();
910 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
911 self.append_lifecycle_event(
912 "DispatchStarted",
913 &event,
914 binding,
915 serde_json::json!({
916 "event_id": event.id.0,
917 "attempt": attempt,
918 "handler_kind": route.kind(),
919 "target_uri": route.target_uri(),
920 "replay_of_event_id": replay_of_event_id,
921 }),
922 replay_of_event_id.as_ref(),
923 )
924 .await?;
925 self.append_topic_event(
926 TRIGGER_OUTBOX_TOPIC,
927 "dispatch_started",
928 &event,
929 Some(binding),
930 Some(attempt),
931 serde_json::json!({
932 "event_id": event.id.0,
933 "attempt": attempt,
934 "trigger_id": binding.id.as_str(),
935 "binding_key": binding.binding_key(),
936 "handler_kind": route.kind(),
937 "target_uri": route.target_uri(),
938 "replay_of_event_id": replay_of_event_id,
939 }),
940 replay_of_event_id.as_ref(),
941 )
942 .await?;
943
944 let mut dispatch_edges = Vec::new();
945 if attempt == 1 {
946 dispatch_edges.push(RunActionGraphEdgeRecord {
947 from_id: source_node_id.clone(),
948 to_id: attempt_node_id.clone(),
949 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
950 label: binding.when.as_ref().map(|_| "true".to_string()),
951 });
952 } else if let Some(retry_node_id) = previous_retry_node.take() {
953 dispatch_edges.push(RunActionGraphEdgeRecord {
954 from_id: retry_node_id,
955 to_id: attempt_node_id.clone(),
956 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
957 label: Some(format!("attempt {attempt}")),
958 });
959 }
960
961 self.emit_action_graph(
962 &event,
963 vec![RunActionGraphNodeRecord {
964 id: attempt_node_id.clone(),
965 label: dispatch_node_label(&route),
966 kind: dispatch_node_kind(&route).to_string(),
967 status: "running".to_string(),
968 outcome: format!("attempt_{attempt}"),
969 trace_id: Some(event.trace_id.0.clone()),
970 stage_id: None,
971 node_id: None,
972 worker_id: None,
973 run_id: None,
974 run_path: None,
975 }],
976 dispatch_edges,
977 serde_json::json!({
978 "source": "dispatcher",
979 "trigger_id": binding.id.as_str(),
980 "binding_key": binding.binding_key(),
981 "event_id": event.id.0,
982 "attempt": attempt,
983 "handler_kind": route.kind(),
984 "target_uri": route.target_uri(),
985 "target_agent": dispatch_target_agent(&route),
986 "replay_of_event_id": replay_of_event_id,
987 }),
988 )
989 .await?;
990
991 let result = self
992 .dispatch_once(
993 binding,
994 &route,
995 &event,
996 autonomy_tier,
997 &mut self.cancel_tx.subscribe(),
998 )
999 .await;
1000 let completed_at = now_rfc3339();
1001
1002 match result {
1003 Ok(result) => {
1004 let attempt_record = DispatchAttemptRecord {
1005 trigger_id: binding.id.as_str().to_string(),
1006 binding_key: binding.binding_key(),
1007 event_id: event.id.0.clone(),
1008 attempt,
1009 handler_kind: route.kind().to_string(),
1010 started_at,
1011 completed_at,
1012 outcome: "success".to_string(),
1013 error_msg: None,
1014 };
1015 attempts.push(attempt_record.clone());
1016 self.append_attempt_record(
1017 &event,
1018 binding,
1019 &attempt_record,
1020 replay_of_event_id.as_ref(),
1021 )
1022 .await?;
1023 self.append_lifecycle_event(
1024 "DispatchSucceeded",
1025 &event,
1026 binding,
1027 serde_json::json!({
1028 "event_id": event.id.0,
1029 "attempt": attempt,
1030 "handler_kind": route.kind(),
1031 "target_uri": route.target_uri(),
1032 "result": result,
1033 "replay_of_event_id": replay_of_event_id,
1034 }),
1035 replay_of_event_id.as_ref(),
1036 )
1037 .await?;
1038 self.append_topic_event(
1039 TRIGGER_OUTBOX_TOPIC,
1040 "dispatch_succeeded",
1041 &event,
1042 Some(binding),
1043 Some(attempt),
1044 serde_json::json!({
1045 "event_id": event.id.0,
1046 "attempt": attempt,
1047 "trigger_id": binding.id.as_str(),
1048 "binding_key": binding.binding_key(),
1049 "handler_kind": route.kind(),
1050 "target_uri": route.target_uri(),
1051 "result": result,
1052 "replay_of_event_id": replay_of_event_id,
1053 }),
1054 replay_of_event_id.as_ref(),
1055 )
1056 .await?;
1057 finish_in_flight(
1058 binding.id.as_str(),
1059 binding.version,
1060 TriggerDispatchOutcome::Dispatched,
1061 )
1062 .await
1063 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1064 self.release_flow_control(&mut acquired_flow).await?;
1065 decrement_in_flight(&self.state);
1066 self.append_dispatch_trust_record(
1067 binding,
1068 &route,
1069 &event,
1070 replay_of_event_id.as_ref(),
1071 autonomy_tier,
1072 TrustOutcome::Success,
1073 "succeeded",
1074 attempt,
1075 None,
1076 )
1077 .await?;
1078 return Ok(DispatchOutcome {
1079 trigger_id: binding.id.as_str().to_string(),
1080 binding_key: binding.binding_key(),
1081 event_id: event.id.0,
1082 attempt_count: attempt,
1083 status: DispatchStatus::Succeeded,
1084 handler_kind: route.kind().to_string(),
1085 target_uri: route.target_uri(),
1086 replay_of_event_id,
1087 result: Some(result),
1088 error: None,
1089 });
1090 }
1091 Err(error) => {
1092 let attempt_record = DispatchAttemptRecord {
1093 trigger_id: binding.id.as_str().to_string(),
1094 binding_key: binding.binding_key(),
1095 event_id: event.id.0.clone(),
1096 attempt,
1097 handler_kind: route.kind().to_string(),
1098 started_at,
1099 completed_at,
1100 outcome: dispatch_error_label(&error).to_string(),
1101 error_msg: Some(error.to_string()),
1102 };
1103 attempts.push(attempt_record.clone());
1104 self.append_attempt_record(
1105 &event,
1106 binding,
1107 &attempt_record,
1108 replay_of_event_id.as_ref(),
1109 )
1110 .await?;
1111 self.append_lifecycle_event(
1112 "DispatchFailed",
1113 &event,
1114 binding,
1115 serde_json::json!({
1116 "event_id": event.id.0,
1117 "attempt": attempt,
1118 "handler_kind": route.kind(),
1119 "target_uri": route.target_uri(),
1120 "error": error.to_string(),
1121 "replay_of_event_id": replay_of_event_id,
1122 }),
1123 replay_of_event_id.as_ref(),
1124 )
1125 .await?;
1126 self.append_topic_event(
1127 TRIGGER_OUTBOX_TOPIC,
1128 "dispatch_failed",
1129 &event,
1130 Some(binding),
1131 Some(attempt),
1132 serde_json::json!({
1133 "event_id": event.id.0,
1134 "attempt": attempt,
1135 "trigger_id": binding.id.as_str(),
1136 "binding_key": binding.binding_key(),
1137 "handler_kind": route.kind(),
1138 "target_uri": route.target_uri(),
1139 "error": error.to_string(),
1140 "replay_of_event_id": replay_of_event_id,
1141 }),
1142 replay_of_event_id.as_ref(),
1143 )
1144 .await?;
1145
1146 if !error.retryable() {
1147 finish_in_flight(
1148 binding.id.as_str(),
1149 binding.version,
1150 TriggerDispatchOutcome::Failed,
1151 )
1152 .await
1153 .map_err(|registry_error| {
1154 DispatchError::Registry(registry_error.to_string())
1155 })?;
1156 self.release_flow_control(&mut acquired_flow).await?;
1157 decrement_in_flight(&self.state);
1158 let trust_outcome = match error {
1159 DispatchError::Denied(_) => TrustOutcome::Denied,
1160 DispatchError::Timeout(_) => TrustOutcome::Timeout,
1161 _ => TrustOutcome::Failure,
1162 };
1163 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1164 "cancelled"
1165 } else {
1166 "failed"
1167 };
1168 self.append_dispatch_trust_record(
1169 binding,
1170 &route,
1171 &event,
1172 replay_of_event_id.as_ref(),
1173 autonomy_tier,
1174 trust_outcome,
1175 terminal_status,
1176 attempt,
1177 Some(error.to_string()),
1178 )
1179 .await?;
1180 return Ok(DispatchOutcome {
1181 trigger_id: binding.id.as_str().to_string(),
1182 binding_key: binding.binding_key(),
1183 event_id: event.id.0,
1184 attempt_count: attempt,
1185 status: if matches!(error, DispatchError::Cancelled(_)) {
1186 DispatchStatus::Cancelled
1187 } else {
1188 DispatchStatus::Failed
1189 },
1190 handler_kind: route.kind().to_string(),
1191 target_uri: route.target_uri(),
1192 replay_of_event_id,
1193 result: None,
1194 error: Some(error.to_string()),
1195 });
1196 }
1197
1198 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1199 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1200 previous_retry_node = Some(retry_node_id.clone());
1201 self.emit_action_graph(
1202 &event,
1203 vec![RunActionGraphNodeRecord {
1204 id: retry_node_id.clone(),
1205 label: format!("retry in {}ms", delay.as_millis()),
1206 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1207 status: "scheduled".to_string(),
1208 outcome: format!("attempt_{}", attempt + 1),
1209 trace_id: Some(event.trace_id.0.clone()),
1210 stage_id: None,
1211 node_id: None,
1212 worker_id: None,
1213 run_id: None,
1214 run_path: None,
1215 }],
1216 vec![RunActionGraphEdgeRecord {
1217 from_id: attempt_node_id,
1218 to_id: retry_node_id.clone(),
1219 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1220 label: Some(format!("attempt {}", attempt + 1)),
1221 }],
1222 serde_json::json!({
1223 "source": "dispatcher",
1224 "trigger_id": binding.id.as_str(),
1225 "binding_key": binding.binding_key(),
1226 "event_id": event.id.0,
1227 "attempt": attempt + 1,
1228 "delay_ms": delay.as_millis(),
1229 "replay_of_event_id": replay_of_event_id,
1230 }),
1231 )
1232 .await?;
1233 self.append_lifecycle_event(
1234 "RetryScheduled",
1235 &event,
1236 binding,
1237 serde_json::json!({
1238 "event_id": event.id.0,
1239 "attempt": attempt + 1,
1240 "delay_ms": delay.as_millis(),
1241 "error": error.to_string(),
1242 "replay_of_event_id": replay_of_event_id,
1243 }),
1244 replay_of_event_id.as_ref(),
1245 )
1246 .await?;
1247 self.append_topic_event(
1248 TRIGGER_ATTEMPTS_TOPIC,
1249 "retry_scheduled",
1250 &event,
1251 Some(binding),
1252 Some(attempt + 1),
1253 serde_json::json!({
1254 "event_id": event.id.0,
1255 "attempt": attempt + 1,
1256 "trigger_id": binding.id.as_str(),
1257 "binding_key": binding.binding_key(),
1258 "delay_ms": delay.as_millis(),
1259 "error": error.to_string(),
1260 "replay_of_event_id": replay_of_event_id,
1261 }),
1262 replay_of_event_id.as_ref(),
1263 )
1264 .await?;
1265 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1266 let sleep_result = sleep_or_cancel_or_request(
1267 &self.event_log,
1268 delay,
1269 &binding_key,
1270 &event.id.0,
1271 replay_of_event_id.as_ref(),
1272 &mut self.cancel_tx.subscribe(),
1273 )
1274 .await;
1275 decrement_retry_queue_depth(&self.state);
1276 if sleep_result.is_err() {
1277 finish_in_flight(
1278 binding.id.as_str(),
1279 binding.version,
1280 TriggerDispatchOutcome::Failed,
1281 )
1282 .await
1283 .map_err(|registry_error| {
1284 DispatchError::Registry(registry_error.to_string())
1285 })?;
1286 self.release_flow_control(&mut acquired_flow).await?;
1287 decrement_in_flight(&self.state);
1288 self.append_dispatch_trust_record(
1289 binding,
1290 &route,
1291 &event,
1292 replay_of_event_id.as_ref(),
1293 autonomy_tier,
1294 TrustOutcome::Failure,
1295 "cancelled",
1296 attempt,
1297 Some("dispatcher shutdown cancelled retry wait".to_string()),
1298 )
1299 .await?;
1300 return Ok(DispatchOutcome {
1301 trigger_id: binding.id.as_str().to_string(),
1302 binding_key: binding.binding_key(),
1303 event_id: event.id.0,
1304 attempt_count: attempt,
1305 status: DispatchStatus::Cancelled,
1306 handler_kind: route.kind().to_string(),
1307 target_uri: route.target_uri(),
1308 replay_of_event_id,
1309 result: None,
1310 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1311 });
1312 }
1313 continue;
1314 }
1315
1316 let final_error = error.to_string();
1317 let dlq_entry = DlqEntry {
1318 trigger_id: binding.id.as_str().to_string(),
1319 binding_key: binding.binding_key(),
1320 event: event.clone(),
1321 attempt_count: attempt,
1322 final_error: final_error.clone(),
1323 attempts: attempts.clone(),
1324 };
1325 self.state
1326 .dlq
1327 .lock()
1328 .expect("dispatcher dlq poisoned")
1329 .push(dlq_entry.clone());
1330 self.emit_action_graph(
1331 &event,
1332 vec![RunActionGraphNodeRecord {
1333 id: format!("dlq:{binding_key}:{}", event.id.0),
1334 label: binding.id.as_str().to_string(),
1335 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1336 status: "queued".to_string(),
1337 outcome: "retry_exhausted".to_string(),
1338 trace_id: Some(event.trace_id.0.clone()),
1339 stage_id: None,
1340 node_id: None,
1341 worker_id: None,
1342 run_id: None,
1343 run_path: None,
1344 }],
1345 vec![RunActionGraphEdgeRecord {
1346 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1347 to_id: format!("dlq:{binding_key}:{}", event.id.0),
1348 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1349 label: Some(format!("{attempt} attempts")),
1350 }],
1351 serde_json::json!({
1352 "source": "dispatcher",
1353 "trigger_id": binding.id.as_str(),
1354 "binding_key": binding.binding_key(),
1355 "event_id": event.id.0,
1356 "attempt_count": attempt,
1357 "final_error": final_error,
1358 "replay_of_event_id": replay_of_event_id,
1359 }),
1360 )
1361 .await?;
1362 self.append_lifecycle_event(
1363 "DlqMoved",
1364 &event,
1365 binding,
1366 serde_json::json!({
1367 "event_id": event.id.0,
1368 "attempt_count": attempt,
1369 "final_error": dlq_entry.final_error,
1370 "replay_of_event_id": replay_of_event_id,
1371 }),
1372 replay_of_event_id.as_ref(),
1373 )
1374 .await?;
1375 self.append_topic_event(
1376 TRIGGER_DLQ_TOPIC,
1377 "dlq_moved",
1378 &event,
1379 Some(binding),
1380 Some(attempt),
1381 serde_json::to_value(&dlq_entry)
1382 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1383 replay_of_event_id.as_ref(),
1384 )
1385 .await?;
1386 finish_in_flight(
1387 binding.id.as_str(),
1388 binding.version,
1389 TriggerDispatchOutcome::Dlq,
1390 )
1391 .await
1392 .map_err(|registry_error| {
1393 DispatchError::Registry(registry_error.to_string())
1394 })?;
1395 self.release_flow_control(&mut acquired_flow).await?;
1396 decrement_in_flight(&self.state);
1397 self.append_dispatch_trust_record(
1398 binding,
1399 &route,
1400 &event,
1401 replay_of_event_id.as_ref(),
1402 autonomy_tier,
1403 TrustOutcome::Failure,
1404 "dlq",
1405 attempt,
1406 Some(error.to_string()),
1407 )
1408 .await?;
1409 return Ok(DispatchOutcome {
1410 trigger_id: binding.id.as_str().to_string(),
1411 binding_key: binding.binding_key(),
1412 event_id: event.id.0,
1413 attempt_count: attempt,
1414 status: DispatchStatus::Dlq,
1415 handler_kind: route.kind().to_string(),
1416 target_uri: route.target_uri(),
1417 replay_of_event_id,
1418 result: None,
1419 error: Some(error.to_string()),
1420 });
1421 }
1422 }
1423 }
1424
1425 finish_in_flight(
1426 binding.id.as_str(),
1427 binding.version,
1428 TriggerDispatchOutcome::Failed,
1429 )
1430 .await
1431 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1432 self.release_flow_control(&mut acquired_flow).await?;
1433 decrement_in_flight(&self.state);
1434 self.append_dispatch_trust_record(
1435 binding,
1436 &route,
1437 &event,
1438 replay_of_event_id.as_ref(),
1439 autonomy_tier,
1440 TrustOutcome::Failure,
1441 "failed",
1442 max_attempts,
1443 Some("dispatch exhausted without terminal outcome".to_string()),
1444 )
1445 .await?;
1446 Ok(DispatchOutcome {
1447 trigger_id: binding.id.as_str().to_string(),
1448 binding_key: binding.binding_key(),
1449 event_id: event.id.0,
1450 attempt_count: max_attempts,
1451 status: DispatchStatus::Failed,
1452 handler_kind: route.kind().to_string(),
1453 target_uri: route.target_uri(),
1454 replay_of_event_id,
1455 result: None,
1456 error: Some("dispatch exhausted without terminal outcome".to_string()),
1457 })
1458 }
1459
1460 async fn dispatch_once(
1461 &self,
1462 binding: &TriggerBinding,
1463 route: &DispatchUri,
1464 event: &TriggerEvent,
1465 autonomy_tier: AutonomyTier,
1466 cancel_rx: &mut broadcast::Receiver<()>,
1467 ) -> Result<serde_json::Value, DispatchError> {
1468 match route {
1469 DispatchUri::Local { .. } => {
1470 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1471 return Err(DispatchError::Local(format!(
1472 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1473 binding.id.as_str()
1474 )));
1475 };
1476 let value = self
1477 .invoke_vm_callable(
1478 closure,
1479 &binding.binding_key(),
1480 event,
1481 None,
1482 binding.id.as_str(),
1483 &format!("{}.{}", event.provider.as_str(), event.kind),
1484 autonomy_tier,
1485 cancel_rx,
1486 )
1487 .await?;
1488 Ok(vm_value_to_json(&value))
1489 }
1490 DispatchUri::A2a {
1491 target,
1492 allow_cleartext,
1493 } => {
1494 if self.state.shutting_down.load(Ordering::SeqCst) {
1495 return Err(DispatchError::Cancelled(
1496 "dispatcher shutdown cancelled A2A dispatch".to_string(),
1497 ));
1498 }
1499 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1500 target,
1501 *allow_cleartext,
1502 binding.id.as_str(),
1503 &binding.binding_key(),
1504 event,
1505 cancel_rx,
1506 )
1507 .await
1508 .map_err(|error| match error {
1509 crate::a2a::A2aClientError::Cancelled(message) => {
1510 DispatchError::Cancelled(message)
1511 }
1512 other => DispatchError::A2a(other.to_string()),
1513 })?;
1514 match ack {
1515 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1516 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1517 }
1518 }
1519 DispatchUri::Worker { queue } => {
1520 let receipt = crate::WorkerQueue::new(self.event_log.clone())
1521 .enqueue(&crate::WorkerQueueJob {
1522 queue: queue.clone(),
1523 trigger_id: binding.id.as_str().to_string(),
1524 binding_key: binding.binding_key(),
1525 binding_version: binding.version,
1526 event: event.clone(),
1527 replay_of_event_id: current_dispatch_context()
1528 .and_then(|context| context.replay_of_event_id),
1529 priority: worker_queue_priority(binding, event),
1530 })
1531 .await
1532 .map_err(DispatchError::from)?;
1533 Ok(serde_json::to_value(receipt)
1534 .map_err(|error| DispatchError::Serde(error.to_string()))?)
1535 }
1536 }
1537 }
1538
1539 async fn apply_flow_control(
1540 &self,
1541 binding: &TriggerBinding,
1542 event: &TriggerEvent,
1543 replay_of_event_id: Option<&String>,
1544 ) -> Result<FlowControlOutcome, DispatchError> {
1545 let flow = &binding.flow_control;
1546 let mut managed_event = event.clone();
1547
1548 if let Some(batch) = &flow.batch {
1549 let gate = self
1550 .resolve_flow_gate(
1551 &binding.binding_key(),
1552 batch.key.as_ref(),
1553 &managed_event,
1554 replay_of_event_id,
1555 )
1556 .await?;
1557 match self
1558 .state
1559 .flow_control
1560 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1561 .await
1562 .map_err(DispatchError::from)?
1563 {
1564 BatchDecision::Dispatch(events) => {
1565 managed_event = build_batched_event(events)?;
1566 }
1567 BatchDecision::Merged => {
1568 return Ok(FlowControlOutcome::Skip {
1569 reason: "batch_merged".to_string(),
1570 })
1571 }
1572 }
1573 }
1574
1575 if let Some(debounce) = &flow.debounce {
1576 let gate = self
1577 .resolve_flow_gate(
1578 &binding.binding_key(),
1579 Some(&debounce.key),
1580 &managed_event,
1581 replay_of_event_id,
1582 )
1583 .await?;
1584 let latest = self
1585 .state
1586 .flow_control
1587 .debounce(&gate, debounce.period)
1588 .await
1589 .map_err(DispatchError::from)?;
1590 if !latest {
1591 return Ok(FlowControlOutcome::Skip {
1592 reason: "debounced".to_string(),
1593 });
1594 }
1595 }
1596
1597 if let Some(rate_limit) = &flow.rate_limit {
1598 let gate = self
1599 .resolve_flow_gate(
1600 &binding.binding_key(),
1601 rate_limit.key.as_ref(),
1602 &managed_event,
1603 replay_of_event_id,
1604 )
1605 .await?;
1606 let allowed = self
1607 .state
1608 .flow_control
1609 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1610 .await
1611 .map_err(DispatchError::from)?;
1612 if !allowed {
1613 return Ok(FlowControlOutcome::Skip {
1614 reason: "rate_limited".to_string(),
1615 });
1616 }
1617 }
1618
1619 if let Some(throttle) = &flow.throttle {
1620 let gate = self
1621 .resolve_flow_gate(
1622 &binding.binding_key(),
1623 throttle.key.as_ref(),
1624 &managed_event,
1625 replay_of_event_id,
1626 )
1627 .await?;
1628 self.state
1629 .flow_control
1630 .wait_for_throttle(&gate, throttle.period, throttle.max)
1631 .await
1632 .map_err(DispatchError::from)?;
1633 }
1634
1635 let mut acquired = AcquiredFlowControl::default();
1636 if let Some(singleton) = &flow.singleton {
1637 let gate = self
1638 .resolve_flow_gate(
1639 &binding.binding_key(),
1640 singleton.key.as_ref(),
1641 &managed_event,
1642 replay_of_event_id,
1643 )
1644 .await?;
1645 let acquired_singleton = self
1646 .state
1647 .flow_control
1648 .try_acquire_singleton(&gate)
1649 .await
1650 .map_err(DispatchError::from)?;
1651 if !acquired_singleton {
1652 return Ok(FlowControlOutcome::Skip {
1653 reason: "singleton_active".to_string(),
1654 });
1655 }
1656 acquired.singleton_gate = Some(gate);
1657 }
1658
1659 if let Some(concurrency) = &flow.concurrency {
1660 let gate = self
1661 .resolve_flow_gate(
1662 &binding.binding_key(),
1663 concurrency.key.as_ref(),
1664 &managed_event,
1665 replay_of_event_id,
1666 )
1667 .await?;
1668 let priority_rank = self
1669 .resolve_priority_rank(
1670 &binding.binding_key(),
1671 flow.priority.as_ref(),
1672 &managed_event,
1673 replay_of_event_id,
1674 )
1675 .await?;
1676 acquired.concurrency = Some(
1677 self.state
1678 .flow_control
1679 .acquire_concurrency(&gate, concurrency.max, priority_rank)
1680 .await
1681 .map_err(DispatchError::from)?,
1682 );
1683 }
1684
1685 Ok(FlowControlOutcome::Dispatch {
1686 event: Box::new(managed_event),
1687 acquired,
1688 })
1689 }
1690
1691 async fn release_flow_control(
1692 &self,
1693 acquired: &mut AcquiredFlowControl,
1694 ) -> Result<(), DispatchError> {
1695 if let Some(gate) = acquired.singleton_gate.take() {
1696 self.state
1697 .flow_control
1698 .release_singleton(&gate)
1699 .await
1700 .map_err(DispatchError::from)?;
1701 }
1702 if let Some(permit) = acquired.concurrency.take() {
1703 self.state
1704 .flow_control
1705 .release_concurrency(permit)
1706 .await
1707 .map_err(DispatchError::from)?;
1708 }
1709 Ok(())
1710 }
1711
1712 async fn resolve_flow_gate(
1713 &self,
1714 binding_key: &str,
1715 expr: Option<&crate::triggers::TriggerExpressionSpec>,
1716 event: &TriggerEvent,
1717 replay_of_event_id: Option<&String>,
1718 ) -> Result<String, DispatchError> {
1719 let key = match expr {
1720 Some(expr) => {
1721 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
1722 .await?
1723 }
1724 None => "_global".to_string(),
1725 };
1726 Ok(format!("{binding_key}:{key}"))
1727 }
1728
1729 async fn resolve_priority_rank(
1730 &self,
1731 binding_key: &str,
1732 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
1733 event: &TriggerEvent,
1734 replay_of_event_id: Option<&String>,
1735 ) -> Result<usize, DispatchError> {
1736 let Some(priority) = priority else {
1737 return Ok(0);
1738 };
1739 let value = self
1740 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
1741 .await?;
1742 Ok(priority
1743 .order
1744 .iter()
1745 .position(|candidate| candidate == &value)
1746 .unwrap_or(priority.order.len()))
1747 }
1748
1749 async fn evaluate_flow_expression(
1750 &self,
1751 binding_key: &str,
1752 expr: &crate::triggers::TriggerExpressionSpec,
1753 event: &TriggerEvent,
1754 replay_of_event_id: Option<&String>,
1755 ) -> Result<String, DispatchError> {
1756 let value = self
1757 .invoke_vm_callable(
1758 &expr.closure,
1759 binding_key,
1760 event,
1761 replay_of_event_id,
1762 "",
1763 "flow_control",
1764 AutonomyTier::Suggest,
1765 &mut self.cancel_tx.subscribe(),
1766 )
1767 .await?;
1768 Ok(json_value_to_gate(&vm_value_to_json(&value)))
1769 }
1770
1771 #[allow(clippy::too_many_arguments)]
1772 async fn invoke_vm_callable(
1773 &self,
1774 closure: &crate::value::VmClosure,
1775 binding_key: &str,
1776 event: &TriggerEvent,
1777 replay_of_event_id: Option<&String>,
1778 agent_id: &str,
1779 action: &str,
1780 autonomy_tier: AutonomyTier,
1781 cancel_rx: &mut broadcast::Receiver<()>,
1782 ) -> Result<VmValue, DispatchError> {
1783 let mut vm = self.base_vm.child_vm();
1784 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
1785 if self.state.shutting_down.load(Ordering::SeqCst) {
1786 cancel_token.store(true, Ordering::SeqCst);
1787 }
1788 self.state
1789 .cancel_tokens
1790 .lock()
1791 .expect("dispatcher cancel tokens poisoned")
1792 .push(cancel_token.clone());
1793 vm.install_cancel_token(cancel_token.clone());
1794 let arg = event_to_handler_value(event)?;
1795 let args = [arg];
1796 let future = vm.call_closure_pub(closure, &args, &[]);
1797 pin_mut!(future);
1798 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1799 slot.borrow_mut().replace(DispatchContext {
1800 trigger_event: event.clone(),
1801 replay_of_event_id: replay_of_event_id.cloned(),
1802 agent_id: agent_id.to_string(),
1803 action: action.to_string(),
1804 autonomy_tier,
1805 })
1806 });
1807 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
1808 crate::stdlib::hitl::reset_hitl_state();
1809 let mut poll = tokio::time::interval(Duration::from_millis(100));
1810 let result = loop {
1811 tokio::select! {
1812 result = &mut future => break result,
1813 _ = recv_cancel(cancel_rx) => {
1814 cancel_token.store(true, Ordering::SeqCst);
1815 }
1816 _ = poll.tick() => {
1817 if dispatch_cancel_requested(
1818 &self.event_log,
1819 binding_key,
1820 &event.id.0,
1821 replay_of_event_id,
1822 )
1823 .await? {
1824 cancel_token.store(true, Ordering::SeqCst);
1825 }
1826 }
1827 }
1828 };
1829 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1830 *slot.borrow_mut() = prior_context;
1831 });
1832 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
1833 {
1834 let mut tokens = self
1835 .state
1836 .cancel_tokens
1837 .lock()
1838 .expect("dispatcher cancel tokens poisoned");
1839 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
1840 }
1841
1842 if cancel_token.load(Ordering::SeqCst) {
1843 if dispatch_cancel_requested(
1844 &self.event_log,
1845 binding_key,
1846 &event.id.0,
1847 replay_of_event_id,
1848 )
1849 .await?
1850 {
1851 Err(DispatchError::Cancelled(
1852 "trigger cancel request cancelled local handler".to_string(),
1853 ))
1854 } else {
1855 Err(DispatchError::Cancelled(
1856 "dispatcher shutdown cancelled local handler".to_string(),
1857 ))
1858 }
1859 } else {
1860 result.map_err(dispatch_error_from_vm_error)
1861 }
1862 }
1863
1864 async fn evaluate_predicate(
1865 &self,
1866 binding: &TriggerBinding,
1867 predicate: &super::registry::TriggerPredicateSpec,
1868 event: &TriggerEvent,
1869 replay_of_event_id: Option<&String>,
1870 autonomy_tier: AutonomyTier,
1871 ) -> Result<PredicateEvaluationRecord, DispatchError> {
1872 let event_id = event.id.0.clone();
1873 let trigger_id = binding.id.as_str().to_string();
1874 let now_ms = now_unix_ms();
1875 let today = utc_day_key();
1876
1877 let breaker_open_until = {
1878 let mut state = binding
1879 .predicate_state
1880 .lock()
1881 .expect("trigger predicate state poisoned");
1882 if state.budget_day_utc != Some(today) {
1883 state.budget_day_utc = Some(today);
1884 binding
1885 .metrics
1886 .cost_today_usd_micros
1887 .store(0, Ordering::Relaxed);
1888 }
1889 if state
1890 .breaker_open_until_ms
1891 .is_some_and(|until_ms| until_ms > now_ms)
1892 {
1893 state.breaker_open_until_ms
1894 } else {
1895 None
1896 }
1897 };
1898
1899 if breaker_open_until.is_some() {
1900 let mut metadata = BTreeMap::new();
1901 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
1902 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
1903 metadata.insert(
1904 "breaker_open_until_ms".to_string(),
1905 serde_json::json!(breaker_open_until),
1906 );
1907 crate::events::log_warn_meta(
1908 "trigger.predicate.circuit_breaker",
1909 "trigger predicate circuit breaker is open; short-circuiting to false",
1910 metadata,
1911 );
1912 let record = PredicateEvaluationRecord {
1913 result: false,
1914 reason: Some("circuit_open".to_string()),
1915 ..Default::default()
1916 };
1917 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1918 .await?;
1919 return Ok(record);
1920 }
1921
1922 if binding
1923 .daily_cost_usd
1924 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
1925 {
1926 self.append_lifecycle_event(
1927 "predicate.daily_budget_exceeded",
1928 event,
1929 binding,
1930 serde_json::json!({
1931 "trigger_id": binding.id.as_str(),
1932 "event_id": event.id.0,
1933 "limit_usd": binding.daily_cost_usd,
1934 "cost_today_usd": current_predicate_daily_cost(binding),
1935 "replay_of_event_id": replay_of_event_id,
1936 }),
1937 replay_of_event_id,
1938 )
1939 .await?;
1940 let record = PredicateEvaluationRecord {
1941 result: false,
1942 reason: Some("daily_budget_exceeded".to_string()),
1943 ..Default::default()
1944 };
1945 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1946 .await?;
1947 return Ok(record);
1948 }
1949
1950 let replay_cache = self
1951 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
1952 .await?;
1953 let guard = start_predicate_evaluation(
1954 binding.when_budget.clone().unwrap_or_default(),
1955 replay_cache,
1956 );
1957 let started = std::time::Instant::now();
1958 let eval = self
1959 .invoke_vm_callable_with_timeout(
1960 &predicate.closure,
1961 &binding.binding_key(),
1962 event,
1963 replay_of_event_id,
1964 binding.id.as_str(),
1965 &format!("{}.{}", event.provider.as_str(), event.kind),
1966 autonomy_tier,
1967 &mut self.cancel_tx.subscribe(),
1968 binding
1969 .when_budget
1970 .as_ref()
1971 .and_then(|budget| budget.timeout()),
1972 )
1973 .await;
1974 let capture = guard.finish();
1975 let latency_ms = started.elapsed().as_millis() as u64;
1976 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
1977 self.append_predicate_cache_record(binding, event, &capture.entries)
1978 .await?;
1979 }
1980
1981 let mut record = PredicateEvaluationRecord {
1982 result: false,
1983 cost_usd: capture.total_cost_usd,
1984 tokens: capture.total_tokens,
1985 latency_ms,
1986 cached: capture.cached,
1987 reason: None,
1988 };
1989
1990 let mut count_failure = false;
1991 let mut opened_breaker = false;
1992
1993 match eval {
1994 Ok(value) => match predicate_value_as_bool(value) {
1995 Ok(result) => {
1996 record.result = result;
1997 }
1998 Err(reason) => {
1999 count_failure = true;
2000 record.reason = Some(reason);
2001 }
2002 },
2003 Err(error) => {
2004 count_failure = true;
2005 record.reason = Some(error.to_string());
2006 }
2007 }
2008
2009 let cost_usd_micros = usd_to_micros(record.cost_usd);
2010 if cost_usd_micros > 0 {
2011 binding
2012 .metrics
2013 .cost_total_usd_micros
2014 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2015 binding
2016 .metrics
2017 .cost_today_usd_micros
2018 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2019 }
2020
2021 let timed_out = matches!(
2022 record.reason.as_deref(),
2023 Some("predicate evaluation timed out")
2024 );
2025 if capture.budget_exceeded || timed_out {
2026 record.result = false;
2027 record.reason = Some("budget_exceeded".to_string());
2028 self.append_lifecycle_event(
2029 "predicate.budget_exceeded",
2030 event,
2031 binding,
2032 serde_json::json!({
2033 "trigger_id": binding.id.as_str(),
2034 "event_id": event.id.0,
2035 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2036 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2037 "cost_usd": record.cost_usd,
2038 "tokens": record.tokens,
2039 "replay_of_event_id": replay_of_event_id,
2040 }),
2041 replay_of_event_id,
2042 )
2043 .await?;
2044 }
2045
2046 if binding
2047 .daily_cost_usd
2048 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2049 {
2050 record.result = false;
2051 record.reason = Some("daily_budget_exceeded".to_string());
2052 self.append_lifecycle_event(
2053 "predicate.daily_budget_exceeded",
2054 event,
2055 binding,
2056 serde_json::json!({
2057 "trigger_id": binding.id.as_str(),
2058 "event_id": event.id.0,
2059 "limit_usd": binding.daily_cost_usd,
2060 "cost_today_usd": current_predicate_daily_cost(binding),
2061 "replay_of_event_id": replay_of_event_id,
2062 }),
2063 replay_of_event_id,
2064 )
2065 .await?;
2066 }
2067
2068 {
2069 let mut state = binding
2070 .predicate_state
2071 .lock()
2072 .expect("trigger predicate state poisoned");
2073 if state.budget_day_utc != Some(today) {
2074 state.budget_day_utc = Some(today);
2075 binding
2076 .metrics
2077 .cost_today_usd_micros
2078 .store(cost_usd_micros, Ordering::Relaxed);
2079 }
2080 if count_failure {
2081 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2082 if state.consecutive_failures >= 3 {
2083 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2084 opened_breaker = true;
2085 }
2086 } else {
2087 state.consecutive_failures = 0;
2088 state.breaker_open_until_ms = None;
2089 }
2090 }
2091
2092 if opened_breaker {
2093 let mut metadata = BTreeMap::new();
2094 metadata.insert(
2095 "trigger_id".to_string(),
2096 serde_json::json!(binding.id.as_str()),
2097 );
2098 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2099 metadata.insert("failure_count".to_string(), serde_json::json!(3));
2100 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2101 crate::events::log_warn_meta(
2102 "trigger.predicate.circuit_breaker",
2103 "trigger predicate circuit breaker opened for 5 minutes",
2104 metadata,
2105 );
2106 }
2107
2108 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2109 .await?;
2110 Ok(record)
2111 }
2112
2113 #[allow(clippy::too_many_arguments)]
2114 #[allow(clippy::too_many_arguments)]
2115 async fn invoke_vm_callable_with_timeout(
2116 &self,
2117 closure: &crate::value::VmClosure,
2118 binding_key: &str,
2119 event: &TriggerEvent,
2120 replay_of_event_id: Option<&String>,
2121 agent_id: &str,
2122 action: &str,
2123 autonomy_tier: AutonomyTier,
2124 cancel_rx: &mut broadcast::Receiver<()>,
2125 timeout: Option<Duration>,
2126 ) -> Result<VmValue, DispatchError> {
2127 let future = self.invoke_vm_callable(
2128 closure,
2129 binding_key,
2130 event,
2131 replay_of_event_id,
2132 agent_id,
2133 action,
2134 autonomy_tier,
2135 cancel_rx,
2136 );
2137 pin_mut!(future);
2138 if let Some(timeout) = timeout {
2139 match tokio::time::timeout(timeout, future).await {
2140 Ok(result) => result,
2141 Err(_) => Err(DispatchError::Local(
2142 "predicate evaluation timed out".to_string(),
2143 )),
2144 }
2145 } else {
2146 future.await
2147 }
2148 }
2149
2150 async fn append_predicate_evaluated_event(
2151 &self,
2152 binding: &TriggerBinding,
2153 event: &TriggerEvent,
2154 record: &PredicateEvaluationRecord,
2155 replay_of_event_id: Option<&String>,
2156 ) -> Result<(), DispatchError> {
2157 self.append_lifecycle_event(
2158 "predicate.evaluated",
2159 event,
2160 binding,
2161 serde_json::json!({
2162 "trigger_id": binding.id.as_str(),
2163 "event_id": event.id.0,
2164 "result": record.result,
2165 "cost_usd": record.cost_usd,
2166 "tokens": record.tokens,
2167 "latency_ms": record.latency_ms,
2168 "cached": record.cached,
2169 "reason": record.reason,
2170 "replay_of_event_id": replay_of_event_id,
2171 }),
2172 replay_of_event_id,
2173 )
2174 .await
2175 }
2176
2177 async fn append_predicate_cache_record(
2178 &self,
2179 binding: &TriggerBinding,
2180 event: &TriggerEvent,
2181 entries: &[PredicateCacheEntry],
2182 ) -> Result<(), DispatchError> {
2183 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2184 .expect("static trigger inbox legacy topic name is valid");
2185 let payload = serde_json::to_value(PredicateCacheRecord {
2186 trigger_id: binding.id.as_str().to_string(),
2187 event_id: event.id.0.clone(),
2188 entries: entries.to_vec(),
2189 })
2190 .map_err(|error| DispatchError::Serde(error.to_string()))?;
2191 self.event_log
2192 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2193 .await
2194 .map_err(DispatchError::from)
2195 .map(|_| ())
2196 }
2197
2198 async fn read_predicate_cache_record(
2199 &self,
2200 event_id: &str,
2201 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2202 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2203 .expect("static trigger inbox legacy topic name is valid");
2204 let records = self
2205 .event_log
2206 .read_range(&topic, None, usize::MAX)
2207 .await
2208 .map_err(DispatchError::from)?;
2209 Ok(records
2210 .into_iter()
2211 .filter(|(_, event)| event.kind == "predicate_llm_cache")
2212 .filter_map(|(_, event)| {
2213 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2214 })
2215 .filter(|record| record.event_id == event_id)
2216 .flat_map(|record| record.entries)
2217 .collect())
2218 }
2219
2220 #[allow(clippy::too_many_arguments)]
2221 async fn append_dispatch_trust_record(
2222 &self,
2223 binding: &TriggerBinding,
2224 route: &DispatchUri,
2225 event: &TriggerEvent,
2226 replay_of_event_id: Option<&String>,
2227 autonomy_tier: AutonomyTier,
2228 outcome: TrustOutcome,
2229 terminal_status: &str,
2230 attempt_count: u32,
2231 error: Option<String>,
2232 ) -> Result<(), DispatchError> {
2233 let mut record = TrustRecord::new(
2234 binding.id.as_str().to_string(),
2235 format!("{}.{}", event.provider.as_str(), event.kind),
2236 None,
2237 outcome,
2238 event.trace_id.0.clone(),
2239 autonomy_tier,
2240 );
2241 record.metadata.insert(
2242 "binding_key".to_string(),
2243 serde_json::json!(binding.binding_key()),
2244 );
2245 record.metadata.insert(
2246 "binding_version".to_string(),
2247 serde_json::json!(binding.version),
2248 );
2249 record.metadata.insert(
2250 "provider".to_string(),
2251 serde_json::json!(event.provider.as_str()),
2252 );
2253 record
2254 .metadata
2255 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2256 record
2257 .metadata
2258 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2259 record.metadata.insert(
2260 "target_uri".to_string(),
2261 serde_json::json!(route.target_uri()),
2262 );
2263 record.metadata.insert(
2264 "terminal_status".to_string(),
2265 serde_json::json!(terminal_status),
2266 );
2267 record.metadata.insert(
2268 "attempt_count".to_string(),
2269 serde_json::json!(attempt_count),
2270 );
2271 if let Some(replay_of_event_id) = replay_of_event_id {
2272 record.metadata.insert(
2273 "replay_of_event_id".to_string(),
2274 serde_json::json!(replay_of_event_id),
2275 );
2276 }
2277 if let Some(error) = error {
2278 record
2279 .metadata
2280 .insert("error".to_string(), serde_json::json!(error));
2281 }
2282 append_trust_record(&self.event_log, &record)
2283 .await
2284 .map_err(DispatchError::from)
2285 }
2286
2287 async fn append_attempt_record(
2288 &self,
2289 event: &TriggerEvent,
2290 binding: &TriggerBinding,
2291 attempt: &DispatchAttemptRecord,
2292 replay_of_event_id: Option<&String>,
2293 ) -> Result<(), DispatchError> {
2294 self.append_topic_event(
2295 TRIGGER_ATTEMPTS_TOPIC,
2296 "attempt_recorded",
2297 event,
2298 Some(binding),
2299 Some(attempt.attempt),
2300 serde_json::to_value(attempt)
2301 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2302 replay_of_event_id,
2303 )
2304 .await
2305 }
2306
2307 async fn append_lifecycle_event(
2308 &self,
2309 kind: &str,
2310 event: &TriggerEvent,
2311 binding: &TriggerBinding,
2312 payload: serde_json::Value,
2313 replay_of_event_id: Option<&String>,
2314 ) -> Result<(), DispatchError> {
2315 self.append_topic_event(
2316 TRIGGERS_LIFECYCLE_TOPIC,
2317 kind,
2318 event,
2319 Some(binding),
2320 None,
2321 payload,
2322 replay_of_event_id,
2323 )
2324 .await
2325 }
2326
2327 async fn append_topic_event(
2328 &self,
2329 topic_name: &str,
2330 kind: &str,
2331 event: &TriggerEvent,
2332 binding: Option<&TriggerBinding>,
2333 attempt: Option<u32>,
2334 payload: serde_json::Value,
2335 replay_of_event_id: Option<&String>,
2336 ) -> Result<(), DispatchError> {
2337 let topic = Topic::new(topic_name)
2338 .expect("static trigger dispatcher topic names should always be valid");
2339 let headers = event_headers(event, binding, attempt, replay_of_event_id);
2340 self.event_log
2341 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2342 .await
2343 .map_err(DispatchError::from)
2344 .map(|_| ())
2345 }
2346
2347 async fn emit_action_graph(
2348 &self,
2349 event: &TriggerEvent,
2350 nodes: Vec<RunActionGraphNodeRecord>,
2351 edges: Vec<RunActionGraphEdgeRecord>,
2352 extra: serde_json::Value,
2353 ) -> Result<(), DispatchError> {
2354 let mut headers = BTreeMap::new();
2355 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2356 headers.insert("event_id".to_string(), event.id.0.clone());
2357 let observability = RunObservabilityRecord {
2358 schema_version: 1,
2359 action_graph_nodes: nodes,
2360 action_graph_edges: edges,
2361 ..Default::default()
2362 };
2363 append_action_graph_update(
2364 headers,
2365 serde_json::json!({
2366 "source": "dispatcher",
2367 "trace_id": event.trace_id.0,
2368 "event_id": event.id.0,
2369 "observability": observability,
2370 "context": extra,
2371 }),
2372 )
2373 .await
2374 .map_err(DispatchError::from)
2375 }
2376}
2377
2378async fn dispatch_cancel_requested(
2379 event_log: &Arc<AnyEventLog>,
2380 binding_key: &str,
2381 event_id: &str,
2382 replay_of_event_id: Option<&String>,
2383) -> Result<bool, DispatchError> {
2384 if replay_of_event_id.is_some() {
2385 return Ok(false);
2386 }
2387 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2388 .expect("static trigger cancel topic should always be valid");
2389 let events = event_log.read_range(&topic, None, usize::MAX).await?;
2390 let requested = events
2391 .into_iter()
2392 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2393 .filter_map(|(_, event)| {
2394 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2395 })
2396 .collect::<BTreeSet<_>>();
2397 Ok(requested
2398 .iter()
2399 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2400}
2401
2402async fn sleep_or_cancel_or_request(
2403 event_log: &Arc<AnyEventLog>,
2404 delay: Duration,
2405 binding_key: &str,
2406 event_id: &str,
2407 replay_of_event_id: Option<&String>,
2408 cancel_rx: &mut broadcast::Receiver<()>,
2409) -> Result<(), DispatchError> {
2410 let sleep = tokio::time::sleep(delay);
2411 pin_mut!(sleep);
2412 let mut poll = tokio::time::interval(Duration::from_millis(100));
2413 loop {
2414 tokio::select! {
2415 _ = &mut sleep => return Ok(()),
2416 _ = recv_cancel(cancel_rx) => {
2417 return Err(DispatchError::Cancelled(
2418 "dispatcher shutdown cancelled retry wait".to_string(),
2419 ));
2420 }
2421 _ = poll.tick() => {
2422 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2423 return Err(DispatchError::Cancelled(
2424 "trigger cancel request cancelled retry wait".to_string(),
2425 ));
2426 }
2427 }
2428 }
2429 }
2430}
2431
2432fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2433 let mut iter = events.into_iter();
2434 let Some(mut root) = iter.next() else {
2435 return Err(DispatchError::Registry(
2436 "batch dispatch produced an empty event list".to_string(),
2437 ));
2438 };
2439 let mut batch = Vec::new();
2440 batch.push(
2441 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2442 );
2443 for event in iter {
2444 batch.push(
2445 serde_json::to_value(&event)
2446 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2447 );
2448 }
2449 root.batch = Some(batch);
2450 Ok(root)
2451}
2452
2453fn json_value_to_gate(value: &serde_json::Value) -> String {
2454 match value {
2455 serde_json::Value::Null => "null".to_string(),
2456 serde_json::Value::String(text) => text.clone(),
2457 serde_json::Value::Bool(value) => value.to_string(),
2458 serde_json::Value::Number(value) => value.to_string(),
2459 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2460 }
2461}
2462
2463fn event_to_handler_value(event: &TriggerEvent) -> Result<VmValue, DispatchError> {
2464 let json =
2465 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2466 let value = json_to_vm_value(&json);
2467 match (&event.raw_body, value) {
2468 (Some(raw_body), VmValue::Dict(dict)) => {
2469 let mut map = (*dict).clone();
2470 map.insert(
2471 "raw_body".to_string(),
2472 VmValue::Bytes(Rc::new(raw_body.clone())),
2473 );
2474 Ok(VmValue::Dict(Rc::new(map)))
2475 }
2476 (_, other) => Ok(other),
2477 }
2478}
2479
2480fn decrement_in_flight(state: &DispatcherRuntimeState) {
2481 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2482 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2483 state.idle_notify.notify_waiters();
2484 }
2485}
2486
2487fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2488 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2489 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2490 state.idle_notify.notify_waiters();
2491 }
2492}
2493
2494#[cfg(test)]
2495fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2496 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2497 *slot.borrow_mut() = Some(tx);
2498 });
2499}
2500
2501#[cfg(not(test))]
2502fn notify_test_inbox_dequeued() {}
2503
2504#[cfg(test)]
2505fn notify_test_inbox_dequeued() {
2506 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2507 if let Some(tx) = slot.borrow_mut().take() {
2508 let _ = tx.send(());
2509 }
2510 });
2511}
2512
2513pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2514 event_log: &L,
2515 event: &TriggerEvent,
2516) -> Result<u64, DispatchError> {
2517 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2518 .expect("static trigger.inbox.envelopes topic is valid");
2519 let headers = event_headers(event, None, None, None);
2520 let payload =
2521 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2522 event_log
2523 .append(
2524 &topic,
2525 LogEvent::new("event_ingested", payload).with_headers(headers),
2526 )
2527 .await
2528 .map_err(DispatchError::from)
2529}
2530
2531pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2532 ACTIVE_DISPATCHER_STATE.with(|slot| {
2533 slot.borrow()
2534 .as_ref()
2535 .map(|state| DispatcherStatsSnapshot {
2536 in_flight: state.in_flight.load(Ordering::Relaxed),
2537 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2538 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2539 })
2540 .unwrap_or_default()
2541 })
2542}
2543
2544pub fn clear_dispatcher_state() {
2545 ACTIVE_DISPATCHER_STATE.with(|slot| {
2546 *slot.borrow_mut() = None;
2547 });
2548}
2549
2550fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2551 if is_cancelled_vm_error(&error) {
2552 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2553 }
2554 if let VmError::Thrown(VmValue::String(message)) = &error {
2555 return DispatchError::Local(message.to_string());
2556 }
2557 match error_to_category(&error) {
2558 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2559 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2560 ErrorCategory::Cancelled => {
2561 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2562 }
2563 _ => DispatchError::Local(error.to_string()),
2564 }
2565}
2566
2567fn dispatch_error_label(error: &DispatchError) -> &'static str {
2568 match error {
2569 DispatchError::Denied(_) => "denied",
2570 DispatchError::Timeout(_) => "timeout",
2571 DispatchError::Cancelled(_) => "cancelled",
2572 _ => "failed",
2573 }
2574}
2575
2576fn dispatch_node_id(
2577 route: &DispatchUri,
2578 binding_key: &str,
2579 event_id: &str,
2580 attempt: u32,
2581) -> String {
2582 let prefix = match route {
2583 DispatchUri::A2a { .. } => "a2a",
2584 _ => "dispatch",
2585 };
2586 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2587}
2588
2589fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
2590 match route {
2591 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
2592 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
2593 }
2594}
2595
2596fn dispatch_node_label(route: &DispatchUri) -> String {
2597 match route {
2598 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
2599 _ => route.target_uri(),
2600 }
2601}
2602
2603fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
2604 match route {
2605 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
2606 _ => None,
2607 }
2608}
2609
2610fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
2611 match route {
2612 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
2613 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
2614 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
2615 }
2616}
2617
2618fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
2619 match value {
2620 VmValue::Bool(result) => Ok(result),
2621 VmValue::EnumVariant {
2622 enum_name,
2623 variant,
2624 mut fields,
2625 } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
2626 Some(VmValue::Bool(result)) => Ok(result),
2627 Some(other) => Err(format!(
2628 "predicate Result.Ok payload must be bool, got {}",
2629 other.type_name()
2630 )),
2631 None => Err("predicate Result.Ok payload is missing".to_string()),
2632 },
2633 VmValue::EnumVariant {
2634 enum_name,
2635 variant,
2636 fields,
2637 } if enum_name == "Result" && variant == "Err" => Err(fields
2638 .first()
2639 .map(VmValue::display)
2640 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
2641 other => Err(format!(
2642 "predicate must return bool or Result<bool, _>, got {}",
2643 other.type_name()
2644 )),
2645 }
2646}
2647
2648fn usd_to_micros(value: f64) -> u64 {
2649 if !value.is_finite() || value <= 0.0 {
2650 return 0;
2651 }
2652 (value * 1_000_000.0).round() as u64
2653}
2654
2655fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
2656 binding
2657 .metrics
2658 .cost_today_usd_micros
2659 .load(Ordering::Relaxed) as f64
2660 / 1_000_000.0
2661}
2662
2663fn is_cancelled_vm_error(error: &VmError) -> bool {
2664 matches!(
2665 error,
2666 VmError::Thrown(VmValue::String(message))
2667 if message.starts_with("kind:cancelled:")
2668 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
2669}
2670
2671fn event_headers(
2672 event: &TriggerEvent,
2673 binding: Option<&TriggerBinding>,
2674 attempt: Option<u32>,
2675 replay_of_event_id: Option<&String>,
2676) -> BTreeMap<String, String> {
2677 let mut headers = BTreeMap::new();
2678 headers.insert("event_id".to_string(), event.id.0.clone());
2679 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2680 headers.insert("provider".to_string(), event.provider.as_str().to_string());
2681 headers.insert("kind".to_string(), event.kind.clone());
2682 if let Some(replay_of_event_id) = replay_of_event_id {
2683 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
2684 }
2685 if let Some(binding) = binding {
2686 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
2687 headers.insert("binding_key".to_string(), binding.binding_key());
2688 headers.insert(
2689 "handler_kind".to_string(),
2690 DispatchUri::from(&binding.handler).kind().to_string(),
2691 );
2692 }
2693 if let Some(attempt) = attempt {
2694 headers.insert("attempt".to_string(), attempt.to_string());
2695 }
2696 headers
2697}
2698
2699fn worker_queue_priority(
2700 binding: &super::registry::TriggerBinding,
2701 event: &TriggerEvent,
2702) -> crate::WorkerQueuePriority {
2703 match event
2704 .headers
2705 .get("priority")
2706 .map(|value| value.trim().to_ascii_lowercase())
2707 .as_deref()
2708 {
2709 Some("high") => crate::WorkerQueuePriority::High,
2710 Some("low") => crate::WorkerQueuePriority::Low,
2711 _ => binding.dispatch_priority,
2712 }
2713}
2714
2715const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
2716
2717fn maybe_fail_before_outbox() {
2718 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
2719 std::process::exit(86);
2720 }
2721}
2722
2723fn now_rfc3339() -> String {
2724 time::OffsetDateTime::now_utc()
2725 .format(&time::format_description::well_known::Rfc3339)
2726 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2727}
2728
2729fn now_unix_ms() -> i64 {
2730 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
2731}
2732
2733fn utc_day_key() -> i32 {
2734 time::OffsetDateTime::now_utc().date().to_julian_day()
2735}
2736
2737fn cancelled_dispatch_outcome(
2738 binding: &TriggerBinding,
2739 route: &DispatchUri,
2740 event: &TriggerEvent,
2741 replay_of_event_id: Option<String>,
2742 attempt_count: u32,
2743 error: String,
2744) -> DispatchOutcome {
2745 DispatchOutcome {
2746 trigger_id: binding.id.as_str().to_string(),
2747 binding_key: binding.binding_key(),
2748 event_id: event.id.0.clone(),
2749 attempt_count,
2750 status: DispatchStatus::Cancelled,
2751 handler_kind: route.kind().to_string(),
2752 target_uri: route.target_uri(),
2753 replay_of_event_id,
2754 result: None,
2755 error: Some(error),
2756 }
2757}
2758
2759async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
2760 let _ = cancel_rx.recv().await;
2761}
2762
2763#[cfg(test)]
2764mod tests;