1use std::cell::RefCell;
2use std::collections::{BTreeMap, BTreeSet};
3use std::rc::Rc;
4use std::sync::atomic::{AtomicU64, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7#[cfg(feature = "otel")]
8use std::time::Instant;
9
10use futures::{pin_mut, StreamExt};
11use serde::{Deserialize, Serialize};
12use tokio::sync::broadcast;
13use tokio::sync::Notify;
14use tracing::Instrument as _;
15
16use crate::event_log::{active_event_log, AnyEventLog, EventLog, LogError, LogEvent, Topic};
17use crate::llm::trigger_predicate::{start_predicate_evaluation, PredicateCacheEntry};
18use crate::llm::vm_value_to_json;
19use crate::orchestration::{
20 append_action_graph_update, RunActionGraphEdgeRecord, RunActionGraphNodeRecord,
21 RunObservabilityRecord, ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH, ACTION_GRAPH_EDGE_KIND_DLQ_MOVE,
22 ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE, ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN,
23 ACTION_GRAPH_EDGE_KIND_RETRY, ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
24 ACTION_GRAPH_NODE_KIND_A2A_HOP, ACTION_GRAPH_NODE_KIND_DISPATCH, ACTION_GRAPH_NODE_KIND_DLQ,
25 ACTION_GRAPH_NODE_KIND_RETRY, ACTION_GRAPH_NODE_KIND_TRIGGER,
26 ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE,
27};
28use crate::stdlib::json_to_vm_value;
29use crate::trust_graph::{append_trust_record, AutonomyTier, TrustOutcome, TrustRecord};
30use crate::value::{error_to_category, ErrorCategory, VmError, VmValue};
31use crate::vm::Vm;
32
33use self::uri::DispatchUri;
34use super::registry::matching_bindings;
35use super::registry::{TriggerBinding, TriggerHandlerSpec};
36use super::{
37 begin_in_flight, finish_in_flight, TriggerDispatchOutcome, TriggerEvent,
38 TRIGGERS_LIFECYCLE_TOPIC, TRIGGER_ATTEMPTS_TOPIC, TRIGGER_CANCEL_REQUESTS_TOPIC,
39 TRIGGER_DLQ_TOPIC, TRIGGER_INBOX_ENVELOPES_TOPIC, TRIGGER_INBOX_LEGACY_TOPIC,
40 TRIGGER_OUTBOX_TOPIC,
41};
42use flow_control::{BatchDecision, ConcurrencyPermit, FlowControlManager};
43
44mod flow_control;
45pub mod retry;
46pub mod uri;
47
48pub use retry::{RetryPolicy, TriggerRetryConfig, DEFAULT_MAX_ATTEMPTS};
49
50thread_local! {
51 static ACTIVE_DISPATCHER_STATE: RefCell<Option<Arc<DispatcherRuntimeState>>> = const { RefCell::new(None) };
52 static ACTIVE_DISPATCH_CONTEXT: RefCell<Option<DispatchContext>> = const { RefCell::new(None) };
53 #[cfg(test)]
54 static TEST_INBOX_DEQUEUED_SIGNAL: RefCell<Option<tokio::sync::oneshot::Sender<()>>> = const { RefCell::new(None) };
55}
56
57tokio::task_local! {
58 static ACTIVE_DISPATCH_IS_REPLAY: bool;
59}
60
61#[derive(Clone, Debug)]
62pub(crate) struct DispatchContext {
63 pub trigger_event: TriggerEvent,
64 pub replay_of_event_id: Option<String>,
65 pub agent_id: String,
66 pub action: String,
67 pub autonomy_tier: AutonomyTier,
68}
69
70#[derive(Clone, Debug, Serialize, Deserialize)]
71struct PredicateCacheRecord {
72 trigger_id: String,
73 event_id: String,
74 entries: Vec<PredicateCacheEntry>,
75}
76
77#[derive(Clone, Debug, Default)]
78struct PredicateEvaluationRecord {
79 result: bool,
80 cost_usd: f64,
81 tokens: u64,
82 latency_ms: u64,
83 cached: bool,
84 reason: Option<String>,
85}
86
87pub(crate) fn current_dispatch_context() -> Option<DispatchContext> {
88 ACTIVE_DISPATCH_CONTEXT.with(|slot| slot.borrow().clone())
89}
90
91pub(crate) fn current_dispatch_is_replay() -> bool {
92 ACTIVE_DISPATCH_IS_REPLAY
93 .try_with(|is_replay| *is_replay)
94 .unwrap_or(false)
95}
96
97#[derive(Clone)]
98pub struct Dispatcher {
99 base_vm: Rc<Vm>,
100 event_log: Arc<AnyEventLog>,
101 cancel_tx: broadcast::Sender<()>,
102 state: Arc<DispatcherRuntimeState>,
103 metrics: Option<Arc<crate::MetricsRegistry>>,
104}
105
106#[derive(Debug)]
107struct DispatcherRuntimeState {
108 in_flight: AtomicU64,
109 retry_queue_depth: AtomicU64,
110 dlq: Mutex<Vec<DlqEntry>>,
111 cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
112 shutting_down: std::sync::atomic::AtomicBool,
113 idle_notify: Notify,
114 flow_control: FlowControlManager,
115}
116
117impl DispatcherRuntimeState {
118 fn new(event_log: Arc<AnyEventLog>) -> Self {
119 Self {
120 in_flight: AtomicU64::new(0),
121 retry_queue_depth: AtomicU64::new(0),
122 dlq: Mutex::new(Vec::new()),
123 cancel_tokens: Mutex::new(Vec::new()),
124 shutting_down: std::sync::atomic::AtomicBool::new(false),
125 idle_notify: Notify::new(),
126 flow_control: FlowControlManager::new(event_log),
127 }
128 }
129}
130
131#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(default)]
133pub struct DispatcherStatsSnapshot {
134 pub in_flight: u64,
135 pub retry_queue_depth: u64,
136 pub dlq_depth: u64,
137}
138
139#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
140#[serde(rename_all = "snake_case")]
141pub enum DispatchStatus {
142 Succeeded,
143 Failed,
144 Dlq,
145 Skipped,
146 Cancelled,
147}
148
149impl DispatchStatus {
150 pub fn as_str(&self) -> &'static str {
151 match self {
152 Self::Succeeded => "succeeded",
153 Self::Failed => "failed",
154 Self::Dlq => "dlq",
155 Self::Skipped => "skipped",
156 Self::Cancelled => "cancelled",
157 }
158 }
159}
160
161#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
162#[serde(default)]
163pub struct DispatchOutcome {
164 pub trigger_id: String,
165 pub binding_key: String,
166 pub event_id: String,
167 pub attempt_count: u32,
168 pub status: DispatchStatus,
169 pub handler_kind: String,
170 pub target_uri: String,
171 pub replay_of_event_id: Option<String>,
172 pub result: Option<serde_json::Value>,
173 pub error: Option<String>,
174}
175
176#[derive(Clone, Debug, Serialize, Deserialize)]
177pub struct InboxEnvelope {
178 pub trigger_id: Option<String>,
179 pub binding_version: Option<u32>,
180 pub event: TriggerEvent,
181}
182
183#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
184#[serde(default)]
185pub struct DispatcherDrainReport {
186 pub drained: bool,
187 pub in_flight: u64,
188 pub retry_queue_depth: u64,
189 pub dlq_depth: u64,
190}
191
192impl Default for DispatchOutcome {
193 fn default() -> Self {
194 Self {
195 trigger_id: String::new(),
196 binding_key: String::new(),
197 event_id: String::new(),
198 attempt_count: 0,
199 status: DispatchStatus::Failed,
200 handler_kind: String::new(),
201 target_uri: String::new(),
202 replay_of_event_id: None,
203 result: None,
204 error: None,
205 }
206 }
207}
208
209#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
210#[serde(default)]
211pub struct DispatchAttemptRecord {
212 pub trigger_id: String,
213 pub binding_key: String,
214 pub event_id: String,
215 pub attempt: u32,
216 pub handler_kind: String,
217 pub started_at: String,
218 pub completed_at: String,
219 pub outcome: String,
220 pub error_msg: Option<String>,
221}
222
223#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
224pub struct DispatchCancelRequest {
225 pub binding_key: String,
226 pub event_id: String,
227 #[serde(with = "time::serde::rfc3339")]
228 pub requested_at: time::OffsetDateTime,
229 #[serde(default)]
230 pub requested_by: Option<String>,
231 #[serde(default)]
232 pub audit_id: Option<String>,
233}
234
235#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
236pub struct DlqEntry {
237 pub trigger_id: String,
238 pub binding_key: String,
239 pub event: TriggerEvent,
240 pub attempt_count: u32,
241 pub final_error: String,
242 pub attempts: Vec<DispatchAttemptRecord>,
243}
244
245#[derive(Default)]
246struct AcquiredFlowControl {
247 singleton_gate: Option<String>,
248 concurrency: Option<ConcurrencyPermit>,
249}
250
251enum FlowControlOutcome {
252 Dispatch {
253 event: Box<TriggerEvent>,
254 acquired: AcquiredFlowControl,
255 },
256 Skip {
257 reason: String,
258 },
259}
260
261#[derive(Debug)]
262pub enum DispatchError {
263 EventLog(String),
264 Registry(String),
265 Serde(String),
266 Local(String),
267 A2a(String),
268 Denied(String),
269 Timeout(String),
270 Cancelled(String),
271 NotImplemented(String),
272}
273
274impl std::fmt::Display for DispatchError {
275 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
276 match self {
277 Self::EventLog(message)
278 | Self::Registry(message)
279 | Self::Serde(message)
280 | Self::Local(message)
281 | Self::A2a(message)
282 | Self::Denied(message)
283 | Self::Timeout(message)
284 | Self::Cancelled(message)
285 | Self::NotImplemented(message) => f.write_str(message),
286 }
287 }
288}
289
290impl std::error::Error for DispatchError {}
291
292impl DispatchError {
293 fn retryable(&self) -> bool {
294 !matches!(
295 self,
296 Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_)
297 )
298 }
299}
300
301impl From<LogError> for DispatchError {
302 fn from(value: LogError) -> Self {
303 Self::EventLog(value.to_string())
304 }
305}
306
307pub async fn append_dispatch_cancel_request(
308 event_log: &Arc<AnyEventLog>,
309 request: &DispatchCancelRequest,
310) -> Result<u64, DispatchError> {
311 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
312 .expect("static trigger cancel topic should always be valid");
313 event_log
314 .append(
315 &topic,
316 LogEvent::new(
317 "dispatch_cancel_requested",
318 serde_json::to_value(request)
319 .map_err(|error| DispatchError::Serde(error.to_string()))?,
320 ),
321 )
322 .await
323 .map_err(DispatchError::from)
324}
325
326impl Dispatcher {
327 pub fn new(base_vm: Vm) -> Result<Self, DispatchError> {
328 let event_log = active_event_log().ok_or_else(|| {
329 DispatchError::EventLog("dispatcher requires an active event log".to_string())
330 })?;
331 Ok(Self::with_event_log(base_vm, event_log))
332 }
333
334 pub fn with_event_log(base_vm: Vm, event_log: Arc<AnyEventLog>) -> Self {
335 Self::with_event_log_and_metrics(base_vm, event_log, None)
336 }
337
338 pub fn with_event_log_and_metrics(
339 base_vm: Vm,
340 event_log: Arc<AnyEventLog>,
341 metrics: Option<Arc<crate::MetricsRegistry>>,
342 ) -> Self {
343 let state = Arc::new(DispatcherRuntimeState::new(event_log.clone()));
344 ACTIVE_DISPATCHER_STATE.with(|slot| {
345 *slot.borrow_mut() = Some(state.clone());
346 });
347 let (cancel_tx, _) = broadcast::channel(32);
348 Self {
349 base_vm: Rc::new(base_vm),
350 event_log,
351 cancel_tx,
352 state,
353 metrics,
354 }
355 }
356
357 pub fn snapshot(&self) -> DispatcherStatsSnapshot {
358 DispatcherStatsSnapshot {
359 in_flight: self.state.in_flight.load(Ordering::Relaxed),
360 retry_queue_depth: self.state.retry_queue_depth.load(Ordering::Relaxed),
361 dlq_depth: self
362 .state
363 .dlq
364 .lock()
365 .expect("dispatcher dlq poisoned")
366 .len() as u64,
367 }
368 }
369
370 pub fn dlq_entries(&self) -> Vec<DlqEntry> {
371 self.state
372 .dlq
373 .lock()
374 .expect("dispatcher dlq poisoned")
375 .clone()
376 }
377
378 pub fn shutdown(&self) {
379 self.state.shutting_down.store(true, Ordering::SeqCst);
380 for token in self
381 .state
382 .cancel_tokens
383 .lock()
384 .expect("dispatcher cancel tokens poisoned")
385 .iter()
386 {
387 token.store(true, Ordering::SeqCst);
388 }
389 let _ = self.cancel_tx.send(());
390 }
391
392 pub async fn enqueue(&self, event: TriggerEvent) -> Result<u64, DispatchError> {
393 self.enqueue_targeted(None, None, event).await
394 }
395
396 pub async fn enqueue_targeted(
397 &self,
398 trigger_id: Option<String>,
399 binding_version: Option<u32>,
400 event: TriggerEvent,
401 ) -> Result<u64, DispatchError> {
402 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
403 .expect("static trigger inbox envelopes topic is valid");
404 let headers = event_headers(&event, None, None, None);
405 let payload = serde_json::to_value(InboxEnvelope {
406 trigger_id,
407 binding_version,
408 event,
409 })
410 .map_err(|error| DispatchError::Serde(error.to_string()))?;
411 self.event_log
412 .append(
413 &topic,
414 LogEvent::new("event_ingested", payload).with_headers(headers),
415 )
416 .await
417 .map_err(DispatchError::from)
418 }
419
420 pub async fn run(&self) -> Result<(), DispatchError> {
421 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
422 .expect("static trigger inbox envelopes topic is valid");
423 let start_from = self.event_log.latest(&topic).await?;
424 let stream = self.event_log.clone().subscribe(&topic, start_from).await?;
425 pin_mut!(stream);
426 let mut cancel_rx = self.cancel_tx.subscribe();
427
428 loop {
429 tokio::select! {
430 received = stream.next() => {
431 let Some(received) = received else {
432 break;
433 };
434 let (_, event) = received.map_err(DispatchError::from)?;
435 if event.kind != "event_ingested" {
436 continue;
437 }
438 let parent_headers = event.headers.clone();
439 let envelope: InboxEnvelope = serde_json::from_value(event.payload)
440 .map_err(|error| DispatchError::Serde(error.to_string()))?;
441 notify_test_inbox_dequeued();
442 let _ = self
443 .dispatch_inbox_envelope_with_headers(envelope, Some(&parent_headers))
444 .await;
445 }
446 _ = recv_cancel(&mut cancel_rx) => break,
447 }
448 }
449
450 Ok(())
451 }
452
453 pub async fn drain(&self, timeout: Duration) -> Result<DispatcherDrainReport, DispatchError> {
454 let deadline = tokio::time::Instant::now() + timeout;
455 loop {
456 let snapshot = self.snapshot();
457 if snapshot.in_flight == 0 && snapshot.retry_queue_depth == 0 {
458 return Ok(DispatcherDrainReport {
459 drained: true,
460 in_flight: snapshot.in_flight,
461 retry_queue_depth: snapshot.retry_queue_depth,
462 dlq_depth: snapshot.dlq_depth,
463 });
464 }
465
466 let notified = self.state.idle_notify.notified();
467 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
468 if remaining.is_zero() {
469 return Ok(DispatcherDrainReport {
470 drained: false,
471 in_flight: snapshot.in_flight,
472 retry_queue_depth: snapshot.retry_queue_depth,
473 dlq_depth: snapshot.dlq_depth,
474 });
475 }
476 if tokio::time::timeout(remaining, notified).await.is_err() {
477 let snapshot = self.snapshot();
478 return Ok(DispatcherDrainReport {
479 drained: false,
480 in_flight: snapshot.in_flight,
481 retry_queue_depth: snapshot.retry_queue_depth,
482 dlq_depth: snapshot.dlq_depth,
483 });
484 }
485 }
486 }
487
488 pub async fn dispatch_inbox_envelope(
489 &self,
490 envelope: InboxEnvelope,
491 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
492 self.dispatch_inbox_envelope_with_headers(envelope, None)
493 .await
494 }
495
496 async fn dispatch_inbox_envelope_with_headers(
497 &self,
498 envelope: InboxEnvelope,
499 parent_headers: Option<&BTreeMap<String, String>>,
500 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
501 if let Some(trigger_id) = envelope.trigger_id {
502 let binding = super::registry::resolve_live_trigger_binding(
503 &trigger_id,
504 envelope.binding_version,
505 )
506 .map_err(|error| DispatchError::Registry(error.to_string()))?;
507 return Ok(vec![
508 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
509 .await?,
510 ]);
511 }
512
513 let cron_target = match &envelope.event.provider_payload {
514 crate::triggers::ProviderPayload::Known(
515 crate::triggers::event::KnownProviderPayload::Cron(payload),
516 ) => payload.cron_id.clone(),
517 _ => None,
518 };
519 if let Some(trigger_id) = cron_target {
520 let binding = super::registry::resolve_live_trigger_binding(
521 &trigger_id,
522 envelope.binding_version,
523 )
524 .map_err(|error| DispatchError::Registry(error.to_string()))?;
525 return Ok(vec![
526 self.dispatch_with_replay(&binding, envelope.event, None, None, parent_headers)
527 .await?,
528 ]);
529 }
530
531 self.dispatch_event(envelope.event).await
532 }
533
534 pub async fn dispatch_event(
535 &self,
536 event: TriggerEvent,
537 ) -> Result<Vec<DispatchOutcome>, DispatchError> {
538 let bindings = matching_bindings(&event);
539 let mut outcomes = Vec::new();
540 for binding in bindings {
541 outcomes.push(self.dispatch(&binding, event.clone()).await?);
542 }
543 Ok(outcomes)
544 }
545
546 pub async fn dispatch(
547 &self,
548 binding: &TriggerBinding,
549 event: TriggerEvent,
550 ) -> Result<DispatchOutcome, DispatchError> {
551 self.dispatch_with_replay(binding, event, None, None, None)
552 .await
553 }
554
555 pub async fn dispatch_replay(
556 &self,
557 binding: &TriggerBinding,
558 event: TriggerEvent,
559 replay_of_event_id: String,
560 ) -> Result<DispatchOutcome, DispatchError> {
561 self.dispatch_with_replay(binding, event, Some(replay_of_event_id), None, None)
562 .await
563 }
564
565 pub async fn dispatch_with_parent_span_id(
566 &self,
567 binding: &TriggerBinding,
568 event: TriggerEvent,
569 parent_span_id: Option<String>,
570 ) -> Result<DispatchOutcome, DispatchError> {
571 self.dispatch_with_replay(binding, event, None, parent_span_id, None)
572 .await
573 }
574
575 async fn dispatch_with_replay(
576 &self,
577 binding: &TriggerBinding,
578 event: TriggerEvent,
579 replay_of_event_id: Option<String>,
580 parent_span_id: Option<String>,
581 parent_headers: Option<&BTreeMap<String, String>>,
582 ) -> Result<DispatchOutcome, DispatchError> {
583 let span = tracing::info_span!(
584 "dispatch",
585 trigger_id = %binding.id.as_str(),
586 binding_version = binding.version,
587 trace_id = %event.trace_id.0
588 );
589 #[cfg(feature = "otel")]
590 let span_for_otel = span.clone();
591 let _ = if let Some(headers) = parent_headers {
592 crate::observability::otel::set_span_parent_from_headers(
593 &span,
594 headers,
595 &event.trace_id,
596 parent_span_id.as_deref(),
597 )
598 } else {
599 crate::observability::otel::set_span_parent(
600 &span,
601 &event.trace_id,
602 parent_span_id.as_deref(),
603 )
604 };
605 #[cfg(feature = "otel")]
606 let started_at = Instant::now();
607 let metrics = self.metrics.clone();
608 let outcome = ACTIVE_DISPATCH_IS_REPLAY
609 .scope(
610 replay_of_event_id.is_some(),
611 self.dispatch_with_replay_inner(binding, event, replay_of_event_id)
612 .instrument(span),
613 )
614 .await;
615 if let Some(metrics) = metrics.as_ref() {
616 match &outcome {
617 Ok(dispatch_outcome) => match dispatch_outcome.status {
618 DispatchStatus::Succeeded | DispatchStatus::Skipped => {
619 metrics.record_dispatch_succeeded();
620 }
621 _ => metrics.record_dispatch_failed(),
622 },
623 Err(_) => metrics.record_dispatch_failed(),
624 }
625 }
626 #[cfg(feature = "otel")]
627 {
628 use tracing_opentelemetry::OpenTelemetrySpanExt as _;
629
630 let duration_ms = started_at.elapsed().as_millis() as i64;
631 let status = match &outcome {
632 Ok(dispatch_outcome) => match dispatch_outcome.status {
633 DispatchStatus::Succeeded => "succeeded",
634 DispatchStatus::Skipped => "skipped",
635 DispatchStatus::Cancelled => "cancelled",
636 DispatchStatus::Failed => "failed",
637 DispatchStatus::Dlq => "dlq",
638 },
639 Err(DispatchError::Cancelled(_)) => "cancelled",
640 Err(_) => "failed",
641 };
642 span_for_otel.set_attribute("result.status", status);
643 span_for_otel.set_attribute("result.duration_ms", duration_ms);
644 }
645 outcome
646 }
647
648 async fn dispatch_with_replay_inner(
649 &self,
650 binding: &TriggerBinding,
651 event: TriggerEvent,
652 replay_of_event_id: Option<String>,
653 ) -> Result<DispatchOutcome, DispatchError> {
654 let autonomy_tier = crate::resolve_agent_autonomy_tier(
655 &self.event_log,
656 binding.id.as_str(),
657 binding.autonomy_tier,
658 )
659 .await
660 .unwrap_or(binding.autonomy_tier);
661 let binding_key = binding.binding_key();
662 let route = DispatchUri::from(&binding.handler);
663 let trigger_id = binding.id.as_str().to_string();
664 let event_id = event.id.0.clone();
665 self.state.in_flight.fetch_add(1, Ordering::Relaxed);
666 let begin = if replay_of_event_id.is_some() {
667 super::registry::begin_replay_in_flight(binding.id.as_str(), binding.version)
668 } else {
669 begin_in_flight(binding.id.as_str(), binding.version)
670 };
671 begin.map_err(|error| DispatchError::Registry(error.to_string()))?;
672
673 let mut attempts = Vec::new();
674 let mut source_node_id = format!("trigger:{}", event.id.0);
675 let mut initial_nodes = Vec::new();
676 let mut initial_edges = Vec::new();
677 if let Some(original_event_id) = replay_of_event_id.as_ref() {
678 let original_node_id = format!("trigger:{original_event_id}");
679 initial_nodes.push(RunActionGraphNodeRecord {
680 id: original_node_id.clone(),
681 label: format!(
682 "{}:{} (original {})",
683 event.provider.as_str(),
684 event.kind,
685 original_event_id
686 ),
687 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
688 status: "historical".to_string(),
689 outcome: "replayed_from".to_string(),
690 trace_id: Some(event.trace_id.0.clone()),
691 stage_id: None,
692 node_id: None,
693 worker_id: None,
694 run_id: None,
695 run_path: None,
696 });
697 initial_edges.push(RunActionGraphEdgeRecord {
698 from_id: original_node_id,
699 to_id: source_node_id.clone(),
700 kind: ACTION_GRAPH_EDGE_KIND_REPLAY_CHAIN.to_string(),
701 label: Some("replay chain".to_string()),
702 });
703 }
704 initial_nodes.push(RunActionGraphNodeRecord {
705 id: source_node_id.clone(),
706 label: format!("{}:{}", event.provider.as_str(), event.kind),
707 kind: ACTION_GRAPH_NODE_KIND_TRIGGER.to_string(),
708 status: "received".to_string(),
709 outcome: "received".to_string(),
710 trace_id: Some(event.trace_id.0.clone()),
711 stage_id: None,
712 node_id: None,
713 worker_id: None,
714 run_id: None,
715 run_path: None,
716 });
717 self.emit_action_graph(
718 &event,
719 initial_nodes,
720 initial_edges,
721 serde_json::json!({
722 "source": "dispatcher",
723 "trigger_id": trigger_id,
724 "binding_key": binding_key,
725 "event_id": event_id,
726 "replay_of_event_id": replay_of_event_id,
727 }),
728 )
729 .await?;
730
731 if dispatch_cancel_requested(
732 &self.event_log,
733 &binding_key,
734 &event.id.0,
735 replay_of_event_id.as_ref(),
736 )
737 .await?
738 {
739 finish_in_flight(
740 binding.id.as_str(),
741 binding.version,
742 TriggerDispatchOutcome::Failed,
743 )
744 .await
745 .map_err(|error| DispatchError::Registry(error.to_string()))?;
746 decrement_in_flight(&self.state);
747 return Ok(cancelled_dispatch_outcome(
748 binding,
749 &route,
750 &event,
751 replay_of_event_id,
752 0,
753 "trigger cancel request cancelled dispatch before attempt 1".to_string(),
754 ));
755 }
756
757 if let Some(predicate) = binding.when.as_ref() {
758 let predicate_node_id = format!("predicate:{binding_key}:{}", event.id.0);
759 let evaluation = self
760 .evaluate_predicate(
761 binding,
762 predicate,
763 &event,
764 replay_of_event_id.as_ref(),
765 autonomy_tier,
766 )
767 .await?;
768 let passed = evaluation.result;
769 self.emit_action_graph(
770 &event,
771 vec![RunActionGraphNodeRecord {
772 id: predicate_node_id.clone(),
773 label: predicate.raw.clone(),
774 kind: ACTION_GRAPH_NODE_KIND_TRIGGER_PREDICATE.to_string(),
775 status: "completed".to_string(),
776 outcome: passed.to_string(),
777 trace_id: Some(event.trace_id.0.clone()),
778 stage_id: None,
779 node_id: None,
780 worker_id: None,
781 run_id: None,
782 run_path: None,
783 }],
784 vec![RunActionGraphEdgeRecord {
785 from_id: source_node_id.clone(),
786 to_id: predicate_node_id.clone(),
787 kind: ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH.to_string(),
788 label: None,
789 }],
790 serde_json::json!({
791 "source": "dispatcher",
792 "trigger_id": binding.id.as_str(),
793 "binding_key": binding.binding_key(),
794 "event_id": event.id.0,
795 "predicate": predicate.raw,
796 "reason": evaluation.reason,
797 "cached": evaluation.cached,
798 "cost_usd": evaluation.cost_usd,
799 "tokens": evaluation.tokens,
800 "latency_ms": evaluation.latency_ms,
801 "replay_of_event_id": replay_of_event_id,
802 }),
803 )
804 .await?;
805
806 if !passed {
807 finish_in_flight(
808 binding.id.as_str(),
809 binding.version,
810 TriggerDispatchOutcome::Dispatched,
811 )
812 .await
813 .map_err(|error| DispatchError::Registry(error.to_string()))?;
814 decrement_in_flight(&self.state);
815 self.append_dispatch_trust_record(
816 binding,
817 &route,
818 &event,
819 replay_of_event_id.as_ref(),
820 autonomy_tier,
821 TrustOutcome::Denied,
822 "skipped",
823 0,
824 None,
825 )
826 .await?;
827 return Ok(DispatchOutcome {
828 trigger_id: binding.id.as_str().to_string(),
829 binding_key: binding.binding_key(),
830 event_id: event.id.0,
831 attempt_count: 0,
832 status: DispatchStatus::Skipped,
833 handler_kind: route.kind().to_string(),
834 target_uri: route.target_uri(),
835 replay_of_event_id,
836 result: Some(serde_json::json!({
837 "skipped": true,
838 "predicate": predicate.raw,
839 "reason": evaluation.reason,
840 })),
841 error: None,
842 });
843 }
844
845 source_node_id = predicate_node_id;
846 }
847
848 let (event, mut acquired_flow) = match self
849 .apply_flow_control(binding, &event, replay_of_event_id.as_ref())
850 .await?
851 {
852 FlowControlOutcome::Dispatch { event, acquired } => (*event, acquired),
853 FlowControlOutcome::Skip { reason } => {
854 finish_in_flight(
855 binding.id.as_str(),
856 binding.version,
857 TriggerDispatchOutcome::Dispatched,
858 )
859 .await
860 .map_err(|error| DispatchError::Registry(error.to_string()))?;
861 decrement_in_flight(&self.state);
862 return Ok(DispatchOutcome {
863 trigger_id: binding.id.as_str().to_string(),
864 binding_key: binding.binding_key(),
865 event_id: event.id.0,
866 attempt_count: 0,
867 status: DispatchStatus::Skipped,
868 handler_kind: route.kind().to_string(),
869 target_uri: route.target_uri(),
870 replay_of_event_id,
871 result: Some(serde_json::json!({
872 "skipped": true,
873 "flow_control": reason,
874 })),
875 error: None,
876 });
877 }
878 };
879
880 let mut previous_retry_node = None;
881 let max_attempts = binding.retry.max_attempts();
882 for attempt in 1..=max_attempts {
883 if dispatch_cancel_requested(
884 &self.event_log,
885 &binding_key,
886 &event.id.0,
887 replay_of_event_id.as_ref(),
888 )
889 .await?
890 {
891 finish_in_flight(
892 binding.id.as_str(),
893 binding.version,
894 TriggerDispatchOutcome::Failed,
895 )
896 .await
897 .map_err(|error| DispatchError::Registry(error.to_string()))?;
898 decrement_in_flight(&self.state);
899 return Ok(cancelled_dispatch_outcome(
900 binding,
901 &route,
902 &event,
903 replay_of_event_id,
904 attempt.saturating_sub(1),
905 format!("trigger cancel request cancelled dispatch before attempt {attempt}"),
906 ));
907 }
908 maybe_fail_before_outbox();
909 let started_at = now_rfc3339();
910 let attempt_node_id = dispatch_node_id(&route, &binding_key, &event.id.0, attempt);
911 self.append_lifecycle_event(
912 "DispatchStarted",
913 &event,
914 binding,
915 serde_json::json!({
916 "event_id": event.id.0,
917 "attempt": attempt,
918 "handler_kind": route.kind(),
919 "target_uri": route.target_uri(),
920 "replay_of_event_id": replay_of_event_id,
921 }),
922 replay_of_event_id.as_ref(),
923 )
924 .await?;
925 self.append_topic_event(
926 TRIGGER_OUTBOX_TOPIC,
927 "dispatch_started",
928 &event,
929 Some(binding),
930 Some(attempt),
931 serde_json::json!({
932 "event_id": event.id.0,
933 "attempt": attempt,
934 "trigger_id": binding.id.as_str(),
935 "binding_key": binding.binding_key(),
936 "handler_kind": route.kind(),
937 "target_uri": route.target_uri(),
938 "replay_of_event_id": replay_of_event_id,
939 }),
940 replay_of_event_id.as_ref(),
941 )
942 .await?;
943
944 let mut dispatch_edges = Vec::new();
945 if attempt == 1 {
946 dispatch_edges.push(RunActionGraphEdgeRecord {
947 from_id: source_node_id.clone(),
948 to_id: attempt_node_id.clone(),
949 kind: dispatch_entry_edge_kind(&route, binding.when.is_some()).to_string(),
950 label: binding.when.as_ref().map(|_| "true".to_string()),
951 });
952 } else if let Some(retry_node_id) = previous_retry_node.take() {
953 dispatch_edges.push(RunActionGraphEdgeRecord {
954 from_id: retry_node_id,
955 to_id: attempt_node_id.clone(),
956 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
957 label: Some(format!("attempt {attempt}")),
958 });
959 }
960
961 self.emit_action_graph(
962 &event,
963 vec![RunActionGraphNodeRecord {
964 id: attempt_node_id.clone(),
965 label: dispatch_node_label(&route),
966 kind: dispatch_node_kind(&route).to_string(),
967 status: "running".to_string(),
968 outcome: format!("attempt_{attempt}"),
969 trace_id: Some(event.trace_id.0.clone()),
970 stage_id: None,
971 node_id: None,
972 worker_id: None,
973 run_id: None,
974 run_path: None,
975 }],
976 dispatch_edges,
977 serde_json::json!({
978 "source": "dispatcher",
979 "trigger_id": binding.id.as_str(),
980 "binding_key": binding.binding_key(),
981 "event_id": event.id.0,
982 "attempt": attempt,
983 "handler_kind": route.kind(),
984 "target_uri": route.target_uri(),
985 "target_agent": dispatch_target_agent(&route),
986 "replay_of_event_id": replay_of_event_id,
987 }),
988 )
989 .await?;
990
991 let result = self
992 .dispatch_once(
993 binding,
994 &route,
995 &event,
996 autonomy_tier,
997 &mut self.cancel_tx.subscribe(),
998 )
999 .await;
1000 let completed_at = now_rfc3339();
1001
1002 match result {
1003 Ok(result) => {
1004 let attempt_record = DispatchAttemptRecord {
1005 trigger_id: binding.id.as_str().to_string(),
1006 binding_key: binding.binding_key(),
1007 event_id: event.id.0.clone(),
1008 attempt,
1009 handler_kind: route.kind().to_string(),
1010 started_at,
1011 completed_at,
1012 outcome: "success".to_string(),
1013 error_msg: None,
1014 };
1015 attempts.push(attempt_record.clone());
1016 self.append_attempt_record(
1017 &event,
1018 binding,
1019 &attempt_record,
1020 replay_of_event_id.as_ref(),
1021 )
1022 .await?;
1023 self.append_lifecycle_event(
1024 "DispatchSucceeded",
1025 &event,
1026 binding,
1027 serde_json::json!({
1028 "event_id": event.id.0,
1029 "attempt": attempt,
1030 "handler_kind": route.kind(),
1031 "target_uri": route.target_uri(),
1032 "result": result,
1033 "replay_of_event_id": replay_of_event_id,
1034 }),
1035 replay_of_event_id.as_ref(),
1036 )
1037 .await?;
1038 self.append_topic_event(
1039 TRIGGER_OUTBOX_TOPIC,
1040 "dispatch_succeeded",
1041 &event,
1042 Some(binding),
1043 Some(attempt),
1044 serde_json::json!({
1045 "event_id": event.id.0,
1046 "attempt": attempt,
1047 "trigger_id": binding.id.as_str(),
1048 "binding_key": binding.binding_key(),
1049 "handler_kind": route.kind(),
1050 "target_uri": route.target_uri(),
1051 "result": result,
1052 "replay_of_event_id": replay_of_event_id,
1053 }),
1054 replay_of_event_id.as_ref(),
1055 )
1056 .await?;
1057 finish_in_flight(
1058 binding.id.as_str(),
1059 binding.version,
1060 TriggerDispatchOutcome::Dispatched,
1061 )
1062 .await
1063 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1064 self.release_flow_control(&mut acquired_flow).await?;
1065 decrement_in_flight(&self.state);
1066 self.append_dispatch_trust_record(
1067 binding,
1068 &route,
1069 &event,
1070 replay_of_event_id.as_ref(),
1071 autonomy_tier,
1072 TrustOutcome::Success,
1073 "succeeded",
1074 attempt,
1075 None,
1076 )
1077 .await?;
1078 return Ok(DispatchOutcome {
1079 trigger_id: binding.id.as_str().to_string(),
1080 binding_key: binding.binding_key(),
1081 event_id: event.id.0,
1082 attempt_count: attempt,
1083 status: DispatchStatus::Succeeded,
1084 handler_kind: route.kind().to_string(),
1085 target_uri: route.target_uri(),
1086 replay_of_event_id,
1087 result: Some(result),
1088 error: None,
1089 });
1090 }
1091 Err(error) => {
1092 let attempt_record = DispatchAttemptRecord {
1093 trigger_id: binding.id.as_str().to_string(),
1094 binding_key: binding.binding_key(),
1095 event_id: event.id.0.clone(),
1096 attempt,
1097 handler_kind: route.kind().to_string(),
1098 started_at,
1099 completed_at,
1100 outcome: dispatch_error_label(&error).to_string(),
1101 error_msg: Some(error.to_string()),
1102 };
1103 attempts.push(attempt_record.clone());
1104 self.append_attempt_record(
1105 &event,
1106 binding,
1107 &attempt_record,
1108 replay_of_event_id.as_ref(),
1109 )
1110 .await?;
1111 self.append_lifecycle_event(
1112 "DispatchFailed",
1113 &event,
1114 binding,
1115 serde_json::json!({
1116 "event_id": event.id.0,
1117 "attempt": attempt,
1118 "handler_kind": route.kind(),
1119 "target_uri": route.target_uri(),
1120 "error": error.to_string(),
1121 "replay_of_event_id": replay_of_event_id,
1122 }),
1123 replay_of_event_id.as_ref(),
1124 )
1125 .await?;
1126 self.append_topic_event(
1127 TRIGGER_OUTBOX_TOPIC,
1128 "dispatch_failed",
1129 &event,
1130 Some(binding),
1131 Some(attempt),
1132 serde_json::json!({
1133 "event_id": event.id.0,
1134 "attempt": attempt,
1135 "trigger_id": binding.id.as_str(),
1136 "binding_key": binding.binding_key(),
1137 "handler_kind": route.kind(),
1138 "target_uri": route.target_uri(),
1139 "error": error.to_string(),
1140 "replay_of_event_id": replay_of_event_id,
1141 }),
1142 replay_of_event_id.as_ref(),
1143 )
1144 .await?;
1145
1146 if !error.retryable() {
1147 finish_in_flight(
1148 binding.id.as_str(),
1149 binding.version,
1150 TriggerDispatchOutcome::Failed,
1151 )
1152 .await
1153 .map_err(|registry_error| {
1154 DispatchError::Registry(registry_error.to_string())
1155 })?;
1156 self.release_flow_control(&mut acquired_flow).await?;
1157 decrement_in_flight(&self.state);
1158 let trust_outcome = match error {
1159 DispatchError::Denied(_) => TrustOutcome::Denied,
1160 DispatchError::Timeout(_) => TrustOutcome::Timeout,
1161 _ => TrustOutcome::Failure,
1162 };
1163 let terminal_status = if matches!(error, DispatchError::Cancelled(_)) {
1164 "cancelled"
1165 } else {
1166 "failed"
1167 };
1168 self.append_dispatch_trust_record(
1169 binding,
1170 &route,
1171 &event,
1172 replay_of_event_id.as_ref(),
1173 autonomy_tier,
1174 trust_outcome,
1175 terminal_status,
1176 attempt,
1177 Some(error.to_string()),
1178 )
1179 .await?;
1180 return Ok(DispatchOutcome {
1181 trigger_id: binding.id.as_str().to_string(),
1182 binding_key: binding.binding_key(),
1183 event_id: event.id.0,
1184 attempt_count: attempt,
1185 status: if matches!(error, DispatchError::Cancelled(_)) {
1186 DispatchStatus::Cancelled
1187 } else {
1188 DispatchStatus::Failed
1189 },
1190 handler_kind: route.kind().to_string(),
1191 target_uri: route.target_uri(),
1192 replay_of_event_id,
1193 result: None,
1194 error: Some(error.to_string()),
1195 });
1196 }
1197
1198 if let Some(delay) = binding.retry.next_retry_delay(attempt) {
1199 let retry_node_id = format!("retry:{binding_key}:{}:{attempt}", event.id.0);
1200 previous_retry_node = Some(retry_node_id.clone());
1201 self.emit_action_graph(
1202 &event,
1203 vec![RunActionGraphNodeRecord {
1204 id: retry_node_id.clone(),
1205 label: format!("retry in {}ms", delay.as_millis()),
1206 kind: ACTION_GRAPH_NODE_KIND_RETRY.to_string(),
1207 status: "scheduled".to_string(),
1208 outcome: format!("attempt_{}", attempt + 1),
1209 trace_id: Some(event.trace_id.0.clone()),
1210 stage_id: None,
1211 node_id: None,
1212 worker_id: None,
1213 run_id: None,
1214 run_path: None,
1215 }],
1216 vec![RunActionGraphEdgeRecord {
1217 from_id: attempt_node_id,
1218 to_id: retry_node_id.clone(),
1219 kind: ACTION_GRAPH_EDGE_KIND_RETRY.to_string(),
1220 label: Some(format!("attempt {}", attempt + 1)),
1221 }],
1222 serde_json::json!({
1223 "source": "dispatcher",
1224 "trigger_id": binding.id.as_str(),
1225 "binding_key": binding.binding_key(),
1226 "event_id": event.id.0,
1227 "attempt": attempt + 1,
1228 "delay_ms": delay.as_millis(),
1229 "replay_of_event_id": replay_of_event_id,
1230 }),
1231 )
1232 .await?;
1233 self.append_lifecycle_event(
1234 "RetryScheduled",
1235 &event,
1236 binding,
1237 serde_json::json!({
1238 "event_id": event.id.0,
1239 "attempt": attempt + 1,
1240 "delay_ms": delay.as_millis(),
1241 "error": error.to_string(),
1242 "replay_of_event_id": replay_of_event_id,
1243 }),
1244 replay_of_event_id.as_ref(),
1245 )
1246 .await?;
1247 self.append_topic_event(
1248 TRIGGER_ATTEMPTS_TOPIC,
1249 "retry_scheduled",
1250 &event,
1251 Some(binding),
1252 Some(attempt + 1),
1253 serde_json::json!({
1254 "event_id": event.id.0,
1255 "attempt": attempt + 1,
1256 "trigger_id": binding.id.as_str(),
1257 "binding_key": binding.binding_key(),
1258 "delay_ms": delay.as_millis(),
1259 "error": error.to_string(),
1260 "replay_of_event_id": replay_of_event_id,
1261 }),
1262 replay_of_event_id.as_ref(),
1263 )
1264 .await?;
1265 self.state.retry_queue_depth.fetch_add(1, Ordering::Relaxed);
1266 let sleep_result = sleep_or_cancel_or_request(
1267 &self.event_log,
1268 delay,
1269 &binding_key,
1270 &event.id.0,
1271 replay_of_event_id.as_ref(),
1272 &mut self.cancel_tx.subscribe(),
1273 )
1274 .await;
1275 decrement_retry_queue_depth(&self.state);
1276 if sleep_result.is_err() {
1277 finish_in_flight(
1278 binding.id.as_str(),
1279 binding.version,
1280 TriggerDispatchOutcome::Failed,
1281 )
1282 .await
1283 .map_err(|registry_error| {
1284 DispatchError::Registry(registry_error.to_string())
1285 })?;
1286 self.release_flow_control(&mut acquired_flow).await?;
1287 decrement_in_flight(&self.state);
1288 self.append_dispatch_trust_record(
1289 binding,
1290 &route,
1291 &event,
1292 replay_of_event_id.as_ref(),
1293 autonomy_tier,
1294 TrustOutcome::Failure,
1295 "cancelled",
1296 attempt,
1297 Some("dispatcher shutdown cancelled retry wait".to_string()),
1298 )
1299 .await?;
1300 return Ok(DispatchOutcome {
1301 trigger_id: binding.id.as_str().to_string(),
1302 binding_key: binding.binding_key(),
1303 event_id: event.id.0,
1304 attempt_count: attempt,
1305 status: DispatchStatus::Cancelled,
1306 handler_kind: route.kind().to_string(),
1307 target_uri: route.target_uri(),
1308 replay_of_event_id,
1309 result: None,
1310 error: Some("dispatcher shutdown cancelled retry wait".to_string()),
1311 });
1312 }
1313 continue;
1314 }
1315
1316 let final_error = error.to_string();
1317 let dlq_entry = DlqEntry {
1318 trigger_id: binding.id.as_str().to_string(),
1319 binding_key: binding.binding_key(),
1320 event: event.clone(),
1321 attempt_count: attempt,
1322 final_error: final_error.clone(),
1323 attempts: attempts.clone(),
1324 };
1325 self.state
1326 .dlq
1327 .lock()
1328 .expect("dispatcher dlq poisoned")
1329 .push(dlq_entry.clone());
1330 self.emit_action_graph(
1331 &event,
1332 vec![RunActionGraphNodeRecord {
1333 id: format!("dlq:{binding_key}:{}", event.id.0),
1334 label: binding.id.as_str().to_string(),
1335 kind: ACTION_GRAPH_NODE_KIND_DLQ.to_string(),
1336 status: "queued".to_string(),
1337 outcome: "retry_exhausted".to_string(),
1338 trace_id: Some(event.trace_id.0.clone()),
1339 stage_id: None,
1340 node_id: None,
1341 worker_id: None,
1342 run_id: None,
1343 run_path: None,
1344 }],
1345 vec![RunActionGraphEdgeRecord {
1346 from_id: dispatch_node_id(&route, &binding_key, &event.id.0, attempt),
1347 to_id: format!("dlq:{binding_key}:{}", event.id.0),
1348 kind: ACTION_GRAPH_EDGE_KIND_DLQ_MOVE.to_string(),
1349 label: Some(format!("{attempt} attempts")),
1350 }],
1351 serde_json::json!({
1352 "source": "dispatcher",
1353 "trigger_id": binding.id.as_str(),
1354 "binding_key": binding.binding_key(),
1355 "event_id": event.id.0,
1356 "attempt_count": attempt,
1357 "final_error": final_error,
1358 "replay_of_event_id": replay_of_event_id,
1359 }),
1360 )
1361 .await?;
1362 self.append_lifecycle_event(
1363 "DlqMoved",
1364 &event,
1365 binding,
1366 serde_json::json!({
1367 "event_id": event.id.0,
1368 "attempt_count": attempt,
1369 "final_error": dlq_entry.final_error,
1370 "replay_of_event_id": replay_of_event_id,
1371 }),
1372 replay_of_event_id.as_ref(),
1373 )
1374 .await?;
1375 self.append_topic_event(
1376 TRIGGER_DLQ_TOPIC,
1377 "dlq_moved",
1378 &event,
1379 Some(binding),
1380 Some(attempt),
1381 serde_json::to_value(&dlq_entry)
1382 .map_err(|serde_error| DispatchError::Serde(serde_error.to_string()))?,
1383 replay_of_event_id.as_ref(),
1384 )
1385 .await?;
1386 finish_in_flight(
1387 binding.id.as_str(),
1388 binding.version,
1389 TriggerDispatchOutcome::Dlq,
1390 )
1391 .await
1392 .map_err(|registry_error| {
1393 DispatchError::Registry(registry_error.to_string())
1394 })?;
1395 self.release_flow_control(&mut acquired_flow).await?;
1396 decrement_in_flight(&self.state);
1397 self.append_dispatch_trust_record(
1398 binding,
1399 &route,
1400 &event,
1401 replay_of_event_id.as_ref(),
1402 autonomy_tier,
1403 TrustOutcome::Failure,
1404 "dlq",
1405 attempt,
1406 Some(error.to_string()),
1407 )
1408 .await?;
1409 return Ok(DispatchOutcome {
1410 trigger_id: binding.id.as_str().to_string(),
1411 binding_key: binding.binding_key(),
1412 event_id: event.id.0,
1413 attempt_count: attempt,
1414 status: DispatchStatus::Dlq,
1415 handler_kind: route.kind().to_string(),
1416 target_uri: route.target_uri(),
1417 replay_of_event_id,
1418 result: None,
1419 error: Some(error.to_string()),
1420 });
1421 }
1422 }
1423 }
1424
1425 finish_in_flight(
1426 binding.id.as_str(),
1427 binding.version,
1428 TriggerDispatchOutcome::Failed,
1429 )
1430 .await
1431 .map_err(|error| DispatchError::Registry(error.to_string()))?;
1432 self.release_flow_control(&mut acquired_flow).await?;
1433 decrement_in_flight(&self.state);
1434 self.append_dispatch_trust_record(
1435 binding,
1436 &route,
1437 &event,
1438 replay_of_event_id.as_ref(),
1439 autonomy_tier,
1440 TrustOutcome::Failure,
1441 "failed",
1442 max_attempts,
1443 Some("dispatch exhausted without terminal outcome".to_string()),
1444 )
1445 .await?;
1446 Ok(DispatchOutcome {
1447 trigger_id: binding.id.as_str().to_string(),
1448 binding_key: binding.binding_key(),
1449 event_id: event.id.0,
1450 attempt_count: max_attempts,
1451 status: DispatchStatus::Failed,
1452 handler_kind: route.kind().to_string(),
1453 target_uri: route.target_uri(),
1454 replay_of_event_id,
1455 result: None,
1456 error: Some("dispatch exhausted without terminal outcome".to_string()),
1457 })
1458 }
1459
1460 async fn dispatch_once(
1461 &self,
1462 binding: &TriggerBinding,
1463 route: &DispatchUri,
1464 event: &TriggerEvent,
1465 autonomy_tier: AutonomyTier,
1466 cancel_rx: &mut broadcast::Receiver<()>,
1467 ) -> Result<serde_json::Value, DispatchError> {
1468 match route {
1469 DispatchUri::Local { .. } => {
1470 let TriggerHandlerSpec::Local { closure, .. } = &binding.handler else {
1471 return Err(DispatchError::Local(format!(
1472 "trigger '{}' resolved to a local dispatch URI but does not carry a local closure",
1473 binding.id.as_str()
1474 )));
1475 };
1476 let value = self
1477 .invoke_vm_callable(
1478 closure,
1479 &binding.binding_key(),
1480 event,
1481 None,
1482 binding.id.as_str(),
1483 &format!("{}.{}", event.provider.as_str(), event.kind),
1484 autonomy_tier,
1485 cancel_rx,
1486 )
1487 .await?;
1488 Ok(vm_value_to_json(&value))
1489 }
1490 DispatchUri::A2a {
1491 target,
1492 allow_cleartext,
1493 } => {
1494 if self.state.shutting_down.load(Ordering::SeqCst) {
1495 return Err(DispatchError::Cancelled(
1496 "dispatcher shutdown cancelled A2A dispatch".to_string(),
1497 ));
1498 }
1499 let (_endpoint, ack) = crate::a2a::dispatch_trigger_event(
1500 target,
1501 *allow_cleartext,
1502 binding.id.as_str(),
1503 &binding.binding_key(),
1504 event,
1505 cancel_rx,
1506 )
1507 .await
1508 .map_err(|error| match error {
1509 crate::a2a::A2aClientError::Cancelled(message) => {
1510 DispatchError::Cancelled(message)
1511 }
1512 other => DispatchError::A2a(other.to_string()),
1513 })?;
1514 match ack {
1515 crate::a2a::DispatchAck::InlineResult { result, .. } => Ok(result),
1516 crate::a2a::DispatchAck::PendingTask { handle, .. } => Ok(handle),
1517 }
1518 }
1519 DispatchUri::Worker { queue } => Err(DispatchError::NotImplemented(format!(
1520 "worker:// dispatch to '{queue}' is not implemented yet; see O-05 #182"
1521 ))),
1522 }
1523 }
1524
1525 async fn apply_flow_control(
1526 &self,
1527 binding: &TriggerBinding,
1528 event: &TriggerEvent,
1529 replay_of_event_id: Option<&String>,
1530 ) -> Result<FlowControlOutcome, DispatchError> {
1531 let flow = &binding.flow_control;
1532 let mut managed_event = event.clone();
1533
1534 if let Some(batch) = &flow.batch {
1535 let gate = self
1536 .resolve_flow_gate(
1537 &binding.binding_key(),
1538 batch.key.as_ref(),
1539 &managed_event,
1540 replay_of_event_id,
1541 )
1542 .await?;
1543 match self
1544 .state
1545 .flow_control
1546 .consume_batch(&gate, batch.size, batch.timeout, managed_event.clone())
1547 .await
1548 .map_err(DispatchError::from)?
1549 {
1550 BatchDecision::Dispatch(events) => {
1551 managed_event = build_batched_event(events)?;
1552 }
1553 BatchDecision::Merged => {
1554 return Ok(FlowControlOutcome::Skip {
1555 reason: "batch_merged".to_string(),
1556 })
1557 }
1558 }
1559 }
1560
1561 if let Some(debounce) = &flow.debounce {
1562 let gate = self
1563 .resolve_flow_gate(
1564 &binding.binding_key(),
1565 Some(&debounce.key),
1566 &managed_event,
1567 replay_of_event_id,
1568 )
1569 .await?;
1570 let latest = self
1571 .state
1572 .flow_control
1573 .debounce(&gate, debounce.period)
1574 .await
1575 .map_err(DispatchError::from)?;
1576 if !latest {
1577 return Ok(FlowControlOutcome::Skip {
1578 reason: "debounced".to_string(),
1579 });
1580 }
1581 }
1582
1583 if let Some(rate_limit) = &flow.rate_limit {
1584 let gate = self
1585 .resolve_flow_gate(
1586 &binding.binding_key(),
1587 rate_limit.key.as_ref(),
1588 &managed_event,
1589 replay_of_event_id,
1590 )
1591 .await?;
1592 let allowed = self
1593 .state
1594 .flow_control
1595 .check_rate_limit(&gate, rate_limit.period, rate_limit.max)
1596 .await
1597 .map_err(DispatchError::from)?;
1598 if !allowed {
1599 return Ok(FlowControlOutcome::Skip {
1600 reason: "rate_limited".to_string(),
1601 });
1602 }
1603 }
1604
1605 if let Some(throttle) = &flow.throttle {
1606 let gate = self
1607 .resolve_flow_gate(
1608 &binding.binding_key(),
1609 throttle.key.as_ref(),
1610 &managed_event,
1611 replay_of_event_id,
1612 )
1613 .await?;
1614 self.state
1615 .flow_control
1616 .wait_for_throttle(&gate, throttle.period, throttle.max)
1617 .await
1618 .map_err(DispatchError::from)?;
1619 }
1620
1621 let mut acquired = AcquiredFlowControl::default();
1622 if let Some(singleton) = &flow.singleton {
1623 let gate = self
1624 .resolve_flow_gate(
1625 &binding.binding_key(),
1626 singleton.key.as_ref(),
1627 &managed_event,
1628 replay_of_event_id,
1629 )
1630 .await?;
1631 let acquired_singleton = self
1632 .state
1633 .flow_control
1634 .try_acquire_singleton(&gate)
1635 .await
1636 .map_err(DispatchError::from)?;
1637 if !acquired_singleton {
1638 return Ok(FlowControlOutcome::Skip {
1639 reason: "singleton_active".to_string(),
1640 });
1641 }
1642 acquired.singleton_gate = Some(gate);
1643 }
1644
1645 if let Some(concurrency) = &flow.concurrency {
1646 let gate = self
1647 .resolve_flow_gate(
1648 &binding.binding_key(),
1649 concurrency.key.as_ref(),
1650 &managed_event,
1651 replay_of_event_id,
1652 )
1653 .await?;
1654 let priority_rank = self
1655 .resolve_priority_rank(
1656 &binding.binding_key(),
1657 flow.priority.as_ref(),
1658 &managed_event,
1659 replay_of_event_id,
1660 )
1661 .await?;
1662 acquired.concurrency = Some(
1663 self.state
1664 .flow_control
1665 .acquire_concurrency(&gate, concurrency.max, priority_rank)
1666 .await
1667 .map_err(DispatchError::from)?,
1668 );
1669 }
1670
1671 Ok(FlowControlOutcome::Dispatch {
1672 event: Box::new(managed_event),
1673 acquired,
1674 })
1675 }
1676
1677 async fn release_flow_control(
1678 &self,
1679 acquired: &mut AcquiredFlowControl,
1680 ) -> Result<(), DispatchError> {
1681 if let Some(gate) = acquired.singleton_gate.take() {
1682 self.state
1683 .flow_control
1684 .release_singleton(&gate)
1685 .await
1686 .map_err(DispatchError::from)?;
1687 }
1688 if let Some(permit) = acquired.concurrency.take() {
1689 self.state
1690 .flow_control
1691 .release_concurrency(permit)
1692 .await
1693 .map_err(DispatchError::from)?;
1694 }
1695 Ok(())
1696 }
1697
1698 async fn resolve_flow_gate(
1699 &self,
1700 binding_key: &str,
1701 expr: Option<&crate::triggers::TriggerExpressionSpec>,
1702 event: &TriggerEvent,
1703 replay_of_event_id: Option<&String>,
1704 ) -> Result<String, DispatchError> {
1705 let key = match expr {
1706 Some(expr) => {
1707 self.evaluate_flow_expression(binding_key, expr, event, replay_of_event_id)
1708 .await?
1709 }
1710 None => "_global".to_string(),
1711 };
1712 Ok(format!("{binding_key}:{key}"))
1713 }
1714
1715 async fn resolve_priority_rank(
1716 &self,
1717 binding_key: &str,
1718 priority: Option<&crate::triggers::TriggerPriorityOrderConfig>,
1719 event: &TriggerEvent,
1720 replay_of_event_id: Option<&String>,
1721 ) -> Result<usize, DispatchError> {
1722 let Some(priority) = priority else {
1723 return Ok(0);
1724 };
1725 let value = self
1726 .evaluate_flow_expression(binding_key, &priority.key, event, replay_of_event_id)
1727 .await?;
1728 Ok(priority
1729 .order
1730 .iter()
1731 .position(|candidate| candidate == &value)
1732 .unwrap_or(priority.order.len()))
1733 }
1734
1735 async fn evaluate_flow_expression(
1736 &self,
1737 binding_key: &str,
1738 expr: &crate::triggers::TriggerExpressionSpec,
1739 event: &TriggerEvent,
1740 replay_of_event_id: Option<&String>,
1741 ) -> Result<String, DispatchError> {
1742 let value = self
1743 .invoke_vm_callable(
1744 &expr.closure,
1745 binding_key,
1746 event,
1747 replay_of_event_id,
1748 "",
1749 "flow_control",
1750 AutonomyTier::Suggest,
1751 &mut self.cancel_tx.subscribe(),
1752 )
1753 .await?;
1754 Ok(json_value_to_gate(&vm_value_to_json(&value)))
1755 }
1756
1757 #[allow(clippy::too_many_arguments)]
1758 async fn invoke_vm_callable(
1759 &self,
1760 closure: &crate::value::VmClosure,
1761 binding_key: &str,
1762 event: &TriggerEvent,
1763 replay_of_event_id: Option<&String>,
1764 agent_id: &str,
1765 action: &str,
1766 autonomy_tier: AutonomyTier,
1767 cancel_rx: &mut broadcast::Receiver<()>,
1768 ) -> Result<VmValue, DispatchError> {
1769 let mut vm = self.base_vm.child_vm();
1770 let cancel_token = Arc::new(std::sync::atomic::AtomicBool::new(false));
1771 if self.state.shutting_down.load(Ordering::SeqCst) {
1772 cancel_token.store(true, Ordering::SeqCst);
1773 }
1774 self.state
1775 .cancel_tokens
1776 .lock()
1777 .expect("dispatcher cancel tokens poisoned")
1778 .push(cancel_token.clone());
1779 vm.install_cancel_token(cancel_token.clone());
1780 let arg = json_to_vm_value(
1781 &serde_json::to_value(event)
1782 .map_err(|error| DispatchError::Serde(error.to_string()))?,
1783 );
1784 let args = [arg];
1785 let future = vm.call_closure_pub(closure, &args, &[]);
1786 pin_mut!(future);
1787 let prior_context = ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1788 slot.borrow_mut().replace(DispatchContext {
1789 trigger_event: event.clone(),
1790 replay_of_event_id: replay_of_event_id.cloned(),
1791 agent_id: agent_id.to_string(),
1792 action: action.to_string(),
1793 autonomy_tier,
1794 })
1795 });
1796 let prior_hitl_state = crate::stdlib::hitl::take_hitl_state();
1797 crate::stdlib::hitl::reset_hitl_state();
1798 let mut poll = tokio::time::interval(Duration::from_millis(100));
1799 let result = loop {
1800 tokio::select! {
1801 result = &mut future => break result,
1802 _ = recv_cancel(cancel_rx) => {
1803 cancel_token.store(true, Ordering::SeqCst);
1804 }
1805 _ = poll.tick() => {
1806 if dispatch_cancel_requested(
1807 &self.event_log,
1808 binding_key,
1809 &event.id.0,
1810 replay_of_event_id,
1811 )
1812 .await? {
1813 cancel_token.store(true, Ordering::SeqCst);
1814 }
1815 }
1816 }
1817 };
1818 ACTIVE_DISPATCH_CONTEXT.with(|slot| {
1819 *slot.borrow_mut() = prior_context;
1820 });
1821 crate::stdlib::hitl::restore_hitl_state(prior_hitl_state);
1822 {
1823 let mut tokens = self
1824 .state
1825 .cancel_tokens
1826 .lock()
1827 .expect("dispatcher cancel tokens poisoned");
1828 tokens.retain(|token| !Arc::ptr_eq(token, &cancel_token));
1829 }
1830
1831 if cancel_token.load(Ordering::SeqCst) {
1832 if dispatch_cancel_requested(
1833 &self.event_log,
1834 binding_key,
1835 &event.id.0,
1836 replay_of_event_id,
1837 )
1838 .await?
1839 {
1840 Err(DispatchError::Cancelled(
1841 "trigger cancel request cancelled local handler".to_string(),
1842 ))
1843 } else {
1844 Err(DispatchError::Cancelled(
1845 "dispatcher shutdown cancelled local handler".to_string(),
1846 ))
1847 }
1848 } else {
1849 result.map_err(dispatch_error_from_vm_error)
1850 }
1851 }
1852
1853 async fn evaluate_predicate(
1854 &self,
1855 binding: &TriggerBinding,
1856 predicate: &super::registry::TriggerPredicateSpec,
1857 event: &TriggerEvent,
1858 replay_of_event_id: Option<&String>,
1859 autonomy_tier: AutonomyTier,
1860 ) -> Result<PredicateEvaluationRecord, DispatchError> {
1861 let event_id = event.id.0.clone();
1862 let trigger_id = binding.id.as_str().to_string();
1863 let now_ms = now_unix_ms();
1864 let today = utc_day_key();
1865
1866 let breaker_open_until = {
1867 let mut state = binding
1868 .predicate_state
1869 .lock()
1870 .expect("trigger predicate state poisoned");
1871 if state.budget_day_utc != Some(today) {
1872 state.budget_day_utc = Some(today);
1873 binding
1874 .metrics
1875 .cost_today_usd_micros
1876 .store(0, Ordering::Relaxed);
1877 }
1878 if state
1879 .breaker_open_until_ms
1880 .is_some_and(|until_ms| until_ms > now_ms)
1881 {
1882 state.breaker_open_until_ms
1883 } else {
1884 None
1885 }
1886 };
1887
1888 if breaker_open_until.is_some() {
1889 let mut metadata = BTreeMap::new();
1890 metadata.insert("trigger_id".to_string(), serde_json::json!(trigger_id));
1891 metadata.insert("event_id".to_string(), serde_json::json!(event_id));
1892 metadata.insert(
1893 "breaker_open_until_ms".to_string(),
1894 serde_json::json!(breaker_open_until),
1895 );
1896 crate::events::log_warn_meta(
1897 "trigger.predicate.circuit_breaker",
1898 "trigger predicate circuit breaker is open; short-circuiting to false",
1899 metadata,
1900 );
1901 let record = PredicateEvaluationRecord {
1902 result: false,
1903 reason: Some("circuit_open".to_string()),
1904 ..Default::default()
1905 };
1906 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1907 .await?;
1908 return Ok(record);
1909 }
1910
1911 if binding
1912 .daily_cost_usd
1913 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
1914 {
1915 self.append_lifecycle_event(
1916 "predicate.daily_budget_exceeded",
1917 event,
1918 binding,
1919 serde_json::json!({
1920 "trigger_id": binding.id.as_str(),
1921 "event_id": event.id.0,
1922 "limit_usd": binding.daily_cost_usd,
1923 "cost_today_usd": current_predicate_daily_cost(binding),
1924 "replay_of_event_id": replay_of_event_id,
1925 }),
1926 replay_of_event_id,
1927 )
1928 .await?;
1929 let record = PredicateEvaluationRecord {
1930 result: false,
1931 reason: Some("daily_budget_exceeded".to_string()),
1932 ..Default::default()
1933 };
1934 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
1935 .await?;
1936 return Ok(record);
1937 }
1938
1939 let replay_cache = self
1940 .read_predicate_cache_record(replay_of_event_id.unwrap_or(&event_id))
1941 .await?;
1942 let guard = start_predicate_evaluation(
1943 binding.when_budget.clone().unwrap_or_default(),
1944 replay_cache,
1945 );
1946 let started = std::time::Instant::now();
1947 let eval = self
1948 .invoke_vm_callable_with_timeout(
1949 &predicate.closure,
1950 &binding.binding_key(),
1951 event,
1952 replay_of_event_id,
1953 binding.id.as_str(),
1954 &format!("{}.{}", event.provider.as_str(), event.kind),
1955 autonomy_tier,
1956 &mut self.cancel_tx.subscribe(),
1957 binding
1958 .when_budget
1959 .as_ref()
1960 .and_then(|budget| budget.timeout()),
1961 )
1962 .await;
1963 let capture = guard.finish();
1964 let latency_ms = started.elapsed().as_millis() as u64;
1965 if replay_of_event_id.is_none() && !capture.entries.is_empty() {
1966 self.append_predicate_cache_record(binding, event, &capture.entries)
1967 .await?;
1968 }
1969
1970 let mut record = PredicateEvaluationRecord {
1971 result: false,
1972 cost_usd: capture.total_cost_usd,
1973 tokens: capture.total_tokens,
1974 latency_ms,
1975 cached: capture.cached,
1976 reason: None,
1977 };
1978
1979 let mut count_failure = false;
1980 let mut opened_breaker = false;
1981
1982 match eval {
1983 Ok(value) => match predicate_value_as_bool(value) {
1984 Ok(result) => {
1985 record.result = result;
1986 }
1987 Err(reason) => {
1988 count_failure = true;
1989 record.reason = Some(reason);
1990 }
1991 },
1992 Err(error) => {
1993 count_failure = true;
1994 record.reason = Some(error.to_string());
1995 }
1996 }
1997
1998 let cost_usd_micros = usd_to_micros(record.cost_usd);
1999 if cost_usd_micros > 0 {
2000 binding
2001 .metrics
2002 .cost_total_usd_micros
2003 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2004 binding
2005 .metrics
2006 .cost_today_usd_micros
2007 .fetch_add(cost_usd_micros, Ordering::Relaxed);
2008 }
2009
2010 let timed_out = matches!(
2011 record.reason.as_deref(),
2012 Some("predicate evaluation timed out")
2013 );
2014 if capture.budget_exceeded || timed_out {
2015 record.result = false;
2016 record.reason = Some("budget_exceeded".to_string());
2017 self.append_lifecycle_event(
2018 "predicate.budget_exceeded",
2019 event,
2020 binding,
2021 serde_json::json!({
2022 "trigger_id": binding.id.as_str(),
2023 "event_id": event.id.0,
2024 "max_cost_usd": binding.when_budget.as_ref().and_then(|budget| budget.max_cost_usd),
2025 "tokens_max": binding.when_budget.as_ref().and_then(|budget| budget.tokens_max),
2026 "cost_usd": record.cost_usd,
2027 "tokens": record.tokens,
2028 "replay_of_event_id": replay_of_event_id,
2029 }),
2030 replay_of_event_id,
2031 )
2032 .await?;
2033 }
2034
2035 if binding
2036 .daily_cost_usd
2037 .is_some_and(|limit| current_predicate_daily_cost(binding) > limit)
2038 {
2039 record.result = false;
2040 record.reason = Some("daily_budget_exceeded".to_string());
2041 self.append_lifecycle_event(
2042 "predicate.daily_budget_exceeded",
2043 event,
2044 binding,
2045 serde_json::json!({
2046 "trigger_id": binding.id.as_str(),
2047 "event_id": event.id.0,
2048 "limit_usd": binding.daily_cost_usd,
2049 "cost_today_usd": current_predicate_daily_cost(binding),
2050 "replay_of_event_id": replay_of_event_id,
2051 }),
2052 replay_of_event_id,
2053 )
2054 .await?;
2055 }
2056
2057 {
2058 let mut state = binding
2059 .predicate_state
2060 .lock()
2061 .expect("trigger predicate state poisoned");
2062 if state.budget_day_utc != Some(today) {
2063 state.budget_day_utc = Some(today);
2064 binding
2065 .metrics
2066 .cost_today_usd_micros
2067 .store(cost_usd_micros, Ordering::Relaxed);
2068 }
2069 if count_failure {
2070 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
2071 if state.consecutive_failures >= 3 {
2072 state.breaker_open_until_ms = Some(now_ms.saturating_add(5 * 60 * 1000));
2073 opened_breaker = true;
2074 }
2075 } else {
2076 state.consecutive_failures = 0;
2077 state.breaker_open_until_ms = None;
2078 }
2079 }
2080
2081 if opened_breaker {
2082 let mut metadata = BTreeMap::new();
2083 metadata.insert(
2084 "trigger_id".to_string(),
2085 serde_json::json!(binding.id.as_str()),
2086 );
2087 metadata.insert("event_id".to_string(), serde_json::json!(event.id.0));
2088 metadata.insert("failure_count".to_string(), serde_json::json!(3));
2089 metadata.insert("reason".to_string(), serde_json::json!(record.reason));
2090 crate::events::log_warn_meta(
2091 "trigger.predicate.circuit_breaker",
2092 "trigger predicate circuit breaker opened for 5 minutes",
2093 metadata,
2094 );
2095 }
2096
2097 self.append_predicate_evaluated_event(binding, event, &record, replay_of_event_id)
2098 .await?;
2099 Ok(record)
2100 }
2101
2102 #[allow(clippy::too_many_arguments)]
2103 #[allow(clippy::too_many_arguments)]
2104 async fn invoke_vm_callable_with_timeout(
2105 &self,
2106 closure: &crate::value::VmClosure,
2107 binding_key: &str,
2108 event: &TriggerEvent,
2109 replay_of_event_id: Option<&String>,
2110 agent_id: &str,
2111 action: &str,
2112 autonomy_tier: AutonomyTier,
2113 cancel_rx: &mut broadcast::Receiver<()>,
2114 timeout: Option<Duration>,
2115 ) -> Result<VmValue, DispatchError> {
2116 let future = self.invoke_vm_callable(
2117 closure,
2118 binding_key,
2119 event,
2120 replay_of_event_id,
2121 agent_id,
2122 action,
2123 autonomy_tier,
2124 cancel_rx,
2125 );
2126 pin_mut!(future);
2127 if let Some(timeout) = timeout {
2128 match tokio::time::timeout(timeout, future).await {
2129 Ok(result) => result,
2130 Err(_) => Err(DispatchError::Local(
2131 "predicate evaluation timed out".to_string(),
2132 )),
2133 }
2134 } else {
2135 future.await
2136 }
2137 }
2138
2139 async fn append_predicate_evaluated_event(
2140 &self,
2141 binding: &TriggerBinding,
2142 event: &TriggerEvent,
2143 record: &PredicateEvaluationRecord,
2144 replay_of_event_id: Option<&String>,
2145 ) -> Result<(), DispatchError> {
2146 self.append_lifecycle_event(
2147 "predicate.evaluated",
2148 event,
2149 binding,
2150 serde_json::json!({
2151 "trigger_id": binding.id.as_str(),
2152 "event_id": event.id.0,
2153 "result": record.result,
2154 "cost_usd": record.cost_usd,
2155 "tokens": record.tokens,
2156 "latency_ms": record.latency_ms,
2157 "cached": record.cached,
2158 "reason": record.reason,
2159 "replay_of_event_id": replay_of_event_id,
2160 }),
2161 replay_of_event_id,
2162 )
2163 .await
2164 }
2165
2166 async fn append_predicate_cache_record(
2167 &self,
2168 binding: &TriggerBinding,
2169 event: &TriggerEvent,
2170 entries: &[PredicateCacheEntry],
2171 ) -> Result<(), DispatchError> {
2172 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2173 .expect("static trigger inbox legacy topic name is valid");
2174 let payload = serde_json::to_value(PredicateCacheRecord {
2175 trigger_id: binding.id.as_str().to_string(),
2176 event_id: event.id.0.clone(),
2177 entries: entries.to_vec(),
2178 })
2179 .map_err(|error| DispatchError::Serde(error.to_string()))?;
2180 self.event_log
2181 .append(&topic, LogEvent::new("predicate_llm_cache", payload))
2182 .await
2183 .map_err(DispatchError::from)
2184 .map(|_| ())
2185 }
2186
2187 async fn read_predicate_cache_record(
2188 &self,
2189 event_id: &str,
2190 ) -> Result<Vec<PredicateCacheEntry>, DispatchError> {
2191 let topic = Topic::new(TRIGGER_INBOX_LEGACY_TOPIC)
2192 .expect("static trigger inbox legacy topic name is valid");
2193 let records = self
2194 .event_log
2195 .read_range(&topic, None, usize::MAX)
2196 .await
2197 .map_err(DispatchError::from)?;
2198 Ok(records
2199 .into_iter()
2200 .filter(|(_, event)| event.kind == "predicate_llm_cache")
2201 .filter_map(|(_, event)| {
2202 serde_json::from_value::<PredicateCacheRecord>(event.payload).ok()
2203 })
2204 .filter(|record| record.event_id == event_id)
2205 .flat_map(|record| record.entries)
2206 .collect())
2207 }
2208
2209 #[allow(clippy::too_many_arguments)]
2210 async fn append_dispatch_trust_record(
2211 &self,
2212 binding: &TriggerBinding,
2213 route: &DispatchUri,
2214 event: &TriggerEvent,
2215 replay_of_event_id: Option<&String>,
2216 autonomy_tier: AutonomyTier,
2217 outcome: TrustOutcome,
2218 terminal_status: &str,
2219 attempt_count: u32,
2220 error: Option<String>,
2221 ) -> Result<(), DispatchError> {
2222 let mut record = TrustRecord::new(
2223 binding.id.as_str().to_string(),
2224 format!("{}.{}", event.provider.as_str(), event.kind),
2225 None,
2226 outcome,
2227 event.trace_id.0.clone(),
2228 autonomy_tier,
2229 );
2230 record.metadata.insert(
2231 "binding_key".to_string(),
2232 serde_json::json!(binding.binding_key()),
2233 );
2234 record.metadata.insert(
2235 "binding_version".to_string(),
2236 serde_json::json!(binding.version),
2237 );
2238 record.metadata.insert(
2239 "provider".to_string(),
2240 serde_json::json!(event.provider.as_str()),
2241 );
2242 record
2243 .metadata
2244 .insert("event_kind".to_string(), serde_json::json!(event.kind));
2245 record
2246 .metadata
2247 .insert("handler_kind".to_string(), serde_json::json!(route.kind()));
2248 record.metadata.insert(
2249 "target_uri".to_string(),
2250 serde_json::json!(route.target_uri()),
2251 );
2252 record.metadata.insert(
2253 "terminal_status".to_string(),
2254 serde_json::json!(terminal_status),
2255 );
2256 record.metadata.insert(
2257 "attempt_count".to_string(),
2258 serde_json::json!(attempt_count),
2259 );
2260 if let Some(replay_of_event_id) = replay_of_event_id {
2261 record.metadata.insert(
2262 "replay_of_event_id".to_string(),
2263 serde_json::json!(replay_of_event_id),
2264 );
2265 }
2266 if let Some(error) = error {
2267 record
2268 .metadata
2269 .insert("error".to_string(), serde_json::json!(error));
2270 }
2271 append_trust_record(&self.event_log, &record)
2272 .await
2273 .map_err(DispatchError::from)
2274 }
2275
2276 async fn append_attempt_record(
2277 &self,
2278 event: &TriggerEvent,
2279 binding: &TriggerBinding,
2280 attempt: &DispatchAttemptRecord,
2281 replay_of_event_id: Option<&String>,
2282 ) -> Result<(), DispatchError> {
2283 self.append_topic_event(
2284 TRIGGER_ATTEMPTS_TOPIC,
2285 "attempt_recorded",
2286 event,
2287 Some(binding),
2288 Some(attempt.attempt),
2289 serde_json::to_value(attempt)
2290 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2291 replay_of_event_id,
2292 )
2293 .await
2294 }
2295
2296 async fn append_lifecycle_event(
2297 &self,
2298 kind: &str,
2299 event: &TriggerEvent,
2300 binding: &TriggerBinding,
2301 payload: serde_json::Value,
2302 replay_of_event_id: Option<&String>,
2303 ) -> Result<(), DispatchError> {
2304 self.append_topic_event(
2305 TRIGGERS_LIFECYCLE_TOPIC,
2306 kind,
2307 event,
2308 Some(binding),
2309 None,
2310 payload,
2311 replay_of_event_id,
2312 )
2313 .await
2314 }
2315
2316 async fn append_topic_event(
2317 &self,
2318 topic_name: &str,
2319 kind: &str,
2320 event: &TriggerEvent,
2321 binding: Option<&TriggerBinding>,
2322 attempt: Option<u32>,
2323 payload: serde_json::Value,
2324 replay_of_event_id: Option<&String>,
2325 ) -> Result<(), DispatchError> {
2326 let topic = Topic::new(topic_name)
2327 .expect("static trigger dispatcher topic names should always be valid");
2328 let headers = event_headers(event, binding, attempt, replay_of_event_id);
2329 self.event_log
2330 .append(&topic, LogEvent::new(kind, payload).with_headers(headers))
2331 .await
2332 .map_err(DispatchError::from)
2333 .map(|_| ())
2334 }
2335
2336 async fn emit_action_graph(
2337 &self,
2338 event: &TriggerEvent,
2339 nodes: Vec<RunActionGraphNodeRecord>,
2340 edges: Vec<RunActionGraphEdgeRecord>,
2341 extra: serde_json::Value,
2342 ) -> Result<(), DispatchError> {
2343 let mut headers = BTreeMap::new();
2344 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2345 headers.insert("event_id".to_string(), event.id.0.clone());
2346 let observability = RunObservabilityRecord {
2347 schema_version: 1,
2348 action_graph_nodes: nodes,
2349 action_graph_edges: edges,
2350 ..Default::default()
2351 };
2352 append_action_graph_update(
2353 headers,
2354 serde_json::json!({
2355 "source": "dispatcher",
2356 "trace_id": event.trace_id.0,
2357 "event_id": event.id.0,
2358 "observability": observability,
2359 "context": extra,
2360 }),
2361 )
2362 .await
2363 .map_err(DispatchError::from)
2364 }
2365}
2366
2367async fn dispatch_cancel_requested(
2368 event_log: &Arc<AnyEventLog>,
2369 binding_key: &str,
2370 event_id: &str,
2371 replay_of_event_id: Option<&String>,
2372) -> Result<bool, DispatchError> {
2373 if replay_of_event_id.is_some() {
2374 return Ok(false);
2375 }
2376 let topic = Topic::new(TRIGGER_CANCEL_REQUESTS_TOPIC)
2377 .expect("static trigger cancel topic should always be valid");
2378 let events = event_log.read_range(&topic, None, usize::MAX).await?;
2379 let requested = events
2380 .into_iter()
2381 .filter(|(_, event)| event.kind == "dispatch_cancel_requested")
2382 .filter_map(|(_, event)| {
2383 serde_json::from_value::<DispatchCancelRequest>(event.payload).ok()
2384 })
2385 .collect::<BTreeSet<_>>();
2386 Ok(requested
2387 .iter()
2388 .any(|request| request.binding_key == binding_key && request.event_id == event_id))
2389}
2390
2391async fn sleep_or_cancel_or_request(
2392 event_log: &Arc<AnyEventLog>,
2393 delay: Duration,
2394 binding_key: &str,
2395 event_id: &str,
2396 replay_of_event_id: Option<&String>,
2397 cancel_rx: &mut broadcast::Receiver<()>,
2398) -> Result<(), DispatchError> {
2399 let sleep = tokio::time::sleep(delay);
2400 pin_mut!(sleep);
2401 let mut poll = tokio::time::interval(Duration::from_millis(100));
2402 loop {
2403 tokio::select! {
2404 _ = &mut sleep => return Ok(()),
2405 _ = recv_cancel(cancel_rx) => {
2406 return Err(DispatchError::Cancelled(
2407 "dispatcher shutdown cancelled retry wait".to_string(),
2408 ));
2409 }
2410 _ = poll.tick() => {
2411 if dispatch_cancel_requested(event_log, binding_key, event_id, replay_of_event_id).await? {
2412 return Err(DispatchError::Cancelled(
2413 "trigger cancel request cancelled retry wait".to_string(),
2414 ));
2415 }
2416 }
2417 }
2418 }
2419}
2420
2421fn build_batched_event(events: Vec<TriggerEvent>) -> Result<TriggerEvent, DispatchError> {
2422 let mut iter = events.into_iter();
2423 let Some(mut root) = iter.next() else {
2424 return Err(DispatchError::Registry(
2425 "batch dispatch produced an empty event list".to_string(),
2426 ));
2427 };
2428 let mut batch = Vec::new();
2429 batch.push(
2430 serde_json::to_value(&root).map_err(|error| DispatchError::Serde(error.to_string()))?,
2431 );
2432 for event in iter {
2433 batch.push(
2434 serde_json::to_value(&event)
2435 .map_err(|error| DispatchError::Serde(error.to_string()))?,
2436 );
2437 }
2438 root.batch = Some(batch);
2439 Ok(root)
2440}
2441
2442fn json_value_to_gate(value: &serde_json::Value) -> String {
2443 match value {
2444 serde_json::Value::Null => "null".to_string(),
2445 serde_json::Value::String(text) => text.clone(),
2446 serde_json::Value::Bool(value) => value.to_string(),
2447 serde_json::Value::Number(value) => value.to_string(),
2448 other => serde_json::to_string(other).unwrap_or_else(|_| "unserializable".to_string()),
2449 }
2450}
2451
2452fn decrement_in_flight(state: &DispatcherRuntimeState) {
2453 let previous = state.in_flight.fetch_sub(1, Ordering::Relaxed);
2454 if previous == 1 && state.retry_queue_depth.load(Ordering::Relaxed) == 0 {
2455 state.idle_notify.notify_waiters();
2456 }
2457}
2458
2459fn decrement_retry_queue_depth(state: &DispatcherRuntimeState) {
2460 let previous = state.retry_queue_depth.fetch_sub(1, Ordering::Relaxed);
2461 if previous == 1 && state.in_flight.load(Ordering::Relaxed) == 0 {
2462 state.idle_notify.notify_waiters();
2463 }
2464}
2465
2466#[cfg(test)]
2467fn install_test_inbox_dequeued_signal(tx: tokio::sync::oneshot::Sender<()>) {
2468 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2469 *slot.borrow_mut() = Some(tx);
2470 });
2471}
2472
2473#[cfg(not(test))]
2474fn notify_test_inbox_dequeued() {}
2475
2476#[cfg(test)]
2477fn notify_test_inbox_dequeued() {
2478 TEST_INBOX_DEQUEUED_SIGNAL.with(|slot| {
2479 if let Some(tx) = slot.borrow_mut().take() {
2480 let _ = tx.send(());
2481 }
2482 });
2483}
2484
2485pub async fn enqueue_trigger_event<L: EventLog + ?Sized>(
2486 event_log: &L,
2487 event: &TriggerEvent,
2488) -> Result<u64, DispatchError> {
2489 let topic = Topic::new(TRIGGER_INBOX_ENVELOPES_TOPIC)
2490 .expect("static trigger.inbox.envelopes topic is valid");
2491 let headers = event_headers(event, None, None, None);
2492 let payload =
2493 serde_json::to_value(event).map_err(|error| DispatchError::Serde(error.to_string()))?;
2494 event_log
2495 .append(
2496 &topic,
2497 LogEvent::new("event_ingested", payload).with_headers(headers),
2498 )
2499 .await
2500 .map_err(DispatchError::from)
2501}
2502
2503pub fn snapshot_dispatcher_stats() -> DispatcherStatsSnapshot {
2504 ACTIVE_DISPATCHER_STATE.with(|slot| {
2505 slot.borrow()
2506 .as_ref()
2507 .map(|state| DispatcherStatsSnapshot {
2508 in_flight: state.in_flight.load(Ordering::Relaxed),
2509 retry_queue_depth: state.retry_queue_depth.load(Ordering::Relaxed),
2510 dlq_depth: state.dlq.lock().expect("dispatcher dlq poisoned").len() as u64,
2511 })
2512 .unwrap_or_default()
2513 })
2514}
2515
2516pub fn clear_dispatcher_state() {
2517 ACTIVE_DISPATCHER_STATE.with(|slot| {
2518 *slot.borrow_mut() = None;
2519 });
2520}
2521
2522fn dispatch_error_from_vm_error(error: VmError) -> DispatchError {
2523 if is_cancelled_vm_error(&error) {
2524 return DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string());
2525 }
2526 if let VmError::Thrown(VmValue::String(message)) = &error {
2527 return DispatchError::Local(message.to_string());
2528 }
2529 match error_to_category(&error) {
2530 ErrorCategory::Timeout => DispatchError::Timeout(error.to_string()),
2531 ErrorCategory::ToolRejected => DispatchError::Denied(error.to_string()),
2532 ErrorCategory::Cancelled => {
2533 DispatchError::Cancelled("dispatcher shutdown cancelled local handler".to_string())
2534 }
2535 _ => DispatchError::Local(error.to_string()),
2536 }
2537}
2538
2539fn dispatch_error_label(error: &DispatchError) -> &'static str {
2540 match error {
2541 DispatchError::Denied(_) => "denied",
2542 DispatchError::Timeout(_) => "timeout",
2543 DispatchError::Cancelled(_) => "cancelled",
2544 _ => "failed",
2545 }
2546}
2547
2548fn dispatch_node_id(
2549 route: &DispatchUri,
2550 binding_key: &str,
2551 event_id: &str,
2552 attempt: u32,
2553) -> String {
2554 let prefix = match route {
2555 DispatchUri::A2a { .. } => "a2a",
2556 _ => "dispatch",
2557 };
2558 format!("{prefix}:{binding_key}:{event_id}:{attempt}")
2559}
2560
2561fn dispatch_node_kind(route: &DispatchUri) -> &'static str {
2562 match route {
2563 DispatchUri::A2a { .. } => ACTION_GRAPH_NODE_KIND_A2A_HOP,
2564 _ => ACTION_GRAPH_NODE_KIND_DISPATCH,
2565 }
2566}
2567
2568fn dispatch_node_label(route: &DispatchUri) -> String {
2569 match route {
2570 DispatchUri::A2a { target, .. } => crate::a2a::target_agent_label(target),
2571 _ => route.target_uri(),
2572 }
2573}
2574
2575fn dispatch_target_agent(route: &DispatchUri) -> Option<String> {
2576 match route {
2577 DispatchUri::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
2578 _ => None,
2579 }
2580}
2581
2582fn dispatch_entry_edge_kind(route: &DispatchUri, has_predicate: bool) -> &'static str {
2583 match route {
2584 DispatchUri::A2a { .. } => ACTION_GRAPH_EDGE_KIND_A2A_DISPATCH,
2585 _ if has_predicate => ACTION_GRAPH_EDGE_KIND_PREDICATE_GATE,
2586 _ => ACTION_GRAPH_EDGE_KIND_TRIGGER_DISPATCH,
2587 }
2588}
2589
2590fn predicate_value_as_bool(value: VmValue) -> Result<bool, String> {
2591 match value {
2592 VmValue::Bool(result) => Ok(result),
2593 VmValue::EnumVariant {
2594 enum_name,
2595 variant,
2596 mut fields,
2597 } if enum_name == "Result" && variant == "Ok" => match fields.pop() {
2598 Some(VmValue::Bool(result)) => Ok(result),
2599 Some(other) => Err(format!(
2600 "predicate Result.Ok payload must be bool, got {}",
2601 other.type_name()
2602 )),
2603 None => Err("predicate Result.Ok payload is missing".to_string()),
2604 },
2605 VmValue::EnumVariant {
2606 enum_name,
2607 variant,
2608 fields,
2609 } if enum_name == "Result" && variant == "Err" => Err(fields
2610 .first()
2611 .map(VmValue::display)
2612 .unwrap_or_else(|| "predicate returned Result.Err".to_string())),
2613 other => Err(format!(
2614 "predicate must return bool or Result<bool, _>, got {}",
2615 other.type_name()
2616 )),
2617 }
2618}
2619
2620fn usd_to_micros(value: f64) -> u64 {
2621 if !value.is_finite() || value <= 0.0 {
2622 return 0;
2623 }
2624 (value * 1_000_000.0).round() as u64
2625}
2626
2627fn current_predicate_daily_cost(binding: &TriggerBinding) -> f64 {
2628 binding
2629 .metrics
2630 .cost_today_usd_micros
2631 .load(Ordering::Relaxed) as f64
2632 / 1_000_000.0
2633}
2634
2635fn is_cancelled_vm_error(error: &VmError) -> bool {
2636 matches!(
2637 error,
2638 VmError::Thrown(VmValue::String(message))
2639 if message.starts_with("kind:cancelled:")
2640 ) || matches!(error_to_category(error), ErrorCategory::Cancelled)
2641}
2642
2643fn event_headers(
2644 event: &TriggerEvent,
2645 binding: Option<&TriggerBinding>,
2646 attempt: Option<u32>,
2647 replay_of_event_id: Option<&String>,
2648) -> BTreeMap<String, String> {
2649 let mut headers = BTreeMap::new();
2650 headers.insert("event_id".to_string(), event.id.0.clone());
2651 headers.insert("trace_id".to_string(), event.trace_id.0.clone());
2652 headers.insert("provider".to_string(), event.provider.as_str().to_string());
2653 headers.insert("kind".to_string(), event.kind.clone());
2654 if let Some(replay_of_event_id) = replay_of_event_id {
2655 headers.insert("replay_of_event_id".to_string(), replay_of_event_id.clone());
2656 }
2657 if let Some(binding) = binding {
2658 headers.insert("trigger_id".to_string(), binding.id.as_str().to_string());
2659 headers.insert("binding_key".to_string(), binding.binding_key());
2660 headers.insert(
2661 "handler_kind".to_string(),
2662 DispatchUri::from(&binding.handler).kind().to_string(),
2663 );
2664 }
2665 if let Some(attempt) = attempt {
2666 headers.insert("attempt".to_string(), attempt.to_string());
2667 }
2668 headers
2669}
2670
2671const TEST_FAIL_BEFORE_OUTBOX_ENV: &str = "HARN_TEST_DISPATCHER_FAIL_BEFORE_OUTBOX";
2672
2673fn maybe_fail_before_outbox() {
2674 if std::env::var_os(TEST_FAIL_BEFORE_OUTBOX_ENV).is_some() {
2675 std::process::exit(86);
2676 }
2677}
2678
2679fn now_rfc3339() -> String {
2680 time::OffsetDateTime::now_utc()
2681 .format(&time::format_description::well_known::Rfc3339)
2682 .unwrap_or_else(|_| "1970-01-01T00:00:00Z".to_string())
2683}
2684
2685fn now_unix_ms() -> i64 {
2686 (time::OffsetDateTime::now_utc().unix_timestamp_nanos() / 1_000_000) as i64
2687}
2688
2689fn utc_day_key() -> i32 {
2690 time::OffsetDateTime::now_utc().date().to_julian_day()
2691}
2692
2693fn cancelled_dispatch_outcome(
2694 binding: &TriggerBinding,
2695 route: &DispatchUri,
2696 event: &TriggerEvent,
2697 replay_of_event_id: Option<String>,
2698 attempt_count: u32,
2699 error: String,
2700) -> DispatchOutcome {
2701 DispatchOutcome {
2702 trigger_id: binding.id.as_str().to_string(),
2703 binding_key: binding.binding_key(),
2704 event_id: event.id.0.clone(),
2705 attempt_count,
2706 status: DispatchStatus::Cancelled,
2707 handler_kind: route.kind().to_string(),
2708 target_uri: route.target_uri(),
2709 replay_of_event_id,
2710 result: None,
2711 error: Some(error),
2712 }
2713}
2714
2715async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
2716 let _ = cancel_rx.recv().await;
2717}
2718
2719#[cfg(test)]
2720mod tests;