Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11    ".well-known/agent-card.json",
12    ".well-known/a2a-agent",
13    ".well-known/agent.json",
14    "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23    pub card_url: String,
24    pub rpc_url: String,
25    pub agent_id: Option<String>,
26    pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq)]
30pub enum DispatchAck {
31    InlineResult {
32        task_id: String,
33        result: Value,
34    },
35    PendingTask {
36        task_id: String,
37        state: String,
38        handle: Value,
39    },
40}
41
42#[derive(Debug)]
43pub enum A2aClientError {
44    InvalidTarget(String),
45    Discovery(String),
46    Protocol(String),
47    Denied(String),
48    Timeout(String),
49    Cancelled(String),
50}
51
52impl std::fmt::Display for A2aClientError {
53    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
54        match self {
55            Self::InvalidTarget(message)
56            | Self::Discovery(message)
57            | Self::Protocol(message)
58            | Self::Denied(message)
59            | Self::Timeout(message)
60            | Self::Cancelled(message) => f.write_str(message),
61        }
62    }
63}
64
65impl std::error::Error for A2aClientError {}
66
67/// Abstraction over outbound A2A dispatch, injectable for tests.
68#[async_trait]
69pub trait A2aClient: Send + Sync + 'static {
70    async fn dispatch(
71        &self,
72        target: &str,
73        allow_cleartext: bool,
74        binding_id: &str,
75        binding_key: &str,
76        event: &TriggerEvent,
77        cancel_rx: &mut broadcast::Receiver<()>,
78    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
79}
80
81/// Production implementation that performs real HTTP A2A calls.
82pub struct RealA2aClient;
83
84#[async_trait]
85impl A2aClient for RealA2aClient {
86    async fn dispatch(
87        &self,
88        target: &str,
89        allow_cleartext: bool,
90        binding_id: &str,
91        binding_key: &str,
92        event: &TriggerEvent,
93        cancel_rx: &mut broadcast::Receiver<()>,
94    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
95        dispatch_trigger_event(
96            target,
97            allow_cleartext,
98            binding_id,
99            binding_key,
100            event,
101            cancel_rx,
102        )
103        .await
104    }
105}
106
107#[derive(Debug)]
108enum AgentCardFetchError {
109    Cancelled(String),
110    Discovery(String),
111    ConnectRefused(String),
112    Denied(String),
113    Timeout(String),
114}
115
116pub async fn dispatch_trigger_event(
117    raw_target: &str,
118    allow_cleartext: bool,
119    binding_id: &str,
120    binding_key: &str,
121    event: &TriggerEvent,
122    cancel_rx: &mut broadcast::Receiver<()>,
123) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
124    let started = std::time::Instant::now();
125    let target = match parse_target(raw_target) {
126        Ok(target) => target,
127        Err(error) => {
128            record_a2a_metric(raw_target, "failed", started.elapsed());
129            return Err(error);
130        }
131    };
132    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
133        Ok(endpoint) => endpoint,
134        Err(error) => {
135            record_a2a_metric(raw_target, "failed", started.elapsed());
136            return Err(error);
137        }
138    };
139    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
140    let envelope = serde_json::json!({
141        "kind": "harn.trigger.dispatch",
142        "message_id": message_id,
143        "trace_id": event.trace_id.0,
144        "event_id": event.id.0,
145        "trigger_id": binding_id,
146        "binding_key": binding_key,
147        "target_agent": endpoint.target_agent,
148        "event": event,
149    });
150    let text = serde_json::to_string(&envelope)
151        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
152    let push_config = push_notification_config();
153    let mut params = serde_json::json!({
154        "contextId": event.trace_id.0,
155        "message": {
156            "messageId": message_id,
157            "role": "user",
158            "parts": [{
159                "type": "text",
160                "text": text,
161            }],
162            "metadata": {
163                "kind": "harn.trigger.dispatch",
164                "trace_id": event.trace_id.0,
165                "event_id": event.id.0,
166                "trigger_id": binding_id,
167                "binding_key": binding_key,
168                "target_agent": endpoint.target_agent,
169            },
170        },
171    });
172    if let Some(config) = push_config.clone() {
173        params["configuration"] = serde_json::json!({
174            "blocking": false,
175            "returnImmediately": true,
176            "pushNotificationConfig": config,
177        });
178    }
179    let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
180
181    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
182        Ok(body) => body,
183        Err(error) => {
184            record_a2a_metric(raw_target, "failed", started.elapsed());
185            return Err(error);
186        }
187    };
188    let result = match body.get("result").cloned().ok_or_else(|| {
189        if let Some(error) = body.get("error") {
190            let message = error
191                .get("message")
192                .and_then(Value::as_str)
193                .unwrap_or("unknown A2A error");
194            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
195        } else {
196            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
197        }
198    }) {
199        Ok(result) => result,
200        Err(error) => {
201            record_a2a_metric(raw_target, "failed", started.elapsed());
202            return Err(error);
203        }
204    };
205
206    let task_id = match result
207        .get("id")
208        .and_then(Value::as_str)
209        .filter(|value| !value.is_empty())
210        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
211    {
212        Ok(task_id) => task_id.to_string(),
213        Err(error) => {
214            record_a2a_metric(raw_target, "failed", started.elapsed());
215            return Err(error);
216        }
217    };
218    let state = match task_state(&result) {
219        Ok(state) => state.to_string(),
220        Err(error) => {
221            record_a2a_metric(raw_target, "failed", started.elapsed());
222            return Err(error);
223        }
224    };
225
226    if state == "completed" {
227        let inline = extract_inline_result(&result);
228        record_a2a_metric(raw_target, "succeeded", started.elapsed());
229        return Ok((
230            endpoint,
231            DispatchAck::InlineResult {
232                task_id,
233                result: inline,
234            },
235        ));
236    }
237
238    if state == "rejected" {
239        record_a2a_metric(raw_target, "failed", started.elapsed());
240        return Err(A2aClientError::Denied(format!(
241            "A2A task rejected by remote agent: {}",
242            task_status_message(&result).unwrap_or("permission rejected")
243        )));
244    }
245
246    if let Some(config) = push_config {
247        register_push_notification_config(
248            &endpoint.rpc_url,
249            &task_id,
250            config,
251            &event.trace_id.0,
252            cancel_rx,
253        )
254        .await
255        .inspect_err(|_| {
256            record_a2a_metric(raw_target, "failed", started.elapsed());
257        })?;
258    }
259    record_a2a_metric(raw_target, "succeeded", started.elapsed());
260    Ok((
261        endpoint.clone(),
262        DispatchAck::PendingTask {
263            task_id: task_id.clone(),
264            state: state.clone(),
265            handle: serde_json::json!({
266                "kind": "a2a_task_handle",
267                "task_id": task_id,
268                "state": state,
269                "target_agent": endpoint.target_agent,
270                "rpc_url": endpoint.rpc_url,
271                "card_url": endpoint.card_url,
272                "agent_id": endpoint.agent_id,
273            }),
274        },
275    ))
276}
277
278fn push_notification_config() -> Option<Value> {
279    let url = std::env::var(A2A_PUSH_URL_ENV)
280        .ok()
281        .map(|value| value.trim().to_string())
282        .filter(|value| !value.is_empty())?;
283    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
284        .ok()
285        .map(|value| value.trim().to_string())
286        .filter(|value| !value.is_empty());
287    let mut config = serde_json::json!({ "url": url });
288    if let Some(token) = token {
289        config["token"] = Value::String(token.clone());
290        config["authentication"] = serde_json::json!({
291            "scheme": "Bearer",
292            "credentials": token,
293        });
294    }
295    Some(config)
296}
297
298async fn register_push_notification_config(
299    rpc_url: &str,
300    task_id: &str,
301    config: Value,
302    trace_id: &str,
303    cancel_rx: &mut broadcast::Receiver<()>,
304) -> Result<(), A2aClientError> {
305    let request = crate::jsonrpc::request(
306        format!("{trace_id}.{task_id}.push-config"),
307        "tasks/pushNotificationConfig/set",
308        serde_json::json!({
309            "taskId": task_id,
310            "pushNotificationConfig": config,
311        }),
312    );
313    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
314    if response.get("error").is_some() {
315        return Err(A2aClientError::Protocol(format!(
316            "A2A push notification registration failed: {}",
317            response["error"]
318        )));
319    }
320    Ok(())
321}
322
323fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
324    if let Some(metrics) = crate::active_metrics_registry() {
325        metrics.record_a2a_hop(target, outcome, duration);
326    }
327}
328
329pub fn target_agent_label(raw_target: &str) -> String {
330    parse_target(raw_target)
331        .map(|target| target.target_agent_label())
332        .unwrap_or_else(|_| raw_target.to_string())
333}
334
335#[derive(Clone, Debug)]
336struct ParsedTarget {
337    authority: String,
338    target_agent: String,
339}
340
341impl ParsedTarget {
342    fn target_agent_label(&self) -> String {
343        if self.target_agent.is_empty() {
344            self.authority.clone()
345        } else {
346            self.target_agent.clone()
347        }
348    }
349}
350
351fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
352    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
353        A2aClientError::InvalidTarget(format!(
354            "invalid a2a dispatch target '{raw_target}': {error}"
355        ))
356    })?;
357    let host = parsed.host_str().ok_or_else(|| {
358        A2aClientError::InvalidTarget(format!(
359            "invalid a2a dispatch target '{raw_target}': missing host"
360        ))
361    })?;
362    let authority = if let Some(port) = parsed.port() {
363        format!("{host}:{port}")
364    } else {
365        host.to_string()
366    };
367    Ok(ParsedTarget {
368        authority,
369        target_agent: parsed.path().trim_start_matches('/').to_string(),
370    })
371}
372
373async fn resolve_endpoint(
374    target: &ParsedTarget,
375    allow_cleartext: bool,
376    cancel_rx: &mut broadcast::Receiver<()>,
377) -> Result<ResolvedA2aEndpoint, A2aClientError> {
378    let mut last_error = None;
379    for scheme in card_resolution_schemes(allow_cleartext) {
380        let mut last_scheme_error = None;
381        for path in A2A_AGENT_CARD_PATHS {
382            let card_url = format!("{scheme}://{}/{path}", target.authority);
383            match fetch_agent_card(&card_url, cancel_rx).await {
384                Ok(card) => {
385                    return endpoint_from_card(
386                        card_url,
387                        allow_cleartext,
388                        &target.authority,
389                        target.target_agent.clone(),
390                        &card,
391                    );
392                }
393                Err(AgentCardFetchError::Cancelled(message)) => {
394                    return Err(A2aClientError::Cancelled(message));
395                }
396                Err(AgentCardFetchError::Timeout(message)) => {
397                    return Err(A2aClientError::Timeout(message));
398                }
399                Err(AgentCardFetchError::Denied(message)) => {
400                    return Err(A2aClientError::Denied(message));
401                }
402                Err(error) => {
403                    last_error = Some(agent_card_fetch_error_message(&error));
404                    last_scheme_error = Some(error);
405                }
406            }
407        }
408        if last_scheme_error.as_ref().is_some_and(|error| {
409            should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
410        }) {
411            continue;
412        }
413        break;
414    }
415    Err(A2aClientError::Discovery(format!(
416        "could not resolve A2A agent card for '{}': {}",
417        target.authority,
418        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
419    )))
420}
421
422async fn fetch_agent_card(
423    card_url: &str,
424    cancel_rx: &mut broadcast::Receiver<()>,
425) -> Result<Value, AgentCardFetchError> {
426    let response = tokio::select! {
427        response = crate::llm::shared_utility_client().get(card_url).send() => {
428            match response {
429                Ok(response) => Ok(response),
430                Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
431                    format!("A2A HTTP request timed out: {error}")
432                )),
433                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
434                    format!("A2A HTTP request failed: {error}")
435                )),
436                Err(error) => Err(AgentCardFetchError::Discovery(
437                    format!("A2A HTTP request failed: {error}")
438                )),
439            }
440        }
441        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
442            "A2A agent-card fetch cancelled".to_string()
443        )),
444    }?;
445    if matches!(
446        response.status(),
447        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
448    ) {
449        return Err(AgentCardFetchError::Denied(format!(
450            "GET {card_url} returned HTTP {}",
451            response.status()
452        )));
453    }
454    if !response.status().is_success() {
455        return Err(AgentCardFetchError::Discovery(format!(
456            "GET {card_url} returned HTTP {}",
457            response.status()
458        )));
459    }
460    response
461        .json::<Value>()
462        .await
463        .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
464}
465
466fn endpoint_from_card(
467    card_url: String,
468    allow_cleartext: bool,
469    requested_authority: &str,
470    target_agent: String,
471    card: &Value,
472) -> Result<ResolvedA2aEndpoint, A2aClientError> {
473    let rpc_url = if has_current_transport_fields(card) {
474        endpoint_from_current_card(card, allow_cleartext, requested_authority)?
475    } else if let Some(rpc_url) =
476        endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
477    {
478        rpc_url
479    } else {
480        return Err(A2aClientError::Discovery(
481            "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
482        ));
483    };
484
485    Ok(ResolvedA2aEndpoint {
486        card_url,
487        rpc_url: rpc_url.to_string(),
488        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
489        target_agent,
490    })
491}
492
493fn has_current_transport_fields(card: &Value) -> bool {
494    card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
495}
496
497fn endpoint_from_current_card(
498    card: &Value,
499    allow_cleartext: bool,
500    requested_authority: &str,
501) -> Result<Url, A2aClientError> {
502    let protocol_version = card
503        .get("protocolVersion")
504        .and_then(Value::as_str)
505        .ok_or_else(|| {
506            A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
507        })?;
508    if protocol_version != A2A_PROTOCOL_VERSION {
509        return Err(A2aClientError::Discovery(format!(
510            "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
511        )));
512    }
513
514    let base_url = card
515        .get("url")
516        .and_then(Value::as_str)
517        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
518    let base_url = resolve_declared_url(
519        base_url,
520        allow_cleartext,
521        requested_authority,
522        "agent card url",
523    )?;
524
525    let preferred_transport = card
526        .get("preferredTransport")
527        .and_then(Value::as_str)
528        .ok_or_else(|| {
529            A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
530        })?;
531    if transport_is_jsonrpc(preferred_transport) {
532        return Ok(base_url);
533    }
534
535    if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
536        return resolve_declared_url(
537            interface_url,
538            allow_cleartext,
539            requested_authority,
540            "JSONRPC additionalInterface url",
541        );
542    }
543
544    Err(A2aClientError::Discovery(
545        "A2A agent card does not expose JSONRPC transport".to_string(),
546    ))
547}
548
549fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
550    let Some(interfaces) = card.get("additionalInterfaces") else {
551        return Ok(None);
552    };
553    let interfaces = interfaces.as_array().ok_or_else(|| {
554        A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
555    })?;
556    for interface in interfaces {
557        if interface
558            .get("transport")
559            .and_then(Value::as_str)
560            .is_some_and(transport_is_jsonrpc)
561        {
562            let interface_url = interface
563                .get("url")
564                .and_then(Value::as_str)
565                .ok_or_else(|| {
566                    A2aClientError::Discovery(
567                        "A2A JSONRPC additionalInterface missing url".to_string(),
568                    )
569                })?;
570            return Ok(Some(interface_url));
571        }
572    }
573    Ok(None)
574}
575
576fn endpoint_from_legacy_supported_interfaces(
577    card: &Value,
578    allow_cleartext: bool,
579    requested_authority: &str,
580) -> Result<Option<Url>, A2aClientError> {
581    let Some(interfaces) = card.get("supportedInterfaces") else {
582        return Ok(None);
583    };
584    let interfaces = interfaces.as_array().ok_or_else(|| {
585        A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
586    })?;
587    let mut saw_jsonrpc = false;
588    for interface in interfaces {
589        if !interface
590            .get("protocolBinding")
591            .and_then(Value::as_str)
592            .is_some_and(transport_is_jsonrpc)
593        {
594            continue;
595        }
596        saw_jsonrpc = true;
597        if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
598            continue;
599        }
600        let interface_url = interface
601            .get("url")
602            .and_then(Value::as_str)
603            .ok_or_else(|| {
604                A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
605            })?;
606        return resolve_declared_url(
607            interface_url,
608            allow_cleartext,
609            requested_authority,
610            "JSONRPC supportedInterface url",
611        )
612        .map(Some);
613    }
614    if saw_jsonrpc {
615        return Err(A2aClientError::Discovery(format!(
616            "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
617        )));
618    }
619    Err(A2aClientError::Discovery(
620        "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
621    ))
622}
623
624fn transport_is_jsonrpc(transport: &str) -> bool {
625    transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
626}
627
628fn resolve_declared_url(
629    raw_url: &str,
630    allow_cleartext: bool,
631    requested_authority: &str,
632    label: &str,
633) -> Result<Url, A2aClientError> {
634    let url = Url::parse(raw_url).map_err(|error| {
635        A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
636    })?;
637    ensure_cleartext_allowed(&url, allow_cleartext, label)?;
638    let declared_authority = url_authority(&url)?;
639    if !authorities_equivalent(&declared_authority, requested_authority) {
640        return Err(A2aClientError::Denied(format!(
641            "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
642        )));
643    }
644    Ok(url)
645}
646
647fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
648    if allow_cleartext {
649        &["https", "http"]
650    } else {
651        &["https"]
652    }
653}
654
655/// Decide whether an HTTPS discovery failure should fall through to cleartext.
656///
657/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
658/// port isn't listening" case. TLS handshake failures to an external host MUST
659/// NOT silently downgrade to HTTP, because an active network attacker can
660/// forge TLS errors to trigger a downgrade.
661///
662/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
663/// discovery-style error. They cover the standard local-dev case where
664/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
665/// model for loopback is already bounded — any attacker who can reach the
666/// local loopback already has code execution on the box.
667fn should_try_cleartext_fallback(
668    scheme: &str,
669    allow_cleartext: bool,
670    error: &AgentCardFetchError,
671    authority: &str,
672) -> bool {
673    if !allow_cleartext || scheme != "https" {
674        return false;
675    }
676    match error {
677        AgentCardFetchError::Cancelled(_)
678        | AgentCardFetchError::Denied(_)
679        | AgentCardFetchError::Timeout(_) => false,
680        AgentCardFetchError::ConnectRefused(_) => true,
681        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
682    }
683}
684
685fn ensure_cleartext_allowed(
686    url: &Url,
687    allow_cleartext: bool,
688    label: &str,
689) -> Result<(), A2aClientError> {
690    if allow_cleartext || url.scheme() != "http" {
691        return Ok(());
692    }
693    Err(A2aClientError::Denied(format!(
694        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
695    )))
696}
697
698fn is_loopback_authority(authority: &str) -> bool {
699    let (host, _) = split_authority(authority);
700    if host.eq_ignore_ascii_case("localhost") {
701        return true;
702    }
703    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
704        return ip.is_loopback();
705    }
706    false
707}
708
709/// Return true when two authority strings refer to the same A2A endpoint.
710///
711/// Exact string equality is the default — an agent card that reports a
712/// different host than the one the client asked for is a security-relevant
713/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
714/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
715/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
716/// hardcodes `http://localhost:PORT` in its agent card even when a caller
717/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
718/// mismatch in that case without widening the external-host trust boundary.
719fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
720    if card_authority == requested_authority {
721        return true;
722    }
723    let (_, card_port) = split_authority(card_authority);
724    let (_, requested_port) = split_authority(requested_authority);
725    if card_port != requested_port {
726        return false;
727    }
728    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
729}
730
731/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
732/// `[::1]:8080` becomes `("::1", "8080")`.
733fn split_authority(authority: &str) -> (&str, &str) {
734    let (host_raw, port) = if authority.starts_with('[') {
735        // IPv6 bracketed form: "[addr]:port" or "[addr]".
736        if let Some(end) = authority.rfind(']') {
737            let host = &authority[..=end];
738            let rest = &authority[end + 1..];
739            let port = rest.strip_prefix(':').unwrap_or("");
740            (host, port)
741        } else {
742            (authority, "")
743        }
744    } else {
745        match authority.rsplit_once(':') {
746            Some((host, port)) => (host, port),
747            None => (authority, ""),
748        }
749    };
750    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
751    (host, port)
752}
753
754fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
755    match error {
756        AgentCardFetchError::Cancelled(message)
757        | AgentCardFetchError::Discovery(message)
758        | AgentCardFetchError::ConnectRefused(message)
759        | AgentCardFetchError::Denied(message)
760        | AgentCardFetchError::Timeout(message) => message.clone(),
761    }
762}
763
764fn is_connect_refused(error: &reqwest::Error) -> bool {
765    if !error.is_connect() {
766        return false;
767    }
768    let mut source = error.source();
769    while let Some(cause) = source {
770        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
771            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
772                return true;
773            }
774        }
775        source = cause.source();
776    }
777    false
778}
779
780fn url_authority(url: &Url) -> Result<String, A2aClientError> {
781    let host = url
782        .host_str()
783        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
784    Ok(if let Some(port) = url.port() {
785        format!("{host}:{port}")
786    } else {
787        host.to_string()
788    })
789}
790
791async fn send_jsonrpc(
792    rpc_url: &str,
793    request: &Value,
794    trace_id: &str,
795    cancel_rx: &mut broadcast::Receiver<()>,
796) -> Result<Value, A2aClientError> {
797    let response = send_http(
798        crate::llm::shared_blocking_client()
799            .post(rpc_url)
800            .header(reqwest::header::CONTENT_TYPE, "application/json")
801            .header("A2A-Version", A2A_PROTOCOL_VERSION)
802            .header("A2A-Trace-Id", trace_id)
803            .json(request),
804        cancel_rx,
805        "A2A task dispatch cancelled",
806    )
807    .await?;
808    if matches!(
809        response.status(),
810        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
811    ) {
812        return Err(A2aClientError::Denied(format!(
813            "A2A task dispatch returned HTTP {}",
814            response.status()
815        )));
816    }
817    if !response.status().is_success() {
818        return Err(A2aClientError::Protocol(format!(
819            "A2A task dispatch returned HTTP {}",
820            response.status()
821        )));
822    }
823    response
824        .json::<Value>()
825        .await
826        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
827}
828
829async fn send_http(
830    request: reqwest::RequestBuilder,
831    cancel_rx: &mut broadcast::Receiver<()>,
832    cancelled_message: &'static str,
833) -> Result<reqwest::Response, A2aClientError> {
834    tokio::select! {
835        response = request.send() => response.map_err(|error| {
836            if error.is_timeout() {
837                A2aClientError::Timeout(format!("A2A HTTP request timed out: {error}"))
838            } else {
839                A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))
840            }
841        }),
842        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
843    }
844}
845
846fn task_state(task: &Value) -> Result<&str, A2aClientError> {
847    task.pointer("/status/state")
848        .and_then(Value::as_str)
849        .filter(|value| !value.is_empty())
850        .ok_or_else(|| {
851            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
852        })
853}
854
855fn task_status_message(task: &Value) -> Option<&str> {
856    task.pointer("/status/message/parts")
857        .and_then(Value::as_array)
858        .and_then(|parts| {
859            parts.iter().find_map(|part| {
860                if part.get("type").and_then(Value::as_str) == Some("text") {
861                    part.get("text").and_then(Value::as_str).map(str::trim)
862                } else {
863                    None
864                }
865            })
866        })
867        .filter(|message| !message.is_empty())
868}
869
870fn extract_inline_result(task: &Value) -> Value {
871    let text = task
872        .get("history")
873        .and_then(Value::as_array)
874        .and_then(|history| {
875            history.iter().rev().find_map(|message| {
876                let role = message.get("role").and_then(Value::as_str)?;
877                if role != "agent" {
878                    return None;
879                }
880                message
881                    .get("parts")
882                    .and_then(Value::as_array)
883                    .and_then(|parts| {
884                        parts.iter().find_map(|part| {
885                            if part.get("type").and_then(Value::as_str) == Some("text") {
886                                part.get("text").and_then(Value::as_str).map(str::trim_end)
887                            } else {
888                                None
889                            }
890                        })
891                    })
892            })
893        });
894    match text {
895        Some(text) if !text.is_empty() => {
896            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
897        }
898        _ => task.clone(),
899    }
900}
901
902async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
903    let _ = cancel_rx.recv().await;
904}
905
906#[cfg(test)]
907mod tests {
908    use super::*;
909
910    #[test]
911    fn target_agent_label_prefers_path() {
912        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
913        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
914    }
915
916    #[test]
917    fn extract_inline_result_parses_json_text() {
918        let task = serde_json::json!({
919            "history": [
920                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
921                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
922            ]
923        });
924        assert_eq!(
925            extract_inline_result(&task),
926            serde_json::json!({"trace_id": "trace_123"})
927        );
928    }
929
930    #[test]
931    fn discovery_prefers_https_before_http() {
932        assert_eq!(card_resolution_schemes(false), ["https"]);
933        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
934    }
935
936    #[test]
937    fn endpoint_from_card_accepts_current_preferred_transport() {
938        let endpoint = endpoint_from_card(
939            "https://trusted.example/.well-known/agent-card.json".to_string(),
940            false,
941            "trusted.example",
942            "triage".to_string(),
943            &serde_json::json!({
944                "name": "trusted",
945                "protocolVersion": "0.3.0",
946                "url": "https://trusted.example/rpc",
947                "preferredTransport": "JSONRPC",
948                "additionalInterfaces": [{
949                    "url": "https://trusted.example/rpc",
950                    "transport": "JSONRPC"
951                }]
952            }),
953        )
954        .expect("current A2A card should resolve");
955        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
956        assert_eq!(
957            endpoint.card_url,
958            "https://trusted.example/.well-known/agent-card.json"
959        );
960        assert_eq!(endpoint.target_agent, "triage");
961    }
962
963    #[test]
964    fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
965        let endpoint = endpoint_from_card(
966            "https://trusted.example/.well-known/agent-card.json".to_string(),
967            false,
968            "trusted.example",
969            "triage".to_string(),
970            &serde_json::json!({
971                "name": "trusted",
972                "protocolVersion": "0.3.0",
973                "url": "https://trusted.example/rest",
974                "preferredTransport": "HTTP+JSON",
975                "additionalInterfaces": [{
976                    "url": "https://trusted.example/rpc",
977                    "transport": "JSONRPC"
978                }]
979            }),
980        )
981        .expect("current A2A card should resolve through additionalInterfaces");
982        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
983    }
984
985    #[test]
986    fn endpoint_from_card_accepts_legacy_supported_interfaces() {
987        let endpoint = endpoint_from_card(
988            "https://trusted.example/.well-known/agent-card.json".to_string(),
989            false,
990            "trusted.example",
991            "triage".to_string(),
992            &serde_json::json!({
993                "name": "trusted",
994                "supportedInterfaces": [{
995                    "protocolBinding": "JSONRPC",
996                    "protocolVersion": "0.3.0",
997                    "url": "https://trusted.example/rpc"
998                }],
999            }),
1000        )
1001        .expect("legacy A2A card should resolve during the transition");
1002        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1003    }
1004
1005    #[test]
1006    fn endpoint_from_card_rejects_removed_interfaces_shape() {
1007        let error = endpoint_from_card(
1008            "https://trusted.example/.well-known/agent-card.json".to_string(),
1009            false,
1010            "trusted.example",
1011            "triage".to_string(),
1012            &serde_json::json!({
1013                "url": "https://trusted.example",
1014                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1015            }),
1016        )
1017        .expect_err("pre-0.3 Harn discovery shape should be rejected");
1018        assert_eq!(
1019            error.to_string(),
1020            "A2A agent card missing preferredTransport/additionalInterfaces"
1021        );
1022    }
1023
1024    #[test]
1025    fn cleartext_fallback_only_after_https_connect_refused() {
1026        assert!(should_try_cleartext_fallback(
1027            "https",
1028            true,
1029            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1030            "reviewer.example:443",
1031        ));
1032        assert!(!should_try_cleartext_fallback(
1033            "http",
1034            true,
1035            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1036            "reviewer.example:443",
1037        ));
1038        assert!(!should_try_cleartext_fallback(
1039            "https",
1040            true,
1041            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1042            "reviewer.example:443",
1043        ));
1044    }
1045
1046    #[test]
1047    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1048        for authority in [
1049            "127.0.0.1:8080",
1050            "localhost:8080",
1051            "[::1]:8080",
1052            "127.1.2.3:9000",
1053        ] {
1054            assert!(
1055                !should_try_cleartext_fallback(
1056                    "https",
1057                    false,
1058                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1059                    authority,
1060                ),
1061                "cleartext fallback must stay disabled without opt-in for '{authority}'"
1062            );
1063        }
1064    }
1065
1066    #[test]
1067    fn cleartext_fallback_allows_loopback_after_opt_in() {
1068        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
1069        // still need the HTTP fallback to succeed.
1070        for authority in [
1071            "127.0.0.1:8080",
1072            "localhost:8080",
1073            "[::1]:8080",
1074            "127.1.2.3:9000",
1075        ] {
1076            assert!(
1077                should_try_cleartext_fallback(
1078                    "https",
1079                    true,
1080                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1081                    authority,
1082                ),
1083                "expected cleartext fallback for loopback authority '{authority}'"
1084            );
1085        }
1086    }
1087
1088    #[test]
1089    fn cleartext_fallback_denies_external_tls_failures() {
1090        // External target + TLS handshake failure must not downgrade — an
1091        // attacker able to forge TLS errors shouldn't force cleartext.
1092        for authority in [
1093            "reviewer.example:443",
1094            "8.8.8.8:443",
1095            "192.168.1.10:8080",
1096            "10.0.0.5:8443",
1097        ] {
1098            assert!(
1099                !should_try_cleartext_fallback(
1100                    "https",
1101                    true,
1102                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1103                    authority,
1104                ),
1105                "cleartext fallback must be denied for external authority '{authority}'"
1106            );
1107        }
1108    }
1109
1110    #[test]
1111    fn is_loopback_authority_recognises_loopback_forms() {
1112        assert!(is_loopback_authority("127.0.0.1:8080"));
1113        assert!(is_loopback_authority("localhost:8080"));
1114        assert!(is_loopback_authority("LOCALHOST:9000"));
1115        assert!(is_loopback_authority("[::1]:8080"));
1116        assert!(is_loopback_authority("127.5.5.5:1234"));
1117        assert!(!is_loopback_authority("8.8.8.8:443"));
1118        assert!(!is_loopback_authority("192.168.1.10:8080"));
1119        assert!(!is_loopback_authority("example.com:443"));
1120        assert!(!is_loopback_authority("reviewer.prod"));
1121    }
1122
1123    #[test]
1124    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1125        let error = endpoint_from_card(
1126            "https://trusted.example/.well-known/agent-card.json".to_string(),
1127            false,
1128            "trusted.example",
1129            "triage".to_string(),
1130            &serde_json::json!({
1131                "protocolVersion": "0.3.0",
1132                "url": "https://evil.example",
1133                "preferredTransport": "JSONRPC",
1134            }),
1135        )
1136        .unwrap_err();
1137        assert_eq!(
1138            error.to_string(),
1139            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1140        );
1141    }
1142
1143    #[test]
1144    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1145        let error = endpoint_from_card(
1146            "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1147            false,
1148            "127.0.0.1:8080",
1149            "triage".to_string(),
1150            &serde_json::json!({
1151                "protocolVersion": "0.3.0",
1152                "url": "http://localhost:8080",
1153                "preferredTransport": "JSONRPC",
1154            }),
1155        )
1156        .expect_err("cleartext card should require explicit opt-in");
1157        assert!(error
1158            .to_string()
1159            .contains("requires `allow_cleartext = true`"));
1160    }
1161
1162    #[test]
1163    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1164        // harn serve reports `http://localhost:PORT` in its card, but clients
1165        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
1166        // the authority check must not spuriously reject the pair.
1167        let card = serde_json::json!({
1168            "protocolVersion": "0.3.0",
1169            "url": "http://localhost:8080",
1170            "preferredTransport": "JSONRPC",
1171        });
1172        let endpoint = endpoint_from_card(
1173            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1174            true,
1175            "127.0.0.1:8080",
1176            "triage".to_string(),
1177            &card,
1178        )
1179        .expect("loopback alias pair should be accepted");
1180        assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1181
1182        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
1183        let card_v6 = serde_json::json!({
1184            "protocolVersion": "0.3.0",
1185            "url": "http://[::1]:8080",
1186            "preferredTransport": "JSONRPC",
1187        });
1188        let endpoint_v6 = endpoint_from_card(
1189            "http://localhost:8080/.well-known/agent-card.json".to_string(),
1190            true,
1191            "localhost:8080",
1192            "triage".to_string(),
1193            &card_v6,
1194        )
1195        .expect("IPv6 loopback alias should be accepted");
1196        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1197
1198        // Port mismatch is still rejected even on loopback.
1199        let card_wrong_port = serde_json::json!({
1200            "protocolVersion": "0.3.0",
1201            "url": "http://localhost:9000",
1202            "preferredTransport": "JSONRPC",
1203        });
1204        let error = endpoint_from_card(
1205            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1206            true,
1207            "127.0.0.1:8080",
1208            "triage".to_string(),
1209            &card_wrong_port,
1210        )
1211        .expect_err("mismatched ports must still be rejected even on loopback");
1212        assert!(error
1213            .to_string()
1214            .contains("A2A agent card url authority mismatch"));
1215    }
1216
1217    #[test]
1218    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1219        assert!(!authorities_equivalent(
1220            "internal.corp.example:443",
1221            "trusted.example:443",
1222        ));
1223        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1224        assert!(authorities_equivalent(
1225            "trusted.example:443",
1226            "trusted.example:443",
1227        ));
1228    }
1229}