1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11 ".well-known/agent-card.json",
12 ".well-known/a2a-agent",
13 ".well-known/agent.json",
14 "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23 pub card_url: String,
24 pub rpc_url: String,
25 pub agent_id: Option<String>,
26 pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq)]
30pub enum DispatchAck {
31 InlineResult {
32 task_id: String,
33 result: Value,
34 },
35 PendingTask {
36 task_id: String,
37 state: String,
38 handle: Value,
39 },
40}
41
42#[derive(Debug)]
43pub enum A2aClientError {
44 InvalidTarget(String),
45 Discovery(String),
46 Protocol(String),
47 Cancelled(String),
48}
49
50impl std::fmt::Display for A2aClientError {
51 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52 match self {
53 Self::InvalidTarget(message)
54 | Self::Discovery(message)
55 | Self::Protocol(message)
56 | Self::Cancelled(message) => f.write_str(message),
57 }
58 }
59}
60
61impl std::error::Error for A2aClientError {}
62
63#[async_trait]
65pub trait A2aClient: Send + Sync + 'static {
66 async fn dispatch(
67 &self,
68 target: &str,
69 allow_cleartext: bool,
70 binding_id: &str,
71 binding_key: &str,
72 event: &TriggerEvent,
73 cancel_rx: &mut broadcast::Receiver<()>,
74 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
75}
76
77pub struct RealA2aClient;
79
80#[async_trait]
81impl A2aClient for RealA2aClient {
82 async fn dispatch(
83 &self,
84 target: &str,
85 allow_cleartext: bool,
86 binding_id: &str,
87 binding_key: &str,
88 event: &TriggerEvent,
89 cancel_rx: &mut broadcast::Receiver<()>,
90 ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
91 dispatch_trigger_event(
92 target,
93 allow_cleartext,
94 binding_id,
95 binding_key,
96 event,
97 cancel_rx,
98 )
99 .await
100 }
101}
102
103#[derive(Debug)]
104enum AgentCardFetchError {
105 Cancelled(String),
106 Discovery(String),
107 ConnectRefused(String),
108}
109
110pub async fn dispatch_trigger_event(
111 raw_target: &str,
112 allow_cleartext: bool,
113 binding_id: &str,
114 binding_key: &str,
115 event: &TriggerEvent,
116 cancel_rx: &mut broadcast::Receiver<()>,
117) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
118 let started = std::time::Instant::now();
119 let target = match parse_target(raw_target) {
120 Ok(target) => target,
121 Err(error) => {
122 record_a2a_metric(raw_target, "failed", started.elapsed());
123 return Err(error);
124 }
125 };
126 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
127 Ok(endpoint) => endpoint,
128 Err(error) => {
129 record_a2a_metric(raw_target, "failed", started.elapsed());
130 return Err(error);
131 }
132 };
133 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
134 let envelope = serde_json::json!({
135 "kind": "harn.trigger.dispatch",
136 "message_id": message_id,
137 "trace_id": event.trace_id.0,
138 "event_id": event.id.0,
139 "trigger_id": binding_id,
140 "binding_key": binding_key,
141 "target_agent": endpoint.target_agent,
142 "event": event,
143 });
144 let text = serde_json::to_string(&envelope)
145 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
146 let push_config = push_notification_config();
147 let mut params = serde_json::json!({
148 "contextId": event.trace_id.0,
149 "message": {
150 "messageId": message_id,
151 "role": "user",
152 "parts": [{
153 "type": "text",
154 "text": text,
155 }],
156 "metadata": {
157 "kind": "harn.trigger.dispatch",
158 "trace_id": event.trace_id.0,
159 "event_id": event.id.0,
160 "trigger_id": binding_id,
161 "binding_key": binding_key,
162 "target_agent": endpoint.target_agent,
163 },
164 },
165 });
166 if let Some(config) = push_config.clone() {
167 params["configuration"] = serde_json::json!({
168 "blocking": false,
169 "returnImmediately": true,
170 "pushNotificationConfig": config,
171 });
172 }
173 let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
174
175 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
176 Ok(body) => body,
177 Err(error) => {
178 record_a2a_metric(raw_target, "failed", started.elapsed());
179 return Err(error);
180 }
181 };
182 let result = match body.get("result").cloned().ok_or_else(|| {
183 if let Some(error) = body.get("error") {
184 let message = error
185 .get("message")
186 .and_then(Value::as_str)
187 .unwrap_or("unknown A2A error");
188 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
189 } else {
190 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
191 }
192 }) {
193 Ok(result) => result,
194 Err(error) => {
195 record_a2a_metric(raw_target, "failed", started.elapsed());
196 return Err(error);
197 }
198 };
199
200 let task_id = match result
201 .get("id")
202 .and_then(Value::as_str)
203 .filter(|value| !value.is_empty())
204 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
205 {
206 Ok(task_id) => task_id.to_string(),
207 Err(error) => {
208 record_a2a_metric(raw_target, "failed", started.elapsed());
209 return Err(error);
210 }
211 };
212 let state = match task_state(&result) {
213 Ok(state) => state.to_string(),
214 Err(error) => {
215 record_a2a_metric(raw_target, "failed", started.elapsed());
216 return Err(error);
217 }
218 };
219
220 if state == "completed" {
221 let inline = extract_inline_result(&result);
222 record_a2a_metric(raw_target, "succeeded", started.elapsed());
223 return Ok((
224 endpoint,
225 DispatchAck::InlineResult {
226 task_id,
227 result: inline,
228 },
229 ));
230 }
231
232 if let Some(config) = push_config {
233 register_push_notification_config(
234 &endpoint.rpc_url,
235 &task_id,
236 config,
237 &event.trace_id.0,
238 cancel_rx,
239 )
240 .await
241 .inspect_err(|_| {
242 record_a2a_metric(raw_target, "failed", started.elapsed());
243 })?;
244 }
245 record_a2a_metric(raw_target, "succeeded", started.elapsed());
246 Ok((
247 endpoint.clone(),
248 DispatchAck::PendingTask {
249 task_id: task_id.clone(),
250 state: state.clone(),
251 handle: serde_json::json!({
252 "kind": "a2a_task_handle",
253 "task_id": task_id,
254 "state": state,
255 "target_agent": endpoint.target_agent,
256 "rpc_url": endpoint.rpc_url,
257 "card_url": endpoint.card_url,
258 "agent_id": endpoint.agent_id,
259 }),
260 },
261 ))
262}
263
264fn push_notification_config() -> Option<Value> {
265 let url = std::env::var(A2A_PUSH_URL_ENV)
266 .ok()
267 .map(|value| value.trim().to_string())
268 .filter(|value| !value.is_empty())?;
269 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
270 .ok()
271 .map(|value| value.trim().to_string())
272 .filter(|value| !value.is_empty());
273 let mut config = serde_json::json!({ "url": url });
274 if let Some(token) = token {
275 config["token"] = Value::String(token.clone());
276 config["authentication"] = serde_json::json!({
277 "scheme": "Bearer",
278 "credentials": token,
279 });
280 }
281 Some(config)
282}
283
284async fn register_push_notification_config(
285 rpc_url: &str,
286 task_id: &str,
287 config: Value,
288 trace_id: &str,
289 cancel_rx: &mut broadcast::Receiver<()>,
290) -> Result<(), A2aClientError> {
291 let request = crate::jsonrpc::request(
292 format!("{trace_id}.{task_id}.push-config"),
293 "tasks/pushNotificationConfig/set",
294 serde_json::json!({
295 "taskId": task_id,
296 "pushNotificationConfig": config,
297 }),
298 );
299 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
300 if response.get("error").is_some() {
301 return Err(A2aClientError::Protocol(format!(
302 "A2A push notification registration failed: {}",
303 response["error"]
304 )));
305 }
306 Ok(())
307}
308
309fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
310 if let Some(metrics) = crate::active_metrics_registry() {
311 metrics.record_a2a_hop(target, outcome, duration);
312 }
313}
314
315pub fn target_agent_label(raw_target: &str) -> String {
316 parse_target(raw_target)
317 .map(|target| target.target_agent_label())
318 .unwrap_or_else(|_| raw_target.to_string())
319}
320
321#[derive(Clone, Debug)]
322struct ParsedTarget {
323 authority: String,
324 target_agent: String,
325}
326
327impl ParsedTarget {
328 fn target_agent_label(&self) -> String {
329 if self.target_agent.is_empty() {
330 self.authority.clone()
331 } else {
332 self.target_agent.clone()
333 }
334 }
335}
336
337fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
338 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
339 A2aClientError::InvalidTarget(format!(
340 "invalid a2a dispatch target '{raw_target}': {error}"
341 ))
342 })?;
343 let host = parsed.host_str().ok_or_else(|| {
344 A2aClientError::InvalidTarget(format!(
345 "invalid a2a dispatch target '{raw_target}': missing host"
346 ))
347 })?;
348 let authority = if let Some(port) = parsed.port() {
349 format!("{host}:{port}")
350 } else {
351 host.to_string()
352 };
353 Ok(ParsedTarget {
354 authority,
355 target_agent: parsed.path().trim_start_matches('/').to_string(),
356 })
357}
358
359async fn resolve_endpoint(
360 target: &ParsedTarget,
361 allow_cleartext: bool,
362 cancel_rx: &mut broadcast::Receiver<()>,
363) -> Result<ResolvedA2aEndpoint, A2aClientError> {
364 let mut last_error = None;
365 for scheme in card_resolution_schemes(allow_cleartext) {
366 let mut last_scheme_error = None;
367 for path in A2A_AGENT_CARD_PATHS {
368 let card_url = format!("{scheme}://{}/{path}", target.authority);
369 match fetch_agent_card(&card_url, cancel_rx).await {
370 Ok(card) => {
371 return endpoint_from_card(
372 card_url,
373 allow_cleartext,
374 &target.authority,
375 target.target_agent.clone(),
376 &card,
377 );
378 }
379 Err(AgentCardFetchError::Cancelled(message)) => {
380 return Err(A2aClientError::Cancelled(message));
381 }
382 Err(error) => {
383 last_error = Some(agent_card_fetch_error_message(&error));
384 last_scheme_error = Some(error);
385 }
386 }
387 }
388 if last_scheme_error.as_ref().is_some_and(|error| {
389 should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
390 }) {
391 continue;
392 }
393 break;
394 }
395 Err(A2aClientError::Discovery(format!(
396 "could not resolve A2A agent card for '{}': {}",
397 target.authority,
398 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
399 )))
400}
401
402async fn fetch_agent_card(
403 card_url: &str,
404 cancel_rx: &mut broadcast::Receiver<()>,
405) -> Result<Value, AgentCardFetchError> {
406 let response = tokio::select! {
407 response = crate::llm::shared_utility_client().get(card_url).send() => {
408 match response {
409 Ok(response) => Ok(response),
410 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
411 format!("A2A HTTP request failed: {error}")
412 )),
413 Err(error) => Err(AgentCardFetchError::Discovery(
414 format!("A2A HTTP request failed: {error}")
415 )),
416 }
417 }
418 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
419 "A2A agent-card fetch cancelled".to_string()
420 )),
421 }?;
422 if !response.status().is_success() {
423 return Err(AgentCardFetchError::Discovery(format!(
424 "GET {card_url} returned HTTP {}",
425 response.status()
426 )));
427 }
428 response
429 .json::<Value>()
430 .await
431 .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
432}
433
434fn endpoint_from_card(
435 card_url: String,
436 allow_cleartext: bool,
437 requested_authority: &str,
438 target_agent: String,
439 card: &Value,
440) -> Result<ResolvedA2aEndpoint, A2aClientError> {
441 let rpc_url = if has_current_transport_fields(card) {
442 endpoint_from_current_card(card, allow_cleartext, requested_authority)?
443 } else if let Some(rpc_url) =
444 endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
445 {
446 rpc_url
447 } else {
448 return Err(A2aClientError::Discovery(
449 "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
450 ));
451 };
452
453 Ok(ResolvedA2aEndpoint {
454 card_url,
455 rpc_url: rpc_url.to_string(),
456 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
457 target_agent,
458 })
459}
460
461fn has_current_transport_fields(card: &Value) -> bool {
462 card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
463}
464
465fn endpoint_from_current_card(
466 card: &Value,
467 allow_cleartext: bool,
468 requested_authority: &str,
469) -> Result<Url, A2aClientError> {
470 let protocol_version = card
471 .get("protocolVersion")
472 .and_then(Value::as_str)
473 .ok_or_else(|| {
474 A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
475 })?;
476 if protocol_version != A2A_PROTOCOL_VERSION {
477 return Err(A2aClientError::Discovery(format!(
478 "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
479 )));
480 }
481
482 let base_url = card
483 .get("url")
484 .and_then(Value::as_str)
485 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
486 let base_url = resolve_declared_url(
487 base_url,
488 allow_cleartext,
489 requested_authority,
490 "agent card url",
491 )?;
492
493 let preferred_transport = card
494 .get("preferredTransport")
495 .and_then(Value::as_str)
496 .ok_or_else(|| {
497 A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
498 })?;
499 if transport_is_jsonrpc(preferred_transport) {
500 return Ok(base_url);
501 }
502
503 if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
504 return resolve_declared_url(
505 interface_url,
506 allow_cleartext,
507 requested_authority,
508 "JSONRPC additionalInterface url",
509 );
510 }
511
512 Err(A2aClientError::Discovery(
513 "A2A agent card does not expose JSONRPC transport".to_string(),
514 ))
515}
516
517fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
518 let Some(interfaces) = card.get("additionalInterfaces") else {
519 return Ok(None);
520 };
521 let interfaces = interfaces.as_array().ok_or_else(|| {
522 A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
523 })?;
524 for interface in interfaces {
525 if interface
526 .get("transport")
527 .and_then(Value::as_str)
528 .is_some_and(transport_is_jsonrpc)
529 {
530 let interface_url = interface
531 .get("url")
532 .and_then(Value::as_str)
533 .ok_or_else(|| {
534 A2aClientError::Discovery(
535 "A2A JSONRPC additionalInterface missing url".to_string(),
536 )
537 })?;
538 return Ok(Some(interface_url));
539 }
540 }
541 Ok(None)
542}
543
544fn endpoint_from_legacy_supported_interfaces(
545 card: &Value,
546 allow_cleartext: bool,
547 requested_authority: &str,
548) -> Result<Option<Url>, A2aClientError> {
549 let Some(interfaces) = card.get("supportedInterfaces") else {
550 return Ok(None);
551 };
552 let interfaces = interfaces.as_array().ok_or_else(|| {
553 A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
554 })?;
555 let mut saw_jsonrpc = false;
556 for interface in interfaces {
557 if !interface
558 .get("protocolBinding")
559 .and_then(Value::as_str)
560 .is_some_and(transport_is_jsonrpc)
561 {
562 continue;
563 }
564 saw_jsonrpc = true;
565 if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
566 continue;
567 }
568 let interface_url = interface
569 .get("url")
570 .and_then(Value::as_str)
571 .ok_or_else(|| {
572 A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
573 })?;
574 return resolve_declared_url(
575 interface_url,
576 allow_cleartext,
577 requested_authority,
578 "JSONRPC supportedInterface url",
579 )
580 .map(Some);
581 }
582 if saw_jsonrpc {
583 return Err(A2aClientError::Discovery(format!(
584 "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
585 )));
586 }
587 Err(A2aClientError::Discovery(
588 "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
589 ))
590}
591
592fn transport_is_jsonrpc(transport: &str) -> bool {
593 transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
594}
595
596fn resolve_declared_url(
597 raw_url: &str,
598 allow_cleartext: bool,
599 requested_authority: &str,
600 label: &str,
601) -> Result<Url, A2aClientError> {
602 let url = Url::parse(raw_url).map_err(|error| {
603 A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
604 })?;
605 ensure_cleartext_allowed(&url, allow_cleartext, label)?;
606 let declared_authority = url_authority(&url)?;
607 if !authorities_equivalent(&declared_authority, requested_authority) {
608 return Err(A2aClientError::Discovery(format!(
609 "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
610 )));
611 }
612 Ok(url)
613}
614
615fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
616 if allow_cleartext {
617 &["https", "http"]
618 } else {
619 &["https"]
620 }
621}
622
623fn should_try_cleartext_fallback(
636 scheme: &str,
637 allow_cleartext: bool,
638 error: &AgentCardFetchError,
639 authority: &str,
640) -> bool {
641 if !allow_cleartext || scheme != "https" {
642 return false;
643 }
644 match error {
645 AgentCardFetchError::Cancelled(_) => false,
646 AgentCardFetchError::ConnectRefused(_) => true,
647 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
648 }
649}
650
651fn ensure_cleartext_allowed(
652 url: &Url,
653 allow_cleartext: bool,
654 label: &str,
655) -> Result<(), A2aClientError> {
656 if allow_cleartext || url.scheme() != "http" {
657 return Ok(());
658 }
659 Err(A2aClientError::Discovery(format!(
660 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
661 )))
662}
663
664fn is_loopback_authority(authority: &str) -> bool {
665 let (host, _) = split_authority(authority);
666 if host.eq_ignore_ascii_case("localhost") {
667 return true;
668 }
669 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
670 return ip.is_loopback();
671 }
672 false
673}
674
675fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
686 if card_authority == requested_authority {
687 return true;
688 }
689 let (_, card_port) = split_authority(card_authority);
690 let (_, requested_port) = split_authority(requested_authority);
691 if card_port != requested_port {
692 return false;
693 }
694 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
695}
696
697fn split_authority(authority: &str) -> (&str, &str) {
700 let (host_raw, port) = if authority.starts_with('[') {
701 if let Some(end) = authority.rfind(']') {
703 let host = &authority[..=end];
704 let rest = &authority[end + 1..];
705 let port = rest.strip_prefix(':').unwrap_or("");
706 (host, port)
707 } else {
708 (authority, "")
709 }
710 } else {
711 match authority.rsplit_once(':') {
712 Some((host, port)) => (host, port),
713 None => (authority, ""),
714 }
715 };
716 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
717 (host, port)
718}
719
720fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
721 match error {
722 AgentCardFetchError::Cancelled(message)
723 | AgentCardFetchError::Discovery(message)
724 | AgentCardFetchError::ConnectRefused(message) => message.clone(),
725 }
726}
727
728fn is_connect_refused(error: &reqwest::Error) -> bool {
729 if !error.is_connect() {
730 return false;
731 }
732 let mut source = error.source();
733 while let Some(cause) = source {
734 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
735 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
736 return true;
737 }
738 }
739 source = cause.source();
740 }
741 false
742}
743
744fn url_authority(url: &Url) -> Result<String, A2aClientError> {
745 let host = url
746 .host_str()
747 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
748 Ok(if let Some(port) = url.port() {
749 format!("{host}:{port}")
750 } else {
751 host.to_string()
752 })
753}
754
755async fn send_jsonrpc(
756 rpc_url: &str,
757 request: &Value,
758 trace_id: &str,
759 cancel_rx: &mut broadcast::Receiver<()>,
760) -> Result<Value, A2aClientError> {
761 let response = send_http(
762 crate::llm::shared_blocking_client()
763 .post(rpc_url)
764 .header(reqwest::header::CONTENT_TYPE, "application/json")
765 .header("A2A-Version", A2A_PROTOCOL_VERSION)
766 .header("A2A-Trace-Id", trace_id)
767 .json(request),
768 cancel_rx,
769 "A2A task dispatch cancelled",
770 )
771 .await?;
772 if !response.status().is_success() {
773 return Err(A2aClientError::Protocol(format!(
774 "A2A task dispatch returned HTTP {}",
775 response.status()
776 )));
777 }
778 response
779 .json::<Value>()
780 .await
781 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
782}
783
784async fn send_http(
785 request: reqwest::RequestBuilder,
786 cancel_rx: &mut broadcast::Receiver<()>,
787 cancelled_message: &'static str,
788) -> Result<reqwest::Response, A2aClientError> {
789 tokio::select! {
790 response = request.send() => response
791 .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
792 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
793 }
794}
795
796fn task_state(task: &Value) -> Result<&str, A2aClientError> {
797 task.pointer("/status/state")
798 .and_then(Value::as_str)
799 .filter(|value| !value.is_empty())
800 .ok_or_else(|| {
801 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
802 })
803}
804
805fn extract_inline_result(task: &Value) -> Value {
806 let text = task
807 .get("history")
808 .and_then(Value::as_array)
809 .and_then(|history| {
810 history.iter().rev().find_map(|message| {
811 let role = message.get("role").and_then(Value::as_str)?;
812 if role != "agent" {
813 return None;
814 }
815 message
816 .get("parts")
817 .and_then(Value::as_array)
818 .and_then(|parts| {
819 parts.iter().find_map(|part| {
820 if part.get("type").and_then(Value::as_str) == Some("text") {
821 part.get("text").and_then(Value::as_str).map(str::trim_end)
822 } else {
823 None
824 }
825 })
826 })
827 })
828 });
829 match text {
830 Some(text) if !text.is_empty() => {
831 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
832 }
833 _ => task.clone(),
834 }
835}
836
837async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
838 let _ = cancel_rx.recv().await;
839}
840
841#[cfg(test)]
842mod tests {
843 use super::*;
844
845 #[test]
846 fn target_agent_label_prefers_path() {
847 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
848 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
849 }
850
851 #[test]
852 fn extract_inline_result_parses_json_text() {
853 let task = serde_json::json!({
854 "history": [
855 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
856 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
857 ]
858 });
859 assert_eq!(
860 extract_inline_result(&task),
861 serde_json::json!({"trace_id": "trace_123"})
862 );
863 }
864
865 #[test]
866 fn discovery_prefers_https_before_http() {
867 assert_eq!(card_resolution_schemes(false), ["https"]);
868 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
869 }
870
871 #[test]
872 fn endpoint_from_card_accepts_current_preferred_transport() {
873 let endpoint = endpoint_from_card(
874 "https://trusted.example/.well-known/agent-card.json".to_string(),
875 false,
876 "trusted.example",
877 "triage".to_string(),
878 &serde_json::json!({
879 "name": "trusted",
880 "protocolVersion": "0.3.0",
881 "url": "https://trusted.example/rpc",
882 "preferredTransport": "JSONRPC",
883 "additionalInterfaces": [{
884 "url": "https://trusted.example/rpc",
885 "transport": "JSONRPC"
886 }]
887 }),
888 )
889 .expect("current A2A card should resolve");
890 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
891 assert_eq!(
892 endpoint.card_url,
893 "https://trusted.example/.well-known/agent-card.json"
894 );
895 assert_eq!(endpoint.target_agent, "triage");
896 }
897
898 #[test]
899 fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
900 let endpoint = endpoint_from_card(
901 "https://trusted.example/.well-known/agent-card.json".to_string(),
902 false,
903 "trusted.example",
904 "triage".to_string(),
905 &serde_json::json!({
906 "name": "trusted",
907 "protocolVersion": "0.3.0",
908 "url": "https://trusted.example/rest",
909 "preferredTransport": "HTTP+JSON",
910 "additionalInterfaces": [{
911 "url": "https://trusted.example/rpc",
912 "transport": "JSONRPC"
913 }]
914 }),
915 )
916 .expect("current A2A card should resolve through additionalInterfaces");
917 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
918 }
919
920 #[test]
921 fn endpoint_from_card_accepts_legacy_supported_interfaces() {
922 let endpoint = endpoint_from_card(
923 "https://trusted.example/.well-known/agent-card.json".to_string(),
924 false,
925 "trusted.example",
926 "triage".to_string(),
927 &serde_json::json!({
928 "name": "trusted",
929 "supportedInterfaces": [{
930 "protocolBinding": "JSONRPC",
931 "protocolVersion": "0.3.0",
932 "url": "https://trusted.example/rpc"
933 }],
934 }),
935 )
936 .expect("legacy A2A card should resolve during the transition");
937 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
938 }
939
940 #[test]
941 fn endpoint_from_card_rejects_removed_interfaces_shape() {
942 let error = endpoint_from_card(
943 "https://trusted.example/.well-known/agent-card.json".to_string(),
944 false,
945 "trusted.example",
946 "triage".to_string(),
947 &serde_json::json!({
948 "url": "https://trusted.example",
949 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
950 }),
951 )
952 .expect_err("pre-0.3 Harn discovery shape should be rejected");
953 assert_eq!(
954 error.to_string(),
955 "A2A agent card missing preferredTransport/additionalInterfaces"
956 );
957 }
958
959 #[test]
960 fn cleartext_fallback_only_after_https_connect_refused() {
961 assert!(should_try_cleartext_fallback(
962 "https",
963 true,
964 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
965 "reviewer.example:443",
966 ));
967 assert!(!should_try_cleartext_fallback(
968 "http",
969 true,
970 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
971 "reviewer.example:443",
972 ));
973 assert!(!should_try_cleartext_fallback(
974 "https",
975 true,
976 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
977 "reviewer.example:443",
978 ));
979 }
980
981 #[test]
982 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
983 for authority in [
984 "127.0.0.1:8080",
985 "localhost:8080",
986 "[::1]:8080",
987 "127.1.2.3:9000",
988 ] {
989 assert!(
990 !should_try_cleartext_fallback(
991 "https",
992 false,
993 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
994 authority,
995 ),
996 "cleartext fallback must stay disabled without opt-in for '{authority}'"
997 );
998 }
999 }
1000
1001 #[test]
1002 fn cleartext_fallback_allows_loopback_after_opt_in() {
1003 for authority in [
1006 "127.0.0.1:8080",
1007 "localhost:8080",
1008 "[::1]:8080",
1009 "127.1.2.3:9000",
1010 ] {
1011 assert!(
1012 should_try_cleartext_fallback(
1013 "https",
1014 true,
1015 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1016 authority,
1017 ),
1018 "expected cleartext fallback for loopback authority '{authority}'"
1019 );
1020 }
1021 }
1022
1023 #[test]
1024 fn cleartext_fallback_denies_external_tls_failures() {
1025 for authority in [
1028 "reviewer.example:443",
1029 "8.8.8.8:443",
1030 "192.168.1.10:8080",
1031 "10.0.0.5:8443",
1032 ] {
1033 assert!(
1034 !should_try_cleartext_fallback(
1035 "https",
1036 true,
1037 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1038 authority,
1039 ),
1040 "cleartext fallback must be denied for external authority '{authority}'"
1041 );
1042 }
1043 }
1044
1045 #[test]
1046 fn is_loopback_authority_recognises_loopback_forms() {
1047 assert!(is_loopback_authority("127.0.0.1:8080"));
1048 assert!(is_loopback_authority("localhost:8080"));
1049 assert!(is_loopback_authority("LOCALHOST:9000"));
1050 assert!(is_loopback_authority("[::1]:8080"));
1051 assert!(is_loopback_authority("127.5.5.5:1234"));
1052 assert!(!is_loopback_authority("8.8.8.8:443"));
1053 assert!(!is_loopback_authority("192.168.1.10:8080"));
1054 assert!(!is_loopback_authority("example.com:443"));
1055 assert!(!is_loopback_authority("reviewer.prod"));
1056 }
1057
1058 #[test]
1059 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1060 let error = endpoint_from_card(
1061 "https://trusted.example/.well-known/agent-card.json".to_string(),
1062 false,
1063 "trusted.example",
1064 "triage".to_string(),
1065 &serde_json::json!({
1066 "protocolVersion": "0.3.0",
1067 "url": "https://evil.example",
1068 "preferredTransport": "JSONRPC",
1069 }),
1070 )
1071 .unwrap_err();
1072 assert_eq!(
1073 error.to_string(),
1074 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1075 );
1076 }
1077
1078 #[test]
1079 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1080 let error = endpoint_from_card(
1081 "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1082 false,
1083 "127.0.0.1:8080",
1084 "triage".to_string(),
1085 &serde_json::json!({
1086 "protocolVersion": "0.3.0",
1087 "url": "http://localhost:8080",
1088 "preferredTransport": "JSONRPC",
1089 }),
1090 )
1091 .expect_err("cleartext card should require explicit opt-in");
1092 assert!(error
1093 .to_string()
1094 .contains("requires `allow_cleartext = true`"));
1095 }
1096
1097 #[test]
1098 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1099 let card = serde_json::json!({
1103 "protocolVersion": "0.3.0",
1104 "url": "http://localhost:8080",
1105 "preferredTransport": "JSONRPC",
1106 });
1107 let endpoint = endpoint_from_card(
1108 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1109 true,
1110 "127.0.0.1:8080",
1111 "triage".to_string(),
1112 &card,
1113 )
1114 .expect("loopback alias pair should be accepted");
1115 assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1116
1117 let card_v6 = serde_json::json!({
1119 "protocolVersion": "0.3.0",
1120 "url": "http://[::1]:8080",
1121 "preferredTransport": "JSONRPC",
1122 });
1123 let endpoint_v6 = endpoint_from_card(
1124 "http://localhost:8080/.well-known/agent-card.json".to_string(),
1125 true,
1126 "localhost:8080",
1127 "triage".to_string(),
1128 &card_v6,
1129 )
1130 .expect("IPv6 loopback alias should be accepted");
1131 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1132
1133 let card_wrong_port = serde_json::json!({
1135 "protocolVersion": "0.3.0",
1136 "url": "http://localhost:9000",
1137 "preferredTransport": "JSONRPC",
1138 });
1139 let error = endpoint_from_card(
1140 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1141 true,
1142 "127.0.0.1:8080",
1143 "triage".to_string(),
1144 &card_wrong_port,
1145 )
1146 .expect_err("mismatched ports must still be rejected even on loopback");
1147 assert!(error
1148 .to_string()
1149 .contains("A2A agent card url authority mismatch"));
1150 }
1151
1152 #[test]
1153 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1154 assert!(!authorities_equivalent(
1155 "internal.corp.example:443",
1156 "trusted.example:443",
1157 ));
1158 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1159 assert!(authorities_equivalent(
1160 "trusted.example:443",
1161 "trusted.example:443",
1162 ));
1163 }
1164}