Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use reqwest::Url;
4use serde_json::Value;
5use tokio::sync::broadcast;
6
7use crate::triggers::TriggerEvent;
8
9const A2A_AGENT_CARD_PATHS: &[&str] = &[
10    ".well-known/agent-card.json",
11    ".well-known/a2a-agent",
12    ".well-known/agent.json",
13    "agent/card",
14];
15const A2A_PROTOCOL_VERSION: &str = "1.0";
16const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
17const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
18
19#[derive(Clone, Debug, PartialEq, Eq)]
20pub struct ResolvedA2aEndpoint {
21    pub card_url: String,
22    pub rpc_url: String,
23    pub agent_id: Option<String>,
24    pub target_agent: String,
25}
26
27#[derive(Clone, Debug, PartialEq)]
28pub enum DispatchAck {
29    InlineResult {
30        task_id: String,
31        result: Value,
32    },
33    PendingTask {
34        task_id: String,
35        state: String,
36        handle: Value,
37    },
38}
39
40#[derive(Debug)]
41pub enum A2aClientError {
42    InvalidTarget(String),
43    Discovery(String),
44    Protocol(String),
45    Cancelled(String),
46}
47
48impl std::fmt::Display for A2aClientError {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::InvalidTarget(message)
52            | Self::Discovery(message)
53            | Self::Protocol(message)
54            | Self::Cancelled(message) => f.write_str(message),
55        }
56    }
57}
58
59impl std::error::Error for A2aClientError {}
60
61#[derive(Debug)]
62enum AgentCardFetchError {
63    Cancelled(String),
64    Discovery(String),
65    ConnectRefused(String),
66}
67
68pub async fn dispatch_trigger_event(
69    raw_target: &str,
70    allow_cleartext: bool,
71    binding_id: &str,
72    binding_key: &str,
73    event: &TriggerEvent,
74    cancel_rx: &mut broadcast::Receiver<()>,
75) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
76    let started = std::time::Instant::now();
77    let target = match parse_target(raw_target) {
78        Ok(target) => target,
79        Err(error) => {
80            record_a2a_metric(raw_target, "failed", started.elapsed());
81            return Err(error);
82        }
83    };
84    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
85        Ok(endpoint) => endpoint,
86        Err(error) => {
87            record_a2a_metric(raw_target, "failed", started.elapsed());
88            return Err(error);
89        }
90    };
91    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
92    let envelope = serde_json::json!({
93        "kind": "harn.trigger.dispatch",
94        "message_id": message_id,
95        "trace_id": event.trace_id.0,
96        "event_id": event.id.0,
97        "trigger_id": binding_id,
98        "binding_key": binding_key,
99        "target_agent": endpoint.target_agent,
100        "event": event,
101    });
102    let text = serde_json::to_string(&envelope)
103        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
104    let push_config = push_notification_config();
105    let mut params = serde_json::json!({
106        "contextId": event.trace_id.0,
107        "message": {
108            "messageId": message_id,
109            "role": "user",
110            "parts": [{
111                "type": "text",
112                "text": text,
113            }],
114            "metadata": {
115                "kind": "harn.trigger.dispatch",
116                "trace_id": event.trace_id.0,
117                "event_id": event.id.0,
118                "trigger_id": binding_id,
119                "binding_key": binding_key,
120                "target_agent": endpoint.target_agent,
121            },
122        },
123    });
124    if let Some(config) = push_config.clone() {
125        params["configuration"] = serde_json::json!({
126            "blocking": false,
127            "returnImmediately": true,
128            "pushNotificationConfig": config,
129        });
130    }
131    let request = crate::jsonrpc::request(message_id.clone(), "a2a.SendMessage", params);
132
133    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
134        Ok(body) => body,
135        Err(error) => {
136            record_a2a_metric(raw_target, "failed", started.elapsed());
137            return Err(error);
138        }
139    };
140    let result = match body.get("result").cloned().ok_or_else(|| {
141        if let Some(error) = body.get("error") {
142            let message = error
143                .get("message")
144                .and_then(Value::as_str)
145                .unwrap_or("unknown A2A error");
146            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
147        } else {
148            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
149        }
150    }) {
151        Ok(result) => result,
152        Err(error) => {
153            record_a2a_metric(raw_target, "failed", started.elapsed());
154            return Err(error);
155        }
156    };
157
158    let task_id = match result
159        .get("id")
160        .and_then(Value::as_str)
161        .filter(|value| !value.is_empty())
162        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
163    {
164        Ok(task_id) => task_id.to_string(),
165        Err(error) => {
166            record_a2a_metric(raw_target, "failed", started.elapsed());
167            return Err(error);
168        }
169    };
170    let state = match task_state(&result) {
171        Ok(state) => state.to_string(),
172        Err(error) => {
173            record_a2a_metric(raw_target, "failed", started.elapsed());
174            return Err(error);
175        }
176    };
177
178    if state == "completed" {
179        let inline = extract_inline_result(&result);
180        record_a2a_metric(raw_target, "succeeded", started.elapsed());
181        return Ok((
182            endpoint,
183            DispatchAck::InlineResult {
184                task_id,
185                result: inline,
186            },
187        ));
188    }
189
190    if let Some(config) = push_config {
191        register_push_notification_config(
192            &endpoint.rpc_url,
193            &task_id,
194            config,
195            &event.trace_id.0,
196            cancel_rx,
197        )
198        .await
199        .inspect_err(|_| {
200            record_a2a_metric(raw_target, "failed", started.elapsed());
201        })?;
202    }
203    record_a2a_metric(raw_target, "succeeded", started.elapsed());
204    Ok((
205        endpoint.clone(),
206        DispatchAck::PendingTask {
207            task_id: task_id.clone(),
208            state: state.clone(),
209            handle: serde_json::json!({
210                "kind": "a2a_task_handle",
211                "task_id": task_id,
212                "state": state,
213                "target_agent": endpoint.target_agent,
214                "rpc_url": endpoint.rpc_url,
215                "card_url": endpoint.card_url,
216                "agent_id": endpoint.agent_id,
217            }),
218        },
219    ))
220}
221
222fn push_notification_config() -> Option<Value> {
223    let url = std::env::var(A2A_PUSH_URL_ENV)
224        .ok()
225        .map(|value| value.trim().to_string())
226        .filter(|value| !value.is_empty())?;
227    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
228        .ok()
229        .map(|value| value.trim().to_string())
230        .filter(|value| !value.is_empty());
231    let mut config = serde_json::json!({ "url": url });
232    if let Some(token) = token {
233        config["token"] = Value::String(token.clone());
234        config["authentication"] = serde_json::json!({
235            "scheme": "Bearer",
236            "credentials": token,
237        });
238    }
239    Some(config)
240}
241
242async fn register_push_notification_config(
243    rpc_url: &str,
244    task_id: &str,
245    config: Value,
246    trace_id: &str,
247    cancel_rx: &mut broadcast::Receiver<()>,
248) -> Result<(), A2aClientError> {
249    let request = crate::jsonrpc::request(
250        format!("{trace_id}.{task_id}.push-config"),
251        "CreateTaskPushNotificationConfig",
252        serde_json::json!({
253            "taskId": task_id,
254            "pushNotificationConfig": config,
255        }),
256    );
257    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
258    if response.get("error").is_some() {
259        return Err(A2aClientError::Protocol(format!(
260            "A2A push notification registration failed: {}",
261            response["error"]
262        )));
263    }
264    Ok(())
265}
266
267fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
268    if let Some(metrics) = crate::active_metrics_registry() {
269        metrics.record_a2a_hop(target, outcome, duration);
270    }
271}
272
273pub fn target_agent_label(raw_target: &str) -> String {
274    parse_target(raw_target)
275        .map(|target| target.target_agent_label())
276        .unwrap_or_else(|_| raw_target.to_string())
277}
278
279#[derive(Clone, Debug)]
280struct ParsedTarget {
281    authority: String,
282    target_agent: String,
283}
284
285impl ParsedTarget {
286    fn target_agent_label(&self) -> String {
287        if self.target_agent.is_empty() {
288            self.authority.clone()
289        } else {
290            self.target_agent.clone()
291        }
292    }
293}
294
295fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
296    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
297        A2aClientError::InvalidTarget(format!(
298            "invalid a2a dispatch target '{raw_target}': {error}"
299        ))
300    })?;
301    let host = parsed.host_str().ok_or_else(|| {
302        A2aClientError::InvalidTarget(format!(
303            "invalid a2a dispatch target '{raw_target}': missing host"
304        ))
305    })?;
306    let authority = if let Some(port) = parsed.port() {
307        format!("{host}:{port}")
308    } else {
309        host.to_string()
310    };
311    Ok(ParsedTarget {
312        authority,
313        target_agent: parsed.path().trim_start_matches('/').to_string(),
314    })
315}
316
317async fn resolve_endpoint(
318    target: &ParsedTarget,
319    allow_cleartext: bool,
320    cancel_rx: &mut broadcast::Receiver<()>,
321) -> Result<ResolvedA2aEndpoint, A2aClientError> {
322    let mut last_error = None;
323    for scheme in card_resolution_schemes(allow_cleartext) {
324        let mut last_scheme_error = None;
325        for path in A2A_AGENT_CARD_PATHS {
326            let card_url = format!("{scheme}://{}/{path}", target.authority);
327            match fetch_agent_card(&card_url, cancel_rx).await {
328                Ok(card) => {
329                    return endpoint_from_card(
330                        card_url,
331                        allow_cleartext,
332                        &target.authority,
333                        target.target_agent.clone(),
334                        &card,
335                    );
336                }
337                Err(AgentCardFetchError::Cancelled(message)) => {
338                    return Err(A2aClientError::Cancelled(message));
339                }
340                Err(error) => {
341                    last_error = Some(agent_card_fetch_error_message(&error));
342                    last_scheme_error = Some(error);
343                }
344            }
345        }
346        if last_scheme_error.as_ref().is_some_and(|error| {
347            should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
348        }) {
349            continue;
350        }
351        break;
352    }
353    Err(A2aClientError::Discovery(format!(
354        "could not resolve A2A agent card for '{}': {}",
355        target.authority,
356        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
357    )))
358}
359
360async fn fetch_agent_card(
361    card_url: &str,
362    cancel_rx: &mut broadcast::Receiver<()>,
363) -> Result<Value, AgentCardFetchError> {
364    let response = tokio::select! {
365        response = crate::llm::shared_utility_client().get(card_url).send() => {
366            match response {
367                Ok(response) => Ok(response),
368                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
369                    format!("A2A HTTP request failed: {error}")
370                )),
371                Err(error) => Err(AgentCardFetchError::Discovery(
372                    format!("A2A HTTP request failed: {error}")
373                )),
374            }
375        }
376        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
377            "A2A agent-card fetch cancelled".to_string()
378        )),
379    }?;
380    if !response.status().is_success() {
381        return Err(AgentCardFetchError::Discovery(format!(
382            "GET {card_url} returned HTTP {}",
383            response.status()
384        )));
385    }
386    response
387        .json::<Value>()
388        .await
389        .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
390}
391
392fn endpoint_from_card(
393    card_url: String,
394    allow_cleartext: bool,
395    requested_authority: &str,
396    target_agent: String,
397    card: &Value,
398) -> Result<ResolvedA2aEndpoint, A2aClientError> {
399    if let Some(interfaces) = card.get("supportedInterfaces").and_then(Value::as_array) {
400        let interface = interfaces
401            .iter()
402            .find(|entry| {
403                entry
404                    .get("protocolBinding")
405                    .and_then(Value::as_str)
406                    .is_some_and(|binding| binding.eq_ignore_ascii_case("JSONRPC"))
407            })
408            .ok_or_else(|| {
409                A2aClientError::Discovery(
410                    "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
411                )
412            })?;
413        let interface_url = interface
414            .get("url")
415            .and_then(Value::as_str)
416            .ok_or_else(|| {
417                A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
418            })?;
419        let rpc_url = Url::parse(interface_url).map_err(|error| {
420            A2aClientError::Discovery(format!(
421                "invalid A2A JSONRPC supportedInterface url '{interface_url}': {error}"
422            ))
423        })?;
424        ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
425        let interface_authority = url_authority(&rpc_url)?;
426        if !authorities_equivalent(&interface_authority, requested_authority) {
427            return Err(A2aClientError::Discovery(format!(
428                "A2A JSONRPC interface authority mismatch: requested '{requested_authority}', card returned '{interface_authority}'"
429            )));
430        }
431        return Ok(ResolvedA2aEndpoint {
432            card_url,
433            rpc_url: rpc_url.to_string(),
434            agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
435            target_agent,
436        });
437    }
438
439    let base_url = card
440        .get("url")
441        .and_then(Value::as_str)
442        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
443    let base_url = Url::parse(base_url).map_err(|error| {
444        A2aClientError::Discovery(format!("invalid A2A card url '{base_url}': {error}"))
445    })?;
446    ensure_cleartext_allowed(&base_url, allow_cleartext, "agent card")?;
447    let card_authority = url_authority(&base_url)?;
448    if !authorities_equivalent(&card_authority, requested_authority) {
449        return Err(A2aClientError::Discovery(format!(
450            "A2A agent card url authority mismatch: requested '{requested_authority}', card returned '{card_authority}'"
451        )));
452    }
453    let interfaces = card
454        .get("interfaces")
455        .and_then(Value::as_array)
456        .ok_or_else(|| {
457            A2aClientError::Discovery("A2A agent card missing interfaces".to_string())
458        })?;
459    let jsonrpc_interfaces: Vec<&Value> = interfaces
460        .iter()
461        .filter(|entry| {
462            entry
463                .get("protocol")
464                .and_then(Value::as_str)
465                .is_some_and(|protocol| protocol.eq_ignore_ascii_case("jsonrpc"))
466        })
467        .collect();
468    if jsonrpc_interfaces.len() != 1 {
469        return Err(A2aClientError::Discovery(format!(
470            "A2A agent card must expose exactly one jsonrpc interface, found {}",
471            jsonrpc_interfaces.len()
472        )));
473    }
474    let interface_url = jsonrpc_interfaces[0]
475        .get("url")
476        .and_then(Value::as_str)
477        .ok_or_else(|| {
478            A2aClientError::Discovery("A2A jsonrpc interface missing url".to_string())
479        })?;
480    let rpc_url = base_url.join(interface_url).map_err(|error| {
481        A2aClientError::Discovery(format!(
482            "invalid A2A interface url '{interface_url}': {error}"
483        ))
484    })?;
485    ensure_cleartext_allowed(&rpc_url, allow_cleartext, "jsonrpc interface")?;
486    Ok(ResolvedA2aEndpoint {
487        card_url,
488        rpc_url: rpc_url.to_string(),
489        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
490        target_agent,
491    })
492}
493
494fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
495    if allow_cleartext {
496        &["https", "http"]
497    } else {
498        &["https"]
499    }
500}
501
502/// Decide whether an HTTPS discovery failure should fall through to cleartext.
503///
504/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
505/// port isn't listening" case. TLS handshake failures to an external host MUST
506/// NOT silently downgrade to HTTP, because an active network attacker can
507/// forge TLS errors to trigger a downgrade.
508///
509/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
510/// discovery-style error. They cover the standard local-dev case where
511/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
512/// model for loopback is already bounded — any attacker who can reach the
513/// local loopback already has code execution on the box.
514fn should_try_cleartext_fallback(
515    scheme: &str,
516    allow_cleartext: bool,
517    error: &AgentCardFetchError,
518    authority: &str,
519) -> bool {
520    if !allow_cleartext || scheme != "https" {
521        return false;
522    }
523    match error {
524        AgentCardFetchError::Cancelled(_) => false,
525        AgentCardFetchError::ConnectRefused(_) => true,
526        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
527    }
528}
529
530fn ensure_cleartext_allowed(
531    url: &Url,
532    allow_cleartext: bool,
533    label: &str,
534) -> Result<(), A2aClientError> {
535    if allow_cleartext || url.scheme() != "http" {
536        return Ok(());
537    }
538    Err(A2aClientError::Discovery(format!(
539        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
540    )))
541}
542
543fn is_loopback_authority(authority: &str) -> bool {
544    let (host, _) = split_authority(authority);
545    if host.eq_ignore_ascii_case("localhost") {
546        return true;
547    }
548    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
549        return ip.is_loopback();
550    }
551    false
552}
553
554/// Return true when two authority strings refer to the same A2A endpoint.
555///
556/// Exact string equality is the default — an agent card that reports a
557/// different host than the one the client asked for is a security-relevant
558/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
559/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
560/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
561/// hardcodes `http://localhost:PORT` in its agent card even when a caller
562/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
563/// mismatch in that case without widening the external-host trust boundary.
564fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
565    if card_authority == requested_authority {
566        return true;
567    }
568    let (_, card_port) = split_authority(card_authority);
569    let (_, requested_port) = split_authority(requested_authority);
570    if card_port != requested_port {
571        return false;
572    }
573    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
574}
575
576/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
577/// `[::1]:8080` becomes `("::1", "8080")`.
578fn split_authority(authority: &str) -> (&str, &str) {
579    let (host_raw, port) = if authority.starts_with('[') {
580        // IPv6 bracketed form: "[addr]:port" or "[addr]".
581        if let Some(end) = authority.rfind(']') {
582            let host = &authority[..=end];
583            let rest = &authority[end + 1..];
584            let port = rest.strip_prefix(':').unwrap_or("");
585            (host, port)
586        } else {
587            (authority, "")
588        }
589    } else {
590        match authority.rsplit_once(':') {
591            Some((host, port)) => (host, port),
592            None => (authority, ""),
593        }
594    };
595    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
596    (host, port)
597}
598
599fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
600    match error {
601        AgentCardFetchError::Cancelled(message)
602        | AgentCardFetchError::Discovery(message)
603        | AgentCardFetchError::ConnectRefused(message) => message.clone(),
604    }
605}
606
607fn is_connect_refused(error: &reqwest::Error) -> bool {
608    if !error.is_connect() {
609        return false;
610    }
611    let mut source = error.source();
612    while let Some(cause) = source {
613        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
614            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
615                return true;
616            }
617        }
618        source = cause.source();
619    }
620    false
621}
622
623fn url_authority(url: &Url) -> Result<String, A2aClientError> {
624    let host = url
625        .host_str()
626        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
627    Ok(if let Some(port) = url.port() {
628        format!("{host}:{port}")
629    } else {
630        host.to_string()
631    })
632}
633
634async fn send_jsonrpc(
635    rpc_url: &str,
636    request: &Value,
637    trace_id: &str,
638    cancel_rx: &mut broadcast::Receiver<()>,
639) -> Result<Value, A2aClientError> {
640    let response = send_http(
641        crate::llm::shared_blocking_client()
642            .post(rpc_url)
643            .header(reqwest::header::CONTENT_TYPE, "application/json")
644            .header("A2A-Version", A2A_PROTOCOL_VERSION)
645            .header("A2A-Trace-Id", trace_id)
646            .json(request),
647        cancel_rx,
648        "A2A task dispatch cancelled",
649    )
650    .await?;
651    if !response.status().is_success() {
652        return Err(A2aClientError::Protocol(format!(
653            "A2A task dispatch returned HTTP {}",
654            response.status()
655        )));
656    }
657    response
658        .json::<Value>()
659        .await
660        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
661}
662
663async fn send_http(
664    request: reqwest::RequestBuilder,
665    cancel_rx: &mut broadcast::Receiver<()>,
666    cancelled_message: &'static str,
667) -> Result<reqwest::Response, A2aClientError> {
668    tokio::select! {
669        response = request.send() => response
670            .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
671        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
672    }
673}
674
675fn task_state(task: &Value) -> Result<&str, A2aClientError> {
676    task.pointer("/status/state")
677        .and_then(Value::as_str)
678        .filter(|value| !value.is_empty())
679        .ok_or_else(|| {
680            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
681        })
682}
683
684fn extract_inline_result(task: &Value) -> Value {
685    let text = task
686        .get("history")
687        .and_then(Value::as_array)
688        .and_then(|history| {
689            history.iter().rev().find_map(|message| {
690                let role = message.get("role").and_then(Value::as_str)?;
691                if role != "agent" {
692                    return None;
693                }
694                message
695                    .get("parts")
696                    .and_then(Value::as_array)
697                    .and_then(|parts| {
698                        parts.iter().find_map(|part| {
699                            if part.get("type").and_then(Value::as_str) == Some("text") {
700                                part.get("text").and_then(Value::as_str).map(str::trim_end)
701                            } else {
702                                None
703                            }
704                        })
705                    })
706            })
707        });
708    match text {
709        Some(text) if !text.is_empty() => {
710            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
711        }
712        _ => task.clone(),
713    }
714}
715
716async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
717    let _ = cancel_rx.recv().await;
718}
719
720#[cfg(test)]
721mod tests {
722    use super::*;
723
724    #[test]
725    fn target_agent_label_prefers_path() {
726        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
727        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
728    }
729
730    #[test]
731    fn extract_inline_result_parses_json_text() {
732        let task = serde_json::json!({
733            "history": [
734                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
735                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
736            ]
737        });
738        assert_eq!(
739            extract_inline_result(&task),
740            serde_json::json!({"trace_id": "trace_123"})
741        );
742    }
743
744    #[test]
745    fn discovery_prefers_https_before_http() {
746        assert_eq!(card_resolution_schemes(false), ["https"]);
747        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
748    }
749
750    #[test]
751    fn endpoint_from_card_accepts_current_supported_interfaces() {
752        let endpoint = endpoint_from_card(
753            "https://trusted.example/.well-known/agent-card.json".to_string(),
754            false,
755            "trusted.example",
756            "triage".to_string(),
757            &serde_json::json!({
758                "name": "trusted",
759                "supportedInterfaces": [{
760                    "protocolBinding": "JSONRPC",
761                    "protocolVersion": "1.0",
762                    "url": "https://trusted.example/rpc"
763                }],
764            }),
765        )
766        .expect("current A2A card should resolve");
767        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
768        assert_eq!(
769            endpoint.card_url,
770            "https://trusted.example/.well-known/agent-card.json"
771        );
772        assert_eq!(endpoint.target_agent, "triage");
773    }
774
775    #[test]
776    fn cleartext_fallback_only_after_https_connect_refused() {
777        assert!(should_try_cleartext_fallback(
778            "https",
779            true,
780            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
781            "reviewer.example:443",
782        ));
783        assert!(!should_try_cleartext_fallback(
784            "http",
785            true,
786            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
787            "reviewer.example:443",
788        ));
789        assert!(!should_try_cleartext_fallback(
790            "https",
791            true,
792            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
793            "reviewer.example:443",
794        ));
795    }
796
797    #[test]
798    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
799        for authority in [
800            "127.0.0.1:8080",
801            "localhost:8080",
802            "[::1]:8080",
803            "127.1.2.3:9000",
804        ] {
805            assert!(
806                !should_try_cleartext_fallback(
807                    "https",
808                    false,
809                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
810                    authority,
811                ),
812                "cleartext fallback must stay disabled without opt-in for '{authority}'"
813            );
814        }
815    }
816
817    #[test]
818    fn cleartext_fallback_allows_loopback_after_opt_in() {
819        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
820        // still need the HTTP fallback to succeed.
821        for authority in [
822            "127.0.0.1:8080",
823            "localhost:8080",
824            "[::1]:8080",
825            "127.1.2.3:9000",
826        ] {
827            assert!(
828                should_try_cleartext_fallback(
829                    "https",
830                    true,
831                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
832                    authority,
833                ),
834                "expected cleartext fallback for loopback authority '{authority}'"
835            );
836        }
837    }
838
839    #[test]
840    fn cleartext_fallback_denies_external_tls_failures() {
841        // External target + TLS handshake failure must not downgrade — an
842        // attacker able to forge TLS errors shouldn't force cleartext.
843        for authority in [
844            "reviewer.example:443",
845            "8.8.8.8:443",
846            "192.168.1.10:8080",
847            "10.0.0.5:8443",
848        ] {
849            assert!(
850                !should_try_cleartext_fallback(
851                    "https",
852                    true,
853                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
854                    authority,
855                ),
856                "cleartext fallback must be denied for external authority '{authority}'"
857            );
858        }
859    }
860
861    #[test]
862    fn is_loopback_authority_recognises_loopback_forms() {
863        assert!(is_loopback_authority("127.0.0.1:8080"));
864        assert!(is_loopback_authority("localhost:8080"));
865        assert!(is_loopback_authority("LOCALHOST:9000"));
866        assert!(is_loopback_authority("[::1]:8080"));
867        assert!(is_loopback_authority("127.5.5.5:1234"));
868        assert!(!is_loopback_authority("8.8.8.8:443"));
869        assert!(!is_loopback_authority("192.168.1.10:8080"));
870        assert!(!is_loopback_authority("example.com:443"));
871        assert!(!is_loopback_authority("reviewer.prod"));
872    }
873
874    #[test]
875    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
876        let error = endpoint_from_card(
877            "https://trusted.example/.well-known/agent-card.json".to_string(),
878            false,
879            "trusted.example",
880            "triage".to_string(),
881            &serde_json::json!({
882                "url": "https://evil.example",
883                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
884            }),
885        )
886        .unwrap_err();
887        assert_eq!(
888            error.to_string(),
889            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
890        );
891    }
892
893    #[test]
894    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
895        let error = endpoint_from_card(
896            "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
897            false,
898            "127.0.0.1:8080",
899            "triage".to_string(),
900            &serde_json::json!({
901                "url": "http://localhost:8080",
902                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
903            }),
904        )
905        .expect_err("cleartext card should require explicit opt-in");
906        assert!(error
907            .to_string()
908            .contains("requires `allow_cleartext = true`"));
909    }
910
911    #[test]
912    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
913        // harn serve reports `http://localhost:PORT` in its card, but clients
914        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
915        // the authority check must not spuriously reject the pair.
916        let card = serde_json::json!({
917            "url": "http://localhost:8080",
918            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
919        });
920        let endpoint = endpoint_from_card(
921            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
922            true,
923            "127.0.0.1:8080",
924            "triage".to_string(),
925            &card,
926        )
927        .expect("loopback alias pair should be accepted");
928        assert_eq!(endpoint.rpc_url, "http://localhost:8080/rpc");
929
930        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
931        let card_v6 = serde_json::json!({
932            "url": "http://[::1]:8080",
933            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
934        });
935        let endpoint_v6 = endpoint_from_card(
936            "http://localhost:8080/.well-known/agent-card.json".to_string(),
937            true,
938            "localhost:8080",
939            "triage".to_string(),
940            &card_v6,
941        )
942        .expect("IPv6 loopback alias should be accepted");
943        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/rpc");
944
945        // Port mismatch is still rejected even on loopback.
946        let card_wrong_port = serde_json::json!({
947            "url": "http://localhost:9000",
948            "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
949        });
950        let error = endpoint_from_card(
951            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
952            true,
953            "127.0.0.1:8080",
954            "triage".to_string(),
955            &card_wrong_port,
956        )
957        .expect_err("mismatched ports must still be rejected even on loopback");
958        assert!(error
959            .to_string()
960            .contains("A2A agent card url authority mismatch"));
961    }
962
963    #[test]
964    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
965        assert!(!authorities_equivalent(
966            "internal.corp.example:443",
967            "trusted.example:443",
968        ));
969        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
970        assert!(authorities_equivalent(
971            "trusted.example:443",
972            "trusted.example:443",
973        ));
974    }
975}