Skip to main content

harn_vm/triggers/dispatcher/
types.rs

1use std::collections::BTreeMap;
2use std::rc::Rc;
3use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
4use std::sync::{Arc, Mutex};
5
6use serde::{Deserialize, Serialize};
7use tokio::sync::broadcast;
8use tokio::sync::{Mutex as AsyncMutex, Notify};
9
10use crate::event_log::{AnyEventLog, LogError};
11use crate::trust_graph::AutonomyTier;
12use crate::vm::Vm;
13
14use super::circuits::DestinationCircuitRegistry;
15use super::flow_control::{ConcurrencyPermit, FlowControlManager};
16use super::TriggerEvent;
17
18pub(super) const DEFAULT_AUTONOMY_BUDGET_REVIEWER: &str = "operator";
19
20#[derive(Clone, Debug)]
21pub(crate) struct DispatchContext {
22    pub trigger_event: TriggerEvent,
23    pub replay_of_event_id: Option<String>,
24    pub binding_id: String,
25    pub binding_version: u32,
26    pub agent_id: String,
27    pub action: String,
28    pub autonomy_tier: AutonomyTier,
29}
30
31pub(super) struct DispatchExecutionPolicyGuard;
32
33impl Drop for DispatchExecutionPolicyGuard {
34    fn drop(&mut self) {
35        crate::orchestration::pop_execution_policy();
36    }
37}
38
39#[derive(Clone)]
40pub struct Dispatcher {
41    pub(super) base_vm: Rc<Vm>,
42    pub(super) event_log: Arc<AnyEventLog>,
43    pub(super) cancel_tx: broadcast::Sender<()>,
44    pub(super) state: Arc<DispatcherRuntimeState>,
45    pub(super) metrics: Option<Arc<crate::MetricsRegistry>>,
46    pub(super) a2a_client: Arc<dyn crate::a2a::A2aClient>,
47}
48
49#[derive(Debug)]
50pub(super) struct DispatcherRuntimeState {
51    pub(super) in_flight: AtomicU64,
52    pub(super) retry_queue_depth: AtomicU64,
53    pub(super) dlq: Mutex<Vec<DlqEntry>>,
54    pub(super) cancel_tokens: Mutex<Vec<Arc<std::sync::atomic::AtomicBool>>>,
55    pub(super) shutting_down: std::sync::atomic::AtomicBool,
56    pub(super) idle_notify: Notify,
57    pub(super) flow_control: FlowControlManager,
58    pub(super) destination_circuits: DestinationCircuitRegistry,
59}
60
61impl DispatcherRuntimeState {
62    pub(super) fn new(event_log: Arc<AnyEventLog>) -> Self {
63        Self {
64            in_flight: AtomicU64::new(0),
65            retry_queue_depth: AtomicU64::new(0),
66            dlq: Mutex::new(Vec::new()),
67            cancel_tokens: Mutex::new(Vec::new()),
68            shutting_down: std::sync::atomic::AtomicBool::new(false),
69            idle_notify: Notify::new(),
70            flow_control: FlowControlManager::new(event_log),
71            destination_circuits: DestinationCircuitRegistry::default(),
72        }
73    }
74}
75
76#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
77#[serde(default)]
78pub struct DispatcherStatsSnapshot {
79    pub in_flight: u64,
80    pub retry_queue_depth: u64,
81    pub dlq_depth: u64,
82}
83
84#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
85#[serde(rename_all = "snake_case")]
86pub enum DispatchStatus {
87    Succeeded,
88    Failed,
89    Dlq,
90    Skipped,
91    Waiting,
92    Cancelled,
93}
94
95impl DispatchStatus {
96    pub fn as_str(&self) -> &'static str {
97        match self {
98            Self::Succeeded => "succeeded",
99            Self::Failed => "failed",
100            Self::Dlq => "dlq",
101            Self::Skipped => "skipped",
102            Self::Waiting => "waiting",
103            Self::Cancelled => "cancelled",
104        }
105    }
106}
107
108#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
109#[serde(default)]
110pub struct DispatchOutcome {
111    pub trigger_id: String,
112    pub binding_key: String,
113    pub event_id: String,
114    pub attempt_count: u32,
115    pub status: DispatchStatus,
116    pub handler_kind: String,
117    pub target_uri: String,
118    pub replay_of_event_id: Option<String>,
119    pub result: Option<serde_json::Value>,
120    pub error: Option<String>,
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize)]
124pub struct InboxEnvelope {
125    pub trigger_id: Option<String>,
126    pub binding_version: Option<u32>,
127    pub event: TriggerEvent,
128}
129
130#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(default)]
132pub struct DispatcherDrainReport {
133    pub drained: bool,
134    pub in_flight: u64,
135    pub retry_queue_depth: u64,
136    pub dlq_depth: u64,
137}
138
139impl Default for DispatchOutcome {
140    fn default() -> Self {
141        Self {
142            trigger_id: String::new(),
143            binding_key: String::new(),
144            event_id: String::new(),
145            attempt_count: 0,
146            status: DispatchStatus::Failed,
147            handler_kind: String::new(),
148            target_uri: String::new(),
149            replay_of_event_id: None,
150            result: None,
151            error: None,
152        }
153    }
154}
155
156#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)]
157#[serde(default)]
158pub struct DispatchAttemptRecord {
159    pub trigger_id: String,
160    pub binding_key: String,
161    pub event_id: String,
162    pub attempt: u32,
163    pub handler_kind: String,
164    pub started_at: String,
165    pub completed_at: String,
166    pub outcome: String,
167    pub error_msg: Option<String>,
168}
169
170#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
171pub struct DispatchCancelRequest {
172    pub binding_key: String,
173    pub event_id: String,
174    #[serde(with = "time::serde::rfc3339")]
175    pub requested_at: time::OffsetDateTime,
176    #[serde(default)]
177    pub requested_by: Option<String>,
178    #[serde(default)]
179    pub audit_id: Option<String>,
180}
181
182#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
183pub struct DlqEntry {
184    pub trigger_id: String,
185    pub binding_key: String,
186    pub event: TriggerEvent,
187    pub attempt_count: u32,
188    pub final_error: String,
189    #[serde(default = "default_dlq_error_class")]
190    pub error_class: String,
191    pub attempts: Vec<DispatchAttemptRecord>,
192}
193
194fn default_dlq_error_class() -> String {
195    "unknown".to_string()
196}
197
198#[derive(Clone, Debug)]
199pub(super) struct SingletonLease {
200    pub(super) gate: String,
201    pub(super) held: bool,
202}
203
204#[derive(Clone, Debug)]
205pub(super) struct ConcurrencyLease {
206    pub(super) gate: String,
207    pub(super) max: u32,
208    pub(super) priority_rank: usize,
209    pub(super) permit: Option<ConcurrencyPermit>,
210}
211
212#[derive(Default, Debug)]
213pub(super) struct AcquiredFlowControl {
214    pub(super) singleton: Option<SingletonLease>,
215    pub(super) concurrency: Option<ConcurrencyLease>,
216}
217
218#[derive(Clone)]
219pub(crate) struct DispatchWaitLease {
220    state: Arc<DispatcherRuntimeState>,
221    acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
222    suspended: Arc<AtomicBool>,
223}
224
225impl DispatchWaitLease {
226    pub(super) fn new(
227        state: Arc<DispatcherRuntimeState>,
228        acquired: Arc<AsyncMutex<AcquiredFlowControl>>,
229    ) -> Self {
230        Self {
231            state,
232            acquired,
233            suspended: Arc::new(AtomicBool::new(false)),
234        }
235    }
236
237    pub(crate) async fn suspend(&self) -> Result<(), DispatchError> {
238        if self.suspended.swap(true, Ordering::SeqCst) {
239            return Ok(());
240        }
241        let (singleton_gate, concurrency_permit) = {
242            let mut acquired = self.acquired.lock().await;
243            let singleton_gate = acquired.singleton.as_mut().and_then(|lease| {
244                if lease.held {
245                    lease.held = false;
246                    Some(lease.gate.clone())
247                } else {
248                    None
249                }
250            });
251            let concurrency_permit = acquired
252                .concurrency
253                .as_mut()
254                .and_then(|lease| lease.permit.take());
255            (singleton_gate, concurrency_permit)
256        };
257
258        if let Some(gate) = singleton_gate {
259            self.state
260                .flow_control
261                .release_singleton(&gate)
262                .await
263                .map_err(DispatchError::from)?;
264        }
265        if let Some(permit) = concurrency_permit {
266            self.state
267                .flow_control
268                .release_concurrency(permit)
269                .await
270                .map_err(DispatchError::from)?;
271        }
272        Ok(())
273    }
274
275    pub(crate) async fn resume(&self) -> Result<(), DispatchError> {
276        if !self.suspended.swap(false, Ordering::SeqCst) {
277            return Ok(());
278        }
279
280        let singleton_gate = {
281            let acquired = self.acquired.lock().await;
282            acquired.singleton.as_ref().and_then(|lease| {
283                if lease.held {
284                    None
285                } else {
286                    Some(lease.gate.clone())
287                }
288            })
289        };
290        if let Some(gate) = singleton_gate {
291            self.state
292                .flow_control
293                .acquire_singleton(&gate)
294                .await
295                .map_err(DispatchError::from)?;
296            let mut acquired = self.acquired.lock().await;
297            if let Some(lease) = acquired.singleton.as_mut() {
298                lease.held = true;
299            }
300        }
301
302        let concurrency_spec = {
303            let acquired = self.acquired.lock().await;
304            acquired.concurrency.as_ref().and_then(|lease| {
305                if lease.permit.is_some() {
306                    None
307                } else {
308                    Some((lease.gate.clone(), lease.max, lease.priority_rank))
309                }
310            })
311        };
312        if let Some((gate, max, priority_rank)) = concurrency_spec {
313            let permit = self
314                .state
315                .flow_control
316                .acquire_concurrency(&gate, max, priority_rank)
317                .await
318                .map_err(DispatchError::from)?;
319            let mut acquired = self.acquired.lock().await;
320            if let Some(lease) = acquired.concurrency.as_mut() {
321                lease.permit = Some(permit);
322            }
323        }
324        Ok(())
325    }
326}
327
328pub(super) enum FlowControlOutcome {
329    Dispatch {
330        event: Box<TriggerEvent>,
331        acquired: AcquiredFlowControl,
332    },
333    Skip {
334        reason: String,
335    },
336}
337
338#[derive(Debug)]
339pub(super) struct DispatchCallResult {
340    pub(super) output: serde_json::Value,
341    pub(super) metadata: BTreeMap<String, serde_json::Value>,
342}
343
344#[derive(Clone, Debug)]
345pub(super) enum DispatchSkipStage {
346    Predicate,
347    FlowControl,
348}
349
350#[derive(Debug)]
351pub enum DispatchError {
352    EventLog(String),
353    Registry(String),
354    Serde(String),
355    Local(String),
356    A2a(String),
357    Denied(String),
358    Timeout(String),
359    Waiting(String),
360    Cancelled(String),
361    NotImplemented(String),
362}
363
364impl std::fmt::Display for DispatchError {
365    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
366        match self {
367            Self::EventLog(message)
368            | Self::Registry(message)
369            | Self::Serde(message)
370            | Self::Local(message)
371            | Self::A2a(message)
372            | Self::Denied(message)
373            | Self::Timeout(message)
374            | Self::Waiting(message)
375            | Self::Cancelled(message)
376            | Self::NotImplemented(message) => f.write_str(message),
377        }
378    }
379}
380
381impl std::error::Error for DispatchError {}
382
383impl DispatchError {
384    pub(super) fn retryable(&self) -> bool {
385        !matches!(
386            self,
387            Self::Cancelled(_) | Self::Denied(_) | Self::NotImplemented(_) | Self::Waiting(_)
388        )
389    }
390}
391
392impl DispatchSkipStage {
393    pub(super) fn as_str(&self) -> &'static str {
394        match self {
395            Self::Predicate => "predicate",
396            Self::FlowControl => "flow_control",
397        }
398    }
399}
400
401impl From<LogError> for DispatchError {
402    fn from(value: LogError) -> Self {
403        Self::EventLog(value.to_string())
404    }
405}