1use std::error::Error as _;
2
3use reqwest::Url;
4use serde_json::Value;
5use tokio::sync::broadcast;
6
7use crate::triggers::TriggerEvent;
8
9const A2A_AGENT_CARD_PATHS: &[&str] = &[
10 ".well-known/agent-card.json",
11 ".well-known/a2a-agent",
12 ".well-known/agent.json",
13 "agent/card",
14];
15const A2A_PROTOCOL_VERSION: &str = "1.0";
16const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
17const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ResolvedA2aEndpoint {
21 pub card_url: String,
22 pub rpc_url: String,
23 pub agent_id: Option<String>,
24 pub target_agent: String,
25}
26
27#[derive(Clone, Debug, PartialEq)]
28pub enum DispatchAck {
29 InlineResult {
30 task_id: String,
31 result: Value,
32 },
33 PendingTask {
34 task_id: String,
35 state: String,
36 handle: Value,
37 },
38}
39
40#[derive(Debug)]
41pub enum A2aClientError {
42 InvalidTarget(String),
43 Discovery(String),
44 Protocol(String),
45 Cancelled(String),
46}
47
48impl std::fmt::Display for A2aClientError {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::InvalidTarget(message)
52 | Self::Discovery(message)
53 | Self::Protocol(message)
54 | Self::Cancelled(message) => f.write_str(message),
55 }
56 }
57}
58
59impl std::error::Error for A2aClientError {}
60
61#[derive(Debug)]
62enum AgentCardFetchError {
63 Cancelled(String),
64 Discovery(String),
65 ConnectRefused(String),
66}
67
68pub async fn dispatch_trigger_event(
69 raw_target: &str,
70 allow_cleartext: bool,
71 binding_id: &str,
72 binding_key: &str,
73 event: &TriggerEvent,
74 cancel_rx: &mut broadcast::Receiver<()>,
75) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
76 let started = std::time::Instant::now();
77 let target = match parse_target(raw_target) {
78 Ok(target) => target,
79 Err(error) => {
80 record_a2a_metric(raw_target, "failed", started.elapsed());
81 return Err(error);
82 }
83 };
84 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
85 Ok(endpoint) => endpoint,
86 Err(error) => {
87 record_a2a_metric(raw_target, "failed", started.elapsed());
88 return Err(error);
89 }
90 };
91 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
92 let envelope = serde_json::json!({
93 "kind": "harn.trigger.dispatch",
94 "message_id": message_id,
95 "trace_id": event.trace_id.0,
96 "event_id": event.id.0,
97 "trigger_id": binding_id,
98 "binding_key": binding_key,
99 "target_agent": endpoint.target_agent,
100 "event": event,
101 });
102 let text = serde_json::to_string(&envelope)
103 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
104 let push_config = push_notification_config();
105 let mut params = serde_json::json!({
106 "contextId": event.trace_id.0,
107 "message": {
108 "messageId": message_id,
109 "role": "user",
110 "parts": [{
111 "type": "text",
112 "text": text,
113 }],
114 "metadata": {
115 "kind": "harn.trigger.dispatch",
116 "trace_id": event.trace_id.0,
117 "event_id": event.id.0,
118 "trigger_id": binding_id,
119 "binding_key": binding_key,
120 "target_agent": endpoint.target_agent,
121 },
122 },
123 });
124 if let Some(config) = push_config.clone() {
125 params["configuration"] = serde_json::json!({
126 "blocking": false,
127 "returnImmediately": true,
128 "pushNotificationConfig": config,
129 });
130 }
131 let request = crate::jsonrpc::request(message_id.clone(), "a2a.SendMessage", params);
132
133 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
134 Ok(body) => body,
135 Err(error) => {
136 record_a2a_metric(raw_target, "failed", started.elapsed());
137 return Err(error);
138 }
139 };
140 let result = match body.get("result").cloned().ok_or_else(|| {
141 if let Some(error) = body.get("error") {
142 let message = error
143 .get("message")
144 .and_then(Value::as_str)
145 .unwrap_or("unknown A2A error");
146 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
147 } else {
148 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
149 }
150 }) {
151 Ok(result) => result,
152 Err(error) => {
153 record_a2a_metric(raw_target, "failed", started.elapsed());
154 return Err(error);
155 }
156 };
157
158 let task_id = match result
159 .get("id")
160 .and_then(Value::as_str)
161 .filter(|value| !value.is_empty())
162 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
163 {
164 Ok(task_id) => task_id.to_string(),
165 Err(error) => {
166 record_a2a_metric(raw_target, "failed", started.elapsed());
167 return Err(error);
168 }
169 };
170 let state = match task_state(&result) {
171 Ok(state) => state.to_string(),
172 Err(error) => {
173 record_a2a_metric(raw_target, "failed", started.elapsed());
174 return Err(error);
175 }
176 };
177
178 if state == "completed" {
179 let inline = extract_inline_result(&result);
180 record_a2a_metric(raw_target, "succeeded", started.elapsed());
181 return Ok((
182 endpoint,
183 DispatchAck::InlineResult {
184 task_id,
185 result: inline,
186 },
187 ));
188 }
189
190 if let Some(config) = push_config {
191 register_push_notification_config(
192 &endpoint.rpc_url,
193 &task_id,
194 config,
195 &event.trace_id.0,
196 cancel_rx,
197 )
198 .await
199 .inspect_err(|_| {
200 record_a2a_metric(raw_target, "failed", started.elapsed());
201 })?;
202 }
203 record_a2a_metric(raw_target, "succeeded", started.elapsed());
204 Ok((
205 endpoint.clone(),
206 DispatchAck::PendingTask {
207 task_id: task_id.clone(),
208 state: state.clone(),
209 handle: serde_json::json!({
210 "kind": "a2a_task_handle",
211 "task_id": task_id,
212 "state": state,
213 "target_agent": endpoint.target_agent,
214 "rpc_url": endpoint.rpc_url,
215 "card_url": endpoint.card_url,
216 "agent_id": endpoint.agent_id,
217 }),
218 },
219 ))
220}
221
222fn push_notification_config() -> Option<Value> {
223 let url = std::env::var(A2A_PUSH_URL_ENV)
224 .ok()
225 .map(|value| value.trim().to_string())
226 .filter(|value| !value.is_empty())?;
227 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
228 .ok()
229 .map(|value| value.trim().to_string())
230 .filter(|value| !value.is_empty());
231 let mut config = serde_json::json!({ "url": url });
232 if let Some(token) = token {
233 config["token"] = Value::String(token.clone());
234 config["authentication"] = serde_json::json!({
235 "scheme": "Bearer",
236 "credentials": token,
237 });
238 }
239 Some(config)
240}
241
242async fn register_push_notification_config(
243 rpc_url: &str,
244 task_id: &str,
245 config: Value,
246 trace_id: &str,
247 cancel_rx: &mut broadcast::Receiver<()>,
248) -> Result<(), A2aClientError> {
249 let request = crate::jsonrpc::request(
250 format!("{trace_id}.{task_id}.push-config"),
251 "CreateTaskPushNotificationConfig",
252 serde_json::json!({
253 "taskId": task_id,
254 "pushNotificationConfig": config,
255 }),
256 );
257 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
258 if response.get("error").is_some() {
259 return Err(A2aClientError::Protocol(format!(
260 "A2A push notification registration failed: {}",
261 response["error"]
262 )));
263 }
264 Ok(())
265}
266
267fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
268 if let Some(metrics) = crate::active_metrics_registry() {
269 metrics.record_a2a_hop(target, outcome, duration);
270 }
271}
272
273pub fn target_agent_label(raw_target: &str) -> String {
274 parse_target(raw_target)
275 .map(|target| target.target_agent_label())
276 .unwrap_or_else(|_| raw_target.to_string())
277}
278
279#[derive(Clone, Debug)]
280struct ParsedTarget {
281 authority: String,
282 target_agent: String,
283}
284
285impl ParsedTarget {
286 fn target_agent_label(&self) -> String {
287 if self.target_agent.is_empty() {
288 self.authority.clone()
289 } else {
290 self.target_agent.clone()
291 }
292 }
293}
294
295fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
296 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
297 A2aClientError::InvalidTarget(format!(
298 "invalid a2a dispatch target '{raw_target}': {error}"
299 ))
300 })?;
301 let host = parsed.host_str().ok_or_else(|| {
302 A2aClientError::InvalidTarget(format!(
303 "invalid a2a dispatch target '{raw_target}': missing host"
304 ))
305 })?;
306 let authority = if let Some(port) = parsed.port() {
307 format!("{host}:{port}")
308 } else {
309 host.to_string()
310 };
311 Ok(ParsedTarget {
312 authority,
313 target_agent: parsed.path().trim_start_matches('/').to_string(),
314 })
315}
316
317async fn resolve_endpoint(
318 target: &ParsedTarget,
319 allow_cleartext: bool,
320 cancel_rx: &mut broadcast::Receiver<()>,
321) -> Result<ResolvedA2aEndpoint, A2aClientError> {
322 let mut last_error = None;
323 for scheme in card_resolution_schemes(allow_cleartext) {
324 let mut last_scheme_error = None;
325 for path in A2A_AGENT_CARD_PATHS {
326 let card_url = format!("{scheme}://{}/{path}", target.authority);
327 match fetch_agent_card(&card_url, cancel_rx).await {
328 Ok(card) => {
329 return endpoint_from_card(
330 card_url,
331 allow_cleartext,
332 &target.authority,
333 target.target_agent.clone(),
334 &card,
335 );
336 }
337 Err(AgentCardFetchError::Cancelled(message)) => {
338 return Err(A2aClientError::Cancelled(message));
339 }
340 Err(error) => {
341 last_error = Some(agent_card_fetch_error_message(&error));
342 last_scheme_error = Some(error);
343 }
344 }
345 }
346 if last_scheme_error.as_ref().is_some_and(|error| {
347 should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
348 }) {
349 continue;
350 }
351 break;
352 }
353 Err(A2aClientError::Discovery(format!(
354 "could not resolve A2A agent card for '{}': {}",
355 target.authority,
356 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
357 )))
358}
359
360async fn fetch_agent_card(
361 card_url: &str,
362 cancel_rx: &mut broadcast::Receiver<()>,
363) -> Result<Value, AgentCardFetchError> {
364 let response = tokio::select! {
365 response = crate::llm::shared_utility_client().get(card_url).send() => {
366 match response {
367 Ok(response) => Ok(response),
368 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
369 format!("A2A HTTP request failed: {error}")
370 )),
371 Err(error) => Err(AgentCardFetchError::Discovery(
372 format!("A2A HTTP request failed: {error}")
373 )),
374 }
375 }
376 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
377 "A2A agent-card fetch cancelled".to_string()
378 )),
379 }?;
380 if !response.status().is_success() {
381 return Err(AgentCardFetchError::Discovery(format!(
382 "GET {card_url} returned HTTP {}",
383 response.status()
384 )));
385 }
386 response
387 .json::<Value>()
388 .await
389 .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
390}
391
392fn endpoint_from_card(
393 card_url: String,
394 allow_cleartext: bool,
395 requested_authority: &str,
396 target_agent: String,
397 card: &Value,
398) -> Result<ResolvedA2aEndpoint, A2aClientError> {
399 if let Some(interfaces) = card.get("supportedInterfaces").and_then(Value::as_array) {
400 let interface = interfaces
401 .iter()
402 .find(|entry| {
403 entry
404 .get("protocolBinding")
405 .and_then(Value::as_str)
406 .is_some_and(|binding| binding.eq_ignore_ascii_case("JSONRPC"))
407 })
408 .ok_or_else(|| {
409 A2aClientError::Discovery(
410 "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
411 )
412 })?;
413 let interface_url = interface
414 .get("url")
415 .and_then(Value::as_str)
416 .ok_or_else(|| {
417 A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
418 })?;
419 let rpc_url = Url::parse(interface_url).map_err(|error| {
420 A2aClientError::Discovery(format!(
421 "invalid A2A JSONRPC supportedInterface url '{interface_url}': {error}"
422 ))
423 })?;
424 ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
425 let interface_authority = url_authority(&rpc_url)?;
426 if !authorities_equivalent(&interface_authority, requested_authority) {
427 return Err(A2aClientError::Discovery(format!(
428 "A2A JSONRPC interface authority mismatch: requested '{requested_authority}', card returned '{interface_authority}'"
429 )));
430 }
431 return Ok(ResolvedA2aEndpoint {
432 card_url,
433 rpc_url: rpc_url.to_string(),
434 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
435 target_agent,
436 });
437 }
438
439 let base_url = card
440 .get("url")
441 .and_then(Value::as_str)
442 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
443 let base_url = Url::parse(base_url).map_err(|error| {
444 A2aClientError::Discovery(format!("invalid A2A card url '{base_url}': {error}"))
445 })?;
446 ensure_cleartext_allowed(&base_url, allow_cleartext, "agent card")?;
447 let card_authority = url_authority(&base_url)?;
448 if !authorities_equivalent(&card_authority, requested_authority) {
449 return Err(A2aClientError::Discovery(format!(
450 "A2A agent card url authority mismatch: requested '{requested_authority}', card returned '{card_authority}'"
451 )));
452 }
453 let interfaces = card
454 .get("interfaces")
455 .and_then(Value::as_array)
456 .ok_or_else(|| {
457 A2aClientError::Discovery("A2A agent card missing interfaces".to_string())
458 })?;
459 let jsonrpc_interfaces: Vec<&Value> = interfaces
460 .iter()
461 .filter(|entry| {
462 entry
463 .get("protocol")
464 .and_then(Value::as_str)
465 .is_some_and(|protocol| protocol.eq_ignore_ascii_case("jsonrpc"))
466 })
467 .collect();
468 if jsonrpc_interfaces.len() != 1 {
469 return Err(A2aClientError::Discovery(format!(
470 "A2A agent card must expose exactly one jsonrpc interface, found {}",
471 jsonrpc_interfaces.len()
472 )));
473 }
474 let interface_url = jsonrpc_interfaces[0]
475 .get("url")
476 .and_then(Value::as_str)
477 .ok_or_else(|| {
478 A2aClientError::Discovery("A2A jsonrpc interface missing url".to_string())
479 })?;
480 let rpc_url = base_url.join(interface_url).map_err(|error| {
481 A2aClientError::Discovery(format!(
482 "invalid A2A interface url '{interface_url}': {error}"
483 ))
484 })?;
485 ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
486 Ok(ResolvedA2aEndpoint {
487 card_url,
488 rpc_url: rpc_url.to_string(),
489 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
490 target_agent,
491 })
492}
493
494fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
495 if allow_cleartext {
496 &["https", "http"]
497 } else {
498 &["https"]
499 }
500}
501
502fn should_try_cleartext_fallback(
515 scheme: &str,
516 allow_cleartext: bool,
517 error: &AgentCardFetchError,
518 authority: &str,
519) -> bool {
520 if !allow_cleartext || scheme != "https" {
521 return false;
522 }
523 match error {
524 AgentCardFetchError::Cancelled(_) => false,
525 AgentCardFetchError::ConnectRefused(_) => true,
526 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
527 }
528}
529
530fn ensure_cleartext_allowed(
531 url: &Url,
532 allow_cleartext: bool,
533 label: &str,
534) -> Result<(), A2aClientError> {
535 if allow_cleartext || url.scheme() != "http" {
536 return Ok(());
537 }
538 Err(A2aClientError::Discovery(format!(
539 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
540 )))
541}
542
543fn is_loopback_authority(authority: &str) -> bool {
544 let (host, _) = split_authority(authority);
545 if host.eq_ignore_ascii_case("localhost") {
546 return true;
547 }
548 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
549 return ip.is_loopback();
550 }
551 false
552}
553
554fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
565 if card_authority == requested_authority {
566 return true;
567 }
568 let (_, card_port) = split_authority(card_authority);
569 let (_, requested_port) = split_authority(requested_authority);
570 if card_port != requested_port {
571 return false;
572 }
573 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
574}
575
576fn split_authority(authority: &str) -> (&str, &str) {
579 let (host_raw, port) = if authority.starts_with('[') {
580 if let Some(end) = authority.rfind(']') {
582 let host = &authority[..=end];
583 let rest = &authority[end + 1..];
584 let port = rest.strip_prefix(':').unwrap_or("");
585 (host, port)
586 } else {
587 (authority, "")
588 }
589 } else {
590 match authority.rsplit_once(':') {
591 Some((host, port)) => (host, port),
592 None => (authority, ""),
593 }
594 };
595 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
596 (host, port)
597}
598
599fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
600 match error {
601 AgentCardFetchError::Cancelled(message)
602 | AgentCardFetchError::Discovery(message)
603 | AgentCardFetchError::ConnectRefused(message) => message.clone(),
604 }
605}
606
607fn is_connect_refused(error: &reqwest::Error) -> bool {
608 if !error.is_connect() {
609 return false;
610 }
611 let mut source = error.source();
612 while let Some(cause) = source {
613 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
614 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
615 return true;
616 }
617 }
618 source = cause.source();
619 }
620 false
621}
622
623fn url_authority(url: &Url) -> Result<String, A2aClientError> {
624 let host = url
625 .host_str()
626 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
627 Ok(if let Some(port) = url.port() {
628 format!("{host}:{port}")
629 } else {
630 host.to_string()
631 })
632}
633
634async fn send_jsonrpc(
635 rpc_url: &str,
636 request: &Value,
637 trace_id: &str,
638 cancel_rx: &mut broadcast::Receiver<()>,
639) -> Result<Value, A2aClientError> {
640 let response = send_http(
641 crate::llm::shared_blocking_client()
642 .post(rpc_url)
643 .header(reqwest::header::CONTENT_TYPE, "application/json")
644 .header("A2A-Version", A2A_PROTOCOL_VERSION)
645 .header("A2A-Trace-Id", trace_id)
646 .json(request),
647 cancel_rx,
648 "A2A task dispatch cancelled",
649 )
650 .await?;
651 if !response.status().is_success() {
652 return Err(A2aClientError::Protocol(format!(
653 "A2A task dispatch returned HTTP {}",
654 response.status()
655 )));
656 }
657 response
658 .json::<Value>()
659 .await
660 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
661}
662
663async fn send_http(
664 request: reqwest::RequestBuilder,
665 cancel_rx: &mut broadcast::Receiver<()>,
666 cancelled_message: &'static str,
667) -> Result<reqwest::Response, A2aClientError> {
668 tokio::select! {
669 response = request.send() => response
670 .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
671 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
672 }
673}
674
675fn task_state(task: &Value) -> Result<&str, A2aClientError> {
676 task.pointer("/status/state")
677 .and_then(Value::as_str)
678 .filter(|value| !value.is_empty())
679 .ok_or_else(|| {
680 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
681 })
682}
683
684fn extract_inline_result(task: &Value) -> Value {
685 let text = task
686 .get("history")
687 .and_then(Value::as_array)
688 .and_then(|history| {
689 history.iter().rev().find_map(|message| {
690 let role = message.get("role").and_then(Value::as_str)?;
691 if role != "agent" {
692 return None;
693 }
694 message
695 .get("parts")
696 .and_then(Value::as_array)
697 .and_then(|parts| {
698 parts.iter().find_map(|part| {
699 if part.get("type").and_then(Value::as_str) == Some("text") {
700 part.get("text").and_then(Value::as_str).map(str::trim_end)
701 } else {
702 None
703 }
704 })
705 })
706 })
707 });
708 match text {
709 Some(text) if !text.is_empty() => {
710 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
711 }
712 _ => task.clone(),
713 }
714}
715
716async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
717 let _ = cancel_rx.recv().await;
718}
719
720#[cfg(test)]
721mod tests {
722 use super::*;
723
724 #[test]
725 fn target_agent_label_prefers_path() {
726 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
727 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
728 }
729
730 #[test]
731 fn extract_inline_result_parses_json_text() {
732 let task = serde_json::json!({
733 "history": [
734 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
735 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
736 ]
737 });
738 assert_eq!(
739 extract_inline_result(&task),
740 serde_json::json!({"trace_id": "trace_123"})
741 );
742 }
743
744 #[test]
745 fn discovery_prefers_https_before_http() {
746 assert_eq!(card_resolution_schemes(false), ["https"]);
747 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
748 }
749
750 #[test]
751 fn endpoint_from_card_accepts_current_supported_interfaces() {
752 let endpoint = endpoint_from_card(
753 "https://trusted.example/.well-known/agent-card.json".to_string(),
754 false,
755 "trusted.example",
756 "triage".to_string(),
757 &serde_json::json!({
758 "name": "trusted",
759 "supportedInterfaces": [{
760 "protocolBinding": "JSONRPC",
761 "protocolVersion": "1.0",
762 "url": "https://trusted.example/rpc"
763 }],
764 }),
765 )
766 .expect("current A2A card should resolve");
767 assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
768 assert_eq!(
769 endpoint.card_url,
770 "https://trusted.example/.well-known/agent-card.json"
771 );
772 assert_eq!(endpoint.target_agent, "triage");
773 }
774
775 #[test]
776 fn cleartext_fallback_only_after_https_connect_refused() {
777 assert!(should_try_cleartext_fallback(
778 "https",
779 true,
780 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
781 "reviewer.example:443",
782 ));
783 assert!(!should_try_cleartext_fallback(
784 "http",
785 true,
786 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
787 "reviewer.example:443",
788 ));
789 assert!(!should_try_cleartext_fallback(
790 "https",
791 true,
792 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
793 "reviewer.example:443",
794 ));
795 }
796
797 #[test]
798 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
799 for authority in [
800 "127.0.0.1:8080",
801 "localhost:8080",
802 "[::1]:8080",
803 "127.1.2.3:9000",
804 ] {
805 assert!(
806 !should_try_cleartext_fallback(
807 "https",
808 false,
809 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
810 authority,
811 ),
812 "cleartext fallback must stay disabled without opt-in for '{authority}'"
813 );
814 }
815 }
816
817 #[test]
818 fn cleartext_fallback_allows_loopback_after_opt_in() {
819 for authority in [
822 "127.0.0.1:8080",
823 "localhost:8080",
824 "[::1]:8080",
825 "127.1.2.3:9000",
826 ] {
827 assert!(
828 should_try_cleartext_fallback(
829 "https",
830 true,
831 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
832 authority,
833 ),
834 "expected cleartext fallback for loopback authority '{authority}'"
835 );
836 }
837 }
838
839 #[test]
840 fn cleartext_fallback_denies_external_tls_failures() {
841 for authority in [
844 "reviewer.example:443",
845 "8.8.8.8:443",
846 "192.168.1.10:8080",
847 "10.0.0.5:8443",
848 ] {
849 assert!(
850 !should_try_cleartext_fallback(
851 "https",
852 true,
853 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
854 authority,
855 ),
856 "cleartext fallback must be denied for external authority '{authority}'"
857 );
858 }
859 }
860
861 #[test]
862 fn is_loopback_authority_recognises_loopback_forms() {
863 assert!(is_loopback_authority("127.0.0.1:8080"));
864 assert!(is_loopback_authority("localhost:8080"));
865 assert!(is_loopback_authority("LOCALHOST:9000"));
866 assert!(is_loopback_authority("[::1]:8080"));
867 assert!(is_loopback_authority("127.5.5.5:1234"));
868 assert!(!is_loopback_authority("8.8.8.8:443"));
869 assert!(!is_loopback_authority("192.168.1.10:8080"));
870 assert!(!is_loopback_authority("example.com:443"));
871 assert!(!is_loopback_authority("reviewer.prod"));
872 }
873
874 #[test]
875 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
876 let error = endpoint_from_card(
877 "https://trusted.example/.well-known/agent-card.json".to_string(),
878 false,
879 "trusted.example",
880 "triage".to_string(),
881 &serde_json::json!({
882 "url": "https://evil.example",
883 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
884 }),
885 )
886 .unwrap_err();
887 assert_eq!(
888 error.to_string(),
889 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
890 );
891 }
892
893 #[test]
894 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
895 let error = endpoint_from_card(
896 "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
897 false,
898 "127.0.0.1:8080",
899 "triage".to_string(),
900 &serde_json::json!({
901 "url": "http://localhost:8080",
902 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
903 }),
904 )
905 .expect_err("cleartext card should require explicit opt-in");
906 assert!(error
907 .to_string()
908 .contains("requires `allow_cleartext = true`"));
909 }
910
911 #[test]
912 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
913 let card = serde_json::json!({
917 "url": "http://localhost:8080",
918 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
919 });
920 let endpoint = endpoint_from_card(
921 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
922 true,
923 "127.0.0.1:8080",
924 "triage".to_string(),
925 &card,
926 )
927 .expect("loopback alias pair should be accepted");
928 assert_eq!(endpoint.rpc_url, "http://localhost:8080/rpc");
929
930 let card_v6 = serde_json::json!({
932 "url": "http://[::1]:8080",
933 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
934 });
935 let endpoint_v6 = endpoint_from_card(
936 "http://localhost:8080/.well-known/agent-card.json".to_string(),
937 true,
938 "localhost:8080",
939 "triage".to_string(),
940 &card_v6,
941 )
942 .expect("IPv6 loopback alias should be accepted");
943 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/rpc");
944
945 let card_wrong_port = serde_json::json!({
947 "url": "http://localhost:9000",
948 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
949 });
950 let error = endpoint_from_card(
951 "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
952 true,
953 "127.0.0.1:8080",
954 "triage".to_string(),
955 &card_wrong_port,
956 )
957 .expect_err("mismatched ports must still be rejected even on loopback");
958 assert!(error
959 .to_string()
960 .contains("A2A agent card url authority mismatch"));
961 }
962
963 #[test]
964 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
965 assert!(!authorities_equivalent(
966 "internal.corp.example:443",
967 "trusted.example:443",
968 ));
969 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
970 assert!(authorities_equivalent(
971 "trusted.example:443",
972 "trusted.example:443",
973 ));
974 }
975}