1use std::error::Error as _;
2
3use reqwest::Url;
4use serde_json::Value;
5use tokio::sync::broadcast;
6
7use crate::triggers::TriggerEvent;
8
9const A2A_AGENT_CARD_PATH: &str = ".well-known/a2a-agent";
10const A2A_PROTOCOL_VERSION: &str = "1.0.0";
11const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
12const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct ResolvedA2aEndpoint {
16 pub card_url: String,
17 pub rpc_url: String,
18 pub agent_id: Option<String>,
19 pub target_agent: String,
20}
21
22#[derive(Clone, Debug, PartialEq)]
23pub enum DispatchAck {
24 InlineResult {
25 task_id: String,
26 result: Value,
27 },
28 PendingTask {
29 task_id: String,
30 state: String,
31 handle: Value,
32 },
33}
34
35#[derive(Debug)]
36pub enum A2aClientError {
37 InvalidTarget(String),
38 Discovery(String),
39 Protocol(String),
40 Cancelled(String),
41}
42
43impl std::fmt::Display for A2aClientError {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 match self {
46 Self::InvalidTarget(message)
47 | Self::Discovery(message)
48 | Self::Protocol(message)
49 | Self::Cancelled(message) => f.write_str(message),
50 }
51 }
52}
53
54impl std::error::Error for A2aClientError {}
55
56#[derive(Debug)]
57enum AgentCardFetchError {
58 Cancelled(String),
59 Discovery(String),
60 ConnectRefused(String),
61}
62
63pub async fn dispatch_trigger_event(
64 raw_target: &str,
65 allow_cleartext: bool,
66 binding_id: &str,
67 binding_key: &str,
68 event: &TriggerEvent,
69 cancel_rx: &mut broadcast::Receiver<()>,
70) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
71 let started = std::time::Instant::now();
72 let target = match parse_target(raw_target) {
73 Ok(target) => target,
74 Err(error) => {
75 record_a2a_metric(raw_target, "failed", started.elapsed());
76 return Err(error);
77 }
78 };
79 let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
80 Ok(endpoint) => endpoint,
81 Err(error) => {
82 record_a2a_metric(raw_target, "failed", started.elapsed());
83 return Err(error);
84 }
85 };
86 let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
87 let envelope = serde_json::json!({
88 "kind": "harn.trigger.dispatch",
89 "message_id": message_id,
90 "trace_id": event.trace_id.0,
91 "event_id": event.id.0,
92 "trigger_id": binding_id,
93 "binding_key": binding_key,
94 "target_agent": endpoint.target_agent,
95 "event": event,
96 });
97 let text = serde_json::to_string(&envelope)
98 .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
99 let push_config = push_notification_config();
100 let mut params = serde_json::json!({
101 "contextId": event.trace_id.0,
102 "message": {
103 "messageId": message_id,
104 "role": "user",
105 "parts": [{
106 "type": "text",
107 "text": text,
108 }],
109 "metadata": {
110 "kind": "harn.trigger.dispatch",
111 "trace_id": event.trace_id.0,
112 "event_id": event.id.0,
113 "trigger_id": binding_id,
114 "binding_key": binding_key,
115 "target_agent": endpoint.target_agent,
116 },
117 },
118 });
119 if let Some(config) = push_config.clone() {
120 params["configuration"] = serde_json::json!({
121 "blocking": false,
122 "returnImmediately": true,
123 "pushNotificationConfig": config,
124 });
125 }
126 let request = crate::jsonrpc::request(message_id.clone(), "a2a.SendMessage", params);
127
128 let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
129 Ok(body) => body,
130 Err(error) => {
131 record_a2a_metric(raw_target, "failed", started.elapsed());
132 return Err(error);
133 }
134 };
135 let result = match body.get("result").cloned().ok_or_else(|| {
136 if let Some(error) = body.get("error") {
137 let message = error
138 .get("message")
139 .and_then(Value::as_str)
140 .unwrap_or("unknown A2A error");
141 A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
142 } else {
143 A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
144 }
145 }) {
146 Ok(result) => result,
147 Err(error) => {
148 record_a2a_metric(raw_target, "failed", started.elapsed());
149 return Err(error);
150 }
151 };
152
153 let task_id = match result
154 .get("id")
155 .and_then(Value::as_str)
156 .filter(|value| !value.is_empty())
157 .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
158 {
159 Ok(task_id) => task_id.to_string(),
160 Err(error) => {
161 record_a2a_metric(raw_target, "failed", started.elapsed());
162 return Err(error);
163 }
164 };
165 let state = match task_state(&result) {
166 Ok(state) => state.to_string(),
167 Err(error) => {
168 record_a2a_metric(raw_target, "failed", started.elapsed());
169 return Err(error);
170 }
171 };
172
173 if state == "completed" {
174 let inline = extract_inline_result(&result);
175 record_a2a_metric(raw_target, "succeeded", started.elapsed());
176 return Ok((
177 endpoint,
178 DispatchAck::InlineResult {
179 task_id,
180 result: inline,
181 },
182 ));
183 }
184
185 if let Some(config) = push_config {
186 register_push_notification_config(
187 &endpoint.rpc_url,
188 &task_id,
189 config,
190 &event.trace_id.0,
191 cancel_rx,
192 )
193 .await
194 .inspect_err(|_| {
195 record_a2a_metric(raw_target, "failed", started.elapsed());
196 })?;
197 }
198 record_a2a_metric(raw_target, "succeeded", started.elapsed());
199 Ok((
200 endpoint.clone(),
201 DispatchAck::PendingTask {
202 task_id: task_id.clone(),
203 state: state.clone(),
204 handle: serde_json::json!({
205 "kind": "a2a_task_handle",
206 "task_id": task_id,
207 "state": state,
208 "target_agent": endpoint.target_agent,
209 "rpc_url": endpoint.rpc_url,
210 "card_url": endpoint.card_url,
211 "agent_id": endpoint.agent_id,
212 }),
213 },
214 ))
215}
216
217fn push_notification_config() -> Option<Value> {
218 let url = std::env::var(A2A_PUSH_URL_ENV)
219 .ok()
220 .map(|value| value.trim().to_string())
221 .filter(|value| !value.is_empty())?;
222 let token = std::env::var(A2A_PUSH_TOKEN_ENV)
223 .ok()
224 .map(|value| value.trim().to_string())
225 .filter(|value| !value.is_empty());
226 let mut config = serde_json::json!({ "url": url });
227 if let Some(token) = token {
228 config["token"] = Value::String(token.clone());
229 config["authentication"] = serde_json::json!({
230 "scheme": "Bearer",
231 "credentials": token,
232 });
233 }
234 Some(config)
235}
236
237async fn register_push_notification_config(
238 rpc_url: &str,
239 task_id: &str,
240 config: Value,
241 trace_id: &str,
242 cancel_rx: &mut broadcast::Receiver<()>,
243) -> Result<(), A2aClientError> {
244 let request = crate::jsonrpc::request(
245 format!("{trace_id}.{task_id}.push-config"),
246 "CreateTaskPushNotificationConfig",
247 serde_json::json!({
248 "taskId": task_id,
249 "pushNotificationConfig": config,
250 }),
251 );
252 let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
253 if response.get("error").is_some() {
254 return Err(A2aClientError::Protocol(format!(
255 "A2A push notification registration failed: {}",
256 response["error"]
257 )));
258 }
259 Ok(())
260}
261
262fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
263 if let Some(metrics) = crate::active_metrics_registry() {
264 metrics.record_a2a_hop(target, outcome, duration);
265 }
266}
267
268pub fn target_agent_label(raw_target: &str) -> String {
269 parse_target(raw_target)
270 .map(|target| target.target_agent_label())
271 .unwrap_or_else(|_| raw_target.to_string())
272}
273
274#[derive(Clone, Debug)]
275struct ParsedTarget {
276 authority: String,
277 target_agent: String,
278}
279
280impl ParsedTarget {
281 fn target_agent_label(&self) -> String {
282 if self.target_agent.is_empty() {
283 self.authority.clone()
284 } else {
285 self.target_agent.clone()
286 }
287 }
288}
289
290fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
291 let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
292 A2aClientError::InvalidTarget(format!(
293 "invalid a2a dispatch target '{raw_target}': {error}"
294 ))
295 })?;
296 let host = parsed.host_str().ok_or_else(|| {
297 A2aClientError::InvalidTarget(format!(
298 "invalid a2a dispatch target '{raw_target}': missing host"
299 ))
300 })?;
301 let authority = if let Some(port) = parsed.port() {
302 format!("{host}:{port}")
303 } else {
304 host.to_string()
305 };
306 Ok(ParsedTarget {
307 authority,
308 target_agent: parsed.path().trim_start_matches('/').to_string(),
309 })
310}
311
312async fn resolve_endpoint(
313 target: &ParsedTarget,
314 allow_cleartext: bool,
315 cancel_rx: &mut broadcast::Receiver<()>,
316) -> Result<ResolvedA2aEndpoint, A2aClientError> {
317 let mut last_error = None;
318 for scheme in card_resolution_schemes(allow_cleartext) {
319 let card_url = format!("{scheme}://{}/{A2A_AGENT_CARD_PATH}", target.authority);
320 match fetch_agent_card(&card_url, cancel_rx).await {
321 Ok(card) => {
322 return endpoint_from_card(
323 card_url,
324 allow_cleartext,
325 &target.authority,
326 target.target_agent.clone(),
327 &card,
328 );
329 }
330 Err(AgentCardFetchError::Cancelled(message)) => {
331 return Err(A2aClientError::Cancelled(message));
332 }
333 Err(error) => {
334 let message = agent_card_fetch_error_message(&error);
335 last_error = Some(message);
336 if should_try_cleartext_fallback(scheme, allow_cleartext, &error, &target.authority)
337 {
338 continue;
339 }
340 break;
341 }
342 }
343 }
344 Err(A2aClientError::Discovery(format!(
345 "could not resolve A2A agent card for '{}': {}",
346 target.authority,
347 last_error.unwrap_or_else(|| "unknown discovery error".to_string())
348 )))
349}
350
351async fn fetch_agent_card(
352 card_url: &str,
353 cancel_rx: &mut broadcast::Receiver<()>,
354) -> Result<Value, AgentCardFetchError> {
355 let response = tokio::select! {
356 response = crate::llm::shared_utility_client().get(card_url).send() => {
357 match response {
358 Ok(response) => Ok(response),
359 Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
360 format!("A2A HTTP request failed: {error}")
361 )),
362 Err(error) => Err(AgentCardFetchError::Discovery(
363 format!("A2A HTTP request failed: {error}")
364 )),
365 }
366 }
367 _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
368 "A2A agent-card fetch cancelled".to_string()
369 )),
370 }?;
371 if !response.status().is_success() {
372 return Err(AgentCardFetchError::Discovery(format!(
373 "GET {card_url} returned HTTP {}",
374 response.status()
375 )));
376 }
377 response
378 .json::<Value>()
379 .await
380 .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
381}
382
383fn endpoint_from_card(
384 card_url: String,
385 allow_cleartext: bool,
386 requested_authority: &str,
387 target_agent: String,
388 card: &Value,
389) -> Result<ResolvedA2aEndpoint, A2aClientError> {
390 let base_url = card
391 .get("url")
392 .and_then(Value::as_str)
393 .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
394 let base_url = Url::parse(base_url).map_err(|error| {
395 A2aClientError::Discovery(format!("invalid A2A card url '{base_url}': {error}"))
396 })?;
397 ensure_cleartext_allowed(&base_url, allow_cleartext, "agent card")?;
398 let card_authority = url_authority(&base_url)?;
399 if !authorities_equivalent(&card_authority, requested_authority) {
400 return Err(A2aClientError::Discovery(format!(
401 "A2A agent card url authority mismatch: requested '{requested_authority}', card returned '{card_authority}'"
402 )));
403 }
404 let interfaces = card
405 .get("interfaces")
406 .and_then(Value::as_array)
407 .ok_or_else(|| {
408 A2aClientError::Discovery("A2A agent card missing interfaces".to_string())
409 })?;
410 let jsonrpc_interfaces: Vec<&Value> = interfaces
411 .iter()
412 .filter(|entry| entry.get("protocol").and_then(Value::as_str) == Some("jsonrpc"))
413 .collect();
414 if jsonrpc_interfaces.len() != 1 {
415 return Err(A2aClientError::Discovery(format!(
416 "A2A agent card must expose exactly one jsonrpc interface, found {}",
417 jsonrpc_interfaces.len()
418 )));
419 }
420 let interface_url = jsonrpc_interfaces[0]
421 .get("url")
422 .and_then(Value::as_str)
423 .ok_or_else(|| {
424 A2aClientError::Discovery("A2A jsonrpc interface missing url".to_string())
425 })?;
426 let rpc_url = base_url.join(interface_url).map_err(|error| {
427 A2aClientError::Discovery(format!(
428 "invalid A2A interface url '{interface_url}': {error}"
429 ))
430 })?;
431 ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
432 Ok(ResolvedA2aEndpoint {
433 card_url,
434 rpc_url: rpc_url.to_string(),
435 agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
436 target_agent,
437 })
438}
439
440fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
441 if allow_cleartext {
442 &["https", "http"]
443 } else {
444 &["https"]
445 }
446}
447
448fn should_try_cleartext_fallback(
461 scheme: &str,
462 allow_cleartext: bool,
463 error: &AgentCardFetchError,
464 authority: &str,
465) -> bool {
466 if !allow_cleartext || scheme != "https" {
467 return false;
468 }
469 match error {
470 AgentCardFetchError::Cancelled(_) => false,
471 AgentCardFetchError::ConnectRefused(_) => true,
472 AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
473 }
474}
475
476fn ensure_cleartext_allowed(
477 url: &Url,
478 allow_cleartext: bool,
479 label: &str,
480) -> Result<(), A2aClientError> {
481 if allow_cleartext || url.scheme() != "http" {
482 return Ok(());
483 }
484 Err(A2aClientError::Discovery(format!(
485 "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
486 )))
487}
488
489fn is_loopback_authority(authority: &str) -> bool {
490 let (host, _) = split_authority(authority);
491 if host.eq_ignore_ascii_case("localhost") {
492 return true;
493 }
494 if let Ok(ip) = host.parse::<std::net::IpAddr>() {
495 return ip.is_loopback();
496 }
497 false
498}
499
500fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
511 if card_authority == requested_authority {
512 return true;
513 }
514 let (_, card_port) = split_authority(card_authority);
515 let (_, requested_port) = split_authority(requested_authority);
516 if card_port != requested_port {
517 return false;
518 }
519 is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
520}
521
522fn split_authority(authority: &str) -> (&str, &str) {
525 let (host_raw, port) = if authority.starts_with('[') {
526 if let Some(end) = authority.rfind(']') {
528 let host = &authority[..=end];
529 let rest = &authority[end + 1..];
530 let port = rest.strip_prefix(':').unwrap_or("");
531 (host, port)
532 } else {
533 (authority, "")
534 }
535 } else {
536 match authority.rsplit_once(':') {
537 Some((host, port)) => (host, port),
538 None => (authority, ""),
539 }
540 };
541 let host = host_raw.trim_start_matches('[').trim_end_matches(']');
542 (host, port)
543}
544
545fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
546 match error {
547 AgentCardFetchError::Cancelled(message)
548 | AgentCardFetchError::Discovery(message)
549 | AgentCardFetchError::ConnectRefused(message) => message.clone(),
550 }
551}
552
553fn is_connect_refused(error: &reqwest::Error) -> bool {
554 if !error.is_connect() {
555 return false;
556 }
557 let mut source = error.source();
558 while let Some(cause) = source {
559 if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
560 if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
561 return true;
562 }
563 }
564 source = cause.source();
565 }
566 false
567}
568
569fn url_authority(url: &Url) -> Result<String, A2aClientError> {
570 let host = url
571 .host_str()
572 .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
573 Ok(if let Some(port) = url.port() {
574 format!("{host}:{port}")
575 } else {
576 host.to_string()
577 })
578}
579
580async fn send_jsonrpc(
581 rpc_url: &str,
582 request: &Value,
583 trace_id: &str,
584 cancel_rx: &mut broadcast::Receiver<()>,
585) -> Result<Value, A2aClientError> {
586 let response = send_http(
587 crate::llm::shared_blocking_client()
588 .post(rpc_url)
589 .header(reqwest::header::CONTENT_TYPE, "application/json")
590 .header("A2A-Version", A2A_PROTOCOL_VERSION)
591 .header("A2A-Trace-Id", trace_id)
592 .json(request),
593 cancel_rx,
594 "A2A task dispatch cancelled",
595 )
596 .await?;
597 if !response.status().is_success() {
598 return Err(A2aClientError::Protocol(format!(
599 "A2A task dispatch returned HTTP {}",
600 response.status()
601 )));
602 }
603 response
604 .json::<Value>()
605 .await
606 .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
607}
608
609async fn send_http(
610 request: reqwest::RequestBuilder,
611 cancel_rx: &mut broadcast::Receiver<()>,
612 cancelled_message: &'static str,
613) -> Result<reqwest::Response, A2aClientError> {
614 tokio::select! {
615 response = request.send() => response
616 .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
617 _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
618 }
619}
620
621fn task_state(task: &Value) -> Result<&str, A2aClientError> {
622 task.pointer("/status/state")
623 .and_then(Value::as_str)
624 .filter(|value| !value.is_empty())
625 .ok_or_else(|| {
626 A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
627 })
628}
629
630fn extract_inline_result(task: &Value) -> Value {
631 let text = task
632 .get("history")
633 .and_then(Value::as_array)
634 .and_then(|history| {
635 history.iter().rev().find_map(|message| {
636 let role = message.get("role").and_then(Value::as_str)?;
637 if role != "agent" {
638 return None;
639 }
640 message
641 .get("parts")
642 .and_then(Value::as_array)
643 .and_then(|parts| {
644 parts.iter().find_map(|part| {
645 if part.get("type").and_then(Value::as_str) == Some("text") {
646 part.get("text").and_then(Value::as_str).map(str::trim_end)
647 } else {
648 None
649 }
650 })
651 })
652 })
653 });
654 match text {
655 Some(text) if !text.is_empty() => {
656 serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
657 }
658 _ => task.clone(),
659 }
660}
661
662async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
663 let _ = cancel_rx.recv().await;
664}
665
666#[cfg(test)]
667mod tests {
668 use super::*;
669
670 #[test]
671 fn target_agent_label_prefers_path() {
672 assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
673 assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
674 }
675
676 #[test]
677 fn extract_inline_result_parses_json_text() {
678 let task = serde_json::json!({
679 "history": [
680 {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
681 {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
682 ]
683 });
684 assert_eq!(
685 extract_inline_result(&task),
686 serde_json::json!({"trace_id": "trace_123"})
687 );
688 }
689
690 #[test]
691 fn discovery_prefers_https_before_http() {
692 assert_eq!(card_resolution_schemes(false), ["https"]);
693 assert_eq!(card_resolution_schemes(true), ["https", "http"]);
694 }
695
696 #[test]
697 fn cleartext_fallback_only_after_https_connect_refused() {
698 assert!(should_try_cleartext_fallback(
699 "https",
700 true,
701 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
702 "reviewer.example:443",
703 ));
704 assert!(!should_try_cleartext_fallback(
705 "http",
706 true,
707 &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
708 "reviewer.example:443",
709 ));
710 assert!(!should_try_cleartext_fallback(
711 "https",
712 true,
713 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
714 "reviewer.example:443",
715 ));
716 }
717
718 #[test]
719 fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
720 for authority in [
721 "127.0.0.1:8080",
722 "localhost:8080",
723 "[::1]:8080",
724 "127.1.2.3:9000",
725 ] {
726 assert!(
727 !should_try_cleartext_fallback(
728 "https",
729 false,
730 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
731 authority,
732 ),
733 "cleartext fallback must stay disabled without opt-in for '{authority}'"
734 );
735 }
736 }
737
738 #[test]
739 fn cleartext_fallback_allows_loopback_after_opt_in() {
740 for authority in [
743 "127.0.0.1:8080",
744 "localhost:8080",
745 "[::1]:8080",
746 "127.1.2.3:9000",
747 ] {
748 assert!(
749 should_try_cleartext_fallback(
750 "https",
751 true,
752 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
753 authority,
754 ),
755 "expected cleartext fallback for loopback authority '{authority}'"
756 );
757 }
758 }
759
760 #[test]
761 fn cleartext_fallback_denies_external_tls_failures() {
762 for authority in [
765 "reviewer.example:443",
766 "8.8.8.8:443",
767 "192.168.1.10:8080",
768 "10.0.0.5:8443",
769 ] {
770 assert!(
771 !should_try_cleartext_fallback(
772 "https",
773 true,
774 &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
775 authority,
776 ),
777 "cleartext fallback must be denied for external authority '{authority}'"
778 );
779 }
780 }
781
782 #[test]
783 fn is_loopback_authority_recognises_loopback_forms() {
784 assert!(is_loopback_authority("127.0.0.1:8080"));
785 assert!(is_loopback_authority("localhost:8080"));
786 assert!(is_loopback_authority("LOCALHOST:9000"));
787 assert!(is_loopback_authority("[::1]:8080"));
788 assert!(is_loopback_authority("127.5.5.5:1234"));
789 assert!(!is_loopback_authority("8.8.8.8:443"));
790 assert!(!is_loopback_authority("192.168.1.10:8080"));
791 assert!(!is_loopback_authority("example.com:443"));
792 assert!(!is_loopback_authority("reviewer.prod"));
793 }
794
795 #[test]
796 fn endpoint_from_card_rejects_card_url_authority_mismatch() {
797 let error = endpoint_from_card(
798 "https://trusted.example/.well-known/a2a-agent".to_string(),
799 false,
800 "trusted.example",
801 "triage".to_string(),
802 &serde_json::json!({
803 "url": "https://evil.example",
804 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
805 }),
806 )
807 .unwrap_err();
808 assert_eq!(
809 error.to_string(),
810 "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
811 );
812 }
813
814 #[test]
815 fn endpoint_from_card_rejects_cleartext_without_opt_in() {
816 let error = endpoint_from_card(
817 "https://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
818 false,
819 "127.0.0.1:8080",
820 "triage".to_string(),
821 &serde_json::json!({
822 "url": "http://localhost:8080",
823 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
824 }),
825 )
826 .expect_err("cleartext card should require explicit opt-in");
827 assert!(error
828 .to_string()
829 .contains("requires `allow_cleartext = true`"));
830 }
831
832 #[test]
833 fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
834 let card = serde_json::json!({
838 "url": "http://localhost:8080",
839 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
840 });
841 let endpoint = endpoint_from_card(
842 "http://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
843 true,
844 "127.0.0.1:8080",
845 "triage".to_string(),
846 &card,
847 )
848 .expect("loopback alias pair should be accepted");
849 assert_eq!(endpoint.rpc_url, "http://localhost:8080/rpc");
850
851 let card_v6 = serde_json::json!({
853 "url": "http://[::1]:8080",
854 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
855 });
856 let endpoint_v6 = endpoint_from_card(
857 "http://localhost:8080/.well-known/a2a-agent".to_string(),
858 true,
859 "localhost:8080",
860 "triage".to_string(),
861 &card_v6,
862 )
863 .expect("IPv6 loopback alias should be accepted");
864 assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/rpc");
865
866 let card_wrong_port = serde_json::json!({
868 "url": "http://localhost:9000",
869 "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
870 });
871 let error = endpoint_from_card(
872 "http://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
873 true,
874 "127.0.0.1:8080",
875 "triage".to_string(),
876 &card_wrong_port,
877 )
878 .expect_err("mismatched ports must still be rejected even on loopback");
879 assert!(error
880 .to_string()
881 .contains("A2A agent card url authority mismatch"));
882 }
883
884 #[test]
885 fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
886 assert!(!authorities_equivalent(
887 "internal.corp.example:443",
888 "trusted.example:443",
889 ));
890 assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
891 assert!(authorities_equivalent(
892 "trusted.example:443",
893 "trusted.example:443",
894 ));
895 }
896}