Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11    ".well-known/agent-card.json",
12    ".well-known/a2a-agent",
13    ".well-known/agent.json",
14    "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20
21#[derive(Clone, Debug, PartialEq, Eq)]
22pub struct ResolvedA2aEndpoint {
23    pub card_url: String,
24    pub rpc_url: String,
25    pub agent_id: Option<String>,
26    pub target_agent: String,
27}
28
29#[derive(Clone, Debug, PartialEq, Eq)]
30pub struct ResolvedA2aAgent {
31    pub endpoint: ResolvedA2aEndpoint,
32    pub card: Value,
33}
34
35#[derive(Clone, Debug, PartialEq, Eq)]
36pub enum DispatchAck {
37    InlineResult {
38        task_id: String,
39        result: Value,
40    },
41    PendingTask {
42        task_id: String,
43        state: String,
44        handle: Value,
45    },
46}
47
48#[derive(Debug)]
49pub enum A2aClientError {
50    InvalidTarget(String),
51    Discovery(String),
52    Protocol(String),
53    Denied(String),
54    Timeout(String),
55    Cancelled(String),
56}
57
58impl std::fmt::Display for A2aClientError {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        match self {
61            Self::InvalidTarget(message)
62            | Self::Discovery(message)
63            | Self::Protocol(message)
64            | Self::Denied(message)
65            | Self::Timeout(message)
66            | Self::Cancelled(message) => f.write_str(message),
67        }
68    }
69}
70
71impl std::error::Error for A2aClientError {}
72
73/// Abstraction over outbound A2A dispatch, injectable for tests.
74#[async_trait]
75pub trait A2aClient: Send + Sync + 'static {
76    async fn dispatch(
77        &self,
78        target: &str,
79        allow_cleartext: bool,
80        binding_id: &str,
81        binding_key: &str,
82        event: &TriggerEvent,
83        cancel_rx: &mut broadcast::Receiver<()>,
84    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
85}
86
87/// Production implementation that performs real HTTP A2A calls.
88pub struct RealA2aClient;
89
90#[async_trait]
91impl A2aClient for RealA2aClient {
92    async fn dispatch(
93        &self,
94        target: &str,
95        allow_cleartext: bool,
96        binding_id: &str,
97        binding_key: &str,
98        event: &TriggerEvent,
99        cancel_rx: &mut broadcast::Receiver<()>,
100    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
101        dispatch_trigger_event(
102            target,
103            allow_cleartext,
104            binding_id,
105            binding_key,
106            event,
107            cancel_rx,
108        )
109        .await
110    }
111}
112
113#[derive(Debug)]
114enum AgentCardFetchError {
115    Cancelled(String),
116    Discovery(String),
117    ConnectRefused(String),
118    Denied(String),
119    Timeout(String),
120}
121
122pub async fn dispatch_trigger_event(
123    raw_target: &str,
124    allow_cleartext: bool,
125    binding_id: &str,
126    binding_key: &str,
127    event: &TriggerEvent,
128    cancel_rx: &mut broadcast::Receiver<()>,
129) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
130    let started = std::time::Instant::now();
131    let target = match parse_target(raw_target) {
132        Ok(target) => target,
133        Err(error) => {
134            record_a2a_metric(raw_target, "failed", started.elapsed());
135            return Err(error);
136        }
137    };
138    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
139        Ok(endpoint) => endpoint,
140        Err(error) => {
141            record_a2a_metric(raw_target, "failed", started.elapsed());
142            return Err(error);
143        }
144    };
145    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
146    let envelope = serde_json::json!({
147        "kind": "harn.trigger.dispatch",
148        "message_id": message_id,
149        "trace_id": event.trace_id.0,
150        "event_id": event.id.0,
151        "trigger_id": binding_id,
152        "binding_key": binding_key,
153        "target_agent": endpoint.target_agent,
154        "event": event,
155    });
156    let text = serde_json::to_string(&envelope)
157        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
158    let push_config = push_notification_config();
159    let mut params = serde_json::json!({
160        "contextId": event.trace_id.0,
161        "message": {
162            "messageId": message_id,
163            "role": "user",
164            "parts": [{
165                "type": "text",
166                "text": text,
167            }],
168            "metadata": {
169                "kind": "harn.trigger.dispatch",
170                "trace_id": event.trace_id.0,
171                "event_id": event.id.0,
172                "trigger_id": binding_id,
173                "binding_key": binding_key,
174                "target_agent": endpoint.target_agent,
175            },
176        },
177    });
178    if let Some(config) = push_config.clone() {
179        params["configuration"] = serde_json::json!({
180            "blocking": false,
181            "returnImmediately": true,
182            "pushNotificationConfig": config,
183        });
184    }
185    let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
186
187    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
188        Ok(body) => body,
189        Err(error) => {
190            record_a2a_metric(raw_target, "failed", started.elapsed());
191            return Err(error);
192        }
193    };
194    let result = match body.get("result").cloned().ok_or_else(|| {
195        if let Some(error) = body.get("error") {
196            let message = error
197                .get("message")
198                .and_then(Value::as_str)
199                .unwrap_or("unknown A2A error");
200            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
201        } else {
202            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
203        }
204    }) {
205        Ok(result) => result,
206        Err(error) => {
207            record_a2a_metric(raw_target, "failed", started.elapsed());
208            return Err(error);
209        }
210    };
211
212    let task_id = match result
213        .get("id")
214        .and_then(Value::as_str)
215        .filter(|value| !value.is_empty())
216        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
217    {
218        Ok(task_id) => task_id.to_string(),
219        Err(error) => {
220            record_a2a_metric(raw_target, "failed", started.elapsed());
221            return Err(error);
222        }
223    };
224    let state = match task_state(&result) {
225        Ok(state) => state.to_string(),
226        Err(error) => {
227            record_a2a_metric(raw_target, "failed", started.elapsed());
228            return Err(error);
229        }
230    };
231
232    if state == "completed" {
233        let inline = extract_inline_result(&result);
234        record_a2a_metric(raw_target, "succeeded", started.elapsed());
235        return Ok((
236            endpoint,
237            DispatchAck::InlineResult {
238                task_id,
239                result: inline,
240            },
241        ));
242    }
243
244    if state == "rejected" {
245        record_a2a_metric(raw_target, "failed", started.elapsed());
246        return Err(A2aClientError::Denied(format!(
247            "A2A task rejected by remote agent: {}",
248            task_status_message(&result).unwrap_or("permission rejected")
249        )));
250    }
251
252    if let Some(config) = push_config {
253        register_push_notification_config(
254            &endpoint.rpc_url,
255            &task_id,
256            config,
257            &event.trace_id.0,
258            cancel_rx,
259        )
260        .await
261        .inspect_err(|_| {
262            record_a2a_metric(raw_target, "failed", started.elapsed());
263        })?;
264    }
265    record_a2a_metric(raw_target, "succeeded", started.elapsed());
266    Ok((
267        endpoint.clone(),
268        DispatchAck::PendingTask {
269            task_id: task_id.clone(),
270            state: state.clone(),
271            handle: serde_json::json!({
272                "kind": "a2a_task_handle",
273                "task_id": task_id,
274                "state": state,
275                "target_agent": endpoint.target_agent,
276                "rpc_url": endpoint.rpc_url,
277                "card_url": endpoint.card_url,
278                "agent_id": endpoint.agent_id,
279            }),
280        },
281    ))
282}
283
284pub async fn resolve_agent(
285    raw_target: &str,
286    allow_cleartext: bool,
287    cancel_rx: &mut broadcast::Receiver<()>,
288) -> Result<ResolvedA2aAgent, A2aClientError> {
289    let target = parse_target(raw_target)?;
290    let resolved = resolve_endpoint_with_card(&target, allow_cleartext, cancel_rx).await?;
291    Ok(ResolvedA2aAgent {
292        endpoint: resolved.0,
293        card: resolved.1,
294    })
295}
296
297pub async fn send_jsonrpc_request(
298    rpc_url: &str,
299    request: &Value,
300    trace_id: &str,
301    cancel_rx: &mut broadcast::Receiver<()>,
302) -> Result<Value, A2aClientError> {
303    send_jsonrpc(rpc_url, request, trace_id, cancel_rx).await
304}
305
306fn push_notification_config() -> Option<Value> {
307    let url = std::env::var(A2A_PUSH_URL_ENV)
308        .ok()
309        .map(|value| value.trim().to_string())
310        .filter(|value| !value.is_empty())?;
311    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
312        .ok()
313        .map(|value| value.trim().to_string())
314        .filter(|value| !value.is_empty());
315    let mut config = serde_json::json!({ "url": url });
316    if let Some(token) = token {
317        config["token"] = Value::String(token.clone());
318        config["authentication"] = serde_json::json!({
319            "scheme": "Bearer",
320            "credentials": token,
321        });
322    }
323    Some(config)
324}
325
326async fn register_push_notification_config(
327    rpc_url: &str,
328    task_id: &str,
329    config: Value,
330    trace_id: &str,
331    cancel_rx: &mut broadcast::Receiver<()>,
332) -> Result<(), A2aClientError> {
333    let request = crate::jsonrpc::request(
334        format!("{trace_id}.{task_id}.push-config"),
335        "tasks/pushNotificationConfig/set",
336        serde_json::json!({
337            "taskId": task_id,
338            "pushNotificationConfig": config,
339        }),
340    );
341    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
342    if response.get("error").is_some() {
343        return Err(A2aClientError::Protocol(format!(
344            "A2A push notification registration failed: {}",
345            response["error"]
346        )));
347    }
348    Ok(())
349}
350
351fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
352    if let Some(metrics) = crate::active_metrics_registry() {
353        metrics.record_a2a_hop(target, outcome, duration);
354    }
355}
356
357pub fn target_agent_label(raw_target: &str) -> String {
358    parse_target(raw_target)
359        .map(|target| target.target_agent_label())
360        .unwrap_or_else(|_| raw_target.to_string())
361}
362
363#[derive(Clone, Debug)]
364struct ParsedTarget {
365    authority: String,
366    target_agent: String,
367}
368
369impl ParsedTarget {
370    fn target_agent_label(&self) -> String {
371        if self.target_agent.is_empty() {
372            self.authority.clone()
373        } else {
374            self.target_agent.clone()
375        }
376    }
377}
378
379fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
380    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
381        A2aClientError::InvalidTarget(format!(
382            "invalid a2a dispatch target '{raw_target}': {error}"
383        ))
384    })?;
385    let host = parsed.host_str().ok_or_else(|| {
386        A2aClientError::InvalidTarget(format!(
387            "invalid a2a dispatch target '{raw_target}': missing host"
388        ))
389    })?;
390    let authority = if let Some(port) = parsed.port() {
391        format!("{host}:{port}")
392    } else {
393        host.to_string()
394    };
395    Ok(ParsedTarget {
396        authority,
397        target_agent: parsed.path().trim_start_matches('/').to_string(),
398    })
399}
400
401async fn resolve_endpoint(
402    target: &ParsedTarget,
403    allow_cleartext: bool,
404    cancel_rx: &mut broadcast::Receiver<()>,
405) -> Result<ResolvedA2aEndpoint, A2aClientError> {
406    Ok(
407        resolve_endpoint_with_card(target, allow_cleartext, cancel_rx)
408            .await?
409            .0,
410    )
411}
412
413async fn resolve_endpoint_with_card(
414    target: &ParsedTarget,
415    allow_cleartext: bool,
416    cancel_rx: &mut broadcast::Receiver<()>,
417) -> Result<(ResolvedA2aEndpoint, Value), A2aClientError> {
418    let mut last_error = None;
419    for scheme in card_resolution_schemes(allow_cleartext) {
420        let mut last_scheme_error = None;
421        for path in A2A_AGENT_CARD_PATHS {
422            let card_url = format!("{scheme}://{}/{path}", target.authority);
423            match fetch_agent_card(&card_url, cancel_rx).await {
424                Ok(card) => {
425                    let endpoint = endpoint_from_card(
426                        card_url,
427                        allow_cleartext,
428                        &target.authority,
429                        target.target_agent.clone(),
430                        &card,
431                    )?;
432                    return Ok((endpoint, card));
433                }
434                Err(AgentCardFetchError::Cancelled(message)) => {
435                    return Err(A2aClientError::Cancelled(message));
436                }
437                Err(AgentCardFetchError::Timeout(message)) => {
438                    return Err(A2aClientError::Timeout(message));
439                }
440                Err(AgentCardFetchError::Denied(message)) => {
441                    return Err(A2aClientError::Denied(message));
442                }
443                Err(error) => {
444                    last_error = Some(agent_card_fetch_error_message(&error));
445                    last_scheme_error = Some(error);
446                }
447            }
448        }
449        if last_scheme_error.as_ref().is_some_and(|error| {
450            should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
451        }) {
452            continue;
453        }
454        break;
455    }
456    Err(A2aClientError::Discovery(format!(
457        "could not resolve A2A agent card for '{}': {}",
458        target.authority,
459        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
460    )))
461}
462
463async fn fetch_agent_card(
464    card_url: &str,
465    cancel_rx: &mut broadcast::Receiver<()>,
466) -> Result<Value, AgentCardFetchError> {
467    let response = tokio::select! {
468        response = crate::llm::shared_utility_client().get(card_url).send() => {
469            match response {
470                Ok(response) => Ok(response),
471                Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
472                    format!("A2A HTTP request timed out: {error}")
473                )),
474                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
475                    format!("A2A HTTP request failed: {error}")
476                )),
477                Err(error) => Err(AgentCardFetchError::Discovery(
478                    format!("A2A HTTP request failed: {error}")
479                )),
480            }
481        }
482        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
483            "A2A agent-card fetch cancelled".to_string()
484        )),
485    }?;
486    if matches!(
487        response.status(),
488        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
489    ) {
490        return Err(AgentCardFetchError::Denied(format!(
491            "GET {card_url} returned HTTP {}",
492            response.status()
493        )));
494    }
495    if !response.status().is_success() {
496        return Err(AgentCardFetchError::Discovery(format!(
497            "GET {card_url} returned HTTP {}",
498            response.status()
499        )));
500    }
501    response
502        .json::<Value>()
503        .await
504        .map_err(|error| AgentCardFetchError::Discovery(format!("parse {card_url}: {error}")))
505}
506
507fn endpoint_from_card(
508    card_url: String,
509    allow_cleartext: bool,
510    requested_authority: &str,
511    target_agent: String,
512    card: &Value,
513) -> Result<ResolvedA2aEndpoint, A2aClientError> {
514    let rpc_url = if has_current_transport_fields(card) {
515        endpoint_from_current_card(card, allow_cleartext, requested_authority)?
516    } else if let Some(rpc_url) =
517        endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
518    {
519        rpc_url
520    } else {
521        return Err(A2aClientError::Discovery(
522            "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
523        ));
524    };
525
526    Ok(ResolvedA2aEndpoint {
527        card_url,
528        rpc_url: rpc_url.to_string(),
529        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
530        target_agent,
531    })
532}
533
534fn has_current_transport_fields(card: &Value) -> bool {
535    card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
536}
537
538fn endpoint_from_current_card(
539    card: &Value,
540    allow_cleartext: bool,
541    requested_authority: &str,
542) -> Result<Url, A2aClientError> {
543    let protocol_version = card
544        .get("protocolVersion")
545        .and_then(Value::as_str)
546        .ok_or_else(|| {
547            A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
548        })?;
549    if protocol_version != A2A_PROTOCOL_VERSION {
550        return Err(A2aClientError::Discovery(format!(
551            "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
552        )));
553    }
554
555    let base_url = card
556        .get("url")
557        .and_then(Value::as_str)
558        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
559    let base_url = resolve_declared_url(
560        base_url,
561        allow_cleartext,
562        requested_authority,
563        "agent card url",
564    )?;
565
566    let preferred_transport = card
567        .get("preferredTransport")
568        .and_then(Value::as_str)
569        .ok_or_else(|| {
570            A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
571        })?;
572    if transport_is_jsonrpc(preferred_transport) {
573        return Ok(base_url);
574    }
575
576    if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
577        return resolve_declared_url(
578            interface_url,
579            allow_cleartext,
580            requested_authority,
581            "JSONRPC additionalInterface url",
582        );
583    }
584
585    Err(A2aClientError::Discovery(
586        "A2A agent card does not expose JSONRPC transport".to_string(),
587    ))
588}
589
590fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
591    let Some(interfaces) = card.get("additionalInterfaces") else {
592        return Ok(None);
593    };
594    let interfaces = interfaces.as_array().ok_or_else(|| {
595        A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
596    })?;
597    for interface in interfaces {
598        if interface
599            .get("transport")
600            .and_then(Value::as_str)
601            .is_some_and(transport_is_jsonrpc)
602        {
603            let interface_url = interface
604                .get("url")
605                .and_then(Value::as_str)
606                .ok_or_else(|| {
607                    A2aClientError::Discovery(
608                        "A2A JSONRPC additionalInterface missing url".to_string(),
609                    )
610                })?;
611            return Ok(Some(interface_url));
612        }
613    }
614    Ok(None)
615}
616
617fn endpoint_from_legacy_supported_interfaces(
618    card: &Value,
619    allow_cleartext: bool,
620    requested_authority: &str,
621) -> Result<Option<Url>, A2aClientError> {
622    let Some(interfaces) = card.get("supportedInterfaces") else {
623        return Ok(None);
624    };
625    let interfaces = interfaces.as_array().ok_or_else(|| {
626        A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
627    })?;
628    let mut saw_jsonrpc = false;
629    for interface in interfaces {
630        if !interface
631            .get("protocolBinding")
632            .and_then(Value::as_str)
633            .is_some_and(transport_is_jsonrpc)
634        {
635            continue;
636        }
637        saw_jsonrpc = true;
638        if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
639            continue;
640        }
641        let interface_url = interface
642            .get("url")
643            .and_then(Value::as_str)
644            .ok_or_else(|| {
645                A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
646            })?;
647        return resolve_declared_url(
648            interface_url,
649            allow_cleartext,
650            requested_authority,
651            "JSONRPC supportedInterface url",
652        )
653        .map(Some);
654    }
655    if saw_jsonrpc {
656        return Err(A2aClientError::Discovery(format!(
657            "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
658        )));
659    }
660    Err(A2aClientError::Discovery(
661        "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
662    ))
663}
664
665fn transport_is_jsonrpc(transport: &str) -> bool {
666    transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
667}
668
669fn resolve_declared_url(
670    raw_url: &str,
671    allow_cleartext: bool,
672    requested_authority: &str,
673    label: &str,
674) -> Result<Url, A2aClientError> {
675    let url = Url::parse(raw_url).map_err(|error| {
676        A2aClientError::Discovery(format!("invalid A2A {label} '{raw_url}': {error}"))
677    })?;
678    ensure_cleartext_allowed(&url, allow_cleartext, label)?;
679    let declared_authority = url_authority(&url)?;
680    if !authorities_equivalent(&declared_authority, requested_authority) {
681        return Err(A2aClientError::Denied(format!(
682            "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
683        )));
684    }
685    Ok(url)
686}
687
688fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
689    if allow_cleartext {
690        &["https", "http"]
691    } else {
692        &["https"]
693    }
694}
695
696/// Decide whether an HTTPS discovery failure should fall through to cleartext.
697///
698/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
699/// port isn't listening" case. TLS handshake failures to an external host MUST
700/// NOT silently downgrade to HTTP, because an active network attacker can
701/// forge TLS errors to trigger a downgrade.
702///
703/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
704/// discovery-style error. They cover the standard local-dev case where
705/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
706/// model for loopback is already bounded — any attacker who can reach the
707/// local loopback already has code execution on the box.
708fn should_try_cleartext_fallback(
709    scheme: &str,
710    allow_cleartext: bool,
711    error: &AgentCardFetchError,
712    authority: &str,
713) -> bool {
714    if !allow_cleartext || scheme != "https" {
715        return false;
716    }
717    match error {
718        AgentCardFetchError::Cancelled(_)
719        | AgentCardFetchError::Denied(_)
720        | AgentCardFetchError::Timeout(_) => false,
721        AgentCardFetchError::ConnectRefused(_) => true,
722        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
723    }
724}
725
726fn ensure_cleartext_allowed(
727    url: &Url,
728    allow_cleartext: bool,
729    label: &str,
730) -> Result<(), A2aClientError> {
731    if allow_cleartext || url.scheme() != "http" {
732        return Ok(());
733    }
734    Err(A2aClientError::Denied(format!(
735        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
736    )))
737}
738
739fn is_loopback_authority(authority: &str) -> bool {
740    let (host, _) = split_authority(authority);
741    if host.eq_ignore_ascii_case("localhost") {
742        return true;
743    }
744    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
745        return ip.is_loopback();
746    }
747    false
748}
749
750/// Return true when two authority strings refer to the same A2A endpoint.
751///
752/// Exact string equality is the default — an agent card that reports a
753/// different host than the one the client asked for is a security-relevant
754/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
755/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
756/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
757/// hardcodes `http://localhost:PORT` in its agent card even when a caller
758/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
759/// mismatch in that case without widening the external-host trust boundary.
760fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
761    if card_authority == requested_authority {
762        return true;
763    }
764    let (_, card_port) = split_authority(card_authority);
765    let (_, requested_port) = split_authority(requested_authority);
766    if card_port != requested_port {
767        return false;
768    }
769    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
770}
771
772/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
773/// `[::1]:8080` becomes `("::1", "8080")`.
774fn split_authority(authority: &str) -> (&str, &str) {
775    let (host_raw, port) = if authority.starts_with('[') {
776        // IPv6 bracketed form: "[addr]:port" or "[addr]".
777        if let Some(end) = authority.rfind(']') {
778            let host = &authority[..=end];
779            let rest = &authority[end + 1..];
780            let port = rest.strip_prefix(':').unwrap_or("");
781            (host, port)
782        } else {
783            (authority, "")
784        }
785    } else {
786        match authority.rsplit_once(':') {
787            Some((host, port)) => (host, port),
788            None => (authority, ""),
789        }
790    };
791    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
792    (host, port)
793}
794
795fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
796    match error {
797        AgentCardFetchError::Cancelled(message)
798        | AgentCardFetchError::Discovery(message)
799        | AgentCardFetchError::ConnectRefused(message)
800        | AgentCardFetchError::Denied(message)
801        | AgentCardFetchError::Timeout(message) => message.clone(),
802    }
803}
804
805fn is_connect_refused(error: &reqwest::Error) -> bool {
806    if !error.is_connect() {
807        return false;
808    }
809    let mut source = error.source();
810    while let Some(cause) = source {
811        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
812            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
813                return true;
814            }
815        }
816        source = cause.source();
817    }
818    false
819}
820
821fn url_authority(url: &Url) -> Result<String, A2aClientError> {
822    let host = url
823        .host_str()
824        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
825    Ok(if let Some(port) = url.port() {
826        format!("{host}:{port}")
827    } else {
828        host.to_string()
829    })
830}
831
832async fn send_jsonrpc(
833    rpc_url: &str,
834    request: &Value,
835    trace_id: &str,
836    cancel_rx: &mut broadcast::Receiver<()>,
837) -> Result<Value, A2aClientError> {
838    let response = send_http(
839        crate::llm::shared_blocking_client()
840            .post(rpc_url)
841            .header(reqwest::header::CONTENT_TYPE, "application/json")
842            .header("A2A-Version", A2A_PROTOCOL_VERSION)
843            .header("A2A-Trace-Id", trace_id)
844            .json(request),
845        cancel_rx,
846        "A2A task dispatch cancelled",
847    )
848    .await?;
849    if matches!(
850        response.status(),
851        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
852    ) {
853        return Err(A2aClientError::Denied(format!(
854            "A2A task dispatch returned HTTP {}",
855            response.status()
856        )));
857    }
858    if !response.status().is_success() {
859        return Err(A2aClientError::Protocol(format!(
860            "A2A task dispatch returned HTTP {}",
861            response.status()
862        )));
863    }
864    response
865        .json::<Value>()
866        .await
867        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
868}
869
870async fn send_http(
871    request: reqwest::RequestBuilder,
872    cancel_rx: &mut broadcast::Receiver<()>,
873    cancelled_message: &'static str,
874) -> Result<reqwest::Response, A2aClientError> {
875    tokio::select! {
876        response = request.send() => response.map_err(|error| {
877            if error.is_timeout() {
878                A2aClientError::Timeout(format!("A2A HTTP request timed out: {error}"))
879            } else {
880                A2aClientError::Protocol(format!("A2A HTTP request failed: {error}"))
881            }
882        }),
883        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
884    }
885}
886
887fn task_state(task: &Value) -> Result<&str, A2aClientError> {
888    task.pointer("/status/state")
889        .and_then(Value::as_str)
890        .filter(|value| !value.is_empty())
891        .ok_or_else(|| {
892            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
893        })
894}
895
896fn task_status_message(task: &Value) -> Option<&str> {
897    task.pointer("/status/message/parts")
898        .and_then(Value::as_array)
899        .and_then(|parts| {
900            parts.iter().find_map(|part| {
901                if part.get("type").and_then(Value::as_str) == Some("text") {
902                    part.get("text").and_then(Value::as_str).map(str::trim)
903                } else {
904                    None
905                }
906            })
907        })
908        .filter(|message| !message.is_empty())
909}
910
911fn extract_inline_result(task: &Value) -> Value {
912    let text = task
913        .get("history")
914        .and_then(Value::as_array)
915        .and_then(|history| {
916            history.iter().rev().find_map(|message| {
917                let role = message.get("role").and_then(Value::as_str)?;
918                if role != "agent" {
919                    return None;
920                }
921                message
922                    .get("parts")
923                    .and_then(Value::as_array)
924                    .and_then(|parts| {
925                        parts.iter().find_map(|part| {
926                            if part.get("type").and_then(Value::as_str) == Some("text") {
927                                part.get("text").and_then(Value::as_str).map(str::trim_end)
928                            } else {
929                                None
930                            }
931                        })
932                    })
933            })
934        });
935    match text {
936        Some(text) if !text.is_empty() => {
937            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
938        }
939        _ => task.clone(),
940    }
941}
942
943async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
944    let _ = cancel_rx.recv().await;
945}
946
947#[cfg(test)]
948mod tests {
949    use super::*;
950
951    #[test]
952    fn target_agent_label_prefers_path() {
953        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
954        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
955    }
956
957    #[test]
958    fn extract_inline_result_parses_json_text() {
959        let task = serde_json::json!({
960            "history": [
961                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
962                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
963            ]
964        });
965        assert_eq!(
966            extract_inline_result(&task),
967            serde_json::json!({"trace_id": "trace_123"})
968        );
969    }
970
971    #[test]
972    fn discovery_prefers_https_before_http() {
973        assert_eq!(card_resolution_schemes(false), ["https"]);
974        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
975    }
976
977    #[test]
978    fn endpoint_from_card_accepts_current_preferred_transport() {
979        let endpoint = endpoint_from_card(
980            "https://trusted.example/.well-known/agent-card.json".to_string(),
981            false,
982            "trusted.example",
983            "triage".to_string(),
984            &serde_json::json!({
985                "name": "trusted",
986                "protocolVersion": "0.3.0",
987                "url": "https://trusted.example/rpc",
988                "preferredTransport": "JSONRPC",
989                "additionalInterfaces": [{
990                    "url": "https://trusted.example/rpc",
991                    "transport": "JSONRPC"
992                }]
993            }),
994        )
995        .expect("current A2A card should resolve");
996        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
997        assert_eq!(
998            endpoint.card_url,
999            "https://trusted.example/.well-known/agent-card.json"
1000        );
1001        assert_eq!(endpoint.target_agent, "triage");
1002    }
1003
1004    #[test]
1005    fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
1006        let endpoint = endpoint_from_card(
1007            "https://trusted.example/.well-known/agent-card.json".to_string(),
1008            false,
1009            "trusted.example",
1010            "triage".to_string(),
1011            &serde_json::json!({
1012                "name": "trusted",
1013                "protocolVersion": "0.3.0",
1014                "url": "https://trusted.example/rest",
1015                "preferredTransport": "HTTP+JSON",
1016                "additionalInterfaces": [{
1017                    "url": "https://trusted.example/rpc",
1018                    "transport": "JSONRPC"
1019                }]
1020            }),
1021        )
1022        .expect("current A2A card should resolve through additionalInterfaces");
1023        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1024    }
1025
1026    #[test]
1027    fn endpoint_from_card_accepts_legacy_supported_interfaces() {
1028        let endpoint = endpoint_from_card(
1029            "https://trusted.example/.well-known/agent-card.json".to_string(),
1030            false,
1031            "trusted.example",
1032            "triage".to_string(),
1033            &serde_json::json!({
1034                "name": "trusted",
1035                "supportedInterfaces": [{
1036                    "protocolBinding": "JSONRPC",
1037                    "protocolVersion": "0.3.0",
1038                    "url": "https://trusted.example/rpc"
1039                }],
1040            }),
1041        )
1042        .expect("legacy A2A card should resolve during the transition");
1043        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1044    }
1045
1046    #[test]
1047    fn endpoint_from_card_rejects_removed_interfaces_shape() {
1048        let error = endpoint_from_card(
1049            "https://trusted.example/.well-known/agent-card.json".to_string(),
1050            false,
1051            "trusted.example",
1052            "triage".to_string(),
1053            &serde_json::json!({
1054                "url": "https://trusted.example",
1055                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1056            }),
1057        )
1058        .expect_err("pre-0.3 Harn discovery shape should be rejected");
1059        assert_eq!(
1060            error.to_string(),
1061            "A2A agent card missing preferredTransport/additionalInterfaces"
1062        );
1063    }
1064
1065    #[test]
1066    fn cleartext_fallback_only_after_https_connect_refused() {
1067        assert!(should_try_cleartext_fallback(
1068            "https",
1069            true,
1070            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1071            "reviewer.example:443",
1072        ));
1073        assert!(!should_try_cleartext_fallback(
1074            "http",
1075            true,
1076            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1077            "reviewer.example:443",
1078        ));
1079        assert!(!should_try_cleartext_fallback(
1080            "https",
1081            true,
1082            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1083            "reviewer.example:443",
1084        ));
1085    }
1086
1087    #[test]
1088    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1089        for authority in [
1090            "127.0.0.1:8080",
1091            "localhost:8080",
1092            "[::1]:8080",
1093            "127.1.2.3:9000",
1094        ] {
1095            assert!(
1096                !should_try_cleartext_fallback(
1097                    "https",
1098                    false,
1099                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1100                    authority,
1101                ),
1102                "cleartext fallback must stay disabled without opt-in for '{authority}'"
1103            );
1104        }
1105    }
1106
1107    #[test]
1108    fn cleartext_fallback_allows_loopback_after_opt_in() {
1109        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
1110        // still need the HTTP fallback to succeed.
1111        for authority in [
1112            "127.0.0.1:8080",
1113            "localhost:8080",
1114            "[::1]:8080",
1115            "127.1.2.3:9000",
1116        ] {
1117            assert!(
1118                should_try_cleartext_fallback(
1119                    "https",
1120                    true,
1121                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1122                    authority,
1123                ),
1124                "expected cleartext fallback for loopback authority '{authority}'"
1125            );
1126        }
1127    }
1128
1129    #[test]
1130    fn cleartext_fallback_denies_external_tls_failures() {
1131        // External target + TLS handshake failure must not downgrade — an
1132        // attacker able to forge TLS errors shouldn't force cleartext.
1133        for authority in [
1134            "reviewer.example:443",
1135            "8.8.8.8:443",
1136            "192.168.1.10:8080",
1137            "10.0.0.5:8443",
1138        ] {
1139            assert!(
1140                !should_try_cleartext_fallback(
1141                    "https",
1142                    true,
1143                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1144                    authority,
1145                ),
1146                "cleartext fallback must be denied for external authority '{authority}'"
1147            );
1148        }
1149    }
1150
1151    #[test]
1152    fn is_loopback_authority_recognises_loopback_forms() {
1153        assert!(is_loopback_authority("127.0.0.1:8080"));
1154        assert!(is_loopback_authority("localhost:8080"));
1155        assert!(is_loopback_authority("LOCALHOST:9000"));
1156        assert!(is_loopback_authority("[::1]:8080"));
1157        assert!(is_loopback_authority("127.5.5.5:1234"));
1158        assert!(!is_loopback_authority("8.8.8.8:443"));
1159        assert!(!is_loopback_authority("192.168.1.10:8080"));
1160        assert!(!is_loopback_authority("example.com:443"));
1161        assert!(!is_loopback_authority("reviewer.prod"));
1162    }
1163
1164    #[test]
1165    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1166        let error = endpoint_from_card(
1167            "https://trusted.example/.well-known/agent-card.json".to_string(),
1168            false,
1169            "trusted.example",
1170            "triage".to_string(),
1171            &serde_json::json!({
1172                "protocolVersion": "0.3.0",
1173                "url": "https://evil.example",
1174                "preferredTransport": "JSONRPC",
1175            }),
1176        )
1177        .unwrap_err();
1178        assert_eq!(
1179            error.to_string(),
1180            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1181        );
1182    }
1183
1184    #[test]
1185    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1186        let error = endpoint_from_card(
1187            "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1188            false,
1189            "127.0.0.1:8080",
1190            "triage".to_string(),
1191            &serde_json::json!({
1192                "protocolVersion": "0.3.0",
1193                "url": "http://localhost:8080",
1194                "preferredTransport": "JSONRPC",
1195            }),
1196        )
1197        .expect_err("cleartext card should require explicit opt-in");
1198        assert!(error
1199            .to_string()
1200            .contains("requires `allow_cleartext = true`"));
1201    }
1202
1203    #[test]
1204    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1205        // harn serve reports `http://localhost:PORT` in its card, but clients
1206        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
1207        // the authority check must not spuriously reject the pair.
1208        let card = serde_json::json!({
1209            "protocolVersion": "0.3.0",
1210            "url": "http://localhost:8080",
1211            "preferredTransport": "JSONRPC",
1212        });
1213        let endpoint = endpoint_from_card(
1214            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1215            true,
1216            "127.0.0.1:8080",
1217            "triage".to_string(),
1218            &card,
1219        )
1220        .expect("loopback alias pair should be accepted");
1221        assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1222
1223        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
1224        let card_v6 = serde_json::json!({
1225            "protocolVersion": "0.3.0",
1226            "url": "http://[::1]:8080",
1227            "preferredTransport": "JSONRPC",
1228        });
1229        let endpoint_v6 = endpoint_from_card(
1230            "http://localhost:8080/.well-known/agent-card.json".to_string(),
1231            true,
1232            "localhost:8080",
1233            "triage".to_string(),
1234            &card_v6,
1235        )
1236        .expect("IPv6 loopback alias should be accepted");
1237        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1238
1239        // Port mismatch is still rejected even on loopback.
1240        let card_wrong_port = serde_json::json!({
1241            "protocolVersion": "0.3.0",
1242            "url": "http://localhost:9000",
1243            "preferredTransport": "JSONRPC",
1244        });
1245        let error = endpoint_from_card(
1246            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1247            true,
1248            "127.0.0.1:8080",
1249            "triage".to_string(),
1250            &card_wrong_port,
1251        )
1252        .expect_err("mismatched ports must still be rejected even on loopback");
1253        assert!(error
1254            .to_string()
1255            .contains("A2A agent card url authority mismatch"));
1256    }
1257
1258    #[test]
1259    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1260        assert!(!authorities_equivalent(
1261            "internal.corp.example:443",
1262            "trusted.example:443",
1263        ));
1264        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1265        assert!(authorities_equivalent(
1266            "trusted.example:443",
1267            "trusted.example:443",
1268        ));
1269    }
1270}