Skip to main content

harn_vm/triggers/dispatcher/
uri.rs

1use crate::triggers::TriggerHandlerSpec;
2
3#[derive(Clone, Debug, PartialEq, Eq)]
4pub enum DispatchUriError {
5    Empty,
6    MissingTarget { scheme: String },
7    UnknownScheme(String),
8}
9
10impl std::fmt::Display for DispatchUriError {
11    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12        match self {
13            Self::Empty => f.write_str("handler URI cannot be empty"),
14            Self::MissingTarget { scheme } => {
15                write!(f, "{scheme} handler URI target cannot be empty")
16            }
17            Self::UnknownScheme(scheme) => write!(f, "unsupported handler URI scheme '{scheme}'"),
18        }
19    }
20}
21
22impl std::error::Error for DispatchUriError {}
23
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum DispatchUri {
26    Local {
27        raw: String,
28    },
29    A2a {
30        target: String,
31        allow_cleartext: bool,
32    },
33    Worker {
34        queue: String,
35    },
36    Persona {
37        name: String,
38    },
39    EvalPack {
40        target: String,
41        pack_id: String,
42    },
43    AutoResume {
44        worker_id: String,
45    },
46    /// Dispatcher composes triggers with named agent pools (#1889). The
47    /// closure handed to the pool is built per-event by the binding's
48    /// `task_factory`; the descriptor here carries only routing metadata so
49    /// the URI stays serializable and comparable.
50    SpawnToPool {
51        pool: String,
52        priority_from: Option<String>,
53        key_from: Option<String>,
54    },
55    /// Dispatcher injects a `SystemReminder` into the target running session
56    /// when the trigger matches (#1876). The descriptor carries the
57    /// resolution-mode label and (for the `Concrete` form) the literal
58    /// session id so the URI stays serializable and comparable; the body
59    /// template + reminder metadata live on the `TriggerHandlerSpec`.
60    ReminderInject {
61        target_kind: &'static str,
62        target_session_id: Option<String>,
63    },
64    /// CH-10 (#1910): emergency panic broadcast. The descriptor carries the
65    /// scope-mode label and (for the `Concrete` form) the explicit worker
66    /// count so the URI stays serializable and comparable; the closure /
67    /// concrete worker-id list lives on the `TriggerHandlerSpec`.
68    InterruptAndSuspend {
69        scope_kind: &'static str,
70        concrete_count: Option<usize>,
71    },
72}
73
74impl DispatchUri {
75    pub fn parse(raw: &str) -> Result<Self, DispatchUriError> {
76        let raw = raw.trim();
77        if raw.is_empty() {
78            return Err(DispatchUriError::Empty);
79        }
80        if let Some(target) = raw.strip_prefix("a2a://") {
81            if target.is_empty() {
82                return Err(DispatchUriError::MissingTarget {
83                    scheme: "a2a".to_string(),
84                });
85            }
86            return Ok(Self::A2a {
87                target: target.to_string(),
88                allow_cleartext: false,
89            });
90        }
91        if let Some(queue) = raw.strip_prefix("worker://") {
92            if queue.is_empty() {
93                return Err(DispatchUriError::MissingTarget {
94                    scheme: "worker".to_string(),
95                });
96            }
97            return Ok(Self::Worker {
98                queue: queue.to_string(),
99            });
100        }
101        if let Some(name) = raw.strip_prefix("persona://") {
102            if name.is_empty() {
103                return Err(DispatchUriError::MissingTarget {
104                    scheme: "persona".to_string(),
105                });
106            }
107            return Ok(Self::Persona {
108                name: name.to_string(),
109            });
110        }
111        if let Some(target) = raw.strip_prefix("eval_pack://") {
112            if target.is_empty() {
113                return Err(DispatchUriError::MissingTarget {
114                    scheme: "eval_pack".to_string(),
115                });
116            }
117            return Ok(Self::EvalPack {
118                target: target.to_string(),
119                pack_id: target.to_string(),
120            });
121        }
122        if let Some(pool) = raw.strip_prefix("pool://") {
123            if pool.is_empty() {
124                return Err(DispatchUriError::MissingTarget {
125                    scheme: "pool".to_string(),
126                });
127            }
128            return Ok(Self::SpawnToPool {
129                pool: pool.to_string(),
130                priority_from: None,
131                key_from: None,
132            });
133        }
134        if let Some((scheme, _)) = raw.split_once("://") {
135            return Err(DispatchUriError::UnknownScheme(scheme.to_string()));
136        }
137        Ok(Self::Local {
138            raw: raw.to_string(),
139        })
140    }
141
142    pub fn kind(&self) -> &'static str {
143        match self {
144            Self::Local { .. } => "local",
145            Self::A2a { .. } => "a2a",
146            Self::Worker { .. } => "worker",
147            Self::Persona { .. } => "persona",
148            Self::EvalPack { .. } => "eval_pack",
149            Self::AutoResume { .. } => "auto_resume",
150            Self::SpawnToPool { .. } => "spawn_to_pool",
151            Self::ReminderInject { .. } => "reminder_inject",
152            Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
153        }
154    }
155
156    pub fn target_uri(&self) -> String {
157        match self {
158            Self::Local { raw } => raw.clone(),
159            Self::A2a { target, .. } => format!("a2a://{target}"),
160            Self::Worker { queue } => format!("worker://{queue}"),
161            Self::Persona { name } => format!("persona://{name}"),
162            Self::EvalPack { target, .. } => format!("eval_pack://{target}"),
163            Self::AutoResume { worker_id } => format!("auto_resume://{worker_id}"),
164            Self::SpawnToPool { pool, .. } => format!("pool://{pool}"),
165            Self::ReminderInject {
166                target_kind,
167                target_session_id,
168            } => match target_session_id {
169                Some(id) => format!("reminder_inject://{target_kind}/{id}"),
170                None => format!("reminder_inject://{target_kind}"),
171            },
172            Self::InterruptAndSuspend {
173                scope_kind,
174                concrete_count,
175            } => match concrete_count {
176                Some(n) => format!("interrupt_and_suspend://{scope_kind}/{n}"),
177                None => format!("interrupt_and_suspend://{scope_kind}"),
178            },
179        }
180    }
181
182    pub fn trust_boundary(&self) -> &'static str {
183        match self {
184            Self::Local { .. } => "local_process",
185            Self::A2a { .. } => "federated_a2a",
186            Self::Worker { .. } => "event_log_worker_queue",
187            Self::Persona { .. } => "persona_runtime",
188            Self::EvalPack { .. } => "local_process",
189            Self::AutoResume { .. } => "local_process",
190            Self::SpawnToPool { .. } => "local_process",
191            Self::ReminderInject { .. } => "local_process",
192            Self::InterruptAndSuspend { .. } => "local_process",
193        }
194    }
195
196    pub fn execution_location(&self) -> &'static str {
197        match self {
198            Self::Local { .. } => "in_process",
199            Self::A2a { .. } => "remote",
200            Self::Worker { .. } => "queued",
201            Self::Persona { .. } => "managed_persona",
202            Self::EvalPack { .. } => "in_process",
203            Self::AutoResume { .. } => "in_process",
204            Self::SpawnToPool { .. } => "pool_queued",
205            Self::ReminderInject { .. } => "in_process",
206            Self::InterruptAndSuspend { .. } => "in_process",
207        }
208    }
209
210    pub fn remote_identity(&self) -> Option<String> {
211        match self {
212            Self::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
213            _ => None,
214        }
215    }
216
217    pub fn dispatch_boundary_metadata(
218        &self,
219    ) -> std::collections::BTreeMap<String, serde_json::Value> {
220        let mut metadata = std::collections::BTreeMap::new();
221        metadata.insert(
222            "trust_boundary".to_string(),
223            serde_json::json!(self.trust_boundary()),
224        );
225        metadata.insert(
226            "execution_location".to_string(),
227            serde_json::json!(self.execution_location()),
228        );
229        if let Some(remote_identity) = self.remote_identity() {
230            metadata.insert(
231                "remote_identity".to_string(),
232                serde_json::json!(remote_identity),
233            );
234        }
235        metadata
236    }
237}
238
239impl From<&TriggerHandlerSpec> for DispatchUri {
240    fn from(value: &TriggerHandlerSpec) -> Self {
241        match value {
242            TriggerHandlerSpec::Local { raw, .. } => Self::Local { raw: raw.clone() },
243            TriggerHandlerSpec::A2a {
244                target,
245                allow_cleartext,
246            } => Self::A2a {
247                target: target.clone(),
248                allow_cleartext: *allow_cleartext,
249            },
250            TriggerHandlerSpec::Worker { queue } => Self::Worker {
251                queue: queue.clone(),
252            },
253            TriggerHandlerSpec::Persona { binding } => Self::Persona {
254                name: binding.name.clone(),
255            },
256            TriggerHandlerSpec::EvalPack {
257                target, manifest, ..
258            } => Self::EvalPack {
259                target: target.clone(),
260                pack_id: manifest.id.clone(),
261            },
262            TriggerHandlerSpec::AutoResume { worker_id } => Self::AutoResume {
263                worker_id: worker_id.clone(),
264            },
265            TriggerHandlerSpec::SpawnToPool {
266                pool,
267                priority_from,
268                key_from,
269                ..
270            } => Self::SpawnToPool {
271                pool: pool.clone(),
272                priority_from: priority_from.clone(),
273                key_from: key_from.clone(),
274            },
275            TriggerHandlerSpec::ReminderInject { target, .. } => {
276                let target_session_id = match target {
277                    crate::triggers::registry::TargetExpr::Concrete(id) => Some(id.clone()),
278                    _ => None,
279                };
280                Self::ReminderInject {
281                    target_kind: target.kind(),
282                    target_session_id,
283                }
284            }
285            TriggerHandlerSpec::InterruptAndSuspend { target_agents, .. } => {
286                let concrete_count = match target_agents {
287                    crate::triggers::registry::AgentScope::Concrete(ids) => Some(ids.len()),
288                    _ => None,
289                };
290                Self::InterruptAndSuspend {
291                    scope_kind: target_agents.kind(),
292                    concrete_count,
293                }
294            }
295        }
296    }
297}