1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11 ".well-known/agent-card.json",
12 ".well-known/a2a-agent",
13 ".well-known/agent.json",
14 "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20const A2A_ACTOR_CHAIN_METADATA_POINTERS: &[&str] = &[
21 "/actor_chain",
22 "/actorChain",
23 "/metadata/actor_chain",
24 "/metadata/actorChain",
25 "/metadata/harn/actor_chain",
26 "/metadata/harn/actorChain",
27 "/metadata/_harn/actorChain",
28 "/statusUpdate/actor_chain",
29 "/statusUpdate/actorChain",
30 "/statusUpdate/metadata/actor_chain",
31 "/statusUpdate/metadata/actorChain",
32 "/statusUpdate/metadata/harn/actor_chain",
33 "/statusUpdate/metadata/harn/actorChain",
34 "/statusUpdate/metadata/_harn/actorChain",
35 "/task/actor_chain",
36 "/task/actorChain",
37 "/task/metadata/actor_chain",
38 "/task/metadata/actorChain",
39 "/task/metadata/harn/actor_chain",
40 "/task/metadata/harn/actorChain",
41 "/task/metadata/_harn/actorChain",
42 "/message/metadata/actor_chain",
43 "/message/metadata/actorChain",
44 "/message/metadata/harn/actor_chain",
45 "/message/metadata/harn/actorChain",
46 "/message/metadata/_harn/actorChain",
47];
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct ResolvedA2aEndpoint {
51 pub card_url: String,
52 pub rpc_url: String,
53 pub agent_id: Option<String>,
54 pub target_agent: String,
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct ResolvedA2aAgent {
59 pub endpoint: ResolvedA2aEndpoint,
60 pub card: Value,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub enum DispatchAck {
65 InlineResult {
66 task_id: String,
67 result: Value,
68 },
69 PendingTask {
70 task_id: String,
71 state: String,
72 handle: Value,
73 },
74}
75
76#[derive(Debug)]
77pub enum A2aClientError {
78 InvalidTarget(String),
79 Discovery(String),
80 Protocol(String),
81 Denied(String),
82 Timeout(String),
83 Cancelled(String),
84}
85
86impl std::fmt::Display for A2aClientError {
87 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88 match self {
89 Self::InvalidTarget(message)
90 | Self::Discovery(message)
91 | Self::Protocol(message)
92 | Self::Denied(message)
93 | Self::Timeout(message)
94 | Self::Cancelled(message) => f.write_str(message),
95 }
96 }
97}
98
99impl std::error::Error for A2aClientError {}
100
101#[async_trait]
103pub trait A2aClient: Send + Sync + 'static {
104 async fn dispatch(
105 &self,
106 target: &str,
107 allow_cleartext: bool,
108 binding_id: &str,
109 binding_key: &str,
110 event: &TriggerEvent,
111 cancel_rx: &mut broadcast::Receiver<()>,
112 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
113}
114
115pub struct RealA2aClient;
117
118#[async_trait]
119impl A2aClient for RealA2aClient {
120 async fn dispatch(
121 &self,
122 target: &str,
123 allow_cleartext: bool,
124 binding_id: &str,
125 binding_key: &str,
126 event: &TriggerEvent,
127 cancel_rx: &mut broadcast::Receiver<()>,
128 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
129 dispatch_trigger_event(
130 target,
131 allow_cleartext,
132 binding_id,
133 binding_key,
134 event,
135 cancel_rx,
136 )
137 .await
138 }
139}
140
141pub fn actor_chain_metadata_candidate(value: &Value) -> Option<(&'static str, &Value)> {
146 for pointer in A2A_ACTOR_CHAIN_METADATA_POINTERS {
147 if let Some(candidate) = value.pointer(pointer) {
148 return Some((pointer, candidate));
149 }
150 }
151 None
152}
153
154pub fn actor_chain_from_metadata(value: &Value) -> Option<crate::actor_chain::ActorChain> {
155 actor_chain_metadata_candidate(value)
156 .and_then(|(_, candidate)| crate::actor_chain::ActorChain::from_json_value(candidate).ok())
157}
158
159#[derive(Debug)]
160enum AgentCardFetchError {
161 Cancelled(String),
162 Discovery(String),
163 ConnectRefused(String),
164 Denied(String),
165 Timeout(String),
166}
167
168pub async fn dispatch_trigger_event(
169 raw_target: &str,
170 allow_cleartext: bool,
171 binding_id: &str,
172 binding_key: &str,
173 event: &TriggerEvent,
174 cancel_rx: &mut broadcast::Receiver<()>,
175) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
176 let started = std::time::Instant::now();
177 let target = match parse_target(raw_target) {
178 Ok(target) => target,
179 Err(error) => {
180 record_a2a_metric(raw_target, "failed", started.elapsed());
181 return Err(error);
182 }
183 };
184 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
185 Ok(endpoint) => endpoint,
186 Err(error) => {
187 record_a2a_metric(raw_target, "failed", started.elapsed());
188 return Err(error);
189 }
190 };
191 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
192 let actor_chain = crate::agent_sessions::current_actor_chain()
193 .as_ref()
194 .map(crate::actor_chain::ActorChain::to_json_value);
195 let mut envelope = serde_json::json!({
196 "kind": "harn.trigger.dispatch",
197 "message_id": message_id,
198 "trace_id": event.trace_id.0,
199 "event_id": event.id.0,
200 "trigger_id": binding_id,
201 "binding_key": binding_key,
202 "target_agent": endpoint.target_agent,
203 "event": event,
204 });
205 if let Some(chain) = actor_chain.as_ref() {
206 envelope["actor_chain"] = chain.clone();
207 }
208 let text = serde_json::to_string(&envelope)
209 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
210 let push_config = push_notification_config();
211 let mut metadata = serde_json::json!({
212 "kind": "harn.trigger.dispatch",
213 "trace_id": event.trace_id.0,
214 "event_id": event.id.0,
215 "trigger_id": binding_id,
216 "binding_key": binding_key,
217 "target_agent": endpoint.target_agent,
218 });
219 if let Some(chain) = actor_chain.as_ref() {
220 metadata["actor_chain"] = chain.clone();
221 metadata["harn"] = serde_json::json!({"actor_chain": chain});
222 }
223 let mut params = serde_json::json!({
224 "contextId": event.trace_id.0,
225 "message": {
226 "messageId": message_id,
227 "role": "user",
228 "parts": [{
229 "type": "text",
230 "text": text,
231 }],
232 "metadata": metadata,
233 },
234 });
235 if let Some(config) = push_config.clone() {
236 params["configuration"] = serde_json::json!({
237 "blocking": false,
238 "returnImmediately": true,
239 "pushNotificationConfig": config,
240 });
241 }
242 let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
243
244 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
245 Ok(body) => body,
246 Err(error) => {
247 record_a2a_metric(raw_target, "failed", started.elapsed());
248 return Err(error);
249 }
250 };
251 let result = match body.get("result").cloned().ok_or_else(|| {
252 if let Some(error) = body.get("error") {
253 let message = error
254 .get("message")
255 .and_then(Value::as_str)
256 .unwrap_or("unknown A2A error");
257 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
258 } else {
259 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
260 }
261 }) {
262 Ok(result) => result,
263 Err(error) => {
264 record_a2a_metric(raw_target, "failed", started.elapsed());
265 return Err(error);
266 }
267 };
268
269 let task_id = match result
270 .get("id")
271 .and_then(Value::as_str)
272 .filter(|value| !value.is_empty())
273 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
274 {
275 Ok(task_id) => task_id.to_string(),
276 Err(error) => {
277 record_a2a_metric(raw_target, "failed", started.elapsed());
278 return Err(error);
279 }
280 };
281 let state = match task_state(&result) {
282 Ok(state) => state.to_string(),
283 Err(error) => {
284 record_a2a_metric(raw_target, "failed", started.elapsed());
285 return Err(error);
286 }
287 };
288
289 if state == "completed" {
290 let inline = extract_inline_result(&result);
291 record_a2a_metric(raw_target, "succeeded", started.elapsed());
292 return Ok((
293 endpoint,
294 DispatchAck::InlineResult {
295 task_id,
296 result: inline,
297 },
298 ));
299 }
300
301 if state == "rejected" {
302 record_a2a_metric(raw_target, "failed", started.elapsed());
303 return Err(A2aClientError::Denied(format!(
304 "A2A task rejected by remote agent: {}",
305 task_status_message(&result).unwrap_or("permission rejected")
306 )));
307 }
308
309 if let Some(config) = push_config {
310 register_push_notification_config(
311 &endpoint.rpc_url,
312 &task_id,
313 config,
314 &event.trace_id.0,
315 cancel_rx,
316 )
317 .await
318 .inspect_err(|_| {
319 record_a2a_metric(raw_target, "failed", started.elapsed());
320 })?;
321 }
322 record_a2a_metric(raw_target, "succeeded", started.elapsed());
323 Ok((
324 endpoint.clone(),
325 DispatchAck::PendingTask {
326 task_id: task_id.clone(),
327 state: state.clone(),
328 handle: serde_json::json!({
329 "kind": "a2a_task_handle",
330 "task_id": task_id,
331 "state": state,
332 "target_agent": endpoint.target_agent,
333 "rpc_url": endpoint.rpc_url,
334 "card_url": endpoint.card_url,
335 "agent_id": endpoint.agent_id,
336 }),
337 },
338 ))
339}
340
341pub async fn resolve_agent(
342 raw_target: &str,
343 allow_cleartext: bool,
344 cancel_rx: &mut broadcast::Receiver<()>,
345) -> Result<ResolvedA2aAgent, A2aClientError> {
346 let target = parse_target(raw_target)?;
347 let resolved = resolve_endpoint_with_card(&target, allow_cleartext, cancel_rx).await?;
348 Ok(ResolvedA2aAgent {
349 endpoint: resolved.0,
350 card: resolved.1,
351 })
352}
353
354pub async fn send_jsonrpc_request(
355 rpc_url: &str,
356 request: &Value,
357 trace_id: &str,
358 cancel_rx: &mut broadcast::Receiver<()>,
359) -> Result<Value, A2aClientError> {
360 send_jsonrpc(rpc_url, request, trace_id, cancel_rx).await
361}
362
363fn push_notification_config() -> Option<Value> {
364 let url = std::env::var(A2A_PUSH_URL_ENV)
365 .ok()
366 .map(|value| value.trim().to_string())
367 .filter(|value| !value.is_empty())?;
368 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
369 .ok()
370 .map(|value| value.trim().to_string())
371 .filter(|value| !value.is_empty());
372 let mut config = serde_json::json!({ "url": url });
373 if let Some(token) = token {
374 config["token"] = Value::String(token.clone());
375 config["authentication"] = serde_json::json!({
376 "scheme": "Bearer",
377 "credentials": token,
378 });
379 }
380 Some(config)
381}
382
383async fn register_push_notification_config(
384 rpc_url: &str,
385 task_id: &str,
386 config: Value,
387 trace_id: &str,
388 cancel_rx: &mut broadcast::Receiver<()>,
389) -> Result<(), A2aClientError> {
390 let request = crate::jsonrpc::request(
391 format!("{trace_id}.{task_id}.push-config"),
392 "tasks/pushNotificationConfig/set",
393 serde_json::json!({
394 "taskId": task_id,
395 "pushNotificationConfig": config,
396 }),
397 );
398 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
399 if response.get("error").is_some() {
400 return Err(A2aClientError::Protocol(format!(
401 "A2A push notification registration failed: {}",
402 response["error"]
403 )));
404 }
405 Ok(())
406}
407
408fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
409 if let Some(metrics) = crate::active_metrics_registry() {
410 metrics.record_a2a_hop(target, outcome, duration);
411 }
412}
413
414pub fn target_agent_label(raw_target: &str) -> String {
415 parse_target(raw_target)
416 .map(|target| target.target_agent_label())
417 .unwrap_or_else(|_| raw_target.to_string())
418}
419
420#[derive(Clone, Debug)]
421struct ParsedTarget {
422 authority: String,
423 target_agent: String,
424}
425
426impl ParsedTarget {
427 fn target_agent_label(&self) -> String {
428 if self.target_agent.is_empty() {
429 self.authority.clone()
430 } else {
431 self.target_agent.clone()
432 }
433 }
434}
435
436fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
437 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
438 A2aClientError::InvalidTarget(format!(
439 "invalid a2a dispatch target '{raw_target}': {error}"
440 ))
441 })?;
442 let host = parsed.host_str().ok_or_else(|| {
443 A2aClientError::InvalidTarget(format!(
444 "invalid a2a dispatch target '{raw_target}': missing host"
445 ))
446 })?;
447 let authority = if let Some(port) = parsed.port() {
448 format!("{host}:{port}")
449 } else {
450 host.to_string()
451 };
452 Ok(ParsedTarget {
453 authority,
454 target_agent: parsed.path().trim_start_matches('/').to_string(),
455 })
456}
457
458async fn resolve_endpoint(
459 target: &ParsedTarget,
460 allow_cleartext: bool,
461 cancel_rx: &mut broadcast::Receiver<()>,
462) -> Result<ResolvedA2aEndpoint, A2aClientError> {
463 Ok(
464 resolve_endpoint_with_card(target, allow_cleartext, cancel_rx)
465 .await?
466 .0,
467 )
468}
469
470async fn resolve_endpoint_with_card(
471 target: &ParsedTarget,
472 allow_cleartext: bool,
473 cancel_rx: &mut broadcast::Receiver<()>,
474) -> Result<(ResolvedA2aEndpoint, Value), A2aClientError> {
475 let mut last_error = None;
476 for scheme in card_resolution_schemes(allow_cleartext) {
477 let mut last_scheme_error = None;
478 for path in A2A_AGENT_CARD_PATHS {
479 let card_url = format!("{scheme}://{}/{path}", target.authority);
480 match fetch_agent_card(&card_url, cancel_rx).await {
481 Ok(card) => {
482 let endpoint = endpoint_from_card(
483 card_url,
484 allow_cleartext,
485 &target.authority,
486 target.target_agent.clone(),
487 &card,
488 )?;
489 return Ok((endpoint, card));
490 }
491 Err(AgentCardFetchError::Cancelled(message)) => {
492 return Err(A2aClientError::Cancelled(message));
493 }
494 Err(AgentCardFetchError::Timeout(message)) => {
495 return Err(A2aClientError::Timeout(message));
496 }
497 Err(AgentCardFetchError::Denied(message)) => {
498 return Err(A2aClientError::Denied(message));
499 }
500 Err(error) => {
501 last_error = Some(agent_card_fetch_error_message(&error));
502 last_scheme_error = Some(error);
503 }
504 }
505 }
506 if last_scheme_error.as_ref().is_some_and(|error| {
507 should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
508 }) {
509 continue;
510 }
511 break;
512 }
513 Err(A2aClientError::Discovery(format!(
514 "could not resolve A2A agent card for '{}': {}",
515 target.authority,
516 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
517 )))
518}
519
520async fn fetch_agent_card(
521 card_url: &str,
522 cancel_rx: &mut broadcast::Receiver<()>,
523) -> Result<Value, AgentCardFetchError> {
524 let response = tokio::select! {
525 response = crate::llm::shared_utility_client().get(card_url).send() => {
526 match response {
527 Ok(response) => Ok(response),
528 Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
529 format!("A2A HTTP request timed out: {}", crate::egress::redact_reqwest_error(&error))
530 )),
531 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
532 format!("A2A HTTP request failed: {}", crate::egress::redact_reqwest_error(&error))
533 )),
534 Err(error) => Err(AgentCardFetchError::Discovery(
535 format!("A2A HTTP request failed: {}", crate::egress::redact_reqwest_error(&error))
536 )),
537 }
538 }
539 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
540 "A2A agent-card fetch cancelled".to_string()
541 )),
542 }?;
543 if matches!(
544 response.status(),
545 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
546 ) {
547 return Err(AgentCardFetchError::Denied(format!(
548 "GET {card_url} returned HTTP {}",
549 response.status()
550 )));
551 }
552 if !response.status().is_success() {
553 let card_url = crate::egress::redact_diagnostic_text(card_url);
554 return Err(AgentCardFetchError::Discovery(format!(
555 "GET {card_url} returned HTTP {}",
556 response.status()
557 )));
558 }
559 response.json::<Value>().await.map_err(|error| {
560 AgentCardFetchError::Discovery(format!(
561 "parse {}: {error}",
562 crate::egress::redact_diagnostic_text(card_url)
563 ))
564 })
565}
566
567fn endpoint_from_card(
568 card_url: String,
569 allow_cleartext: bool,
570 requested_authority: &str,
571 target_agent: String,
572 card: &Value,
573) -> Result<ResolvedA2aEndpoint, A2aClientError> {
574 let rpc_url = if has_current_transport_fields(card) {
575 endpoint_from_current_card(card, allow_cleartext, requested_authority)?
576 } else if let Some(rpc_url) =
577 endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
578 {
579 rpc_url
580 } else {
581 return Err(A2aClientError::Discovery(
582 "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
583 ));
584 };
585
586 Ok(ResolvedA2aEndpoint {
587 card_url,
588 rpc_url: rpc_url.to_string(),
589 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
590 target_agent,
591 })
592}
593
594fn has_current_transport_fields(card: &Value) -> bool {
595 card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
596}
597
598fn endpoint_from_current_card(
599 card: &Value,
600 allow_cleartext: bool,
601 requested_authority: &str,
602) -> Result<Url, A2aClientError> {
603 let protocol_version = card
604 .get("protocolVersion")
605 .and_then(Value::as_str)
606 .ok_or_else(|| {
607 A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
608 })?;
609 if protocol_version != A2A_PROTOCOL_VERSION {
610 return Err(A2aClientError::Discovery(format!(
611 "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
612 )));
613 }
614
615 let base_url = card
616 .get("url")
617 .and_then(Value::as_str)
618 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
619 let base_url = resolve_declared_url(
620 base_url,
621 allow_cleartext,
622 requested_authority,
623 "agent card url",
624 )?;
625
626 let preferred_transport = card
627 .get("preferredTransport")
628 .and_then(Value::as_str)
629 .ok_or_else(|| {
630 A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
631 })?;
632 if transport_is_jsonrpc(preferred_transport) {
633 return Ok(base_url);
634 }
635
636 if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
637 return resolve_declared_url(
638 interface_url,
639 allow_cleartext,
640 requested_authority,
641 "JSONRPC additionalInterface url",
642 );
643 }
644
645 Err(A2aClientError::Discovery(
646 "A2A agent card does not expose JSONRPC transport".to_string(),
647 ))
648}
649
650fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
651 let Some(interfaces) = card.get("additionalInterfaces") else {
652 return Ok(None);
653 };
654 let interfaces = interfaces.as_array().ok_or_else(|| {
655 A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
656 })?;
657 for interface in interfaces {
658 if interface
659 .get("transport")
660 .and_then(Value::as_str)
661 .is_some_and(transport_is_jsonrpc)
662 {
663 let interface_url = interface
664 .get("url")
665 .and_then(Value::as_str)
666 .ok_or_else(|| {
667 A2aClientError::Discovery(
668 "A2A JSONRPC additionalInterface missing url".to_string(),
669 )
670 })?;
671 return Ok(Some(interface_url));
672 }
673 }
674 Ok(None)
675}
676
677fn endpoint_from_legacy_supported_interfaces(
678 card: &Value,
679 allow_cleartext: bool,
680 requested_authority: &str,
681) -> Result<Option<Url>, A2aClientError> {
682 let Some(interfaces) = card.get("supportedInterfaces") else {
683 return Ok(None);
684 };
685 let interfaces = interfaces.as_array().ok_or_else(|| {
686 A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
687 })?;
688 let mut saw_jsonrpc = false;
689 for interface in interfaces {
690 if !interface
691 .get("protocolBinding")
692 .and_then(Value::as_str)
693 .is_some_and(transport_is_jsonrpc)
694 {
695 continue;
696 }
697 saw_jsonrpc = true;
698 if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
699 continue;
700 }
701 let interface_url = interface
702 .get("url")
703 .and_then(Value::as_str)
704 .ok_or_else(|| {
705 A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
706 })?;
707 return resolve_declared_url(
708 interface_url,
709 allow_cleartext,
710 requested_authority,
711 "JSONRPC supportedInterface url",
712 )
713 .map(Some);
714 }
715 if saw_jsonrpc {
716 return Err(A2aClientError::Discovery(format!(
717 "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
718 )));
719 }
720 Err(A2aClientError::Discovery(
721 "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
722 ))
723}
724
725fn transport_is_jsonrpc(transport: &str) -> bool {
726 transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
727}
728
729fn resolve_declared_url(
730 raw_url: &str,
731 allow_cleartext: bool,
732 requested_authority: &str,
733 label: &str,
734) -> Result<Url, A2aClientError> {
735 let url = Url::parse(raw_url).map_err(|error| {
736 A2aClientError::Discovery(format!(
737 "invalid A2A {label} '{}': {error}",
738 crate::egress::redact_diagnostic_text(raw_url)
739 ))
740 })?;
741 ensure_cleartext_allowed(&url, allow_cleartext, label)?;
742 let declared_authority = url_authority(&url)?;
743 if !authorities_equivalent(&declared_authority, requested_authority) {
744 return Err(A2aClientError::Denied(format!(
745 "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
746 )));
747 }
748 Ok(url)
749}
750
751fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
752 if allow_cleartext {
753 &["https", "http"]
754 } else {
755 &["https"]
756 }
757}
758
759fn should_try_cleartext_fallback(
772 scheme: &str,
773 allow_cleartext: bool,
774 error: &AgentCardFetchError,
775 authority: &str,
776) -> bool {
777 if !allow_cleartext || scheme != "https" {
778 return false;
779 }
780 match error {
781 AgentCardFetchError::Cancelled(_)
782 | AgentCardFetchError::Denied(_)
783 | AgentCardFetchError::Timeout(_) => false,
784 AgentCardFetchError::ConnectRefused(_) => true,
785 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
786 }
787}
788
789fn ensure_cleartext_allowed(
790 url: &Url,
791 allow_cleartext: bool,
792 label: &str,
793) -> Result<(), A2aClientError> {
794 if allow_cleartext || url.scheme() != "http" {
795 return Ok(());
796 }
797 Err(A2aClientError::Denied(format!(
798 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
799 )))
800}
801
802fn is_loopback_authority(authority: &str) -> bool {
803 let (host, _) = split_authority(authority);
804 if host.eq_ignore_ascii_case("localhost") {
805 return true;
806 }
807 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
808 return ip.is_loopback();
809 }
810 false
811}
812
813fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
824 if card_authority == requested_authority {
825 return true;
826 }
827 let (_, card_port) = split_authority(card_authority);
828 let (_, requested_port) = split_authority(requested_authority);
829 if card_port != requested_port {
830 return false;
831 }
832 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
833}
834
835fn split_authority(authority: &str) -> (&str, &str) {
838 let (host_raw, port) = if authority.starts_with('[') {
839 if let Some(end) = authority.rfind(']') {
841 let host = &authority[..=end];
842 let rest = &authority[end + 1..];
843 let port = rest.strip_prefix(':').unwrap_or("");
844 (host, port)
845 } else {
846 (authority, "")
847 }
848 } else {
849 match authority.rsplit_once(':') {
850 Some((host, port)) => (host, port),
851 None => (authority, ""),
852 }
853 };
854 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
855 (host, port)
856}
857
858fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
859 match error {
860 AgentCardFetchError::Cancelled(message)
861 | AgentCardFetchError::Discovery(message)
862 | AgentCardFetchError::ConnectRefused(message)
863 | AgentCardFetchError::Denied(message)
864 | AgentCardFetchError::Timeout(message) => message.clone(),
865 }
866}
867
868fn is_connect_refused(error: &reqwest::Error) -> bool {
869 if !error.is_connect() {
870 return false;
871 }
872 let mut source = error.source();
873 while let Some(cause) = source {
874 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
875 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
876 return true;
877 }
878 }
879 source = cause.source();
880 }
881 false
882}
883
884fn url_authority(url: &Url) -> Result<String, A2aClientError> {
885 let host = url
886 .host_str()
887 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
888 Ok(if let Some(port) = url.port() {
889 format!("{host}:{port}")
890 } else {
891 host.to_string()
892 })
893}
894
895async fn send_jsonrpc(
896 rpc_url: &str,
897 request: &Value,
898 trace_id: &str,
899 cancel_rx: &mut broadcast::Receiver<()>,
900) -> Result<Value, A2aClientError> {
901 let response = send_http(
902 crate::llm::shared_blocking_client()
903 .post(rpc_url)
904 .header(reqwest::header::CONTENT_TYPE, "application/json")
905 .header("A2A-Version", A2A_PROTOCOL_VERSION)
906 .header("A2A-Trace-Id", trace_id)
907 .json(request),
908 cancel_rx,
909 "A2A task dispatch cancelled",
910 )
911 .await?;
912 if matches!(
913 response.status(),
914 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
915 ) {
916 return Err(A2aClientError::Denied(format!(
917 "A2A task dispatch returned HTTP {}",
918 response.status()
919 )));
920 }
921 if !response.status().is_success() {
922 return Err(A2aClientError::Protocol(format!(
923 "A2A task dispatch returned HTTP {}",
924 response.status()
925 )));
926 }
927 response
928 .json::<Value>()
929 .await
930 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
931}
932
933async fn send_http(
934 request: reqwest::RequestBuilder,
935 cancel_rx: &mut broadcast::Receiver<()>,
936 cancelled_message: &'static str,
937) -> Result<reqwest::Response, A2aClientError> {
938 tokio::select! {
939 response = request.send() => response.map_err(|error| {
940 if error.is_timeout() {
941 A2aClientError::Timeout(format!(
942 "A2A HTTP request timed out: {}",
943 crate::egress::redact_reqwest_error(&error)
944 ))
945 } else {
946 A2aClientError::Protocol(format!(
947 "A2A HTTP request failed: {}",
948 crate::egress::redact_reqwest_error(&error)
949 ))
950 }
951 }),
952 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
953 }
954}
955
956fn task_state(task: &Value) -> Result<&str, A2aClientError> {
957 task.pointer("/status/state")
958 .and_then(Value::as_str)
959 .filter(|value| !value.is_empty())
960 .ok_or_else(|| {
961 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
962 })
963}
964
965fn task_status_message(task: &Value) -> Option<&str> {
966 task.pointer("/status/message/parts")
967 .and_then(Value::as_array)
968 .and_then(|parts| {
969 parts.iter().find_map(|part| {
970 if part.get("type").and_then(Value::as_str) == Some("text") {
971 part.get("text").and_then(Value::as_str).map(str::trim)
972 } else {
973 None
974 }
975 })
976 })
977 .filter(|message| !message.is_empty())
978}
979
980fn extract_inline_result(task: &Value) -> Value {
981 let text = task
982 .get("history")
983 .and_then(Value::as_array)
984 .and_then(|history| {
985 history.iter().rev().find_map(|message| {
986 let role = message.get("role").and_then(Value::as_str)?;
987 if role != "agent" {
988 return None;
989 }
990 message
991 .get("parts")
992 .and_then(Value::as_array)
993 .and_then(|parts| {
994 parts.iter().find_map(|part| {
995 if part.get("type").and_then(Value::as_str) == Some("text") {
996 part.get("text").and_then(Value::as_str).map(str::trim_end)
997 } else {
998 None
999 }
1000 })
1001 })
1002 })
1003 });
1004 match text {
1005 Some(text) if !text.is_empty() => {
1006 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
1007 }
1008 _ => task.clone(),
1009 }
1010}
1011
1012async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
1013 let _ = cancel_rx.recv().await;
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018 use super::*;
1019
1020 #[test]
1021 fn target_agent_label_prefers_path() {
1022 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
1023 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
1024 }
1025
1026 #[test]
1027 fn extract_inline_result_parses_json_text() {
1028 let task = serde_json::json!({
1029 "history": [
1030 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
1031 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
1032 ]
1033 });
1034 assert_eq!(
1035 extract_inline_result(&task),
1036 serde_json::json!({"trace_id": "trace_123"})
1037 );
1038 }
1039
1040 #[test]
1041 fn discovery_prefers_https_before_http() {
1042 assert_eq!(card_resolution_schemes(false), ["https"]);
1043 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
1044 }
1045
1046 #[test]
1047 fn endpoint_from_card_accepts_current_preferred_transport() {
1048 let endpoint = endpoint_from_card(
1049 "https://trusted.example/.well-known/agent-card.json".to_string(),
1050 false,
1051 "trusted.example",
1052 "triage".to_string(),
1053 &serde_json::json!({
1054 "name": "trusted",
1055 "protocolVersion": "0.3.0",
1056 "url": "https://trusted.example/rpc",
1057 "preferredTransport": "JSONRPC",
1058 "additionalInterfaces": [{
1059 "url": "https://trusted.example/rpc",
1060 "transport": "JSONRPC"
1061 }]
1062 }),
1063 )
1064 .expect("current A2A card should resolve");
1065 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1066 assert_eq!(
1067 endpoint.card_url,
1068 "https://trusted.example/.well-known/agent-card.json"
1069 );
1070 assert_eq!(endpoint.target_agent, "triage");
1071 }
1072
1073 #[test]
1074 fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
1075 let endpoint = endpoint_from_card(
1076 "https://trusted.example/.well-known/agent-card.json".to_string(),
1077 false,
1078 "trusted.example",
1079 "triage".to_string(),
1080 &serde_json::json!({
1081 "name": "trusted",
1082 "protocolVersion": "0.3.0",
1083 "url": "https://trusted.example/rest",
1084 "preferredTransport": "HTTP+JSON",
1085 "additionalInterfaces": [{
1086 "url": "https://trusted.example/rpc",
1087 "transport": "JSONRPC"
1088 }]
1089 }),
1090 )
1091 .expect("current A2A card should resolve through additionalInterfaces");
1092 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1093 }
1094
1095 #[test]
1096 fn endpoint_from_card_accepts_legacy_supported_interfaces() {
1097 let endpoint = endpoint_from_card(
1098 "https://trusted.example/.well-known/agent-card.json".to_string(),
1099 false,
1100 "trusted.example",
1101 "triage".to_string(),
1102 &serde_json::json!({
1103 "name": "trusted",
1104 "supportedInterfaces": [{
1105 "protocolBinding": "JSONRPC",
1106 "protocolVersion": "0.3.0",
1107 "url": "https://trusted.example/rpc"
1108 }],
1109 }),
1110 )
1111 .expect("legacy A2A card should resolve during the transition");
1112 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1113 }
1114
1115 #[test]
1116 fn endpoint_from_card_rejects_removed_interfaces_shape() {
1117 let error = endpoint_from_card(
1118 "https://trusted.example/.well-known/agent-card.json".to_string(),
1119 false,
1120 "trusted.example",
1121 "triage".to_string(),
1122 &serde_json::json!({
1123 "url": "https://trusted.example",
1124 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1125 }),
1126 )
1127 .expect_err("pre-0.3 Harn discovery shape should be rejected");
1128 assert_eq!(
1129 error.to_string(),
1130 "A2A agent card missing preferredTransport/additionalInterfaces"
1131 );
1132 }
1133
1134 #[test]
1135 fn cleartext_fallback_only_after_https_connect_refused() {
1136 assert!(should_try_cleartext_fallback(
1137 "https",
1138 true,
1139 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1140 "reviewer.example:443",
1141 ));
1142 assert!(!should_try_cleartext_fallback(
1143 "http",
1144 true,
1145 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1146 "reviewer.example:443",
1147 ));
1148 assert!(!should_try_cleartext_fallback(
1149 "https",
1150 true,
1151 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1152 "reviewer.example:443",
1153 ));
1154 }
1155
1156 #[test]
1157 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1158 for authority in [
1159 "127.0.0.1:8080",
1160 "localhost:8080",
1161 "[::1]:8080",
1162 "127.1.2.3:9000",
1163 ] {
1164 assert!(
1165 !should_try_cleartext_fallback(
1166 "https",
1167 false,
1168 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1169 authority,
1170 ),
1171 "cleartext fallback must stay disabled without opt-in for '{authority}'"
1172 );
1173 }
1174 }
1175
1176 #[test]
1177 fn cleartext_fallback_allows_loopback_after_opt_in() {
1178 for authority in [
1181 "127.0.0.1:8080",
1182 "localhost:8080",
1183 "[::1]:8080",
1184 "127.1.2.3:9000",
1185 ] {
1186 assert!(
1187 should_try_cleartext_fallback(
1188 "https",
1189 true,
1190 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1191 authority,
1192 ),
1193 "expected cleartext fallback for loopback authority '{authority}'"
1194 );
1195 }
1196 }
1197
1198 #[test]
1199 fn cleartext_fallback_denies_external_tls_failures() {
1200 for authority in [
1203 "reviewer.example:443",
1204 "8.8.8.8:443",
1205 "192.168.1.10:8080",
1206 "10.0.0.5:8443",
1207 ] {
1208 assert!(
1209 !should_try_cleartext_fallback(
1210 "https",
1211 true,
1212 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1213 authority,
1214 ),
1215 "cleartext fallback must be denied for external authority '{authority}'"
1216 );
1217 }
1218 }
1219
1220 #[test]
1221 fn is_loopback_authority_recognises_loopback_forms() {
1222 assert!(is_loopback_authority("127.0.0.1:8080"));
1223 assert!(is_loopback_authority("localhost:8080"));
1224 assert!(is_loopback_authority("LOCALHOST:9000"));
1225 assert!(is_loopback_authority("[::1]:8080"));
1226 assert!(is_loopback_authority("127.5.5.5:1234"));
1227 assert!(!is_loopback_authority("8.8.8.8:443"));
1228 assert!(!is_loopback_authority("192.168.1.10:8080"));
1229 assert!(!is_loopback_authority("example.com:443"));
1230 assert!(!is_loopback_authority("reviewer.prod"));
1231 }
1232
1233 #[test]
1234 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1235 let error = endpoint_from_card(
1236 "https://trusted.example/.well-known/agent-card.json".to_string(),
1237 false,
1238 "trusted.example",
1239 "triage".to_string(),
1240 &serde_json::json!({
1241 "protocolVersion": "0.3.0",
1242 "url": "https://evil.example",
1243 "preferredTransport": "JSONRPC",
1244 }),
1245 )
1246 .unwrap_err();
1247 assert_eq!(
1248 error.to_string(),
1249 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1250 );
1251 }
1252
1253 #[test]
1254 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1255 let error = endpoint_from_card(
1256 "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1257 false,
1258 "127.0.0.1:8080",
1259 "triage".to_string(),
1260 &serde_json::json!({
1261 "protocolVersion": "0.3.0",
1262 "url": "http://localhost:8080",
1263 "preferredTransport": "JSONRPC",
1264 }),
1265 )
1266 .expect_err("cleartext card should require explicit opt-in");
1267 assert!(error
1268 .to_string()
1269 .contains("requires `allow_cleartext = true`"));
1270 }
1271
1272 #[test]
1273 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1274 let card = serde_json::json!({
1278 "protocolVersion": "0.3.0",
1279 "url": "http://localhost:8080",
1280 "preferredTransport": "JSONRPC",
1281 });
1282 let endpoint = endpoint_from_card(
1283 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1284 true,
1285 "127.0.0.1:8080",
1286 "triage".to_string(),
1287 &card,
1288 )
1289 .expect("loopback alias pair should be accepted");
1290 assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1291
1292 let card_v6 = serde_json::json!({
1294 "protocolVersion": "0.3.0",
1295 "url": "http://[::1]:8080",
1296 "preferredTransport": "JSONRPC",
1297 });
1298 let endpoint_v6 = endpoint_from_card(
1299 "http://localhost:8080/.well-known/agent-card.json".to_string(),
1300 true,
1301 "localhost:8080",
1302 "triage".to_string(),
1303 &card_v6,
1304 )
1305 .expect("IPv6 loopback alias should be accepted");
1306 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1307
1308 let card_wrong_port = serde_json::json!({
1310 "protocolVersion": "0.3.0",
1311 "url": "http://localhost:9000",
1312 "preferredTransport": "JSONRPC",
1313 });
1314 let error = endpoint_from_card(
1315 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1316 true,
1317 "127.0.0.1:8080",
1318 "triage".to_string(),
1319 &card_wrong_port,
1320 )
1321 .expect_err("mismatched ports must still be rejected even on loopback");
1322 assert!(error
1323 .to_string()
1324 .contains("A2A agent card url authority mismatch"));
1325 }
1326
1327 #[test]
1328 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1329 assert!(!authorities_equivalent(
1330 "internal.corp.example:443",
1331 "trusted.example:443",
1332 ));
1333 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1334 assert!(authorities_equivalent(
1335 "trusted.example:443",
1336 "trusted.example:443",
1337 ));
1338 }
1339}