Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use reqwest::Url;
4use serde_json::Value;
5use tokio::sync::broadcast;
6
7use crate::triggers::TriggerEvent;
8
9const A2A_AGENT_CARD_PATH: &str = ".well-known/a2a-agent";
10const A2A_PROTOCOL_VERSION: &str = "1.0.0";
11const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
12const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
13
14#[derive(Clone, Debug, PartialEq, Eq)]
15pub struct ResolvedA2aEndpoint {
16    pub card_url: String,
17    pub rpc_url: String,
18    pub agent_id: Option<String>,
19    pub target_agent: String,
20}
21
22#[derive(Clone, Debug, PartialEq)]
23pub enum DispatchAck {
24    InlineResult {
25        task_id: String,
26        result: Value,
27    },
28    PendingTask {
29        task_id: String,
30        state: String,
31        handle: Value,
32    },
33}
34
35#[derive(Debug)]
36pub enum A2aClientError {
37    InvalidTarget(String),
38    Discovery(String),
39    Protocol(String),
40    Cancelled(String),
41}
42
43impl std::fmt::Display for A2aClientError {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        match self {
46            Self::InvalidTarget(message)
47            | Self::Discovery(message)
48            | Self::Protocol(message)
49            | Self::Cancelled(message) => f.write_str(message),
50        }
51    }
52}
53
54impl std::error::Error for A2aClientError {}
55
56#[derive(Debug)]
57enum AgentCardFetchError {
58    Cancelled(String),
59    Discovery(String),
60    ConnectRefused(String),
61}
62
63pub async fn dispatch_trigger_event(
64    raw_target: &str,
65    allow_cleartext: bool,
66    binding_id: &str,
67    binding_key: &str,
68    event: &TriggerEvent,
69    cancel_rx: &mut broadcast::Receiver<()>,
70) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
71    let started = std::time::Instant::now();
72    let target = match parse_target(raw_target) {
73        Ok(target) => target,
74        Err(error) => {
75            record_a2a_metric(raw_target, "failed", started.elapsed());
76            return Err(error);
77        }
78    };
79    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
80        Ok(endpoint) => endpoint,
81        Err(error) => {
82            record_a2a_metric(raw_target, "failed", started.elapsed());
83            return Err(error);
84        }
85    };
86    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
87    let envelope = serde_json::json!({
88        "kind": "harn.trigger.dispatch",
89        "message_id": message_id,
90        "trace_id": event.trace_id.0,
91        "event_id": event.id.0,
92        "trigger_id": binding_id,
93        "binding_key": binding_key,
94        "target_agent": endpoint.target_agent,
95        "event": event,
96    });
97    let text = serde_json::to_string(&envelope)
98        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
99    let push_config = push_notification_config();
100    let mut params = serde_json::json!({
101        "contextId": event.trace_id.0,
102        "message": {
103            "messageId": message_id,
104            "role": "user",
105            "parts": [{
106                "type": "text",
107                "text": text,
108            }],
109            "metadata": {
110                "kind": "harn.trigger.dispatch",
111                "trace_id": event.trace_id.0,
112                "event_id": event.id.0,
113                "trigger_id": binding_id,
114                "binding_key": binding_key,
115                "target_agent": endpoint.target_agent,
116            },
117        },
118    });
119    if let Some(config) = push_config.clone() {
120        params["configuration"] = serde_json::json!({
121            "blocking": false,
122            "returnImmediately": true,
123            "pushNotificationConfig": config,
124        });
125    }
126    let request = crate::jsonrpc::request(message_id.clone(), "a2a.SendMessage", params);
127
128    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
129        Ok(body) => body,
130        Err(error) => {
131            record_a2a_metric(raw_target, "failed", started.elapsed());
132            return Err(error);
133        }
134    };
135    let result = match body.get("result").cloned().ok_or_else(|| {
136        if let Some(error) = body.get("error") {
137            let message = error
138                .get("message")
139                .and_then(Value::as_str)
140                .unwrap_or("unknown A2A error");
141            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
142        } else {
143            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
144        }
145    }) {
146        Ok(result) => result,
147        Err(error) => {
148            record_a2a_metric(raw_target, "failed", started.elapsed());
149            return Err(error);
150        }
151    };
152
153    let task_id = match result
154        .get("id")
155        .and_then(Value::as_str)
156        .filter(|value| !value.is_empty())
157        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
158    {
159        Ok(task_id) => task_id.to_string(),
160        Err(error) => {
161            record_a2a_metric(raw_target, "failed", started.elapsed());
162            return Err(error);
163        }
164    };
165    let state = match task_state(&result) {
166        Ok(state) => state.to_string(),
167        Err(error) => {
168            record_a2a_metric(raw_target, "failed", started.elapsed());
169            return Err(error);
170        }
171    };
172
173    if state == "completed" {
174        let inline = extract_inline_result(&result);
175        record_a2a_metric(raw_target, "succeeded", started.elapsed());
176        return Ok((
177            endpoint,
178            DispatchAck::InlineResult {
179                task_id,
180                result: inline,
181            },
182        ));
183    }
184
185    if let Some(config) = push_config {
186        register_push_notification_config(
187            &endpoint.rpc_url,
188            &task_id,
189            config,
190            &event.trace_id.0,
191            cancel_rx,
192        )
193        .await
194        .inspect_err(|_| {
195            record_a2a_metric(raw_target, "failed", started.elapsed());
196        })?;
197    }
198    record_a2a_metric(raw_target, "succeeded", started.elapsed());
199    Ok((
200        endpoint.clone(),
201        DispatchAck::PendingTask {
202            task_id: task_id.clone(),
203            state: state.clone(),
204            handle: serde_json::json!({
205                "kind": "a2a_task_handle",
206                "task_id": task_id,
207                "state": state,
208                "target_agent": endpoint.target_agent,
209                "rpc_url": endpoint.rpc_url,
210                "card_url": endpoint.card_url,
211                "agent_id": endpoint.agent_id,
212            }),
213        },
214    ))
215}
216
217fn push_notification_config() -> Option<Value> {
218    let url = std::env::var(A2A_PUSH_URL_ENV)
219        .ok()
220        .map(|value| value.trim().to_string())
221        .filter(|value| !value.is_empty())?;
222    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
223        .ok()
224        .map(|value| value.trim().to_string())
225        .filter(|value| !value.is_empty());
226    let mut config = serde_json::json!({ "url": url });
227    if let Some(token) = token {
228        config["token"] = Value::String(token.clone());
229        config["authentication"] = serde_json::json!({
230            "scheme": "Bearer",
231            "credentials": token,
232        });
233    }
234    Some(config)
235}
236
237async fn register_push_notification_config(
238    rpc_url: &str,
239    task_id: &str,
240    config: Value,
241    trace_id: &str,
242    cancel_rx: &mut broadcast::Receiver<()>,
243) -> Result<(), A2aClientError> {
244    let request = crate::jsonrpc::request(
245        format!("{trace_id}.{task_id}.push-config"),
246        "CreateTaskPushNotificationConfig",
247        serde_json::json!({
248            "taskId": task_id,
249            "pushNotificationConfig": config,
250        }),
251    );
252    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
253    if response.get("error").is_some() {
254        return Err(A2aClientError::Protocol(format!(
255            "A2A push notification registration failed: {}",
256            response["error"]
257        )));
258    }
259    Ok(())
260}
261
262fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
263    if let Some(metrics) = crate::active_metrics_registry() {
264        metrics.record_a2a_hop(target, outcome, duration);
265    }
266}
267
268pub fn target_agent_label(raw_target: &str) -> String {
269    parse_target(raw_target)
270        .map(|target| target.target_agent_label())
271        .unwrap_or_else(|_| raw_target.to_string())
272}
273
274#[derive(Clone, Debug)]
275struct ParsedTarget {
276    authority: String,
277    target_agent: String,
278}
279
280impl ParsedTarget {
281    fn target_agent_label(&self) -> String {
282        if self.target_agent.is_empty() {
283            self.authority.clone()
284        } else {
285            self.target_agent.clone()
286        }
287    }
288}
289
290fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
291    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
292        A2aClientError::InvalidTarget(format!(
293            "invalid a2a dispatch target '{raw_target}': {error}"
294        ))
295    })?;
296    let host = parsed.host_str().ok_or_else(|| {
297        A2aClientError::InvalidTarget(format!(
298            "invalid a2a dispatch target '{raw_target}': missing host"
299        ))
300    })?;
301    let authority = if let Some(port) = parsed.port() {
302        format!("{host}:{port}")
303    } else {
304        host.to_string()
305    };
306    Ok(ParsedTarget {
307        authority,
308        target_agent: parsed.path().trim_start_matches('/').to_string(),
309    })
310}
311
312async fn resolve_endpoint(
313    target: &ParsedTarget,
314    allow_cleartext: bool,
315    cancel_rx: &mut broadcast::Receiver<()>,
316) -> Result<ResolvedA2aEndpoint, A2aClientError> {
317    let mut last_error = None;
318    for scheme in card_resolution_schemes(allow_cleartext) {
319        let card_url = format!("{scheme}://{}/{A2A_AGENT_CARD_PATH}", target.authority);
320        match fetch_agent_card(&card_url, cancel_rx).await {
321            Ok(card) => {
322                return endpoint_from_card(
323                    card_url,
324                    allow_cleartext,
325                    &target.authority,
326                    target.target_agent.clone(),
327                    &card,
328                );
329            }
330            Err(AgentCardFetchError::Cancelled(message)) => {
331                return Err(A2aClientError::Cancelled(message));
332            }
333            Err(error) => {
334                let message = agent_card_fetch_error_message(&error);
335                last_error = Some(message);
336                if should_try_cleartext_fallback(scheme, allow_cleartext, &error, &target.authority)
337                {
338                    continue;
339                }
340                break;
341            }
342        }
343    }
344    Err(A2aClientError::Discovery(format!(
345        "could not resolve A2A agent card for '{}': {}",
346        target.authority,
347        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
348    )))
349}
350
351async fn fetch_agent_card(
352    card_url: &str,
353    cancel_rx: &mut broadcast::Receiver<()>,
354) -> Result<Value, AgentCardFetchError> {
355    let response = tokio::select! {
356        response = crate::llm::shared_utility_client().get(card_url).send() => {
357            match response {
358                Ok(response) => Ok(response),
359                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
360                    format!("A2A HTTP request failed: {error}")
361                )),
362                Err(error) => Err(AgentCardFetchError::Discovery(
363                    format!("A2A HTTP request failed: {error}")
364                )),
365            }
366        }
367        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
368            "A2A agent-card fetch cancelled".to_string()
369        )),
370    }?;
371    if !response.status().is_success() {
372        return Err(AgentCardFetchError::Discovery(format!(
373            "GET {card_url} returned HTTP {}",
374            response.status()
375        )));
376    }
377    response
378        .json::<Value>()
379        .await
380        .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
381}
382
383fn endpoint_from_card(
384    card_url: String,
385    allow_cleartext: bool,
386    requested_authority: &str,
387    target_agent: String,
388    card: &Value,
389) -> Result<ResolvedA2aEndpoint, A2aClientError> {
390    let base_url = card
391        .get("url")
392        .and_then(Value::as_str)
393        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
394    let base_url = Url::parse(base_url).map_err(|error| {
395        A2aClientError::Discovery(format!("invalid A2A card url '{base_url}': {error}"))
396    })?;
397    ensure_cleartext_allowed(&base_url, allow_cleartext, "agent card")?;
398    let card_authority = url_authority(&base_url)?;
399    if !authorities_equivalent(&card_authority, requested_authority) {
400        return Err(A2aClientError::Discovery(format!(
401            "A2A agent card url authority mismatch: requested '{requested_authority}', card returned '{card_authority}'"
402        )));
403    }
404    let interfaces = card
405        .get("interfaces")
406        .and_then(Value::as_array)
407        .ok_or_else(|| {
408            A2aClientError::Discovery("A2A agent card missing interfaces".to_string())
409        })?;
410    let jsonrpc_interfaces: Vec<&Value> = interfaces
411        .iter()
412        .filter(|entry| entry.get("protocol").and_then(Value::as_str) == Some("jsonrpc"))
413        .collect();
414    if jsonrpc_interfaces.len() != 1 {
415        return Err(A2aClientError::Discovery(format!(
416            "A2A agent card must expose exactly one jsonrpc interface, found {}",
417            jsonrpc_interfaces.len()
418        )));
419    }
420    let interface_url = jsonrpc_interfaces[0]
421        .get("url")
422        .and_then(Value::as_str)
423        .ok_or_else(|| {
424            A2aClientError::Discovery("A2A jsonrpc interface missing url".to_string())
425        })?;
426    let rpc_url = base_url.join(interface_url).map_err(|error| {
427        A2aClientError::Discovery(format!(
428            "invalid A2A interface url '{interface_url}': {error}"
429        ))
430    })?;
431    ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
432    Ok(ResolvedA2aEndpoint {
433        card_url,
434        rpc_url: rpc_url.to_string(),
435        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
436        target_agent,
437    })
438}
439
440fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
441    if allow_cleartext {
442        &["https", "http"]
443    } else {
444        &["https"]
445    }
446}
447
448/// Decide whether an HTTPS discovery failure should fall through to cleartext.
449///
450/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
451/// port isn't listening" case. TLS handshake failures to an external host MUST
452/// NOT silently downgrade to HTTP, because an active network attacker can
453/// forge TLS errors to trigger a downgrade.
454///
455/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
456/// discovery-style error. They cover the standard local-dev case where
457/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
458/// model for loopback is already bounded — any attacker who can reach the
459/// local loopback already has code execution on the box.
460fn should_try_cleartext_fallback(
461    scheme: &str,
462    allow_cleartext: bool,
463    error: &AgentCardFetchError,
464    authority: &str,
465) -> bool {
466    if !allow_cleartext || scheme != "https" {
467        return false;
468    }
469    match error {
470        AgentCardFetchError::Cancelled(_) => false,
471        AgentCardFetchError::ConnectRefused(_) => true,
472        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
473    }
474}
475
476fn ensure_cleartext_allowed(
477    url: &Url,
478    allow_cleartext: bool,
479    label: &str,
480) -> Result<(), A2aClientError> {
481    if allow_cleartext || url.scheme() != "http" {
482        return Ok(());
483    }
484    Err(A2aClientError::Discovery(format!(
485        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
486    )))
487}
488
489fn is_loopback_authority(authority: &str) -> bool {
490    let (host, _) = split_authority(authority);
491    if host.eq_ignore_ascii_case("localhost") {
492        return true;
493    }
494    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
495        return ip.is_loopback();
496    }
497    false
498}
499
500/// Return true when two authority strings refer to the same A2A endpoint.
501///
502/// Exact string equality is the default — an agent card that reports a
503/// different host than the one the client asked for is a security-relevant
504/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
505/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
506/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
507/// hardcodes `http://localhost:PORT` in its agent card even when a caller
508/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
509/// mismatch in that case without widening the external-host trust boundary.
510fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
511    if card_authority == requested_authority {
512        return true;
513    }
514    let (_, card_port) = split_authority(card_authority);
515    let (_, requested_port) = split_authority(requested_authority);
516    if card_port != requested_port {
517        return false;
518    }
519    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
520}
521
522/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
523/// `[::1]:8080` becomes `("::1", "8080")`.
524fn split_authority(authority: &str) -> (&str, &str) {
525    let (host_raw, port) = if authority.starts_with('[') {
526        // IPv6 bracketed form: "[addr]:port" or "[addr]".
527        if let Some(end) = authority.rfind(']') {
528            let host = &authority[..=end];
529            let rest = &authority[end + 1..];
530            let port = rest.strip_prefix(':').unwrap_or("");
531            (host, port)
532        } else {
533            (authority, "")
534        }
535    } else {
536        match authority.rsplit_once(':') {
537            Some((host, port)) => (host, port),
538            None => (authority, ""),
539        }
540    };
541    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
542    (host, port)
543}
544
545fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
546    match error {
547        AgentCardFetchError::Cancelled(message)
548        | AgentCardFetchError::Discovery(message)
549        | AgentCardFetchError::ConnectRefused(message) => message.clone(),
550    }
551}
552
553fn is_connect_refused(error: &reqwest::Error) -> bool {
554    if !error.is_connect() {
555        return false;
556    }
557    let mut source = error.source();
558    while let Some(cause) = source {
559        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
560            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
561                return true;
562            }
563        }
564        source = cause.source();
565    }
566    false
567}
568
569fn url_authority(url: &Url) -> Result<String, A2aClientError> {
570    let host = url
571        .host_str()
572        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
573    Ok(if let Some(port) = url.port() {
574        format!("{host}:{port}")
575    } else {
576        host.to_string()
577    })
578}
579
580async fn send_jsonrpc(
581    rpc_url: &str,
582    request: &Value,
583    trace_id: &str,
584    cancel_rx: &mut broadcast::Receiver<()>,
585) -> Result<Value, A2aClientError> {
586    let response = send_http(
587        crate::llm::shared_blocking_client()
588            .post(rpc_url)
589            .header(reqwest::header::CONTENT_TYPE, "application/json")
590            .header("A2A-Version", A2A_PROTOCOL_VERSION)
591            .header("A2A-Trace-Id", trace_id)
592            .json(request),
593        cancel_rx,
594        "A2A task dispatch cancelled",
595    )
596    .await?;
597    if !response.status().is_success() {
598        return Err(A2aClientError::Protocol(format!(
599            "A2A task dispatch returned HTTP {}",
600            response.status()
601        )));
602    }
603    response
604        .json::<Value>()
605        .await
606        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
607}
608
609async fn send_http(
610    request: reqwest::RequestBuilder,
611    cancel_rx: &mut broadcast::Receiver<()>,
612    cancelled_message: &'static str,
613) -> Result<reqwest::Response, A2aClientError> {
614    tokio::select! {
615        response = request.send() => response
616            .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
617        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
618    }
619}
620
621fn task_state(task: &Value) -> Result<&str, A2aClientError> {
622    task.pointer("/status/state")
623        .and_then(Value::as_str)
624        .filter(|value| !value.is_empty())
625        .ok_or_else(|| {
626            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
627        })
628}
629
630fn extract_inline_result(task: &Value) -> Value {
631    let text = task
632        .get("history")
633        .and_then(Value::as_array)
634        .and_then(|history| {
635            history.iter().rev().find_map(|message| {
636                let role = message.get("role").and_then(Value::as_str)?;
637                if role != "agent" {
638                    return None;
639                }
640                message
641                    .get("parts")
642                    .and_then(Value::as_array)
643                    .and_then(|parts| {
644                        parts.iter().find_map(|part| {
645                            if part.get("type").and_then(Value::as_str) == Some("text") {
646                                part.get("text").and_then(Value::as_str).map(str::trim_end)
647                            } else {
648                                None
649                            }
650                        })
651                    })
652            })
653        });
654    match text {
655        Some(text) if !text.is_empty() => {
656            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
657        }
658        _ => task.clone(),
659    }
660}
661
662async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
663    let _ = cancel_rx.recv().await;
664}
665
666#[cfg(test)]
667mod tests {
668    use super::*;
669
670    #[test]
671    fn target_agent_label_prefers_path() {
672        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
673        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
674    }
675
676    #[test]
677    fn extract_inline_result_parses_json_text() {
678        let task = serde_json::json!({
679            "history": [
680                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
681                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
682            ]
683        });
684        assert_eq!(
685            extract_inline_result(&task),
686            serde_json::json!({"trace_id": "trace_123"})
687        );
688    }
689
690    #[test]
691    fn discovery_prefers_https_before_http() {
692        assert_eq!(card_resolution_schemes(false), ["https"]);
693        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
694    }
695
696    #[test]
697    fn cleartext_fallback_only_after_https_connect_refused() {
698        assert!(should_try_cleartext_fallback(
699            "https",
700            true,
701            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
702            "reviewer.example:443",
703        ));
704        assert!(!should_try_cleartext_fallback(
705            "http",
706            true,
707            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
708            "reviewer.example:443",
709        ));
710        assert!(!should_try_cleartext_fallback(
711            "https",
712            true,
713            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
714            "reviewer.example:443",
715        ));
716    }
717
718    #[test]
719    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
720        for authority in [
721            "127.0.0.1:8080",
722            "localhost:8080",
723            "[::1]:8080",
724            "127.1.2.3:9000",
725        ] {
726            assert!(
727                !should_try_cleartext_fallback(
728                    "https",
729                    false,
730                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
731                    authority,
732                ),
733                "cleartext fallback must stay disabled without opt-in for '{authority}'"
734            );
735        }
736    }
737
738    #[test]
739    fn cleartext_fallback_allows_loopback_after_opt_in() {
740        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
741        // still need the HTTP fallback to succeed.
742        for authority in [
743            "127.0.0.1:8080",
744            "localhost:8080",
745            "[::1]:8080",
746            "127.1.2.3:9000",
747        ] {
748            assert!(
749                should_try_cleartext_fallback(
750                    "https",
751                    true,
752                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
753                    authority,
754                ),
755                "expected cleartext fallback for loopback authority '{authority}'"
756            );
757        }
758    }
759
760    #[test]
761    fn cleartext_fallback_denies_external_tls_failures() {
762        // External target + TLS handshake failure must not downgrade — an
763        // attacker able to forge TLS errors shouldn't force cleartext.
764        for authority in [
765            "reviewer.example:443",
766            "8.8.8.8:443",
767            "192.168.1.10:8080",
768            "10.0.0.5:8443",
769        ] {
770            assert!(
771                !should_try_cleartext_fallback(
772                    "https",
773                    true,
774                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
775                    authority,
776                ),
777                "cleartext fallback must be denied for external authority '{authority}'"
778            );
779        }
780    }
781
782    #[test]
783    fn is_loopback_authority_recognises_loopback_forms() {
784        assert!(is_loopback_authority("127.0.0.1:8080"));
785        assert!(is_loopback_authority("localhost:8080"));
786        assert!(is_loopback_authority("LOCALHOST:9000"));
787        assert!(is_loopback_authority("[::1]:8080"));
788        assert!(is_loopback_authority("127.5.5.5:1234"));
789        assert!(!is_loopback_authority("8.8.8.8:443"));
790        assert!(!is_loopback_authority("192.168.1.10:8080"));
791        assert!(!is_loopback_authority("example.com:443"));
792        assert!(!is_loopback_authority("reviewer.prod"));
793    }
794
795    #[test]
796    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
797        let error = endpoint_from_card(
798            "https://trusted.example/.well-known/a2a-agent".to_string(),
799            false,
800            "trusted.example",
801            "triage".to_string(),
802            &serde_json::json!({
803                "url": "https://evil.example",
804                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
805            }),
806        )
807        .unwrap_err();
808        assert_eq!(
809            error.to_string(),
810            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
811        );
812    }
813
814    #[test]
815    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
816        let error = endpoint_from_card(
817            "https://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
818            false,
819            "127.0.0.1:8080",
820            "triage".to_string(),
821            &serde_json::json!({
822                "url": "http://localhost:8080",
823                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
824            }),
825        )
826        .expect_err("cleartext card should require explicit opt-in");
827        assert!(error
828            .to_string()
829            .contains("requires `allow_cleartext = true`"));
830    }
831
832    #[test]
833    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
834        // harn serve reports `http://localhost:PORT` in its card, but clients
835        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
836        // the authority check must not spuriously reject the pair.
837        let card = serde_json::json!({
838            "url": "http://localhost:8080",
839            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
840        });
841        let endpoint = endpoint_from_card(
842            "http://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
843            true,
844            "127.0.0.1:8080",
845            "triage".to_string(),
846            &card,
847        )
848        .expect("loopback alias pair should be accepted");
849        assert_eq!(endpoint.rpc_url, "http://localhost:8080/rpc");
850
851        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
852        let card_v6 = serde_json::json!({
853            "url": "http://[::1]:8080",
854            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
855        });
856        let endpoint_v6 = endpoint_from_card(
857            "http://localhost:8080/.well-known/a2a-agent".to_string(),
858            true,
859            "localhost:8080",
860            "triage".to_string(),
861            &card_v6,
862        )
863        .expect("IPv6 loopback alias should be accepted");
864        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/rpc");
865
866        // Port mismatch is still rejected even on loopback.
867        let card_wrong_port = serde_json::json!({
868            "url": "http://localhost:9000",
869            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
870        });
871        let error = endpoint_from_card(
872            "http://127.0.0.1:8080/.well-known/a2a-agent".to_string(),
873            true,
874            "127.0.0.1:8080",
875            "triage".to_string(),
876            &card_wrong_port,
877        )
878        .expect_err("mismatched ports must still be rejected even on loopback");
879        assert!(error
880            .to_string()
881            .contains("A2A agent card url authority mismatch"));
882    }
883
884    #[test]
885    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
886        assert!(!authorities_equivalent(
887            "internal.corp.example:443",
888            "trusted.example:443",
889        ));
890        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
891        assert!(authorities_equivalent(
892            "trusted.example:443",
893            "trusted.example:443",
894        ));
895    }
896}