harn_vm/triggers/dispatcher/
uri.rs1use crate::triggers::TriggerHandlerSpec;
2
3#[derive(Clone, Debug, PartialEq, Eq)]
4pub enum DispatchUriError {
5 Empty,
6 MissingTarget { scheme: String },
7 UnknownScheme(String),
8}
9
10impl std::fmt::Display for DispatchUriError {
11 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
12 match self {
13 Self::Empty => f.write_str("handler URI cannot be empty"),
14 Self::MissingTarget { scheme } => {
15 write!(f, "{scheme} handler URI target cannot be empty")
16 }
17 Self::UnknownScheme(scheme) => write!(f, "unsupported handler URI scheme '{scheme}'"),
18 }
19 }
20}
21
22impl std::error::Error for DispatchUriError {}
23
24#[derive(Clone, Debug, PartialEq, Eq)]
25pub enum DispatchUri {
26 Local {
27 raw: String,
28 },
29 A2a {
30 target: String,
31 allow_cleartext: bool,
32 },
33 Worker {
34 queue: String,
35 },
36 Persona {
37 name: String,
38 },
39 EvalPack {
40 target: String,
41 pack_id: String,
42 },
43 AutoResume {
44 worker_id: String,
45 },
46 SpawnToPool {
51 pool: String,
52 priority_from: Option<String>,
53 key_from: Option<String>,
54 },
55 ReminderInject {
61 target_kind: &'static str,
62 target_session_id: Option<String>,
63 },
64 InterruptAndSuspend {
69 scope_kind: &'static str,
70 concrete_count: Option<usize>,
71 },
72}
73
74impl DispatchUri {
75 pub fn parse(raw: &str) -> Result<Self, DispatchUriError> {
76 let raw = raw.trim();
77 if raw.is_empty() {
78 return Err(DispatchUriError::Empty);
79 }
80 if let Some(target) = raw.strip_prefix("a2a://") {
81 if target.is_empty() {
82 return Err(DispatchUriError::MissingTarget {
83 scheme: "a2a".to_string(),
84 });
85 }
86 return Ok(Self::A2a {
87 target: target.to_string(),
88 allow_cleartext: false,
89 });
90 }
91 if let Some(queue) = raw.strip_prefix("worker://") {
92 if queue.is_empty() {
93 return Err(DispatchUriError::MissingTarget {
94 scheme: "worker".to_string(),
95 });
96 }
97 return Ok(Self::Worker {
98 queue: queue.to_string(),
99 });
100 }
101 if let Some(name) = raw.strip_prefix("persona://") {
102 if name.is_empty() {
103 return Err(DispatchUriError::MissingTarget {
104 scheme: "persona".to_string(),
105 });
106 }
107 return Ok(Self::Persona {
108 name: name.to_string(),
109 });
110 }
111 if let Some(target) = raw.strip_prefix("eval_pack://") {
112 if target.is_empty() {
113 return Err(DispatchUriError::MissingTarget {
114 scheme: "eval_pack".to_string(),
115 });
116 }
117 return Ok(Self::EvalPack {
118 target: target.to_string(),
119 pack_id: target.to_string(),
120 });
121 }
122 if let Some(pool) = raw.strip_prefix("pool://") {
123 if pool.is_empty() {
124 return Err(DispatchUriError::MissingTarget {
125 scheme: "pool".to_string(),
126 });
127 }
128 return Ok(Self::SpawnToPool {
129 pool: pool.to_string(),
130 priority_from: None,
131 key_from: None,
132 });
133 }
134 if let Some((scheme, _)) = raw.split_once("://") {
135 return Err(DispatchUriError::UnknownScheme(scheme.to_string()));
136 }
137 Ok(Self::Local {
138 raw: raw.to_string(),
139 })
140 }
141
142 pub fn kind(&self) -> &'static str {
143 match self {
144 Self::Local { .. } => "local",
145 Self::A2a { .. } => "a2a",
146 Self::Worker { .. } => "worker",
147 Self::Persona { .. } => "persona",
148 Self::EvalPack { .. } => "eval_pack",
149 Self::AutoResume { .. } => "auto_resume",
150 Self::SpawnToPool { .. } => "spawn_to_pool",
151 Self::ReminderInject { .. } => "reminder_inject",
152 Self::InterruptAndSuspend { .. } => "interrupt_and_suspend",
153 }
154 }
155
156 pub fn target_uri(&self) -> String {
157 match self {
158 Self::Local { raw } => raw.clone(),
159 Self::A2a { target, .. } => format!("a2a://{target}"),
160 Self::Worker { queue } => format!("worker://{queue}"),
161 Self::Persona { name } => format!("persona://{name}"),
162 Self::EvalPack { target, .. } => format!("eval_pack://{target}"),
163 Self::AutoResume { worker_id } => format!("auto_resume://{worker_id}"),
164 Self::SpawnToPool { pool, .. } => format!("pool://{pool}"),
165 Self::ReminderInject {
166 target_kind,
167 target_session_id,
168 } => match target_session_id {
169 Some(id) => format!("reminder_inject://{target_kind}/{id}"),
170 None => format!("reminder_inject://{target_kind}"),
171 },
172 Self::InterruptAndSuspend {
173 scope_kind,
174 concrete_count,
175 } => match concrete_count {
176 Some(n) => format!("interrupt_and_suspend://{scope_kind}/{n}"),
177 None => format!("interrupt_and_suspend://{scope_kind}"),
178 },
179 }
180 }
181
182 pub fn trust_boundary(&self) -> &'static str {
183 match self {
184 Self::Local { .. } => "local_process",
185 Self::A2a { .. } => "federated_a2a",
186 Self::Worker { .. } => "event_log_worker_queue",
187 Self::Persona { .. } => "persona_runtime",
188 Self::EvalPack { .. } => "local_process",
189 Self::AutoResume { .. } => "local_process",
190 Self::SpawnToPool { .. } => "local_process",
191 Self::ReminderInject { .. } => "local_process",
192 Self::InterruptAndSuspend { .. } => "local_process",
193 }
194 }
195
196 pub fn execution_location(&self) -> &'static str {
197 match self {
198 Self::Local { .. } => "in_process",
199 Self::A2a { .. } => "remote",
200 Self::Worker { .. } => "queued",
201 Self::Persona { .. } => "managed_persona",
202 Self::EvalPack { .. } => "in_process",
203 Self::AutoResume { .. } => "in_process",
204 Self::SpawnToPool { .. } => "pool_queued",
205 Self::ReminderInject { .. } => "in_process",
206 Self::InterruptAndSuspend { .. } => "in_process",
207 }
208 }
209
210 pub fn remote_identity(&self) -> Option<String> {
211 match self {
212 Self::A2a { target, .. } => Some(crate::a2a::target_agent_label(target)),
213 _ => None,
214 }
215 }
216
217 pub fn dispatch_boundary_metadata(
218 &self,
219 ) -> std::collections::BTreeMap<String, serde_json::Value> {
220 let mut metadata = std::collections::BTreeMap::new();
221 metadata.insert(
222 "trust_boundary".to_string(),
223 serde_json::json!(self.trust_boundary()),
224 );
225 metadata.insert(
226 "execution_location".to_string(),
227 serde_json::json!(self.execution_location()),
228 );
229 if let Some(remote_identity) = self.remote_identity() {
230 metadata.insert(
231 "remote_identity".to_string(),
232 serde_json::json!(remote_identity),
233 );
234 }
235 metadata
236 }
237}
238
239impl From<&TriggerHandlerSpec> for DispatchUri {
240 fn from(value: &TriggerHandlerSpec) -> Self {
241 match value {
242 TriggerHandlerSpec::Local { raw, .. } => Self::Local { raw: raw.clone() },
243 TriggerHandlerSpec::A2a {
244 target,
245 allow_cleartext,
246 } => Self::A2a {
247 target: target.clone(),
248 allow_cleartext: *allow_cleartext,
249 },
250 TriggerHandlerSpec::Worker { queue } => Self::Worker {
251 queue: queue.clone(),
252 },
253 TriggerHandlerSpec::Persona { binding } => Self::Persona {
254 name: binding.name.clone(),
255 },
256 TriggerHandlerSpec::EvalPack {
257 target, manifest, ..
258 } => Self::EvalPack {
259 target: target.clone(),
260 pack_id: manifest.id.clone(),
261 },
262 TriggerHandlerSpec::AutoResume { worker_id } => Self::AutoResume {
263 worker_id: worker_id.clone(),
264 },
265 TriggerHandlerSpec::SpawnToPool {
266 pool,
267 priority_from,
268 key_from,
269 ..
270 } => Self::SpawnToPool {
271 pool: pool.clone(),
272 priority_from: priority_from.clone(),
273 key_from: key_from.clone(),
274 },
275 TriggerHandlerSpec::ReminderInject { target, .. } => {
276 let target_session_id = match target {
277 crate::triggers::registry::TargetExpr::Concrete(id) => Some(id.clone()),
278 _ => None,
279 };
280 Self::ReminderInject {
281 target_kind: target.kind(),
282 target_session_id,
283 }
284 }
285 TriggerHandlerSpec::InterruptAndSuspend { target_agents, .. } => {
286 let concrete_count = match target_agents {
287 crate::triggers::registry::AgentScope::Concrete(ids) => Some(ids.len()),
288 _ => None,
289 };
290 Self::InterruptAndSuspend {
291 scope_kind: target_agents.kind(),
292 concrete_count,
293 }
294 }
295 }
296 }
297}