1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11 ".well-known/agent-card.json",
12 ".well-known/a2a-agent",
13 ".well-known/agent.json",
14 "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23 pub card_url: String,
24 pub rpc_url: String,
25 pub agent_id: Option<String>,
26 pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ResolvedA2aAgent {
31 pub endpoint: ResolvedA2aEndpoint,
32 pub card: Value,
33}
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub enum DispatchAck {
37 InlineResult {
38 task_id: String,
39 result: Value,
40 },
41 PendingTask {
42 task_id: String,
43 state: String,
44 handle: Value,
45 },
46}
47
48#[derive(Debug)]
49pub enum A2aClientError {
50 InvalidTarget(String),
51 Discovery(String),
52 Protocol(String),
53 Denied(String),
54 Timeout(String),
55 Cancelled(String),
56}
57
58impl std::fmt::Display for A2aClientError {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 match self {
61 Self::InvalidTarget(message)
62 | Self::Discovery(message)
63 | Self::Protocol(message)
64 | Self::Denied(message)
65 | Self::Timeout(message)
66 | Self::Cancelled(message) => f.write_str(message),
67 }
68 }
69}
70
71impl std::error::Error for A2aClientError {}
72
73#[async_trait]
75pub trait A2aClient: Send + Sync + 'static {
76 async fn dispatch(
77 &self,
78 target: &str,
79 allow_cleartext: bool,
80 binding_id: &str,
81 binding_key: &str,
82 event: &TriggerEvent,
83 cancel_rx: &mut broadcast::Receiver<()>,
84 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
85}
86
87pub struct RealA2aClient;
89
90#[async_trait]
91impl A2aClient for RealA2aClient {
92 async fn dispatch(
93 &self,
94 target: &str,
95 allow_cleartext: bool,
96 binding_id: &str,
97 binding_key: &str,
98 event: &TriggerEvent,
99 cancel_rx: &mut broadcast::Receiver<()>,
100 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
101 dispatch_trigger_event(
102 target,
103 allow_cleartext,
104 binding_id,
105 binding_key,
106 event,
107 cancel_rx,
108 )
109 .await
110 }
111}
112
113#[derive(Debug)]
114enum AgentCardFetchError {
115 Cancelled(String),
116 Discovery(String),
117 ConnectRefused(String),
118 Denied(String),
119 Timeout(String),
120}
121
122pub async fn dispatch_trigger_event(
123 raw_target: &str,
124 allow_cleartext: bool,
125 binding_id: &str,
126 binding_key: &str,
127 event: &TriggerEvent,
128 cancel_rx: &mut broadcast::Receiver<()>,
129) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
130 let started = std::time::Instant::now();
131 let target = match parse_target(raw_target) {
132 Ok(target) => target,
133 Err(error) => {
134 record_a2a_metric(raw_target, "failed", started.elapsed());
135 return Err(error);
136 }
137 };
138 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
139 Ok(endpoint) => endpoint,
140 Err(error) => {
141 record_a2a_metric(raw_target, "failed", started.elapsed());
142 return Err(error);
143 }
144 };
145 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
146 let envelope = serde_json::json!({
147 "kind": "harn.trigger.dispatch",
148 "message_id": message_id,
149 "trace_id": event.trace_id.0,
150 "event_id": event.id.0,
151 "trigger_id": binding_id,
152 "binding_key": binding_key,
153 "target_agent": endpoint.target_agent,
154 "event": event,
155 });
156 let text = serde_json::to_string(&envelope)
157 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
158 let push_config = push_notification_config();
159 let mut params = serde_json::json!({
160 "contextId": event.trace_id.0,
161 "message": {
162 "messageId": message_id,
163 "role": "user",
164 "parts": [{
165 "type": "text",
166 "text": text,
167 }],
168 "metadata": {
169 "kind": "harn.trigger.dispatch",
170 "trace_id": event.trace_id.0,
171 "event_id": event.id.0,
172 "trigger_id": binding_id,
173 "binding_key": binding_key,
174 "target_agent": endpoint.target_agent,
175 },
176 },
177 });
178 if let Some(config) = push_config.clone() {
179 params["configuration"] = serde_json::json!({
180 "blocking": false,
181 "returnImmediately": true,
182 "pushNotificationConfig": config,
183 });
184 }
185 let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
186
187 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
188 Ok(body) => body,
189 Err(error) => {
190 record_a2a_metric(raw_target, "failed", started.elapsed());
191 return Err(error);
192 }
193 };
194 let result = match body.get("result").cloned().ok_or_else(|| {
195 if let Some(error) = body.get("error") {
196 let message = error
197 .get("message")
198 .and_then(Value::as_str)
199 .unwrap_or("unknown A2A error");
200 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
201 } else {
202 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
203 }
204 }) {
205 Ok(result) => result,
206 Err(error) => {
207 record_a2a_metric(raw_target, "failed", started.elapsed());
208 return Err(error);
209 }
210 };
211
212 let task_id = match result
213 .get("id")
214 .and_then(Value::as_str)
215 .filter(|value| !value.is_empty())
216 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
217 {
218 Ok(task_id) => task_id.to_string(),
219 Err(error) => {
220 record_a2a_metric(raw_target, "failed", started.elapsed());
221 return Err(error);
222 }
223 };
224 let state = match task_state(&result) {
225 Ok(state) => state.to_string(),
226 Err(error) => {
227 record_a2a_metric(raw_target, "failed", started.elapsed());
228 return Err(error);
229 }
230 };
231
232 if state == "completed" {
233 let inline = extract_inline_result(&result);
234 record_a2a_metric(raw_target, "succeeded", started.elapsed());
235 return Ok((
236 endpoint,
237 DispatchAck::InlineResult {
238 task_id,
239 result: inline,
240 },
241 ));
242 }
243
244 if state == "rejected" {
245 record_a2a_metric(raw_target, "failed", started.elapsed());
246 return Err(A2aClientError::Denied(format!(
247 "A2A task rejected by remote agent: {}",
248 task_status_message(&result).unwrap_or("permission rejected")
249 )));
250 }
251
252 if let Some(config) = push_config {
253 register_push_notification_config(
254 &endpoint.rpc_url,
255 &task_id,
256 config,
257 &event.trace_id.0,
258 cancel_rx,
259 )
260 .await
261 .inspect_err(|_| {
262 record_a2a_metric(raw_target, "failed", started.elapsed());
263 })?;
264 }
265 record_a2a_metric(raw_target, "succeeded", started.elapsed());
266 Ok((
267 endpoint.clone(),
268 DispatchAck::PendingTask {
269 task_id: task_id.clone(),
270 state: state.clone(),
271 handle: serde_json::json!({
272 "kind": "a2a_task_handle",
273 "task_id": task_id,
274 "state": state,
275 "target_agent": endpoint.target_agent,
276 "rpc_url": endpoint.rpc_url,
277 "card_url": endpoint.card_url,
278 "agent_id": endpoint.agent_id,
279 }),
280 },
281 ))
282}
283
284pub async fn resolve_agent(
285 raw_target: &str,
286 allow_cleartext: bool,
287 cancel_rx: &mut broadcast::Receiver<()>,
288) -> Result<ResolvedA2aAgent, A2aClientError> {
289 let target = parse_target(raw_target)?;
290 let resolved = resolve_endpoint_with_card(&target, allow_cleartext, cancel_rx).await?;
291 Ok(ResolvedA2aAgent {
292 endpoint: resolved.0,
293 card: resolved.1,
294 })
295}
296
297pub async fn send_jsonrpc_request(
298 rpc_url: &str,
299 request: &Value,
300 trace_id: &str,
301 cancel_rx: &mut broadcast::Receiver<()>,
302) -> Result<Value, A2aClientError> {
303 send_jsonrpc(rpc_url, request, trace_id, cancel_rx).await
304}
305
306fn push_notification_config() -> Option<Value> {
307 let url = std::env::var(A2A_PUSH_URL_ENV)
308 .ok()
309 .map(|value| value.trim().to_string())
310 .filter(|value| !value.is_empty())?;
311 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
312 .ok()
313 .map(|value| value.trim().to_string())
314 .filter(|value| !value.is_empty());
315 let mut config = serde_json::json!({ "url": url });
316 if let Some(token) = token {
317 config["token"] = Value::String(token.clone());
318 config["authentication"] = serde_json::json!({
319 "scheme": "Bearer",
320 "credentials": token,
321 });
322 }
323 Some(config)
324}
325
326async fn register_push_notification_config(
327 rpc_url: &str,
328 task_id: &str,
329 config: Value,
330 trace_id: &str,
331 cancel_rx: &mut broadcast::Receiver<()>,
332) -> Result<(), A2aClientError> {
333 let request = crate::jsonrpc::request(
334 format!("{trace_id}.{task_id}.push-config"),
335 "tasks/pushNotificationConfig/set",
336 serde_json::json!({
337 "taskId": task_id,
338 "pushNotificationConfig": config,
339 }),
340 );
341 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
342 if response.get("error").is_some() {
343 return Err(A2aClientError::Protocol(format!(
344 "A2A push notification registration failed: {}",
345 response["error"]
346 )));
347 }
348 Ok(())
349}
350
351fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
352 if let Some(metrics) = crate::active_metrics_registry() {
353 metrics.record_a2a_hop(target, outcome, duration);
354 }
355}
356
357pub fn target_agent_label(raw_target: &str) -> String {
358 parse_target(raw_target)
359 .map(|target| target.target_agent_label())
360 .unwrap_or_else(|_| raw_target.to_string())
361}
362
363#[derive(Clone, Debug)]
364struct ParsedTarget {
365 authority: String,
366 target_agent: String,
367}
368
369impl ParsedTarget {
370 fn target_agent_label(&self) -> String {
371 if self.target_agent.is_empty() {
372 self.authority.clone()
373 } else {
374 self.target_agent.clone()
375 }
376 }
377}
378
379fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
380 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
381 A2aClientError::InvalidTarget(format!(
382 "invalid a2a dispatch target '{raw_target}': {error}"
383 ))
384 })?;
385 let host = parsed.host_str().ok_or_else(|| {
386 A2aClientError::InvalidTarget(format!(
387 "invalid a2a dispatch target '{raw_target}': missing host"
388 ))
389 })?;
390 let authority = if let Some(port) = parsed.port() {
391 format!("{host}:{port}")
392 } else {
393 host.to_string()
394 };
395 Ok(ParsedTarget {
396 authority,
397 target_agent: parsed.path().trim_start_matches('/').to_string(),
398 })
399}
400
401async fn resolve_endpoint(
402 target: &ParsedTarget,
403 allow_cleartext: bool,
404 cancel_rx: &mut broadcast::Receiver<()>,
405) -> Result<ResolvedA2aEndpoint, A2aClientError> {
406 Ok(
407 resolve_endpoint_with_card(target, allow_cleartext, cancel_rx)
408 .await?
409 .0,
410 )
411}
412
413async fn resolve_endpoint_with_card(
414 target: &ParsedTarget,
415 allow_cleartext: bool,
416 cancel_rx: &mut broadcast::Receiver<()>,
417) -> Result<(ResolvedA2aEndpoint, Value), A2aClientError> {
418 let mut last_error = None;
419 for scheme in card_resolution_schemes(allow_cleartext) {
420 let mut last_scheme_error = None;
421 for path in A2A_AGENT_CARD_PATHS {
422 let card_url = format!("{scheme}://{}/{path}", target.authority);
423 match fetch_agent_card(&card_url, cancel_rx).await {
424 Ok(card) => {
425 let endpoint = endpoint_from_card(
426 card_url,
427 allow_cleartext,
428 &target.authority,
429 target.target_agent.clone(),
430 &card,
431 )?;
432 return Ok((endpoint, card));
433 }
434 Err(AgentCardFetchError::Cancelled(message)) => {
435 return Err(A2aClientError::Cancelled(message));
436 }
437 Err(AgentCardFetchError::Timeout(message)) => {
438 return Err(A2aClientError::Timeout(message));
439 }
440 Err(AgentCardFetchError::Denied(message)) => {
441 return Err(A2aClientError::Denied(message));
442 }
443 Err(error) => {
444 last_error = Some(agent_card_fetch_error_message(&error));
445 last_scheme_error = Some(error);
446 }
447 }
448 }
449 if last_scheme_error.as_ref().is_some_and(|error| {
450 should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
451 }) {
452 continue;
453 }
454 break;
455 }
456 Err(A2aClientError::Discovery(format!(
457 "could not resolve A2A agent card for '{}': {}",
458 target.authority,
459 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
460 )))
461}
462
463async fn fetch_agent_card(
464 card_url: &str,
465 cancel_rx: &mut broadcast::Receiver<()>,
466) -> Result<Value, AgentCardFetchError> {
467 let response = tokio::select! {
468 response = crate::llm::shared_utility_client().get(card_url).send() => {
469 match response {
470 Ok(response) => Ok(response),
471 Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
472 format!("A2A HTTP request timed out: {error}")
473 )),
474 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
475 format!("A2A HTTP request failed: {error}")
476 )),
477 Err(error) => Err(AgentCardFetchError::Discovery(
478 format!("A2A HTTP request failed: {error}")
479 )),
480 }
481 }
482 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
483 "A2A agent-card fetch cancelled".to_string()
484 )),
485 }?;
486 if matches!(
487 response.status(),
488 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
489 ) {
490 return Err(AgentCardFetchError::Denied(format!(
491 "GET {card_url} returned HTTP {}",
492 response.status()
493 )));
494 }
495 if !response.status().is_success() {
496 return Err(AgentCardFetchError::Discovery(format!(
497 "GET {card_url} returned HTTP {}",
498 response.status()
499 )));
500 }
501 response
502 .json::<Value>()
503 .await
504 .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
505}
506
507fn endpoint_from_card(
508 card_url: String,
509 allow_cleartext: bool,
510 requested_authority: &str,
511 target_agent: String,
512 card: &Value,
513) -> Result<ResolvedA2aEndpoint, A2aClientError> {
514 let rpc_url = if has_current_transport_fields(card) {
515 endpoint_from_current_card(card, allow_cleartext, requested_authority)?
516 } else if let Some(rpc_url) =
517 endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
518 {
519 rpc_url
520 } else {
521 return Err(A2aClientError::Discovery(
522 "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
523 ));
524 };
525
526 Ok(ResolvedA2aEndpoint {
527 card_url,
528 rpc_url: rpc_url.to_string(),
529 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
530 target_agent,
531 })
532}
533
534fn has_current_transport_fields(card: &Value) -> bool {
535 card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
536}
537
538fn endpoint_from_current_card(
539 card: &Value,
540 allow_cleartext: bool,
541 requested_authority: &str,
542) -> Result<Url, A2aClientError> {
543 let protocol_version = card
544 .get("protocolVersion")
545 .and_then(Value::as_str)
546 .ok_or_else(|| {
547 A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
548 })?;
549 if protocol_version != A2A_PROTOCOL_VERSION {
550 return Err(A2aClientError::Discovery(format!(
551 "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
552 )));
553 }
554
555 let base_url = card
556 .get("url")
557 .and_then(Value::as_str)
558 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
559 let base_url = resolve_declared_url(
560 base_url,
561 allow_cleartext,
562 requested_authority,
563 "agent card url",
564 )?;
565
566 let preferred_transport = card
567 .get("preferredTransport")
568 .and_then(Value::as_str)
569 .ok_or_else(|| {
570 A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
571 })?;
572 if transport_is_jsonrpc(preferred_transport) {
573 return Ok(base_url);
574 }
575
576 if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
577 return resolve_declared_url(
578 interface_url,
579 allow_cleartext,
580 requested_authority,
581 "JSONRPC additionalInterface url",
582 );
583 }
584
585 Err(A2aClientError::Discovery(
586 "A2A agent card does not expose JSONRPC transport".to_string(),
587 ))
588}
589
590fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
591 let Some(interfaces) = card.get("additionalInterfaces") else {
592 return Ok(None);
593 };
594 let interfaces = interfaces.as_array().ok_or_else(|| {
595 A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
596 })?;
597 for interface in interfaces {
598 if interface
599 .get("transport")
600 .and_then(Value::as_str)
601 .is_some_and(transport_is_jsonrpc)
602 {
603 let interface_url = interface
604 .get("url")
605 .and_then(Value::as_str)
606 .ok_or_else(|| {
607 A2aClientError::Discovery(
608 "A2A JSONRPC additionalInterface missing url".to_string(),
609 )
610 })?;
611 return Ok(Some(interface_url));
612 }
613 }
614 Ok(None)
615}
616
617fn endpoint_from_legacy_supported_interfaces(
618 card: &Value,
619 allow_cleartext: bool,
620 requested_authority: &str,
621) -> Result<Option<Url>, A2aClientError> {
622 let Some(interfaces) = card.get("supportedInterfaces") else {
623 return Ok(None);
624 };
625 let interfaces = interfaces.as_array().ok_or_else(|| {
626 A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
627 })?;
628 let mut saw_jsonrpc = false;
629 for interface in interfaces {
630 if !interface
631 .get("protocolBinding")
632 .and_then(Value::as_str)
633 .is_some_and(transport_is_jsonrpc)
634 {
635 continue;
636 }
637 saw_jsonrpc = true;
638 if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
639 continue;
640 }
641 let interface_url = interface
642 .get("url")
643 .and_then(Value::as_str)
644 .ok_or_else(|| {
645 A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
646 })?;
647 return resolve_declared_url(
648 interface_url,
649 allow_cleartext,
650 requested_authority,
651 "JSONRPC supportedInterface url",
652 )
653 .map(Some);
654 }
655 if saw_jsonrpc {
656 return Err(A2aClientError::Discovery(format!(
657 "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
658 )));
659 }
660 Err(A2aClientError::Discovery(
661 "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
662 ))
663}
664
665fn transport_is_jsonrpc(transport: &str) -> bool {
666 transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
667}
668
669fn resolve_declared_url(
670 raw_url: &str,
671 allow_cleartext: bool,
672 requested_authority: &str,
673 label: &str,
674) -> Result<Url, A2aClientError> {
675 let url = Url::parse(raw_url).map_err(|error| {
676 A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
677 })?;
678 ensure_cleartext_allowed(&url, allow_cleartext, label)?;
679 let declared_authority = url_authority(&url)?;
680 if !authorities_equivalent(&declared_authority, requested_authority) {
681 return Err(A2aClientError::Denied(format!(
682 "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
683 )));
684 }
685 Ok(url)
686}
687
688fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
689 if allow_cleartext {
690 &["https", "http"]
691 } else {
692 &["https"]
693 }
694}
695
696fn should_try_cleartext_fallback(
709 scheme: &str,
710 allow_cleartext: bool,
711 error: &AgentCardFetchError,
712 authority: &str,
713) -> bool {
714 if !allow_cleartext || scheme != "https" {
715 return false;
716 }
717 match error {
718 AgentCardFetchError::Cancelled(_)
719 | AgentCardFetchError::Denied(_)
720 | AgentCardFetchError::Timeout(_) => false,
721 AgentCardFetchError::ConnectRefused(_) => true,
722 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
723 }
724}
725
726fn ensure_cleartext_allowed(
727 url: &Url,
728 allow_cleartext: bool,
729 label: &str,
730) -> Result<(), A2aClientError> {
731 if allow_cleartext || url.scheme() != "http" {
732 return Ok(());
733 }
734 Err(A2aClientError::Denied(format!(
735 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
736 )))
737}
738
739fn is_loopback_authority(authority: &str) -> bool {
740 let (host, _) = split_authority(authority);
741 if host.eq_ignore_ascii_case("localhost") {
742 return true;
743 }
744 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
745 return ip.is_loopback();
746 }
747 false
748}
749
750fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
761 if card_authority == requested_authority {
762 return true;
763 }
764 let (_, card_port) = split_authority(card_authority);
765 let (_, requested_port) = split_authority(requested_authority);
766 if card_port != requested_port {
767 return false;
768 }
769 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
770}
771
772fn split_authority(authority: &str) -> (&str, &str) {
775 let (host_raw, port) = if authority.starts_with('[') {
776 if let Some(end) = authority.rfind(']') {
778 let host = &authority[..=end];
779 let rest = &authority[end + 1..];
780 let port = rest.strip_prefix(':').unwrap_or("");
781 (host, port)
782 } else {
783 (authority, "")
784 }
785 } else {
786 match authority.rsplit_once(':') {
787 Some((host, port)) => (host, port),
788 None => (authority, ""),
789 }
790 };
791 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
792 (host, port)
793}
794
795fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
796 match error {
797 AgentCardFetchError::Cancelled(message)
798 | AgentCardFetchError::Discovery(message)
799 | AgentCardFetchError::ConnectRefused(message)
800 | AgentCardFetchError::Denied(message)
801 | AgentCardFetchError::Timeout(message) => message.clone(),
802 }
803}
804
805fn is_connect_refused(error: &reqwest::Error) -> bool {
806 if !error.is_connect() {
807 return false;
808 }
809 let mut source = error.source();
810 while let Some(cause) = source {
811 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
812 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
813 return true;
814 }
815 }
816 source = cause.source();
817 }
818 false
819}
820
821fn url_authority(url: &Url) -> Result<String, A2aClientError> {
822 let host = url
823 .host_str()
824 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
825 Ok(if let Some(port) = url.port() {
826 format!("{host}:{port}")
827 } else {
828 host.to_string()
829 })
830}
831
832async fn send_jsonrpc(
833 rpc_url: &str,
834 request: &Value,
835 trace_id: &str,
836 cancel_rx: &mut broadcast::Receiver<()>,
837) -> Result<Value, A2aClientError> {
838 let response = send_http(
839 crate::llm::shared_blocking_client()
840 .post(rpc_url)
841 .header(reqwest::header::CONTENT_TYPE, "application/json")
842 .header("A2A-Version", A2A_PROTOCOL_VERSION)
843 .header("A2A-Trace-Id", trace_id)
844 .json(request),
845 cancel_rx,
846 "A2A task dispatch cancelled",
847 )
848 .await?;
849 if matches!(
850 response.status(),
851 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
852 ) {
853 return Err(A2aClientError::Denied(format!(
854 "A2A task dispatch returned HTTP {}",
855 response.status()
856 )));
857 }
858 if !response.status().is_success() {
859 return Err(A2aClientError::Protocol(format!(
860 "A2A task dispatch returned HTTP {}",
861 response.status()
862 )));
863 }
864 response
865 .json::<Value>()
866 .await
867 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
868}
869
870async fn send_http(
871 request: reqwest::RequestBuilder,
872 cancel_rx: &mut broadcast::Receiver<()>,
873 cancelled_message: &'static str,
874) -> Result<reqwest::Response, A2aClientError> {
875 tokio::select! {
876 response = request.send() => response.map_err(|error| {
877 if error.is_timeout() {
878 A2aClientError::Timeout(format!("A2A HTTP request timed out: {error}"))
879 } else {
880 A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))
881 }
882 }),
883 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
884 }
885}
886
887fn task_state(task: &Value) -> Result<&str, A2aClientError> {
888 task.pointer("/status/state")
889 .and_then(Value::as_str)
890 .filter(|value| !value.is_empty())
891 .ok_or_else(|| {
892 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
893 })
894}
895
896fn task_status_message(task: &Value) -> Option<&str> {
897 task.pointer("/status/message/parts")
898 .and_then(Value::as_array)
899 .and_then(|parts| {
900 parts.iter().find_map(|part| {
901 if part.get("type").and_then(Value::as_str) == Some("text") {
902 part.get("text").and_then(Value::as_str).map(str::trim)
903 } else {
904 None
905 }
906 })
907 })
908 .filter(|message| !message.is_empty())
909}
910
911fn extract_inline_result(task: &Value) -> Value {
912 let text = task
913 .get("history")
914 .and_then(Value::as_array)
915 .and_then(|history| {
916 history.iter().rev().find_map(|message| {
917 let role = message.get("role").and_then(Value::as_str)?;
918 if role != "agent" {
919 return None;
920 }
921 message
922 .get("parts")
923 .and_then(Value::as_array)
924 .and_then(|parts| {
925 parts.iter().find_map(|part| {
926 if part.get("type").and_then(Value::as_str) == Some("text") {
927 part.get("text").and_then(Value::as_str).map(str::trim_end)
928 } else {
929 None
930 }
931 })
932 })
933 })
934 });
935 match text {
936 Some(text) if !text.is_empty() => {
937 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
938 }
939 _ => task.clone(),
940 }
941}
942
943async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
944 let _ = cancel_rx.recv().await;
945}
946
947#[cfg(test)]
948mod tests {
949 use super::*;
950
951 #[test]
952 fn target_agent_label_prefers_path() {
953 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
954 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
955 }
956
957 #[test]
958 fn extract_inline_result_parses_json_text() {
959 let task = serde_json::json!({
960 "history": [
961 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
962 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
963 ]
964 });
965 assert_eq!(
966 extract_inline_result(&task),
967 serde_json::json!({"trace_id": "trace_123"})
968 );
969 }
970
971 #[test]
972 fn discovery_prefers_https_before_http() {
973 assert_eq!(card_resolution_schemes(false), ["https"]);
974 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
975 }
976
977 #[test]
978 fn endpoint_from_card_accepts_current_preferred_transport() {
979 let endpoint = endpoint_from_card(
980 "https://trusted.example/.well-known/agent-card.json".to_string(),
981 false,
982 "trusted.example",
983 "triage".to_string(),
984 &serde_json::json!({
985 "name": "trusted",
986 "protocolVersion": "0.3.0",
987 "url": "https://trusted.example/rpc",
988 "preferredTransport": "JSONRPC",
989 "additionalInterfaces": [{
990 "url": "https://trusted.example/rpc",
991 "transport": "JSONRPC"
992 }]
993 }),
994 )
995 .expect("current A2A card should resolve");
996 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
997 assert_eq!(
998 endpoint.card_url,
999 "https://trusted.example/.well-known/agent-card.json"
1000 );
1001 assert_eq!(endpoint.target_agent, "triage");
1002 }
1003
1004 #[test]
1005 fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
1006 let endpoint = endpoint_from_card(
1007 "https://trusted.example/.well-known/agent-card.json".to_string(),
1008 false,
1009 "trusted.example",
1010 "triage".to_string(),
1011 &serde_json::json!({
1012 "name": "trusted",
1013 "protocolVersion": "0.3.0",
1014 "url": "https://trusted.example/rest",
1015 "preferredTransport": "HTTP+JSON",
1016 "additionalInterfaces": [{
1017 "url": "https://trusted.example/rpc",
1018 "transport": "JSONRPC"
1019 }]
1020 }),
1021 )
1022 .expect("current A2A card should resolve through additionalInterfaces");
1023 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1024 }
1025
1026 #[test]
1027 fn endpoint_from_card_accepts_legacy_supported_interfaces() {
1028 let endpoint = endpoint_from_card(
1029 "https://trusted.example/.well-known/agent-card.json".to_string(),
1030 false,
1031 "trusted.example",
1032 "triage".to_string(),
1033 &serde_json::json!({
1034 "name": "trusted",
1035 "supportedInterfaces": [{
1036 "protocolBinding": "JSONRPC",
1037 "protocolVersion": "0.3.0",
1038 "url": "https://trusted.example/rpc"
1039 }],
1040 }),
1041 )
1042 .expect("legacy A2A card should resolve during the transition");
1043 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1044 }
1045
1046 #[test]
1047 fn endpoint_from_card_rejects_removed_interfaces_shape() {
1048 let error = endpoint_from_card(
1049 "https://trusted.example/.well-known/agent-card.json".to_string(),
1050 false,
1051 "trusted.example",
1052 "triage".to_string(),
1053 &serde_json::json!({
1054 "url": "https://trusted.example",
1055 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1056 }),
1057 )
1058 .expect_err("pre-0.3 Harn discovery shape should be rejected");
1059 assert_eq!(
1060 error.to_string(),
1061 "A2A agent card missing preferredTransport/additionalInterfaces"
1062 );
1063 }
1064
1065 #[test]
1066 fn cleartext_fallback_only_after_https_connect_refused() {
1067 assert!(should_try_cleartext_fallback(
1068 "https",
1069 true,
1070 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1071 "reviewer.example:443",
1072 ));
1073 assert!(!should_try_cleartext_fallback(
1074 "http",
1075 true,
1076 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1077 "reviewer.example:443",
1078 ));
1079 assert!(!should_try_cleartext_fallback(
1080 "https",
1081 true,
1082 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1083 "reviewer.example:443",
1084 ));
1085 }
1086
1087 #[test]
1088 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1089 for authority in [
1090 "127.0.0.1:8080",
1091 "localhost:8080",
1092 "[::1]:8080",
1093 "127.1.2.3:9000",
1094 ] {
1095 assert!(
1096 !should_try_cleartext_fallback(
1097 "https",
1098 false,
1099 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1100 authority,
1101 ),
1102 "cleartext fallback must stay disabled without opt-in for '{authority}'"
1103 );
1104 }
1105 }
1106
1107 #[test]
1108 fn cleartext_fallback_allows_loopback_after_opt_in() {
1109 for authority in [
1112 "127.0.0.1:8080",
1113 "localhost:8080",
1114 "[::1]:8080",
1115 "127.1.2.3:9000",
1116 ] {
1117 assert!(
1118 should_try_cleartext_fallback(
1119 "https",
1120 true,
1121 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1122 authority,
1123 ),
1124 "expected cleartext fallback for loopback authority '{authority}'"
1125 );
1126 }
1127 }
1128
1129 #[test]
1130 fn cleartext_fallback_denies_external_tls_failures() {
1131 for authority in [
1134 "reviewer.example:443",
1135 "8.8.8.8:443",
1136 "192.168.1.10:8080",
1137 "10.0.0.5:8443",
1138 ] {
1139 assert!(
1140 !should_try_cleartext_fallback(
1141 "https",
1142 true,
1143 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1144 authority,
1145 ),
1146 "cleartext fallback must be denied for external authority '{authority}'"
1147 );
1148 }
1149 }
1150
1151 #[test]
1152 fn is_loopback_authority_recognises_loopback_forms() {
1153 assert!(is_loopback_authority("127.0.0.1:8080"));
1154 assert!(is_loopback_authority("localhost:8080"));
1155 assert!(is_loopback_authority("LOCALHOST:9000"));
1156 assert!(is_loopback_authority("[::1]:8080"));
1157 assert!(is_loopback_authority("127.5.5.5:1234"));
1158 assert!(!is_loopback_authority("8.8.8.8:443"));
1159 assert!(!is_loopback_authority("192.168.1.10:8080"));
1160 assert!(!is_loopback_authority("example.com:443"));
1161 assert!(!is_loopback_authority("reviewer.prod"));
1162 }
1163
1164 #[test]
1165 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1166 let error = endpoint_from_card(
1167 "https://trusted.example/.well-known/agent-card.json".to_string(),
1168 false,
1169 "trusted.example",
1170 "triage".to_string(),
1171 &serde_json::json!({
1172 "protocolVersion": "0.3.0",
1173 "url": "https://evil.example",
1174 "preferredTransport": "JSONRPC",
1175 }),
1176 )
1177 .unwrap_err();
1178 assert_eq!(
1179 error.to_string(),
1180 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1181 );
1182 }
1183
1184 #[test]
1185 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1186 let error = endpoint_from_card(
1187 "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1188 false,
1189 "127.0.0.1:8080",
1190 "triage".to_string(),
1191 &serde_json::json!({
1192 "protocolVersion": "0.3.0",
1193 "url": "http://localhost:8080",
1194 "preferredTransport": "JSONRPC",
1195 }),
1196 )
1197 .expect_err("cleartext card should require explicit opt-in");
1198 assert!(error
1199 .to_string()
1200 .contains("requires `allow_cleartext = true`"));
1201 }
1202
1203 #[test]
1204 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1205 let card = serde_json::json!({
1209 "protocolVersion": "0.3.0",
1210 "url": "http://localhost:8080",
1211 "preferredTransport": "JSONRPC",
1212 });
1213 let endpoint = endpoint_from_card(
1214 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1215 true,
1216 "127.0.0.1:8080",
1217 "triage".to_string(),
1218 &card,
1219 )
1220 .expect("loopback alias pair should be accepted");
1221 assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1222
1223 let card_v6 = serde_json::json!({
1225 "protocolVersion": "0.3.0",
1226 "url": "http://[::1]:8080",
1227 "preferredTransport": "JSONRPC",
1228 });
1229 let endpoint_v6 = endpoint_from_card(
1230 "http://localhost:8080/.well-known/agent-card.json".to_string(),
1231 true,
1232 "localhost:8080",
1233 "triage".to_string(),
1234 &card_v6,
1235 )
1236 .expect("IPv6 loopback alias should be accepted");
1237 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1238
1239 let card_wrong_port = serde_json::json!({
1241 "protocolVersion": "0.3.0",
1242 "url": "http://localhost:9000",
1243 "preferredTransport": "JSONRPC",
1244 });
1245 let error = endpoint_from_card(
1246 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1247 true,
1248 "127.0.0.1:8080",
1249 "triage".to_string(),
1250 &card_wrong_port,
1251 )
1252 .expect_err("mismatched ports must still be rejected even on loopback");
1253 assert!(error
1254 .to_string()
1255 .contains("A2A agent card url authority mismatch"));
1256 }
1257
1258 #[test]
1259 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1260 assert!(!authorities_equivalent(
1261 "internal.corp.example:443",
1262 "trusted.example:443",
1263 ));
1264 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1265 assert!(authorities_equivalent(
1266 "trusted.example:443",
1267 "trusted.example:443",
1268 ));
1269 }
1270}