Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11    ".well-known/agent-card.json",
12    ".well-known/a2a-agent",
13    ".well-known/agent.json",
14    "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23    pub card_url: String,
24    pub rpc_url: String,
25    pub agent_id: Option<String>,
26    pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq)]
30pub enum DispatchAck {
31    InlineResult {
32        task_id: String,
33        result: Value,
34    },
35    PendingTask {
36        task_id: String,
37        state: String,
38        handle: Value,
39    },
40}
41
42#[derive(Debug)]
43pub enum A2aClientError {
44    InvalidTarget(String),
45    Discovery(String),
46    Protocol(String),
47    Cancelled(String),
48}
49
50impl std::fmt::Display for A2aClientError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            Self::InvalidTarget(message)
54            | Self::Discovery(message)
55            | Self::Protocol(message)
56            | Self::Cancelled(message) => f.write_str(message),
57        }
58    }
59}
60
61impl std::error::Error for A2aClientError {}
62
63/// Abstraction over outbound A2A dispatch, injectable for tests.
64#[async_trait]
65pub trait A2aClient: Send + Sync + 'static {
66    async fn dispatch(
67        &self,
68        target: &str,
69        allow_cleartext: bool,
70        binding_id: &str,
71        binding_key: &str,
72        event: &TriggerEvent,
73        cancel_rx: &mut broadcast::Receiver<()>,
74    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
75}
76
77/// Production implementation that performs real HTTP A2A calls.
78pub struct RealA2aClient;
79
80#[async_trait]
81impl A2aClient for RealA2aClient {
82    async fn dispatch(
83        &self,
84        target: &str,
85        allow_cleartext: bool,
86        binding_id: &str,
87        binding_key: &str,
88        event: &TriggerEvent,
89        cancel_rx: &mut broadcast::Receiver<()>,
90    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
91        dispatch_trigger_event(
92            target,
93            allow_cleartext,
94            binding_id,
95            binding_key,
96            event,
97            cancel_rx,
98        )
99        .await
100    }
101}
102
103#[derive(Debug)]
104enum AgentCardFetchError {
105    Cancelled(String),
106    Discovery(String),
107    ConnectRefused(String),
108}
109
110pub async fn dispatch_trigger_event(
111    raw_target: &str,
112    allow_cleartext: bool,
113    binding_id: &str,
114    binding_key: &str,
115    event: &TriggerEvent,
116    cancel_rx: &mut broadcast::Receiver<()>,
117) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
118    let started = std::time::Instant::now();
119    let target = match parse_target(raw_target) {
120        Ok(target) => target,
121        Err(error) => {
122            record_a2a_metric(raw_target, "failed", started.elapsed());
123            return Err(error);
124        }
125    };
126    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
127        Ok(endpoint) => endpoint,
128        Err(error) => {
129            record_a2a_metric(raw_target, "failed", started.elapsed());
130            return Err(error);
131        }
132    };
133    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
134    let envelope = serde_json::json!({
135        "kind": "harn.trigger.dispatch",
136        "message_id": message_id,
137        "trace_id": event.trace_id.0,
138        "event_id": event.id.0,
139        "trigger_id": binding_id,
140        "binding_key": binding_key,
141        "target_agent": endpoint.target_agent,
142        "event": event,
143    });
144    let text = serde_json::to_string(&envelope)
145        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
146    let push_config = push_notification_config();
147    let mut params = serde_json::json!({
148        "contextId": event.trace_id.0,
149        "message": {
150            "messageId": message_id,
151            "role": "user",
152            "parts": [{
153                "type": "text",
154                "text": text,
155            }],
156            "metadata": {
157                "kind": "harn.trigger.dispatch",
158                "trace_id": event.trace_id.0,
159                "event_id": event.id.0,
160                "trigger_id": binding_id,
161                "binding_key": binding_key,
162                "target_agent": endpoint.target_agent,
163            },
164        },
165    });
166    if let Some(config) = push_config.clone() {
167        params["configuration"] = serde_json::json!({
168            "blocking": false,
169            "returnImmediately": true,
170            "pushNotificationConfig": config,
171        });
172    }
173    let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
174
175    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
176        Ok(body) => body,
177        Err(error) => {
178            record_a2a_metric(raw_target, "failed", started.elapsed());
179            return Err(error);
180        }
181    };
182    let result = match body.get("result").cloned().ok_or_else(|| {
183        if let Some(error) = body.get("error") {
184            let message = error
185                .get("message")
186                .and_then(Value::as_str)
187                .unwrap_or("unknown A2A error");
188            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
189        } else {
190            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
191        }
192    }) {
193        Ok(result) => result,
194        Err(error) => {
195            record_a2a_metric(raw_target, "failed", started.elapsed());
196            return Err(error);
197        }
198    };
199
200    let task_id = match result
201        .get("id")
202        .and_then(Value::as_str)
203        .filter(|value| !value.is_empty())
204        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
205    {
206        Ok(task_id) => task_id.to_string(),
207        Err(error) => {
208            record_a2a_metric(raw_target, "failed", started.elapsed());
209            return Err(error);
210        }
211    };
212    let state = match task_state(&result) {
213        Ok(state) => state.to_string(),
214        Err(error) => {
215            record_a2a_metric(raw_target, "failed", started.elapsed());
216            return Err(error);
217        }
218    };
219
220    if state == "completed" {
221        let inline = extract_inline_result(&result);
222        record_a2a_metric(raw_target, "succeeded", started.elapsed());
223        return Ok((
224            endpoint,
225            DispatchAck::InlineResult {
226                task_id,
227                result: inline,
228            },
229        ));
230    }
231
232    if let Some(config) = push_config {
233        register_push_notification_config(
234            &endpoint.rpc_url,
235            &task_id,
236            config,
237            &event.trace_id.0,
238            cancel_rx,
239        )
240        .await
241        .inspect_err(|_| {
242            record_a2a_metric(raw_target, "failed", started.elapsed());
243        })?;
244    }
245    record_a2a_metric(raw_target, "succeeded", started.elapsed());
246    Ok((
247        endpoint.clone(),
248        DispatchAck::PendingTask {
249            task_id: task_id.clone(),
250            state: state.clone(),
251            handle: serde_json::json!({
252                "kind": "a2a_task_handle",
253                "task_id": task_id,
254                "state": state,
255                "target_agent": endpoint.target_agent,
256                "rpc_url": endpoint.rpc_url,
257                "card_url": endpoint.card_url,
258                "agent_id": endpoint.agent_id,
259            }),
260        },
261    ))
262}
263
264fn push_notification_config() -> Option<Value> {
265    let url = std::env::var(A2A_PUSH_URL_ENV)
266        .ok()
267        .map(|value| value.trim().to_string())
268        .filter(|value| !value.is_empty())?;
269    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
270        .ok()
271        .map(|value| value.trim().to_string())
272        .filter(|value| !value.is_empty());
273    let mut config = serde_json::json!({ "url": url });
274    if let Some(token) = token {
275        config["token"] = Value::String(token.clone());
276        config["authentication"] = serde_json::json!({
277            "scheme": "Bearer",
278            "credentials": token,
279        });
280    }
281    Some(config)
282}
283
284async fn register_push_notification_config(
285    rpc_url: &str,
286    task_id: &str,
287    config: Value,
288    trace_id: &str,
289    cancel_rx: &mut broadcast::Receiver<()>,
290) -> Result<(), A2aClientError> {
291    let request = crate::jsonrpc::request(
292        format!("{trace_id}.{task_id}.push-config"),
293        "tasks/pushNotificationConfig/set",
294        serde_json::json!({
295            "taskId": task_id,
296            "pushNotificationConfig": config,
297        }),
298    );
299    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
300    if response.get("error").is_some() {
301        return Err(A2aClientError::Protocol(format!(
302            "A2A push notification registration failed: {}",
303            response["error"]
304        )));
305    }
306    Ok(())
307}
308
309fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
310    if let Some(metrics) = crate::active_metrics_registry() {
311        metrics.record_a2a_hop(target, outcome, duration);
312    }
313}
314
315pub fn target_agent_label(raw_target: &str) -> String {
316    parse_target(raw_target)
317        .map(|target| target.target_agent_label())
318        .unwrap_or_else(|_| raw_target.to_string())
319}
320
321#[derive(Clone, Debug)]
322struct ParsedTarget {
323    authority: String,
324    target_agent: String,
325}
326
327impl ParsedTarget {
328    fn target_agent_label(&self) -> String {
329        if self.target_agent.is_empty() {
330            self.authority.clone()
331        } else {
332            self.target_agent.clone()
333        }
334    }
335}
336
337fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
338    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
339        A2aClientError::InvalidTarget(format!(
340            "invalid a2a dispatch target '{raw_target}': {error}"
341        ))
342    })?;
343    let host = parsed.host_str().ok_or_else(|| {
344        A2aClientError::InvalidTarget(format!(
345            "invalid a2a dispatch target '{raw_target}': missing host"
346        ))
347    })?;
348    let authority = if let Some(port) = parsed.port() {
349        format!("{host}:{port}")
350    } else {
351        host.to_string()
352    };
353    Ok(ParsedTarget {
354        authority,
355        target_agent: parsed.path().trim_start_matches('/').to_string(),
356    })
357}
358
359async fn resolve_endpoint(
360    target: &ParsedTarget,
361    allow_cleartext: bool,
362    cancel_rx: &mut broadcast::Receiver<()>,
363) -> Result<ResolvedA2aEndpoint, A2aClientError> {
364    let mut last_error = None;
365    for scheme in card_resolution_schemes(allow_cleartext) {
366        let mut last_scheme_error = None;
367        for path in A2A_AGENT_CARD_PATHS {
368            let card_url = format!("{scheme}://{}/{path}", target.authority);
369            match fetch_agent_card(&card_url, cancel_rx).await {
370                Ok(card) => {
371                    return endpoint_from_card(
372                        card_url,
373                        allow_cleartext,
374                        &target.authority,
375                        target.target_agent.clone(),
376                        &card,
377                    );
378                }
379                Err(AgentCardFetchError::Cancelled(message)) => {
380                    return Err(A2aClientError::Cancelled(message));
381                }
382                Err(error) => {
383                    last_error = Some(agent_card_fetch_error_message(&error));
384                    last_scheme_error = Some(error);
385                }
386            }
387        }
388        if last_scheme_error.as_ref().is_some_and(|error| {
389            should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
390        }) {
391            continue;
392        }
393        break;
394    }
395    Err(A2aClientError::Discovery(format!(
396        "could not resolve A2A agent card for '{}': {}",
397        target.authority,
398        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
399    )))
400}
401
402async fn fetch_agent_card(
403    card_url: &str,
404    cancel_rx: &mut broadcast::Receiver<()>,
405) -> Result<Value, AgentCardFetchError> {
406    let response = tokio::select! {
407        response = crate::llm::shared_utility_client().get(card_url).send() => {
408            match response {
409                Ok(response) => Ok(response),
410                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
411                    format!("A2A HTTP request failed: {error}")
412                )),
413                Err(error) => Err(AgentCardFetchError::Discovery(
414                    format!("A2A HTTP request failed: {error}")
415                )),
416            }
417        }
418        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
419            "A2A agent-card fetch cancelled".to_string()
420        )),
421    }?;
422    if !response.status().is_success() {
423        return Err(AgentCardFetchError::Discovery(format!(
424            "GET {card_url} returned HTTP {}",
425            response.status()
426        )));
427    }
428    response
429        .json::<Value>()
430        .await
431        .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
432}
433
434fn endpoint_from_card(
435    card_url: String,
436    allow_cleartext: bool,
437    requested_authority: &str,
438    target_agent: String,
439    card: &Value,
440) -> Result<ResolvedA2aEndpoint, A2aClientError> {
441    let rpc_url = if has_current_transport_fields(card) {
442        endpoint_from_current_card(card, allow_cleartext, requested_authority)?
443    } else if let Some(rpc_url) =
444        endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
445    {
446        rpc_url
447    } else {
448        return Err(A2aClientError::Discovery(
449            "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
450        ));
451    };
452
453    Ok(ResolvedA2aEndpoint {
454        card_url,
455        rpc_url: rpc_url.to_string(),
456        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
457        target_agent,
458    })
459}
460
461fn has_current_transport_fields(card: &Value) -> bool {
462    card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
463}
464
465fn endpoint_from_current_card(
466    card: &Value,
467    allow_cleartext: bool,
468    requested_authority: &str,
469) -> Result<Url, A2aClientError> {
470    let protocol_version = card
471        .get("protocolVersion")
472        .and_then(Value::as_str)
473        .ok_or_else(|| {
474            A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
475        })?;
476    if protocol_version != A2A_PROTOCOL_VERSION {
477        return Err(A2aClientError::Discovery(format!(
478            "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
479        )));
480    }
481
482    let base_url = card
483        .get("url")
484        .and_then(Value::as_str)
485        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
486    let base_url = resolve_declared_url(
487        base_url,
488        allow_cleartext,
489        requested_authority,
490        "agent card url",
491    )?;
492
493    let preferred_transport = card
494        .get("preferredTransport")
495        .and_then(Value::as_str)
496        .ok_or_else(|| {
497            A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
498        })?;
499    if transport_is_jsonrpc(preferred_transport) {
500        return Ok(base_url);
501    }
502
503    if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
504        return resolve_declared_url(
505            interface_url,
506            allow_cleartext,
507            requested_authority,
508            "JSONRPC additionalInterface url",
509        );
510    }
511
512    Err(A2aClientError::Discovery(
513        "A2A agent card does not expose JSONRPC transport".to_string(),
514    ))
515}
516
517fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
518    let Some(interfaces) = card.get("additionalInterfaces") else {
519        return Ok(None);
520    };
521    let interfaces = interfaces.as_array().ok_or_else(|| {
522        A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
523    })?;
524    for interface in interfaces {
525        if interface
526            .get("transport")
527            .and_then(Value::as_str)
528            .is_some_and(transport_is_jsonrpc)
529        {
530            let interface_url = interface
531                .get("url")
532                .and_then(Value::as_str)
533                .ok_or_else(|| {
534                    A2aClientError::Discovery(
535                        "A2A JSONRPC additionalInterface missing url".to_string(),
536                    )
537                })?;
538            return Ok(Some(interface_url));
539        }
540    }
541    Ok(None)
542}
543
544fn endpoint_from_legacy_supported_interfaces(
545    card: &Value,
546    allow_cleartext: bool,
547    requested_authority: &str,
548) -> Result<Option<Url>, A2aClientError> {
549    let Some(interfaces) = card.get("supportedInterfaces") else {
550        return Ok(None);
551    };
552    let interfaces = interfaces.as_array().ok_or_else(|| {
553        A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
554    })?;
555    let mut saw_jsonrpc = false;
556    for interface in interfaces {
557        if !interface
558            .get("protocolBinding")
559            .and_then(Value::as_str)
560            .is_some_and(transport_is_jsonrpc)
561        {
562            continue;
563        }
564        saw_jsonrpc = true;
565        if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
566            continue;
567        }
568        let interface_url = interface
569            .get("url")
570            .and_then(Value::as_str)
571            .ok_or_else(|| {
572                A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
573            })?;
574        return resolve_declared_url(
575            interface_url,
576            allow_cleartext,
577            requested_authority,
578            "JSONRPC supportedInterface url",
579        )
580        .map(Some);
581    }
582    if saw_jsonrpc {
583        return Err(A2aClientError::Discovery(format!(
584            "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
585        )));
586    }
587    Err(A2aClientError::Discovery(
588        "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
589    ))
590}
591
592fn transport_is_jsonrpc(transport: &str) -> bool {
593    transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
594}
595
596fn resolve_declared_url(
597    raw_url: &str,
598    allow_cleartext: bool,
599    requested_authority: &str,
600    label: &str,
601) -> Result<Url, A2aClientError> {
602    let url = Url::parse(raw_url).map_err(|error| {
603        A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
604    })?;
605    ensure_cleartext_allowed(&url, allow_cleartext, label)?;
606    let declared_authority = url_authority(&url)?;
607    if !authorities_equivalent(&declared_authority, requested_authority) {
608        return Err(A2aClientError::Discovery(format!(
609            "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
610        )));
611    }
612    Ok(url)
613}
614
615fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
616    if allow_cleartext {
617        &["https", "http"]
618    } else {
619        &["https"]
620    }
621}
622
623/// Decide whether an HTTPS discovery failure should fall through to cleartext.
624///
625/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
626/// port isn't listening" case. TLS handshake failures to an external host MUST
627/// NOT silently downgrade to HTTP, because an active network attacker can
628/// forge TLS errors to trigger a downgrade.
629///
630/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
631/// discovery-style error. They cover the standard local-dev case where
632/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
633/// model for loopback is already bounded — any attacker who can reach the
634/// local loopback already has code execution on the box.
635fn should_try_cleartext_fallback(
636    scheme: &str,
637    allow_cleartext: bool,
638    error: &AgentCardFetchError,
639    authority: &str,
640) -> bool {
641    if !allow_cleartext || scheme != "https" {
642        return false;
643    }
644    match error {
645        AgentCardFetchError::Cancelled(_) => false,
646        AgentCardFetchError::ConnectRefused(_) => true,
647        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
648    }
649}
650
651fn ensure_cleartext_allowed(
652    url: &Url,
653    allow_cleartext: bool,
654    label: &str,
655) -> Result<(), A2aClientError> {
656    if allow_cleartext || url.scheme() != "http" {
657        return Ok(());
658    }
659    Err(A2aClientError::Discovery(format!(
660        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
661    )))
662}
663
664fn is_loopback_authority(authority: &str) -> bool {
665    let (host, _) = split_authority(authority);
666    if host.eq_ignore_ascii_case("localhost") {
667        return true;
668    }
669    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
670        return ip.is_loopback();
671    }
672    false
673}
674
675/// Return true when two authority strings refer to the same A2A endpoint.
676///
677/// Exact string equality is the default — an agent card that reports a
678/// different host than the one the client asked for is a security-relevant
679/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
680/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
681/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
682/// hardcodes `http://localhost:PORT` in its agent card even when a caller
683/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
684/// mismatch in that case without widening the external-host trust boundary.
685fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
686    if card_authority == requested_authority {
687        return true;
688    }
689    let (_, card_port) = split_authority(card_authority);
690    let (_, requested_port) = split_authority(requested_authority);
691    if card_port != requested_port {
692        return false;
693    }
694    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
695}
696
697/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
698/// `[::1]:8080` becomes `("::1", "8080")`.
699fn split_authority(authority: &str) -> (&str, &str) {
700    let (host_raw, port) = if authority.starts_with('[') {
701        // IPv6 bracketed form: "[addr]:port" or "[addr]".
702        if let Some(end) = authority.rfind(']') {
703            let host = &authority[..=end];
704            let rest = &authority[end + 1..];
705            let port = rest.strip_prefix(':').unwrap_or("");
706            (host, port)
707        } else {
708            (authority, "")
709        }
710    } else {
711        match authority.rsplit_once(':') {
712            Some((host, port)) => (host, port),
713            None => (authority, ""),
714        }
715    };
716    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
717    (host, port)
718}
719
720fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
721    match error {
722        AgentCardFetchError::Cancelled(message)
723        | AgentCardFetchError::Discovery(message)
724        | AgentCardFetchError::ConnectRefused(message) => message.clone(),
725    }
726}
727
728fn is_connect_refused(error: &reqwest::Error) -> bool {
729    if !error.is_connect() {
730        return false;
731    }
732    let mut source = error.source();
733    while let Some(cause) = source {
734        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
735            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
736                return true;
737            }
738        }
739        source = cause.source();
740    }
741    false
742}
743
744fn url_authority(url: &Url) -> Result<String, A2aClientError> {
745    let host = url
746        .host_str()
747        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
748    Ok(if let Some(port) = url.port() {
749        format!("{host}:{port}")
750    } else {
751        host.to_string()
752    })
753}
754
755async fn send_jsonrpc(
756    rpc_url: &str,
757    request: &Value,
758    trace_id: &str,
759    cancel_rx: &mut broadcast::Receiver<()>,
760) -> Result<Value, A2aClientError> {
761    let response = send_http(
762        crate::llm::shared_blocking_client()
763            .post(rpc_url)
764            .header(reqwest::header::CONTENT_TYPE, "application/json")
765            .header("A2A-Version", A2A_PROTOCOL_VERSION)
766            .header("A2A-Trace-Id", trace_id)
767            .json(request),
768        cancel_rx,
769        "A2A task dispatch cancelled",
770    )
771    .await?;
772    if !response.status().is_success() {
773        return Err(A2aClientError::Protocol(format!(
774            "A2A task dispatch returned HTTP {}",
775            response.status()
776        )));
777    }
778    response
779        .json::<Value>()
780        .await
781        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
782}
783
784async fn send_http(
785    request: reqwest::RequestBuilder,
786    cancel_rx: &mut broadcast::Receiver<()>,
787    cancelled_message: &'static str,
788) -> Result<reqwest::Response, A2aClientError> {
789    tokio::select! {
790        response = request.send() => response
791            .map_err(|error| A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))),
792        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
793    }
794}
795
796fn task_state(task: &Value) -> Result<&str, A2aClientError> {
797    task.pointer("/status/state")
798        .and_then(Value::as_str)
799        .filter(|value| !value.is_empty())
800        .ok_or_else(|| {
801            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
802        })
803}
804
805fn extract_inline_result(task: &Value) -> Value {
806    let text = task
807        .get("history")
808        .and_then(Value::as_array)
809        .and_then(|history| {
810            history.iter().rev().find_map(|message| {
811                let role = message.get("role").and_then(Value::as_str)?;
812                if role != "agent" {
813                    return None;
814                }
815                message
816                    .get("parts")
817                    .and_then(Value::as_array)
818                    .and_then(|parts| {
819                        parts.iter().find_map(|part| {
820                            if part.get("type").and_then(Value::as_str) == Some("text") {
821                                part.get("text").and_then(Value::as_str).map(str::trim_end)
822                            } else {
823                                None
824                            }
825                        })
826                    })
827            })
828        });
829    match text {
830        Some(text) if !text.is_empty() => {
831            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
832        }
833        _ => task.clone(),
834    }
835}
836
837async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
838    let _ = cancel_rx.recv().await;
839}
840
841#[cfg(test)]
842mod tests {
843    use super::*;
844
845    #[test]
846    fn target_agent_label_prefers_path() {
847        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
848        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
849    }
850
851    #[test]
852    fn extract_inline_result_parses_json_text() {
853        let task = serde_json::json!({
854            "history": [
855                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
856                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
857            ]
858        });
859        assert_eq!(
860            extract_inline_result(&task),
861            serde_json::json!({"trace_id": "trace_123"})
862        );
863    }
864
865    #[test]
866    fn discovery_prefers_https_before_http() {
867        assert_eq!(card_resolution_schemes(false), ["https"]);
868        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
869    }
870
871    #[test]
872    fn endpoint_from_card_accepts_current_preferred_transport() {
873        let endpoint = endpoint_from_card(
874            "https://trusted.example/.well-known/agent-card.json".to_string(),
875            false,
876            "trusted.example",
877            "triage".to_string(),
878            &serde_json::json!({
879                "name": "trusted",
880                "protocolVersion": "0.3.0",
881                "url": "https://trusted.example/rpc",
882                "preferredTransport": "JSONRPC",
883                "additionalInterfaces": [{
884                    "url": "https://trusted.example/rpc",
885                    "transport": "JSONRPC"
886                }]
887            }),
888        )
889        .expect("current A2A card should resolve");
890        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
891        assert_eq!(
892            endpoint.card_url,
893            "https://trusted.example/.well-known/agent-card.json"
894        );
895        assert_eq!(endpoint.target_agent, "triage");
896    }
897
898    #[test]
899    fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
900        let endpoint = endpoint_from_card(
901            "https://trusted.example/.well-known/agent-card.json".to_string(),
902            false,
903            "trusted.example",
904            "triage".to_string(),
905            &serde_json::json!({
906                "name": "trusted",
907                "protocolVersion": "0.3.0",
908                "url": "https://trusted.example/rest",
909                "preferredTransport": "HTTP+JSON",
910                "additionalInterfaces": [{
911                    "url": "https://trusted.example/rpc",
912                    "transport": "JSONRPC"
913                }]
914            }),
915        )
916        .expect("current A2A card should resolve through additionalInterfaces");
917        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
918    }
919
920    #[test]
921    fn endpoint_from_card_accepts_legacy_supported_interfaces() {
922        let endpoint = endpoint_from_card(
923            "https://trusted.example/.well-known/agent-card.json".to_string(),
924            false,
925            "trusted.example",
926            "triage".to_string(),
927            &serde_json::json!({
928                "name": "trusted",
929                "supportedInterfaces": [{
930                    "protocolBinding": "JSONRPC",
931                    "protocolVersion": "0.3.0",
932                    "url": "https://trusted.example/rpc"
933                }],
934            }),
935        )
936        .expect("legacy A2A card should resolve during the transition");
937        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
938    }
939
940    #[test]
941    fn endpoint_from_card_rejects_removed_interfaces_shape() {
942        let error = endpoint_from_card(
943            "https://trusted.example/.well-known/agent-card.json".to_string(),
944            false,
945            "trusted.example",
946            "triage".to_string(),
947            &serde_json::json!({
948                "url": "https://trusted.example",
949                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
950            }),
951        )
952        .expect_err("pre-0.3 Harn discovery shape should be rejected");
953        assert_eq!(
954            error.to_string(),
955            "A2A agent card missing preferredTransport/additionalInterfaces"
956        );
957    }
958
959    #[test]
960    fn cleartext_fallback_only_after_https_connect_refused() {
961        assert!(should_try_cleartext_fallback(
962            "https",
963            true,
964            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
965            "reviewer.example:443",
966        ));
967        assert!(!should_try_cleartext_fallback(
968            "http",
969            true,
970            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
971            "reviewer.example:443",
972        ));
973        assert!(!should_try_cleartext_fallback(
974            "https",
975            true,
976            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
977            "reviewer.example:443",
978        ));
979    }
980
981    #[test]
982    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
983        for authority in [
984            "127.0.0.1:8080",
985            "localhost:8080",
986            "[::1]:8080",
987            "127.1.2.3:9000",
988        ] {
989            assert!(
990                !should_try_cleartext_fallback(
991                    "https",
992                    false,
993                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
994                    authority,
995                ),
996                "cleartext fallback must stay disabled without opt-in for '{authority}'"
997            );
998        }
999    }
1000
1001    #[test]
1002    fn cleartext_fallback_allows_loopback_after_opt_in() {
1003        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
1004        // still need the HTTP fallback to succeed.
1005        for authority in [
1006            "127.0.0.1:8080",
1007            "localhost:8080",
1008            "[::1]:8080",
1009            "127.1.2.3:9000",
1010        ] {
1011            assert!(
1012                should_try_cleartext_fallback(
1013                    "https",
1014                    true,
1015                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1016                    authority,
1017                ),
1018                "expected cleartext fallback for loopback authority '{authority}'"
1019            );
1020        }
1021    }
1022
1023    #[test]
1024    fn cleartext_fallback_denies_external_tls_failures() {
1025        // External target + TLS handshake failure must not downgrade — an
1026        // attacker able to forge TLS errors shouldn't force cleartext.
1027        for authority in [
1028            "reviewer.example:443",
1029            "8.8.8.8:443",
1030            "192.168.1.10:8080",
1031            "10.0.0.5:8443",
1032        ] {
1033            assert!(
1034                !should_try_cleartext_fallback(
1035                    "https",
1036                    true,
1037                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1038                    authority,
1039                ),
1040                "cleartext fallback must be denied for external authority '{authority}'"
1041            );
1042        }
1043    }
1044
1045    #[test]
1046    fn is_loopback_authority_recognises_loopback_forms() {
1047        assert!(is_loopback_authority("127.0.0.1:8080"));
1048        assert!(is_loopback_authority("localhost:8080"));
1049        assert!(is_loopback_authority("LOCALHOST:9000"));
1050        assert!(is_loopback_authority("[::1]:8080"));
1051        assert!(is_loopback_authority("127.5.5.5:1234"));
1052        assert!(!is_loopback_authority("8.8.8.8:443"));
1053        assert!(!is_loopback_authority("192.168.1.10:8080"));
1054        assert!(!is_loopback_authority("example.com:443"));
1055        assert!(!is_loopback_authority("reviewer.prod"));
1056    }
1057
1058    #[test]
1059    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1060        let error = endpoint_from_card(
1061            "https://trusted.example/.well-known/agent-card.json".to_string(),
1062            false,
1063            "trusted.example",
1064            "triage".to_string(),
1065            &serde_json::json!({
1066                "protocolVersion": "0.3.0",
1067                "url": "https://evil.example",
1068                "preferredTransport": "JSONRPC",
1069            }),
1070        )
1071        .unwrap_err();
1072        assert_eq!(
1073            error.to_string(),
1074            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1075        );
1076    }
1077
1078    #[test]
1079    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1080        let error = endpoint_from_card(
1081            "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1082            false,
1083            "127.0.0.1:8080",
1084            "triage".to_string(),
1085            &serde_json::json!({
1086                "protocolVersion": "0.3.0",
1087                "url": "http://localhost:8080",
1088                "preferredTransport": "JSONRPC",
1089            }),
1090        )
1091        .expect_err("cleartext card should require explicit opt-in");
1092        assert!(error
1093            .to_string()
1094            .contains("requires `allow_cleartext = true`"));
1095    }
1096
1097    #[test]
1098    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1099        // harn serve reports `http://localhost:PORT` in its card, but clients
1100        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
1101        // the authority check must not spuriously reject the pair.
1102        let card = serde_json::json!({
1103            "protocolVersion": "0.3.0",
1104            "url": "http://localhost:8080",
1105            "preferredTransport": "JSONRPC",
1106        });
1107        let endpoint = endpoint_from_card(
1108            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1109            true,
1110            "127.0.0.1:8080",
1111            "triage".to_string(),
1112            &card,
1113        )
1114        .expect("loopback alias pair should be accepted");
1115        assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1116
1117        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
1118        let card_v6 = serde_json::json!({
1119            "protocolVersion": "0.3.0",
1120            "url": "http://[::1]:8080",
1121            "preferredTransport": "JSONRPC",
1122        });
1123        let endpoint_v6 = endpoint_from_card(
1124            "http://localhost:8080/.well-known/agent-card.json".to_string(),
1125            true,
1126            "localhost:8080",
1127            "triage".to_string(),
1128            &card_v6,
1129        )
1130        .expect("IPv6 loopback alias should be accepted");
1131        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1132
1133        // Port mismatch is still rejected even on loopback.
1134        let card_wrong_port = serde_json::json!({
1135            "protocolVersion": "0.3.0",
1136            "url": "http://localhost:9000",
1137            "preferredTransport": "JSONRPC",
1138        });
1139        let error = endpoint_from_card(
1140            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1141            true,
1142            "127.0.0.1:8080",
1143            "triage".to_string(),
1144            &card_wrong_port,
1145        )
1146        .expect_err("mismatched ports must still be rejected even on loopback");
1147        assert!(error
1148            .to_string()
1149            .contains("A2A agent card url authority mismatch"));
1150    }
1151
1152    #[test]
1153    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1154        assert!(!authorities_equivalent(
1155            "internal.corp.example:443",
1156            "trusted.example:443",
1157        ));
1158        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1159        assert!(authorities_equivalent(
1160            "trusted.example:443",
1161            "trusted.example:443",
1162        ));
1163    }
1164}