Skip to main content

harn_vm/triggers/dispatcher/
types.rs

1use std::rc::Rc;
2use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
3use std::sync::{Arc, Mutex};
4
5use serde::{Deserialize, Serialize};
6use tokio::sync::broadcast;
7use tokio::sync::{Mutex as AsyncMutex, Notify};
8
9use crate::event_log::{AnyEventLog, LogError};
10use crate::trust_graph::AutonomyTier;
11use crate::vm::Vm;
12
13use super::circuits::DestinationCircuitRegistry;
14use super::flow_control::{ConcurrencyPermit, FlowControlManager};
15use super::TriggerEvent;
16
17pub(super) const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
18
19#[derive(Clone, Debug)]
20pub(crate) struct DispatchContext {
21    pub trigger_event: TriggerEvent,
22    pub replay_of_event_id: Option<String>,
23    pub binding_id: String,
24    pub binding_version: u32,
25    pub agent_id: String,
26    pub action: String,
27    pub autonomy_tier: AutonomyTier,
28}
29
30pub(super) struct DispatchExecutionPolicyGuard;
31
32impl Drop for DispatchExecutionPolicyGuard {
33    fn drop(&mut self) {
34        crate::orchestration::pop_execution_policy();
35    }
36}
37
38#[derive(Clone)]
39pub struct Dispatcher {
40    pub(super) base_vm: Rc<Vm>,
41    pub(super) event_log: Arc<AnyEventLog>,
42    pub(super) cancel_tx: broadcast::Sender<()>,
43    pub(super) state: Arc<DispatcherRuntimeState>,
44    pub(super) metrics: Option<Arc<crate::MetricsRegistry>>,
45    pub(super) a2a_client: Arc<dyn crate::a2a::A2aClient>,
46}
47
48#[derive(Debug)]
49pub(super) struct DispatcherRuntimeState {
50    pub(super) in_flight: AtomicU64,
51    pub(super) retry_queue_depth: AtomicU64,
52    pub(super) dlq: Mutex<Vec<DlqEntry>>,
53    pub(super) cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
54    pub(super) shutting_down: std::sync::atomic::AtomicBool,
55    pub(super) idle_notify: Notify,
56    pub(super) flow_control: FlowControlManager,
57    pub(super) destination_circuits: DestinationCircuitRegistry,
58}
59
60impl DispatcherRuntimeState {
61    pub(super) fn new(event_log: Arc<AnyEventLog>) -> Self {
62        Self {
63            in_flight: AtomicU64::new(0),
64            retry_queue_depth: AtomicU64::new(0),
65            dlq: Mutex::new(Vec::new()),
66            cancel_tokens: Mutex::new(Vec::new()),
67            shutting_down: std::sync::atomic::AtomicBool::new(false),
68            idle_notify: Notify::new(),
69            flow_control: FlowControlManager::new(event_log),
70            destination_circuits: DestinationCircuitRegistry::default(),
71        }
72    }
73}
74
75#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
76#[serde(default)]
77pub struct DispatcherStatsSnapshot {
78    pub in_flight: u64,
79    pub retry_queue_depth: u64,
80    pub dlq_depth: u64,
81}
82
83#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
84#[serde(rename_all = "snake_case")]
85pub enum DispatchStatus {
86    Succeeded,
87    Failed,
88    Dlq,
89    Skipped,
90    Waiting,
91    Cancelled,
92}
93
94impl DispatchStatus {
95    pub fn as_str(&self) -> &'static str {
96        match self {
97            Self::Succeeded => "succeeded",
98            Self::Failed => "failed",
99            Self::Dlq => "dlq",
100            Self::Skipped => "skipped",
101            Self::Waiting => "waiting",
102            Self::Cancelled => "cancelled",
103        }
104    }
105}
106
107#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
108#[serde(default)]
109pub struct DispatchOutcome {
110    pub trigger_id: String,
111    pub binding_key: String,
112    pub event_id: String,
113    pub attempt_count: u32,
114    pub status: DispatchStatus,
115    pub handler_kind: String,
116    pub target_uri: String,
117    pub replay_of_event_id: Option<String>,
118    pub result: Option<serde_json::Value>,
119    pub error: Option<String>,
120}
121
122#[derive(Clone, Debug, Serialize, Deserialize)]
123pub struct InboxEnvelope {
124    pub trigger_id: Option<String>,
125    pub binding_version: Option<u32>,
126    pub event: TriggerEvent,
127}
128
129#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
130#[serde(default)]
131pub struct DispatcherDrainReport {
132    pub drained: bool,
133    pub in_flight: u64,
134    pub retry_queue_depth: u64,
135    pub dlq_depth: u64,
136}
137
138impl Default for DispatchOutcome {
139    fn default() -> Self {
140        Self {
141            trigger_id: String::new(),
142            binding_key: String::new(),
143            event_id: String::new(),
144            attempt_count: 0,
145            status: DispatchStatus::Failed,
146            handler_kind: String::new(),
147            target_uri: String::new(),
148            replay_of_event_id: None,
149            result: None,
150            error: None,
151        }
152    }
153}
154
155#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
156#[serde(default)]
157pub struct DispatchAttemptRecord {
158    pub trigger_id: String,
159    pub binding_key: String,
160    pub event_id: String,
161    pub attempt: u32,
162    pub handler_kind: String,
163    pub started_at: String,
164    pub completed_at: String,
165    pub outcome: String,
166    pub error_msg: Option<String>,
167}
168
169#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
170pub struct DispatchCancelRequest {
171    pub binding_key: String,
172    pub event_id: String,
173    #[serde(with = "time::serde::rfc3339")]
174    pub requested_at: time::OffsetDateTime,
175    #[serde(default)]
176    pub requested_by: Option<String>,
177    #[serde(default)]
178    pub audit_id: Option<String>,
179}
180
181#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
182pub struct DlqEntry {
183    pub trigger_id: String,
184    pub binding_key: String,
185    pub event: TriggerEvent,
186    pub attempt_count: u32,
187    pub final_error: String,
188    #[serde(default = "default_dlq_error_class")]
189    pub error_class: String,
190    pub attempts: Vec<DispatchAttemptRecord>,
191}
192
193fn default_dlq_error_class() -> String {
194    "unknown".to_string()
195}
196
197#[derive(Clone, Debug)]
198pub(super) struct SingletonLease {
199    pub(super) gate: String,
200    pub(super) held: bool,
201}
202
203#[derive(Clone, Debug)]
204pub(super) struct ConcurrencyLease {
205    pub(super) gate: String,
206    pub(super) max: u32,
207    pub(super) priority_rank: usize,
208    pub(super) permit: Option<ConcurrencyPermit>,
209}
210
211#[derive(Default, Debug)]
212pub(super) struct AcquiredFlowControl {
213    pub(super) singleton: Option<SingletonLease>,
214    pub(super) concurrency: Option<ConcurrencyLease>,
215}
216
217#[derive(Clone)]
218pub(crate) struct DispatchWaitLease {
219    state: Arc<DispatcherRuntimeState>,
220    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
221    suspended: Arc<AtomicBool>,
222}
223
224impl DispatchWaitLease {
225    pub(super) fn new(
226        state: Arc<DispatcherRuntimeState>,
227        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
228    ) -> Self {
229        Self {
230            state,
231            acquired,
232            suspended: Arc::new(AtomicBool::new(false)),
233        }
234    }
235
236    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
237        if self.suspended.swap(true, Ordering::SeqCst) {
238            return Ok(());
239        }
240        let (singleton_gate, concurrency_permit) = {
241            let mut acquired = self.acquired.lock().await;
242            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
243                if lease.held {
244                    lease.held = false;
245                    Some(lease.gate.clone())
246                } else {
247                    None
248                }
249            });
250            let concurrency_permit = acquired
251                .concurrency
252                .as_mut()
253                .and_then(|lease| lease.permit.take());
254            (singleton_gate, concurrency_permit)
255        };
256
257        if let Some(gate) = singleton_gate {
258            self.state
259                .flow_control
260                .release_singleton(&gate)
261                .await
262                .map_err(DispatchError::from)?;
263        }
264        if let Some(permit) = concurrency_permit {
265            self.state
266                .flow_control
267                .release_concurrency(permit)
268                .await
269                .map_err(DispatchError::from)?;
270        }
271        Ok(())
272    }
273
274    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
275        if !self.suspended.swap(false, Ordering::SeqCst) {
276            return Ok(());
277        }
278
279        let singleton_gate = {
280            let acquired = self.acquired.lock().await;
281            acquired.singleton.as_ref().and_then(|lease| {
282                if lease.held {
283                    None
284                } else {
285                    Some(lease.gate.clone())
286                }
287            })
288        };
289        if let Some(gate) = singleton_gate {
290            self.state
291                .flow_control
292                .acquire_singleton(&gate)
293                .await
294                .map_err(DispatchError::from)?;
295            let mut acquired = self.acquired.lock().await;
296            if let Some(lease) = acquired.singleton.as_mut() {
297                lease.held = true;
298            }
299        }
300
301        let concurrency_spec = {
302            let acquired = self.acquired.lock().await;
303            acquired.concurrency.as_ref().and_then(|lease| {
304                if lease.permit.is_some() {
305                    None
306                } else {
307                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
308                }
309            })
310        };
311        if let Some((gate, max, priority_rank)) = concurrency_spec {
312            let permit = self
313                .state
314                .flow_control
315                .acquire_concurrency(&gate, max, priority_rank)
316                .await
317                .map_err(DispatchError::from)?;
318            let mut acquired = self.acquired.lock().await;
319            if let Some(lease) = acquired.concurrency.as_mut() {
320                lease.permit = Some(permit);
321            }
322        }
323        Ok(())
324    }
325}
326
327pub(super) enum FlowControlOutcome {
328    Dispatch {
329        event: Box<TriggerEvent>,
330        acquired: AcquiredFlowControl,
331    },
332    Skip {
333        reason: String,
334    },
335}
336
337#[derive(Clone, Debug)]
338pub(super) enum DispatchSkipStage {
339    Predicate,
340    FlowControl,
341}
342
343#[derive(Debug)]
344pub enum DispatchError {
345    EventLog(String),
346    Registry(String),
347    Serde(String),
348    Local(String),
349    A2a(String),
350    Denied(String),
351    Timeout(String),
352    Waiting(String),
353    Cancelled(String),
354    NotImplemented(String),
355}
356
357impl std::fmt::Display for DispatchError {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        match self {
360            Self::EventLog(message)
361            | Self::Registry(message)
362            | Self::Serde(message)
363            | Self::Local(message)
364            | Self::A2a(message)
365            | Self::Denied(message)
366            | Self::Timeout(message)
367            | Self::Waiting(message)
368            | Self::Cancelled(message)
369            | Self::NotImplemented(message) => f.write_str(message),
370        }
371    }
372}
373
374impl std::error::Error for DispatchError {}
375
376impl DispatchError {
377    pub(super) fn retryable(&self) -> bool {
378        !matches!(
379            self,
380            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
381        )
382    }
383}
384
385impl DispatchSkipStage {
386    pub(super) fn as_str(&self) -> &'static str {
387        match self {
388            Self::Predicate => "predicate",
389            Self::FlowControl => "flow_control",
390        }
391    }
392}
393
394impl From<LogError> for DispatchError {
395    fn from(value: LogError) -> Self {
396        Self::EventLog(value.to_string())
397    }
398}