Skip to main content

harn_vm/triggers/dispatcher/
types.rs

1use std::collections::BTreeMap;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::{Arc, Mutex};
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::broadcast;
7use tokio::sync::{Mutex as AsyncMutex, Notify};
8
9use crate::event_log::{AnyEventLog, LogError};
10use crate::trust_graph::AutonomyTier;
11use crate::vm::Vm;
12
13use super::super::{ProviderId, SignatureStatus, TenantId, TraceId, TriggerEvent, TriggerEventId};
14use super::circuits::DestinationCircuitRegistry;
15use super::flow_control::{ConcurrencyPermit, FlowControlManager};
16
17pub(super) const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
18
19#[derive(Clone, Debug)]
20pub(crate) struct DispatchContext {
21    pub trigger_event: TriggerEvent,
22    pub replay_of_event_id: Option<String>,
23    pub binding_id: String,
24    pub binding_version: u32,
25    pub agent_id: String,
26    pub action: String,
27    pub autonomy_tier: AutonomyTier,
28}
29
30pub(super) struct DispatchExecutionPolicyGuard;
31
32impl Drop for DispatchExecutionPolicyGuard {
33    fn drop(&mut self) {
34        crate::orchestration::pop_execution_policy();
35    }
36}
37
38#[derive(Clone)]
39pub struct Dispatcher {
40    pub(super) base_vm: Arc<Vm>,
41    pub(super) event_log: Arc<AnyEventLog>,
42    pub(super) cancel_tx: broadcast::Sender<()>,
43    pub(super) state: Arc<DispatcherRuntimeState>,
44    pub(super) metrics: Option<Arc<crate::MetricsRegistry>>,
45    pub(super) a2a_client: Arc<dyn crate::a2a::A2aClient>,
46}
47
48#[derive(Debug)]
49pub(super) struct DispatcherRuntimeState {
50    pub(super) in_flight: AtomicU64,
51    pub(super) retry_queue_depth: AtomicU64,
52    pub(super) dlq: Mutex<Vec<DlqEntry>>,
53    pub(super) cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
54    pub(super) shutting_down: std::sync::atomic::AtomicBool,
55    pub(super) idle_notify: Notify,
56    pub(super) flow_control: FlowControlManager,
57    pub(super) destination_circuits: DestinationCircuitRegistry,
58}
59
60impl DispatcherRuntimeState {
61    pub(super) fn new(event_log: Arc<AnyEventLog>) -> Self {
62        Self {
63            in_flight: AtomicU64::new(0),
64            retry_queue_depth: AtomicU64::new(0),
65            dlq: Mutex::new(Vec::new()),
66            cancel_tokens: Mutex::new(Vec::new()),
67            shutting_down: std::sync::atomic::AtomicBool::new(false),
68            idle_notify: Notify::new(),
69            flow_control: FlowControlManager::new(event_log),
70            destination_circuits: DestinationCircuitRegistry::default(),
71        }
72    }
73}
74
75#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(default)]
77pub struct DispatcherStatsSnapshot {
78    pub in_flight: u64,
79    pub retry_queue_depth: u64,
80    pub dlq_depth: u64,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub enum DispatchStatus {
86    Succeeded,
87    Failed,
88    Dlq,
89    Skipped,
90    Waiting,
91    Cancelled,
92}
93
94impl DispatchStatus {
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Self::Succeeded => "succeeded",
98            Self::Failed => "failed",
99            Self::Dlq => "dlq",
100            Self::Skipped => "skipped",
101            Self::Waiting => "waiting",
102            Self::Cancelled => "cancelled",
103        }
104    }
105}
106
107#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
108#[serde(default)]
109pub struct DispatchOutcome {
110    pub trigger_id: String,
111    pub binding_key: String,
112    pub event_id: String,
113    pub attempt_count: u32,
114    pub status: DispatchStatus,
115    pub handler_kind: String,
116    pub target_uri: String,
117    pub replay_of_event_id: Option<String>,
118    pub result: Option<serde_json::Value>,
119    pub error: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123pub struct InboxEnvelope {
124    pub trigger_id: Option<String>,
125    pub binding_version: Option<u32>,
126    pub event: TriggerEvent,
127}
128
129#[derive(Clone, Copy, Debug, PartialEq, Eq)]
130pub(crate) enum TriggerInboxTopicScope {
131    Shared,
132    Tenant,
133}
134
135pub const INBOX_OBSERVATION_SCHEMA_VERSION: u32 = 1;
136
137#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
138pub struct InboxObservationRecord {
139    pub schema_version: u32,
140    pub trigger_id: Option<String>,
141    pub binding_version: Option<u32>,
142    pub event: TriggerEventObservation,
143}
144
145impl InboxObservationRecord {
146    pub fn new(
147        trigger_id: Option<String>,
148        binding_version: Option<u32>,
149        event: TriggerEventObservation,
150    ) -> Self {
151        Self {
152            schema_version: INBOX_OBSERVATION_SCHEMA_VERSION,
153            trigger_id,
154            binding_version,
155            event,
156        }
157    }
158}
159
160#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
161pub struct TriggerEventObservation {
162    pub id: TriggerEventId,
163    pub provider: ProviderId,
164    pub payload_provider: String,
165    pub kind: String,
166    #[serde(with = "time::serde::rfc3339")]
167    pub received_at: time::OffsetDateTime,
168    #[serde(with = "time::serde::rfc3339::option")]
169    pub occurred_at: Option<time::OffsetDateTime>,
170    pub dedupe_key: String,
171    pub trace_id: TraceId,
172    pub tenant_id: Option<TenantId>,
173    pub headers: BTreeMap<String, String>,
174    pub raw_body: Option<RawBodyObservation>,
175    pub signature_status: SignatureStatus,
176}
177
178#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
179pub struct RawBodyObservation {
180    pub byte_len: usize,
181    pub sha256: String,
182}
183
184#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
185#[serde(default)]
186pub struct DispatcherDrainReport {
187    pub drained: bool,
188    pub in_flight: u64,
189    pub retry_queue_depth: u64,
190    pub dlq_depth: u64,
191}
192
193impl Default for DispatchOutcome {
194    fn default() -> Self {
195        Self {
196            trigger_id: String::new(),
197            binding_key: String::new(),
198            event_id: String::new(),
199            attempt_count: 0,
200            status: DispatchStatus::Failed,
201            handler_kind: String::new(),
202            target_uri: String::new(),
203            replay_of_event_id: None,
204            result: None,
205            error: None,
206        }
207    }
208}
209
210#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
211#[serde(default)]
212pub struct DispatchAttemptRecord {
213    pub trigger_id: String,
214    pub binding_key: String,
215    pub event_id: String,
216    pub attempt: u32,
217    pub handler_kind: String,
218    pub started_at: String,
219    pub completed_at: String,
220    pub outcome: String,
221    pub error_msg: Option<String>,
222}
223
224#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
225pub struct DispatchCancelRequest {
226    pub binding_key: String,
227    pub event_id: String,
228    #[serde(with = "time::serde::rfc3339")]
229    pub requested_at: time::OffsetDateTime,
230    #[serde(default)]
231    pub requested_by: Option<String>,
232    #[serde(default)]
233    pub audit_id: Option<String>,
234}
235
236#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
237pub struct DlqEntry {
238    pub trigger_id: String,
239    pub binding_key: String,
240    pub event: TriggerEvent,
241    pub attempt_count: u32,
242    pub final_error: String,
243    #[serde(default = "default_dlq_error_class")]
244    pub error_class: String,
245    pub attempts: Vec<DispatchAttemptRecord>,
246}
247
248fn default_dlq_error_class() -> String {
249    "unknown".to_string()
250}
251
252#[derive(Clone, Debug)]
253pub(super) struct SingletonLease {
254    pub(super) gate: String,
255    pub(super) held: bool,
256}
257
258#[derive(Clone, Debug)]
259pub(super) struct ConcurrencyLease {
260    pub(super) gate: String,
261    pub(super) max: u32,
262    pub(super) priority_rank: usize,
263    pub(super) permit: Option<ConcurrencyPermit>,
264}
265
266#[derive(Default, Debug)]
267pub(super) struct AcquiredFlowControl {
268    pub(super) singleton: Option<SingletonLease>,
269    pub(super) concurrency: Option<ConcurrencyLease>,
270}
271
272#[derive(Clone)]
273pub(crate) struct DispatchWaitLease {
274    state: Arc<DispatcherRuntimeState>,
275    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
276    suspended: Arc<AtomicBool>,
277}
278
279impl DispatchWaitLease {
280    pub(super) fn new(
281        state: Arc<DispatcherRuntimeState>,
282        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
283    ) -> Self {
284        Self {
285            state,
286            acquired,
287            suspended: Arc::new(AtomicBool::new(false)),
288        }
289    }
290
291    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
292        if self.suspended.swap(true, Ordering::SeqCst) {
293            return Ok(());
294        }
295        let (singleton_gate, concurrency_permit) = {
296            let mut acquired = self.acquired.lock().await;
297            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
298                if lease.held {
299                    lease.held = false;
300                    Some(lease.gate.clone())
301                } else {
302                    None
303                }
304            });
305            let concurrency_permit = acquired
306                .concurrency
307                .as_mut()
308                .and_then(|lease| lease.permit.take());
309            (singleton_gate, concurrency_permit)
310        };
311
312        if let Some(gate) = singleton_gate {
313            self.state
314                .flow_control
315                .release_singleton(&gate)
316                .await
317                .map_err(DispatchError::from)?;
318        }
319        if let Some(permit) = concurrency_permit {
320            self.state
321                .flow_control
322                .release_concurrency(permit)
323                .await
324                .map_err(DispatchError::from)?;
325        }
326        Ok(())
327    }
328
329    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
330        if !self.suspended.swap(false, Ordering::SeqCst) {
331            return Ok(());
332        }
333
334        let singleton_gate = {
335            let acquired = self.acquired.lock().await;
336            acquired.singleton.as_ref().and_then(|lease| {
337                if lease.held {
338                    None
339                } else {
340                    Some(lease.gate.clone())
341                }
342            })
343        };
344        if let Some(gate) = singleton_gate {
345            self.state
346                .flow_control
347                .acquire_singleton(&gate)
348                .await
349                .map_err(DispatchError::from)?;
350            let mut acquired = self.acquired.lock().await;
351            if let Some(lease) = acquired.singleton.as_mut() {
352                lease.held = true;
353            }
354        }
355
356        let concurrency_spec = {
357            let acquired = self.acquired.lock().await;
358            acquired.concurrency.as_ref().and_then(|lease| {
359                if lease.permit.is_some() {
360                    None
361                } else {
362                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
363                }
364            })
365        };
366        if let Some((gate, max, priority_rank)) = concurrency_spec {
367            let permit = self
368                .state
369                .flow_control
370                .acquire_concurrency(&gate, max, priority_rank)
371                .await
372                .map_err(DispatchError::from)?;
373            let mut acquired = self.acquired.lock().await;
374            if let Some(lease) = acquired.concurrency.as_mut() {
375                lease.permit = Some(permit);
376            }
377        }
378        Ok(())
379    }
380}
381
382pub(super) enum FlowControlOutcome {
383    Dispatch {
384        event: Box<TriggerEvent>,
385        acquired: AcquiredFlowControl,
386    },
387    Skip {
388        reason: String,
389    },
390}
391
392#[derive(Debug)]
393pub(super) struct DispatchCallResult {
394    pub(super) output: serde_json::Value,
395    pub(super) metadata: BTreeMap<String, serde_json::Value>,
396}
397
398#[derive(Clone, Debug)]
399pub(super) enum DispatchSkipStage {
400    Budget,
401    Predicate,
402    FlowControl,
403}
404
405#[derive(Debug)]
406pub enum DispatchError {
407    EventLog(String),
408    Registry(String),
409    Serde(String),
410    Local(String),
411    A2a(String),
412    Denied(String),
413    Timeout(String),
414    Waiting(String),
415    Cancelled(String),
416    NotImplemented(String),
417}
418
419impl std::fmt::Display for DispatchError {
420    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
421        match self {
422            Self::EventLog(message)
423            | Self::Registry(message)
424            | Self::Serde(message)
425            | Self::Local(message)
426            | Self::A2a(message)
427            | Self::Denied(message)
428            | Self::Timeout(message)
429            | Self::Waiting(message)
430            | Self::Cancelled(message)
431            | Self::NotImplemented(message) => f.write_str(message),
432        }
433    }
434}
435
436impl std::error::Error for DispatchError {}
437
438impl DispatchError {
439    pub(super) fn retryable(&self) -> bool {
440        !matches!(
441            self,
442            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
443        )
444    }
445}
446
447impl DispatchSkipStage {
448    pub(super) fn as_str(&self) -> &'static str {
449        match self {
450            Self::Budget => "budget",
451            Self::Predicate => "predicate",
452            Self::FlowControl => "flow_control",
453        }
454    }
455}
456
457impl From<LogError> for DispatchError {
458    fn from(value: LogError) -> Self {
459        Self::EventLog(value.to_string())
460    }
461}