1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11 ".well-known/agent-card.json",
12 ".well-known/a2a-agent",
13 ".well-known/agent.json",
14 "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23 pub card_url: String,
24 pub rpc_url: String,
25 pub agent_id: Option<String>,
26 pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq)]
30pub enum DispatchAck {
31 InlineResult {
32 task_id: String,
33 result: Value,
34 },
35 PendingTask {
36 task_id: String,
37 state: String,
38 handle: Value,
39 },
40}
41
42#[derive(Debug)]
43pub enum A2aClientError {
44 InvalidTarget(String),
45 Discovery(String),
46 Protocol(String),
47 Denied(String),
48 Timeout(String),
49 Cancelled(String),
50}
51
52impl std::fmt::Display for A2aClientError {
53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54 match self {
55 Self::InvalidTarget(message)
56 | Self::Discovery(message)
57 | Self::Protocol(message)
58 | Self::Denied(message)
59 | Self::Timeout(message)
60 | Self::Cancelled(message) => f.write_str(message),
61 }
62 }
63}
64
65impl std::error::Error for A2aClientError {}
66
67#[async_trait]
69pub trait A2aClient: Send + Sync + 'static {
70 async fn dispatch(
71 &self,
72 target: &str,
73 allow_cleartext: bool,
74 binding_id: &str,
75 binding_key: &str,
76 event: &TriggerEvent,
77 cancel_rx: &mut broadcast::Receiver<()>,
78 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
79}
80
81pub struct RealA2aClient;
83
84#[async_trait]
85impl A2aClient for RealA2aClient {
86 async fn dispatch(
87 &self,
88 target: &str,
89 allow_cleartext: bool,
90 binding_id: &str,
91 binding_key: &str,
92 event: &TriggerEvent,
93 cancel_rx: &mut broadcast::Receiver<()>,
94 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
95 dispatch_trigger_event(
96 target,
97 allow_cleartext,
98 binding_id,
99 binding_key,
100 event,
101 cancel_rx,
102 )
103 .await
104 }
105}
106
107#[derive(Debug)]
108enum AgentCardFetchError {
109 Cancelled(String),
110 Discovery(String),
111 ConnectRefused(String),
112 Denied(String),
113 Timeout(String),
114}
115
116pub async fn dispatch_trigger_event(
117 raw_target: &str,
118 allow_cleartext: bool,
119 binding_id: &str,
120 binding_key: &str,
121 event: &TriggerEvent,
122 cancel_rx: &mut broadcast::Receiver<()>,
123) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
124 let started = std::time::Instant::now();
125 let target = match parse_target(raw_target) {
126 Ok(target) => target,
127 Err(error) => {
128 record_a2a_metric(raw_target, "failed", started.elapsed());
129 return Err(error);
130 }
131 };
132 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
133 Ok(endpoint) => endpoint,
134 Err(error) => {
135 record_a2a_metric(raw_target, "failed", started.elapsed());
136 return Err(error);
137 }
138 };
139 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
140 let envelope = serde_json::json!({
141 "kind": "harn.trigger.dispatch",
142 "message_id": message_id,
143 "trace_id": event.trace_id.0,
144 "event_id": event.id.0,
145 "trigger_id": binding_id,
146 "binding_key": binding_key,
147 "target_agent": endpoint.target_agent,
148 "event": event,
149 });
150 let text = serde_json::to_string(&envelope)
151 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
152 let push_config = push_notification_config();
153 let mut params = serde_json::json!({
154 "contextId": event.trace_id.0,
155 "message": {
156 "messageId": message_id,
157 "role": "user",
158 "parts": [{
159 "type": "text",
160 "text": text,
161 }],
162 "metadata": {
163 "kind": "harn.trigger.dispatch",
164 "trace_id": event.trace_id.0,
165 "event_id": event.id.0,
166 "trigger_id": binding_id,
167 "binding_key": binding_key,
168 "target_agent": endpoint.target_agent,
169 },
170 },
171 });
172 if let Some(config) = push_config.clone() {
173 params["configuration"] = serde_json::json!({
174 "blocking": false,
175 "returnImmediately": true,
176 "pushNotificationConfig": config,
177 });
178 }
179 let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
180
181 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
182 Ok(body) => body,
183 Err(error) => {
184 record_a2a_metric(raw_target, "failed", started.elapsed());
185 return Err(error);
186 }
187 };
188 let result = match body.get("result").cloned().ok_or_else(|| {
189 if let Some(error) = body.get("error") {
190 let message = error
191 .get("message")
192 .and_then(Value::as_str)
193 .unwrap_or("unknown A2A error");
194 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
195 } else {
196 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
197 }
198 }) {
199 Ok(result) => result,
200 Err(error) => {
201 record_a2a_metric(raw_target, "failed", started.elapsed());
202 return Err(error);
203 }
204 };
205
206 let task_id = match result
207 .get("id")
208 .and_then(Value::as_str)
209 .filter(|value| !value.is_empty())
210 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
211 {
212 Ok(task_id) => task_id.to_string(),
213 Err(error) => {
214 record_a2a_metric(raw_target, "failed", started.elapsed());
215 return Err(error);
216 }
217 };
218 let state = match task_state(&result) {
219 Ok(state) => state.to_string(),
220 Err(error) => {
221 record_a2a_metric(raw_target, "failed", started.elapsed());
222 return Err(error);
223 }
224 };
225
226 if state == "completed" {
227 let inline = extract_inline_result(&result);
228 record_a2a_metric(raw_target, "succeeded", started.elapsed());
229 return Ok((
230 endpoint,
231 DispatchAck::InlineResult {
232 task_id,
233 result: inline,
234 },
235 ));
236 }
237
238 if state == "rejected" {
239 record_a2a_metric(raw_target, "failed", started.elapsed());
240 return Err(A2aClientError::Denied(format!(
241 "A2A task rejected by remote agent: {}",
242 task_status_message(&result).unwrap_or("permission rejected")
243 )));
244 }
245
246 if let Some(config) = push_config {
247 register_push_notification_config(
248 &endpoint.rpc_url,
249 &task_id,
250 config,
251 &event.trace_id.0,
252 cancel_rx,
253 )
254 .await
255 .inspect_err(|_| {
256 record_a2a_metric(raw_target, "failed", started.elapsed());
257 })?;
258 }
259 record_a2a_metric(raw_target, "succeeded", started.elapsed());
260 Ok((
261 endpoint.clone(),
262 DispatchAck::PendingTask {
263 task_id: task_id.clone(),
264 state: state.clone(),
265 handle: serde_json::json!({
266 "kind": "a2a_task_handle",
267 "task_id": task_id,
268 "state": state,
269 "target_agent": endpoint.target_agent,
270 "rpc_url": endpoint.rpc_url,
271 "card_url": endpoint.card_url,
272 "agent_id": endpoint.agent_id,
273 }),
274 },
275 ))
276}
277
278fn push_notification_config() -> Option<Value> {
279 let url = std::env::var(A2A_PUSH_URL_ENV)
280 .ok()
281 .map(|value| value.trim().to_string())
282 .filter(|value| !value.is_empty())?;
283 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
284 .ok()
285 .map(|value| value.trim().to_string())
286 .filter(|value| !value.is_empty());
287 let mut config = serde_json::json!({ "url": url });
288 if let Some(token) = token {
289 config["token"] = Value::String(token.clone());
290 config["authentication"] = serde_json::json!({
291 "scheme": "Bearer",
292 "credentials": token,
293 });
294 }
295 Some(config)
296}
297
298async fn register_push_notification_config(
299 rpc_url: &str,
300 task_id: &str,
301 config: Value,
302 trace_id: &str,
303 cancel_rx: &mut broadcast::Receiver<()>,
304) -> Result<(), A2aClientError> {
305 let request = crate::jsonrpc::request(
306 format!("{trace_id}.{task_id}.push-config"),
307 "tasks/pushNotificationConfig/set",
308 serde_json::json!({
309 "taskId": task_id,
310 "pushNotificationConfig": config,
311 }),
312 );
313 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
314 if response.get("error").is_some() {
315 return Err(A2aClientError::Protocol(format!(
316 "A2A push notification registration failed: {}",
317 response["error"]
318 )));
319 }
320 Ok(())
321}
322
323fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
324 if let Some(metrics) = crate::active_metrics_registry() {
325 metrics.record_a2a_hop(target, outcome, duration);
326 }
327}
328
329pub fn target_agent_label(raw_target: &str) -> String {
330 parse_target(raw_target)
331 .map(|target| target.target_agent_label())
332 .unwrap_or_else(|_| raw_target.to_string())
333}
334
335#[derive(Clone, Debug)]
336struct ParsedTarget {
337 authority: String,
338 target_agent: String,
339}
340
341impl ParsedTarget {
342 fn target_agent_label(&self) -> String {
343 if self.target_agent.is_empty() {
344 self.authority.clone()
345 } else {
346 self.target_agent.clone()
347 }
348 }
349}
350
351fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
352 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
353 A2aClientError::InvalidTarget(format!(
354 "invalid a2a dispatch target '{raw_target}': {error}"
355 ))
356 })?;
357 let host = parsed.host_str().ok_or_else(|| {
358 A2aClientError::InvalidTarget(format!(
359 "invalid a2a dispatch target '{raw_target}': missing host"
360 ))
361 })?;
362 let authority = if let Some(port) = parsed.port() {
363 format!("{host}:{port}")
364 } else {
365 host.to_string()
366 };
367 Ok(ParsedTarget {
368 authority,
369 target_agent: parsed.path().trim_start_matches('/').to_string(),
370 })
371}
372
373async fn resolve_endpoint(
374 target: &ParsedTarget,
375 allow_cleartext: bool,
376 cancel_rx: &mut broadcast::Receiver<()>,
377) -> Result<ResolvedA2aEndpoint, A2aClientError> {
378 let mut last_error = None;
379 for scheme in card_resolution_schemes(allow_cleartext) {
380 let mut last_scheme_error = None;
381 for path in A2A_AGENT_CARD_PATHS {
382 let card_url = format!("{scheme}://{}/{path}", target.authority);
383 match fetch_agent_card(&card_url, cancel_rx).await {
384 Ok(card) => {
385 return endpoint_from_card(
386 card_url,
387 allow_cleartext,
388 &target.authority,
389 target.target_agent.clone(),
390 &card,
391 );
392 }
393 Err(AgentCardFetchError::Cancelled(message)) => {
394 return Err(A2aClientError::Cancelled(message));
395 }
396 Err(AgentCardFetchError::Timeout(message)) => {
397 return Err(A2aClientError::Timeout(message));
398 }
399 Err(AgentCardFetchError::Denied(message)) => {
400 return Err(A2aClientError::Denied(message));
401 }
402 Err(error) => {
403 last_error = Some(agent_card_fetch_error_message(&error));
404 last_scheme_error = Some(error);
405 }
406 }
407 }
408 if last_scheme_error.as_ref().is_some_and(|error| {
409 should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
410 }) {
411 continue;
412 }
413 break;
414 }
415 Err(A2aClientError::Discovery(format!(
416 "could not resolve A2A agent card for '{}': {}",
417 target.authority,
418 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
419 )))
420}
421
422async fn fetch_agent_card(
423 card_url: &str,
424 cancel_rx: &mut broadcast::Receiver<()>,
425) -> Result<Value, AgentCardFetchError> {
426 let response = tokio::select! {
427 response = crate::llm::shared_utility_client().get(card_url).send() => {
428 match response {
429 Ok(response) => Ok(response),
430 Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
431 format!("A2A HTTP request timed out: {error}")
432 )),
433 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
434 format!("A2A HTTP request failed: {error}")
435 )),
436 Err(error) => Err(AgentCardFetchError::Discovery(
437 format!("A2A HTTP request failed: {error}")
438 )),
439 }
440 }
441 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
442 "A2A agent-card fetch cancelled".to_string()
443 )),
444 }?;
445 if matches!(
446 response.status(),
447 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
448 ) {
449 return Err(AgentCardFetchError::Denied(format!(
450 "GET {card_url} returned HTTP {}",
451 response.status()
452 )));
453 }
454 if !response.status().is_success() {
455 return Err(AgentCardFetchError::Discovery(format!(
456 "GET {card_url} returned HTTP {}",
457 response.status()
458 )));
459 }
460 response
461 .json::<Value>()
462 .await
463 .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
464}
465
466fn endpoint_from_card(
467 card_url: String,
468 allow_cleartext: bool,
469 requested_authority: &str,
470 target_agent: String,
471 card: &Value,
472) -> Result<ResolvedA2aEndpoint, A2aClientError> {
473 let rpc_url = if has_current_transport_fields(card) {
474 endpoint_from_current_card(card, allow_cleartext, requested_authority)?
475 } else if let Some(rpc_url) =
476 endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
477 {
478 rpc_url
479 } else {
480 return Err(A2aClientError::Discovery(
481 "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
482 ));
483 };
484
485 Ok(ResolvedA2aEndpoint {
486 card_url,
487 rpc_url: rpc_url.to_string(),
488 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
489 target_agent,
490 })
491}
492
493fn has_current_transport_fields(card: &Value) -> bool {
494 card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
495}
496
497fn endpoint_from_current_card(
498 card: &Value,
499 allow_cleartext: bool,
500 requested_authority: &str,
501) -> Result<Url, A2aClientError> {
502 let protocol_version = card
503 .get("protocolVersion")
504 .and_then(Value::as_str)
505 .ok_or_else(|| {
506 A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
507 })?;
508 if protocol_version != A2A_PROTOCOL_VERSION {
509 return Err(A2aClientError::Discovery(format!(
510 "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
511 )));
512 }
513
514 let base_url = card
515 .get("url")
516 .and_then(Value::as_str)
517 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
518 let base_url = resolve_declared_url(
519 base_url,
520 allow_cleartext,
521 requested_authority,
522 "agent card url",
523 )?;
524
525 let preferred_transport = card
526 .get("preferredTransport")
527 .and_then(Value::as_str)
528 .ok_or_else(|| {
529 A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
530 })?;
531 if transport_is_jsonrpc(preferred_transport) {
532 return Ok(base_url);
533 }
534
535 if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
536 return resolve_declared_url(
537 interface_url,
538 allow_cleartext,
539 requested_authority,
540 "JSONRPC additionalInterface url",
541 );
542 }
543
544 Err(A2aClientError::Discovery(
545 "A2A agent card does not expose JSONRPC transport".to_string(),
546 ))
547}
548
549fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
550 let Some(interfaces) = card.get("additionalInterfaces") else {
551 return Ok(None);
552 };
553 let interfaces = interfaces.as_array().ok_or_else(|| {
554 A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
555 })?;
556 for interface in interfaces {
557 if interface
558 .get("transport")
559 .and_then(Value::as_str)
560 .is_some_and(transport_is_jsonrpc)
561 {
562 let interface_url = interface
563 .get("url")
564 .and_then(Value::as_str)
565 .ok_or_else(|| {
566 A2aClientError::Discovery(
567 "A2A JSONRPC additionalInterface missing url".to_string(),
568 )
569 })?;
570 return Ok(Some(interface_url));
571 }
572 }
573 Ok(None)
574}
575
576fn endpoint_from_legacy_supported_interfaces(
577 card: &Value,
578 allow_cleartext: bool,
579 requested_authority: &str,
580) -> Result<Option<Url>, A2aClientError> {
581 let Some(interfaces) = card.get("supportedInterfaces") else {
582 return Ok(None);
583 };
584 let interfaces = interfaces.as_array().ok_or_else(|| {
585 A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
586 })?;
587 let mut saw_jsonrpc = false;
588 for interface in interfaces {
589 if !interface
590 .get("protocolBinding")
591 .and_then(Value::as_str)
592 .is_some_and(transport_is_jsonrpc)
593 {
594 continue;
595 }
596 saw_jsonrpc = true;
597 if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
598 continue;
599 }
600 let interface_url = interface
601 .get("url")
602 .and_then(Value::as_str)
603 .ok_or_else(|| {
604 A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
605 })?;
606 return resolve_declared_url(
607 interface_url,
608 allow_cleartext,
609 requested_authority,
610 "JSONRPC supportedInterface url",
611 )
612 .map(Some);
613 }
614 if saw_jsonrpc {
615 return Err(A2aClientError::Discovery(format!(
616 "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
617 )));
618 }
619 Err(A2aClientError::Discovery(
620 "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
621 ))
622}
623
624fn transport_is_jsonrpc(transport: &str) -> bool {
625 transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
626}
627
628fn resolve_declared_url(
629 raw_url: &str,
630 allow_cleartext: bool,
631 requested_authority: &str,
632 label: &str,
633) -> Result<Url, A2aClientError> {
634 let url = Url::parse(raw_url).map_err(|error| {
635 A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
636 })?;
637 ensure_cleartext_allowed(&url, allow_cleartext, label)?;
638 let declared_authority = url_authority(&url)?;
639 if !authorities_equivalent(&declared_authority, requested_authority) {
640 return Err(A2aClientError::Denied(format!(
641 "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
642 )));
643 }
644 Ok(url)
645}
646
647fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
648 if allow_cleartext {
649 &["https", "http"]
650 } else {
651 &["https"]
652 }
653}
654
655fn should_try_cleartext_fallback(
668 scheme: &str,
669 allow_cleartext: bool,
670 error: &AgentCardFetchError,
671 authority: &str,
672) -> bool {
673 if !allow_cleartext || scheme != "https" {
674 return false;
675 }
676 match error {
677 AgentCardFetchError::Cancelled(_)
678 | AgentCardFetchError::Denied(_)
679 | AgentCardFetchError::Timeout(_) => false,
680 AgentCardFetchError::ConnectRefused(_) => true,
681 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
682 }
683}
684
685fn ensure_cleartext_allowed(
686 url: &Url,
687 allow_cleartext: bool,
688 label: &str,
689) -> Result<(), A2aClientError> {
690 if allow_cleartext || url.scheme() != "http" {
691 return Ok(());
692 }
693 Err(A2aClientError::Denied(format!(
694 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
695 )))
696}
697
698fn is_loopback_authority(authority: &str) -> bool {
699 let (host, _) = split_authority(authority);
700 if host.eq_ignore_ascii_case("localhost") {
701 return true;
702 }
703 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
704 return ip.is_loopback();
705 }
706 false
707}
708
709fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
720 if card_authority == requested_authority {
721 return true;
722 }
723 let (_, card_port) = split_authority(card_authority);
724 let (_, requested_port) = split_authority(requested_authority);
725 if card_port != requested_port {
726 return false;
727 }
728 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
729}
730
731fn split_authority(authority: &str) -> (&str, &str) {
734 let (host_raw, port) = if authority.starts_with('[') {
735 if let Some(end) = authority.rfind(']') {
737 let host = &authority[..=end];
738 let rest = &authority[end + 1..];
739 let port = rest.strip_prefix(':').unwrap_or("");
740 (host, port)
741 } else {
742 (authority, "")
743 }
744 } else {
745 match authority.rsplit_once(':') {
746 Some((host, port)) => (host, port),
747 None => (authority, ""),
748 }
749 };
750 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
751 (host, port)
752}
753
754fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
755 match error {
756 AgentCardFetchError::Cancelled(message)
757 | AgentCardFetchError::Discovery(message)
758 | AgentCardFetchError::ConnectRefused(message)
759 | AgentCardFetchError::Denied(message)
760 | AgentCardFetchError::Timeout(message) => message.clone(),
761 }
762}
763
764fn is_connect_refused(error: &reqwest::Error) -> bool {
765 if !error.is_connect() {
766 return false;
767 }
768 let mut source = error.source();
769 while let Some(cause) = source {
770 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
771 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
772 return true;
773 }
774 }
775 source = cause.source();
776 }
777 false
778}
779
780fn url_authority(url: &Url) -> Result<String, A2aClientError> {
781 let host = url
782 .host_str()
783 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
784 Ok(if let Some(port) = url.port() {
785 format!("{host}:{port}")
786 } else {
787 host.to_string()
788 })
789}
790
791async fn send_jsonrpc(
792 rpc_url: &str,
793 request: &Value,
794 trace_id: &str,
795 cancel_rx: &mut broadcast::Receiver<()>,
796) -> Result<Value, A2aClientError> {
797 let response = send_http(
798 crate::llm::shared_blocking_client()
799 .post(rpc_url)
800 .header(reqwest::header::CONTENT_TYPE, "application/json")
801 .header("A2A-Version", A2A_PROTOCOL_VERSION)
802 .header("A2A-Trace-Id", trace_id)
803 .json(request),
804 cancel_rx,
805 "A2A task dispatch cancelled",
806 )
807 .await?;
808 if matches!(
809 response.status(),
810 reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
811 ) {
812 return Err(A2aClientError::Denied(format!(
813 "A2A task dispatch returned HTTP {}",
814 response.status()
815 )));
816 }
817 if !response.status().is_success() {
818 return Err(A2aClientError::Protocol(format!(
819 "A2A task dispatch returned HTTP {}",
820 response.status()
821 )));
822 }
823 response
824 .json::<Value>()
825 .await
826 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
827}
828
829async fn send_http(
830 request: reqwest::RequestBuilder,
831 cancel_rx: &mut broadcast::Receiver<()>,
832 cancelled_message: &'static str,
833) -> Result<reqwest::Response, A2aClientError> {
834 tokio::select! {
835 response = request.send() => response.map_err(|error| {
836 if error.is_timeout() {
837 A2aClientError::Timeout(format!("A2A HTTP request timed out: {error}"))
838 } else {
839 A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))
840 }
841 }),
842 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
843 }
844}
845
846fn task_state(task: &Value) -> Result<&str, A2aClientError> {
847 task.pointer("/status/state")
848 .and_then(Value::as_str)
849 .filter(|value| !value.is_empty())
850 .ok_or_else(|| {
851 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
852 })
853}
854
855fn task_status_message(task: &Value) -> Option<&str> {
856 task.pointer("/status/message/parts")
857 .and_then(Value::as_array)
858 .and_then(|parts| {
859 parts.iter().find_map(|part| {
860 if part.get("type").and_then(Value::as_str) == Some("text") {
861 part.get("text").and_then(Value::as_str).map(str::trim)
862 } else {
863 None
864 }
865 })
866 })
867 .filter(|message| !message.is_empty())
868}
869
870fn extract_inline_result(task: &Value) -> Value {
871 let text = task
872 .get("history")
873 .and_then(Value::as_array)
874 .and_then(|history| {
875 history.iter().rev().find_map(|message| {
876 let role = message.get("role").and_then(Value::as_str)?;
877 if role != "agent" {
878 return None;
879 }
880 message
881 .get("parts")
882 .and_then(Value::as_array)
883 .and_then(|parts| {
884 parts.iter().find_map(|part| {
885 if part.get("type").and_then(Value::as_str) == Some("text") {
886 part.get("text").and_then(Value::as_str).map(str::trim_end)
887 } else {
888 None
889 }
890 })
891 })
892 })
893 });
894 match text {
895 Some(text) if !text.is_empty() => {
896 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
897 }
898 _ => task.clone(),
899 }
900}
901
902async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
903 let _ = cancel_rx.recv().await;
904}
905
906#[cfg(test)]
907mod tests {
908 use super::*;
909
910 #[test]
911 fn target_agent_label_prefers_path() {
912 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
913 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
914 }
915
916 #[test]
917 fn extract_inline_result_parses_json_text() {
918 let task = serde_json::json!({
919 "history": [
920 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
921 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
922 ]
923 });
924 assert_eq!(
925 extract_inline_result(&task),
926 serde_json::json!({"trace_id": "trace_123"})
927 );
928 }
929
930 #[test]
931 fn discovery_prefers_https_before_http() {
932 assert_eq!(card_resolution_schemes(false), ["https"]);
933 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
934 }
935
936 #[test]
937 fn endpoint_from_card_accepts_current_preferred_transport() {
938 let endpoint = endpoint_from_card(
939 "https://trusted.example/.well-known/agent-card.json".to_string(),
940 false,
941 "trusted.example",
942 "triage".to_string(),
943 &serde_json::json!({
944 "name": "trusted",
945 "protocolVersion": "0.3.0",
946 "url": "https://trusted.example/rpc",
947 "preferredTransport": "JSONRPC",
948 "additionalInterfaces": [{
949 "url": "https://trusted.example/rpc",
950 "transport": "JSONRPC"
951 }]
952 }),
953 )
954 .expect("current A2A card should resolve");
955 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
956 assert_eq!(
957 endpoint.card_url,
958 "https://trusted.example/.well-known/agent-card.json"
959 );
960 assert_eq!(endpoint.target_agent, "triage");
961 }
962
963 #[test]
964 fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
965 let endpoint = endpoint_from_card(
966 "https://trusted.example/.well-known/agent-card.json".to_string(),
967 false,
968 "trusted.example",
969 "triage".to_string(),
970 &serde_json::json!({
971 "name": "trusted",
972 "protocolVersion": "0.3.0",
973 "url": "https://trusted.example/rest",
974 "preferredTransport": "HTTP+JSON",
975 "additionalInterfaces": [{
976 "url": "https://trusted.example/rpc",
977 "transport": "JSONRPC"
978 }]
979 }),
980 )
981 .expect("current A2A card should resolve through additionalInterfaces");
982 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
983 }
984
985 #[test]
986 fn endpoint_from_card_accepts_legacy_supported_interfaces() {
987 let endpoint = endpoint_from_card(
988 "https://trusted.example/.well-known/agent-card.json".to_string(),
989 false,
990 "trusted.example",
991 "triage".to_string(),
992 &serde_json::json!({
993 "name": "trusted",
994 "supportedInterfaces": [{
995 "protocolBinding": "JSONRPC",
996 "protocolVersion": "0.3.0",
997 "url": "https://trusted.example/rpc"
998 }],
999 }),
1000 )
1001 .expect("legacy A2A card should resolve during the transition");
1002 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1003 }
1004
1005 #[test]
1006 fn endpoint_from_card_rejects_removed_interfaces_shape() {
1007 let error = endpoint_from_card(
1008 "https://trusted.example/.well-known/agent-card.json".to_string(),
1009 false,
1010 "trusted.example",
1011 "triage".to_string(),
1012 &serde_json::json!({
1013 "url": "https://trusted.example",
1014 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1015 }),
1016 )
1017 .expect_err("pre-0.3 Harn discovery shape should be rejected");
1018 assert_eq!(
1019 error.to_string(),
1020 "A2A agent card missing preferredTransport/additionalInterfaces"
1021 );
1022 }
1023
1024 #[test]
1025 fn cleartext_fallback_only_after_https_connect_refused() {
1026 assert!(should_try_cleartext_fallback(
1027 "https",
1028 true,
1029 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1030 "reviewer.example:443",
1031 ));
1032 assert!(!should_try_cleartext_fallback(
1033 "http",
1034 true,
1035 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1036 "reviewer.example:443",
1037 ));
1038 assert!(!should_try_cleartext_fallback(
1039 "https",
1040 true,
1041 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1042 "reviewer.example:443",
1043 ));
1044 }
1045
1046 #[test]
1047 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1048 for authority in [
1049 "127.0.0.1:8080",
1050 "localhost:8080",
1051 "[::1]:8080",
1052 "127.1.2.3:9000",
1053 ] {
1054 assert!(
1055 !should_try_cleartext_fallback(
1056 "https",
1057 false,
1058 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1059 authority,
1060 ),
1061 "cleartext fallback must stay disabled without opt-in for '{authority}'"
1062 );
1063 }
1064 }
1065
1066 #[test]
1067 fn cleartext_fallback_allows_loopback_after_opt_in() {
1068 for authority in [
1071 "127.0.0.1:8080",
1072 "localhost:8080",
1073 "[::1]:8080",
1074 "127.1.2.3:9000",
1075 ] {
1076 assert!(
1077 should_try_cleartext_fallback(
1078 "https",
1079 true,
1080 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1081 authority,
1082 ),
1083 "expected cleartext fallback for loopback authority '{authority}'"
1084 );
1085 }
1086 }
1087
1088 #[test]
1089 fn cleartext_fallback_denies_external_tls_failures() {
1090 for authority in [
1093 "reviewer.example:443",
1094 "8.8.8.8:443",
1095 "192.168.1.10:8080",
1096 "10.0.0.5:8443",
1097 ] {
1098 assert!(
1099 !should_try_cleartext_fallback(
1100 "https",
1101 true,
1102 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1103 authority,
1104 ),
1105 "cleartext fallback must be denied for external authority '{authority}'"
1106 );
1107 }
1108 }
1109
1110 #[test]
1111 fn is_loopback_authority_recognises_loopback_forms() {
1112 assert!(is_loopback_authority("127.0.0.1:8080"));
1113 assert!(is_loopback_authority("localhost:8080"));
1114 assert!(is_loopback_authority("LOCALHOST:9000"));
1115 assert!(is_loopback_authority("[::1]:8080"));
1116 assert!(is_loopback_authority("127.5.5.5:1234"));
1117 assert!(!is_loopback_authority("8.8.8.8:443"));
1118 assert!(!is_loopback_authority("192.168.1.10:8080"));
1119 assert!(!is_loopback_authority("example.com:443"));
1120 assert!(!is_loopback_authority("reviewer.prod"));
1121 }
1122
1123 #[test]
1124 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1125 let error = endpoint_from_card(
1126 "https://trusted.example/.well-known/agent-card.json".to_string(),
1127 false,
1128 "trusted.example",
1129 "triage".to_string(),
1130 &serde_json::json!({
1131 "protocolVersion": "0.3.0",
1132 "url": "https://evil.example",
1133 "preferredTransport": "JSONRPC",
1134 }),
1135 )
1136 .unwrap_err();
1137 assert_eq!(
1138 error.to_string(),
1139 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1140 );
1141 }
1142
1143 #[test]
1144 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1145 let error = endpoint_from_card(
1146 "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1147 false,
1148 "127.0.0.1:8080",
1149 "triage".to_string(),
1150 &serde_json::json!({
1151 "protocolVersion": "0.3.0",
1152 "url": "http://localhost:8080",
1153 "preferredTransport": "JSONRPC",
1154 }),
1155 )
1156 .expect_err("cleartext card should require explicit opt-in");
1157 assert!(error
1158 .to_string()
1159 .contains("requires `allow_cleartext = true`"));
1160 }
1161
1162 #[test]
1163 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1164 let card = serde_json::json!({
1168 "protocolVersion": "0.3.0",
1169 "url": "http://localhost:8080",
1170 "preferredTransport": "JSONRPC",
1171 });
1172 let endpoint = endpoint_from_card(
1173 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1174 true,
1175 "127.0.0.1:8080",
1176 "triage".to_string(),
1177 &card,
1178 )
1179 .expect("loopback alias pair should be accepted");
1180 assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1181
1182 let card_v6 = serde_json::json!({
1184 "protocolVersion": "0.3.0",
1185 "url": "http://[::1]:8080",
1186 "preferredTransport": "JSONRPC",
1187 });
1188 let endpoint_v6 = endpoint_from_card(
1189 "http://localhost:8080/.well-known/agent-card.json".to_string(),
1190 true,
1191 "localhost:8080",
1192 "triage".to_string(),
1193 &card_v6,
1194 )
1195 .expect("IPv6 loopback alias should be accepted");
1196 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1197
1198 let card_wrong_port = serde_json::json!({
1200 "protocolVersion": "0.3.0",
1201 "url": "http://localhost:9000",
1202 "preferredTransport": "JSONRPC",
1203 });
1204 let error = endpoint_from_card(
1205 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1206 true,
1207 "127.0.0.1:8080",
1208 "triage".to_string(),
1209 &card_wrong_port,
1210 )
1211 .expect_err("mismatched ports must still be rejected even on loopback");
1212 assert!(error
1213 .to_string()
1214 .contains("A2A agent card url authority mismatch"));
1215 }
1216
1217 #[test]
1218 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1219 assert!(!authorities_equivalent(
1220 "internal.corp.example:443",
1221 "trusted.example:443",
1222 ));
1223 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1224 assert!(authorities_equivalent(
1225 "trusted.example:443",
1226 "trusted.example:443",
1227 ));
1228 }
1229}