Skip to main content

harn_vm/a2a/
mod.rs

1use std::error::Error as _;
2
3use async_trait::async_trait;
4use reqwest::Url;
5use serde_json::Value;
6use tokio::sync::broadcast;
7
8use crate::triggers::TriggerEvent;
9
10const A2A_AGENT_CARD_PATHS: &[&str] = &[
11    ".well-known/agent-card.json",
12    ".well-known/a2a-agent",
13    ".well-known/agent.json",
14    "agent/card",
15];
16const A2A_PROTOCOL_VERSION: &str = "0.3.0";
17const A2A_JSONRPC_TRANSPORT: &str = "JSONRPC";
18const A2A_PUSH_URL_ENV: &str = "HARN_A2A_PUSH_URL";
19const A2A_PUSH_TOKEN_ENV: &str = "HARN_A2A_PUSH_TOKEN";
20const A2A_ACTOR_CHAIN_METADATA_POINTERS: &[&str] = &[
21    "/actor_chain",
22    "/actorChain",
23    "/metadata/actor_chain",
24    "/metadata/actorChain",
25    "/metadata/harn/actor_chain",
26    "/metadata/harn/actorChain",
27    "/metadata/_harn/actorChain",
28    "/statusUpdate/actor_chain",
29    "/statusUpdate/actorChain",
30    "/statusUpdate/metadata/actor_chain",
31    "/statusUpdate/metadata/actorChain",
32    "/statusUpdate/metadata/harn/actor_chain",
33    "/statusUpdate/metadata/harn/actorChain",
34    "/statusUpdate/metadata/_harn/actorChain",
35    "/task/actor_chain",
36    "/task/actorChain",
37    "/task/metadata/actor_chain",
38    "/task/metadata/actorChain",
39    "/task/metadata/harn/actor_chain",
40    "/task/metadata/harn/actorChain",
41    "/task/metadata/_harn/actorChain",
42    "/message/metadata/actor_chain",
43    "/message/metadata/actorChain",
44    "/message/metadata/harn/actor_chain",
45    "/message/metadata/harn/actorChain",
46    "/message/metadata/_harn/actorChain",
47];
48
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct ResolvedA2aEndpoint {
51    pub card_url: String,
52    pub rpc_url: String,
53    pub agent_id: Option<String>,
54    pub target_agent: String,
55}
56
57#[derive(Clone, Debug, PartialEq, Eq)]
58pub struct ResolvedA2aAgent {
59    pub endpoint: ResolvedA2aEndpoint,
60    pub card: Value,
61}
62
63#[derive(Clone, Debug, PartialEq, Eq)]
64pub enum DispatchAck {
65    InlineResult {
66        task_id: String,
67        result: Value,
68    },
69    PendingTask {
70        task_id: String,
71        state: String,
72        handle: Value,
73    },
74}
75
76#[derive(Debug)]
77pub enum A2aClientError {
78    InvalidTarget(String),
79    Discovery(String),
80    Protocol(String),
81    Denied(String),
82    Timeout(String),
83    Cancelled(String),
84}
85
86impl std::fmt::Display for A2aClientError {
87    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
88        match self {
89            Self::InvalidTarget(message)
90            | Self::Discovery(message)
91            | Self::Protocol(message)
92            | Self::Denied(message)
93            | Self::Timeout(message)
94            | Self::Cancelled(message) => f.write_str(message),
95        }
96    }
97}
98
99impl std::error::Error for A2aClientError {}
100
101/// Abstraction over outbound A2A dispatch, injectable for tests.
102#[async_trait]
103pub trait A2aClient: Send + Sync + 'static {
104    async fn dispatch(
105        &self,
106        target: &str,
107        allow_cleartext: bool,
108        binding_id: &str,
109        binding_key: &str,
110        event: &TriggerEvent,
111        cancel_rx: &mut broadcast::Receiver<()>,
112    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError>;
113}
114
115/// Production implementation that performs real HTTP A2A calls.
116pub struct RealA2aClient;
117
118#[async_trait]
119impl A2aClient for RealA2aClient {
120    async fn dispatch(
121        &self,
122        target: &str,
123        allow_cleartext: bool,
124        binding_id: &str,
125        binding_key: &str,
126        event: &TriggerEvent,
127        cancel_rx: &mut broadcast::Receiver<()>,
128    ) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
129        dispatch_trigger_event(
130            target,
131            allow_cleartext,
132            binding_id,
133            binding_key,
134            event,
135            cancel_rx,
136        )
137        .await
138    }
139}
140
141/// Return the first actor-chain metadata candidate accepted by Harn's A2A
142/// surfaces. Callers that accept invalid metadata should use
143/// [`actor_chain_from_metadata`]; callers that must reject malformed input can
144/// parse the returned value themselves and keep the pointer for diagnostics.
145pub fn actor_chain_metadata_candidate(value: &Value) -> Option<(&'static str, &Value)> {
146    for pointer in A2A_ACTOR_CHAIN_METADATA_POINTERS {
147        if let Some(candidate) = value.pointer(pointer) {
148            return Some((pointer, candidate));
149        }
150    }
151    None
152}
153
154pub fn actor_chain_from_metadata(value: &Value) -> Option<crate::actor_chain::ActorChain> {
155    actor_chain_metadata_candidate(value)
156        .and_then(|(_, candidate)| crate::actor_chain::ActorChain::from_json_value(candidate).ok())
157}
158
159#[derive(Debug)]
160enum AgentCardFetchError {
161    Cancelled(String),
162    Discovery(String),
163    ConnectRefused(String),
164    Denied(String),
165    Timeout(String),
166}
167
168pub async fn dispatch_trigger_event(
169    raw_target: &str,
170    allow_cleartext: bool,
171    binding_id: &str,
172    binding_key: &str,
173    event: &TriggerEvent,
174    cancel_rx: &mut broadcast::Receiver<()>,
175) -> Result<(ResolvedA2aEndpoint, DispatchAck), A2aClientError> {
176    let started = std::time::Instant::now();
177    let target = match parse_target(raw_target) {
178        Ok(target) => target,
179        Err(error) => {
180            record_a2a_metric(raw_target, "failed", started.elapsed());
181            return Err(error);
182        }
183    };
184    let endpoint = match resolve_endpoint(&target, allow_cleartext, cancel_rx).await {
185        Ok(endpoint) => endpoint,
186        Err(error) => {
187            record_a2a_metric(raw_target, "failed", started.elapsed());
188            return Err(error);
189        }
190    };
191    let message_id = format!("{}.{}", event.trace_id.0, event.id.0);
192    let actor_chain = crate::agent_sessions::current_actor_chain()
193        .as_ref()
194        .map(crate::actor_chain::ActorChain::to_json_value);
195    let mut envelope = serde_json::json!({
196        "kind": "harn.trigger.dispatch",
197        "message_id": message_id,
198        "trace_id": event.trace_id.0,
199        "event_id": event.id.0,
200        "trigger_id": binding_id,
201        "binding_key": binding_key,
202        "target_agent": endpoint.target_agent,
203        "event": event,
204    });
205    if let Some(chain) = actor_chain.as_ref() {
206        envelope["actor_chain"] = chain.clone();
207    }
208    let text = serde_json::to_string(&envelope)
209        .map_err(|error| A2aClientError::Protocol(format!("serialize A2A envelope: {error}")))?;
210    let push_config = push_notification_config();
211    let mut metadata = serde_json::json!({
212        "kind": "harn.trigger.dispatch",
213        "trace_id": event.trace_id.0,
214        "event_id": event.id.0,
215        "trigger_id": binding_id,
216        "binding_key": binding_key,
217        "target_agent": endpoint.target_agent,
218    });
219    if let Some(chain) = actor_chain.as_ref() {
220        metadata["actor_chain"] = chain.clone();
221        metadata["harn"] = serde_json::json!({"actor_chain": chain});
222    }
223    let mut params = serde_json::json!({
224        "contextId": event.trace_id.0,
225        "message": {
226            "messageId": message_id,
227            "role": "user",
228            "parts": [{
229                "type": "text",
230                "text": text,
231            }],
232            "metadata": metadata,
233        },
234    });
235    if let Some(config) = push_config.clone() {
236        params["configuration"] = serde_json::json!({
237            "blocking": false,
238            "returnImmediately": true,
239            "pushNotificationConfig": config,
240        });
241    }
242    let request = crate::jsonrpc::request(message_id.clone(), "message/send", params);
243
244    let body = match send_jsonrpc(&endpoint.rpc_url, &request, &event.trace_id.0, cancel_rx).await {
245        Ok(body) => body,
246        Err(error) => {
247            record_a2a_metric(raw_target, "failed", started.elapsed());
248            return Err(error);
249        }
250    };
251    let result = match body.get("result").cloned().ok_or_else(|| {
252        if let Some(error) = body.get("error") {
253            let message = error
254                .get("message")
255                .and_then(Value::as_str)
256                .unwrap_or("unknown A2A error");
257            A2aClientError::Protocol(format!("A2A task dispatch failed: {message}"))
258        } else {
259            A2aClientError::Protocol("A2A task dispatch response missing result".to_string())
260        }
261    }) {
262        Ok(result) => result,
263        Err(error) => {
264            record_a2a_metric(raw_target, "failed", started.elapsed());
265            return Err(error);
266        }
267    };
268
269    let task_id = match result
270        .get("id")
271        .and_then(Value::as_str)
272        .filter(|value| !value.is_empty())
273        .ok_or_else(|| A2aClientError::Protocol("A2A task response missing result.id".to_string()))
274    {
275        Ok(task_id) => task_id.to_string(),
276        Err(error) => {
277            record_a2a_metric(raw_target, "failed", started.elapsed());
278            return Err(error);
279        }
280    };
281    let state = match task_state(&result) {
282        Ok(state) => state.to_string(),
283        Err(error) => {
284            record_a2a_metric(raw_target, "failed", started.elapsed());
285            return Err(error);
286        }
287    };
288
289    if state == "completed" {
290        let inline = extract_inline_result(&result);
291        record_a2a_metric(raw_target, "succeeded", started.elapsed());
292        return Ok((
293            endpoint,
294            DispatchAck::InlineResult {
295                task_id,
296                result: inline,
297            },
298        ));
299    }
300
301    if state == "rejected" {
302        record_a2a_metric(raw_target, "failed", started.elapsed());
303        return Err(A2aClientError::Denied(format!(
304            "A2A task rejected by remote agent: {}",
305            task_status_message(&result).unwrap_or("permission rejected")
306        )));
307    }
308
309    if let Some(config) = push_config {
310        register_push_notification_config(
311            &endpoint.rpc_url,
312            &task_id,
313            config,
314            &event.trace_id.0,
315            cancel_rx,
316        )
317        .await
318        .inspect_err(|_| {
319            record_a2a_metric(raw_target, "failed", started.elapsed());
320        })?;
321    }
322    record_a2a_metric(raw_target, "succeeded", started.elapsed());
323    Ok((
324        endpoint.clone(),
325        DispatchAck::PendingTask {
326            task_id: task_id.clone(),
327            state: state.clone(),
328            handle: serde_json::json!({
329                "kind": "a2a_task_handle",
330                "task_id": task_id,
331                "state": state,
332                "target_agent": endpoint.target_agent,
333                "rpc_url": endpoint.rpc_url,
334                "card_url": endpoint.card_url,
335                "agent_id": endpoint.agent_id,
336            }),
337        },
338    ))
339}
340
341pub async fn resolve_agent(
342    raw_target: &str,
343    allow_cleartext: bool,
344    cancel_rx: &mut broadcast::Receiver<()>,
345) -> Result<ResolvedA2aAgent, A2aClientError> {
346    let target = parse_target(raw_target)?;
347    let resolved = resolve_endpoint_with_card(&target, allow_cleartext, cancel_rx).await?;
348    Ok(ResolvedA2aAgent {
349        endpoint: resolved.0,
350        card: resolved.1,
351    })
352}
353
354pub async fn send_jsonrpc_request(
355    rpc_url: &str,
356    request: &Value,
357    trace_id: &str,
358    cancel_rx: &mut broadcast::Receiver<()>,
359) -> Result<Value, A2aClientError> {
360    send_jsonrpc(rpc_url, request, trace_id, cancel_rx).await
361}
362
363fn push_notification_config() -> Option<Value> {
364    let url = std::env::var(A2A_PUSH_URL_ENV)
365        .ok()
366        .map(|value| value.trim().to_string())
367        .filter(|value| !value.is_empty())?;
368    let token = std::env::var(A2A_PUSH_TOKEN_ENV)
369        .ok()
370        .map(|value| value.trim().to_string())
371        .filter(|value| !value.is_empty());
372    let mut config = serde_json::json!({ "url": url });
373    if let Some(token) = token {
374        config["token"] = Value::String(token.clone());
375        config["authentication"] = serde_json::json!({
376            "scheme": "Bearer",
377            "credentials": token,
378        });
379    }
380    Some(config)
381}
382
383async fn register_push_notification_config(
384    rpc_url: &str,
385    task_id: &str,
386    config: Value,
387    trace_id: &str,
388    cancel_rx: &mut broadcast::Receiver<()>,
389) -> Result<(), A2aClientError> {
390    let request = crate::jsonrpc::request(
391        format!("{trace_id}.{task_id}.push-config"),
392        "tasks/pushNotificationConfig/set",
393        serde_json::json!({
394            "taskId": task_id,
395            "pushNotificationConfig": config,
396        }),
397    );
398    let response = send_jsonrpc(rpc_url, &request, trace_id, cancel_rx).await?;
399    if response.get("error").is_some() {
400        return Err(A2aClientError::Protocol(format!(
401            "A2A push notification registration failed: {}",
402            response["error"]
403        )));
404    }
405    Ok(())
406}
407
408fn record_a2a_metric(target: &str, outcome: &str, duration: std::time::Duration) {
409    if let Some(metrics) = crate::active_metrics_registry() {
410        metrics.record_a2a_hop(target, outcome, duration);
411    }
412}
413
414pub fn target_agent_label(raw_target: &str) -> String {
415    parse_target(raw_target)
416        .map(|target| target.target_agent_label())
417        .unwrap_or_else(|_| raw_target.to_string())
418}
419
420#[derive(Clone, Debug)]
421struct ParsedTarget {
422    authority: String,
423    target_agent: String,
424}
425
426impl ParsedTarget {
427    fn target_agent_label(&self) -> String {
428        if self.target_agent.is_empty() {
429            self.authority.clone()
430        } else {
431            self.target_agent.clone()
432        }
433    }
434}
435
436fn parse_target(raw_target: &str) -> Result<ParsedTarget, A2aClientError> {
437    let parsed = Url::parse(&format!("http://{raw_target}")).map_err(|error| {
438        A2aClientError::InvalidTarget(format!(
439            "invalid a2a dispatch target '{raw_target}': {error}"
440        ))
441    })?;
442    let host = parsed.host_str().ok_or_else(|| {
443        A2aClientError::InvalidTarget(format!(
444            "invalid a2a dispatch target '{raw_target}': missing host"
445        ))
446    })?;
447    let authority = if let Some(port) = parsed.port() {
448        format!("{host}:{port}")
449    } else {
450        host.to_string()
451    };
452    Ok(ParsedTarget {
453        authority,
454        target_agent: parsed.path().trim_start_matches('/').to_string(),
455    })
456}
457
458async fn resolve_endpoint(
459    target: &ParsedTarget,
460    allow_cleartext: bool,
461    cancel_rx: &mut broadcast::Receiver<()>,
462) -> Result<ResolvedA2aEndpoint, A2aClientError> {
463    Ok(
464        resolve_endpoint_with_card(target, allow_cleartext, cancel_rx)
465            .await?
466            .0,
467    )
468}
469
470async fn resolve_endpoint_with_card(
471    target: &ParsedTarget,
472    allow_cleartext: bool,
473    cancel_rx: &mut broadcast::Receiver<()>,
474) -> Result<(ResolvedA2aEndpoint, Value), A2aClientError> {
475    let mut last_error = None;
476    for scheme in card_resolution_schemes(allow_cleartext) {
477        let mut last_scheme_error = None;
478        for path in A2A_AGENT_CARD_PATHS {
479            let card_url = format!("{scheme}://{}/{path}", target.authority);
480            match fetch_agent_card(&card_url, cancel_rx).await {
481                Ok(card) => {
482                    let endpoint = endpoint_from_card(
483                        card_url,
484                        allow_cleartext,
485                        &target.authority,
486                        target.target_agent.clone(),
487                        &card,
488                    )?;
489                    return Ok((endpoint, card));
490                }
491                Err(AgentCardFetchError::Cancelled(message)) => {
492                    return Err(A2aClientError::Cancelled(message));
493                }
494                Err(AgentCardFetchError::Timeout(message)) => {
495                    return Err(A2aClientError::Timeout(message));
496                }
497                Err(AgentCardFetchError::Denied(message)) => {
498                    return Err(A2aClientError::Denied(message));
499                }
500                Err(error) => {
501                    last_error = Some(agent_card_fetch_error_message(&error));
502                    last_scheme_error = Some(error);
503                }
504            }
505        }
506        if last_scheme_error.as_ref().is_some_and(|error| {
507            should_try_cleartext_fallback(scheme, allow_cleartext, error, &target.authority)
508        }) {
509            continue;
510        }
511        break;
512    }
513    Err(A2aClientError::Discovery(format!(
514        "could not resolve A2A agent card for '{}': {}",
515        target.authority,
516        last_error.unwrap_or_else(|| "unknown discovery error".to_string())
517    )))
518}
519
520async fn fetch_agent_card(
521    card_url: &str,
522    cancel_rx: &mut broadcast::Receiver<()>,
523) -> Result<Value, AgentCardFetchError> {
524    let response = tokio::select! {
525        response = crate::llm::shared_utility_client().get(card_url).send() => {
526            match response {
527                Ok(response) => Ok(response),
528                Err(error) if error.is_timeout() => Err(AgentCardFetchError::Timeout(
529                    format!("A2A HTTP request timed out: {}", crate::egress::redact_reqwest_error(&error))
530                )),
531                Err(error) if is_connect_refused(&error) => Err(AgentCardFetchError::ConnectRefused(
532                    format!("A2A HTTP request failed: {}", crate::egress::redact_reqwest_error(&error))
533                )),
534                Err(error) => Err(AgentCardFetchError::Discovery(
535                    format!("A2A HTTP request failed: {}", crate::egress::redact_reqwest_error(&error))
536                )),
537            }
538        }
539        _ = recv_cancel(cancel_rx) => Err(AgentCardFetchError::Cancelled(
540            "A2A agent-card fetch cancelled".to_string()
541        )),
542    }?;
543    if matches!(
544        response.status(),
545        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
546    ) {
547        return Err(AgentCardFetchError::Denied(format!(
548            "GET {card_url} returned HTTP {}",
549            response.status()
550        )));
551    }
552    if !response.status().is_success() {
553        let card_url = crate::egress::redact_diagnostic_text(card_url);
554        return Err(AgentCardFetchError::Discovery(format!(
555            "GET {card_url} returned HTTP {}",
556            response.status()
557        )));
558    }
559    response.json::<Value>().await.map_err(|error| {
560        AgentCardFetchError::Discovery(format!(
561            "parse {}: {error}",
562            crate::egress::redact_diagnostic_text(card_url)
563        ))
564    })
565}
566
567fn endpoint_from_card(
568    card_url: String,
569    allow_cleartext: bool,
570    requested_authority: &str,
571    target_agent: String,
572    card: &Value,
573) -> Result<ResolvedA2aEndpoint, A2aClientError> {
574    let rpc_url = if has_current_transport_fields(card) {
575        endpoint_from_current_card(card, allow_cleartext, requested_authority)?
576    } else if let Some(rpc_url) =
577        endpoint_from_legacy_supported_interfaces(card, allow_cleartext, requested_authority)?
578    {
579        rpc_url
580    } else {
581        return Err(A2aClientError::Discovery(
582            "A2A agent card missing preferredTransport/additionalInterfaces".to_string(),
583        ));
584    };
585
586    Ok(ResolvedA2aEndpoint {
587        card_url,
588        rpc_url: rpc_url.to_string(),
589        agent_id: card.get("id").and_then(Value::as_str).map(str::to_string),
590        target_agent,
591    })
592}
593
594fn has_current_transport_fields(card: &Value) -> bool {
595    card.get("preferredTransport").is_some() || card.get("additionalInterfaces").is_some()
596}
597
598fn endpoint_from_current_card(
599    card: &Value,
600    allow_cleartext: bool,
601    requested_authority: &str,
602) -> Result<Url, A2aClientError> {
603    let protocol_version = card
604        .get("protocolVersion")
605        .and_then(Value::as_str)
606        .ok_or_else(|| {
607            A2aClientError::Discovery("A2A agent card missing protocolVersion".to_string())
608        })?;
609    if protocol_version != A2A_PROTOCOL_VERSION {
610        return Err(A2aClientError::Discovery(format!(
611            "A2A agent card protocolVersion '{protocol_version}' is not supported; expected {A2A_PROTOCOL_VERSION}"
612        )));
613    }
614
615    let base_url = card
616        .get("url")
617        .and_then(Value::as_str)
618        .ok_or_else(|| A2aClientError::Discovery("A2A agent card missing url".to_string()))?;
619    let base_url = resolve_declared_url(
620        base_url,
621        allow_cleartext,
622        requested_authority,
623        "agent card url",
624    )?;
625
626    let preferred_transport = card
627        .get("preferredTransport")
628        .and_then(Value::as_str)
629        .ok_or_else(|| {
630            A2aClientError::Discovery("A2A agent card missing preferredTransport".to_string())
631        })?;
632    if transport_is_jsonrpc(preferred_transport) {
633        return Ok(base_url);
634    }
635
636    if let Some(interface_url) = current_additional_jsonrpc_url(card)? {
637        return resolve_declared_url(
638            interface_url,
639            allow_cleartext,
640            requested_authority,
641            "JSONRPC additionalInterface url",
642        );
643    }
644
645    Err(A2aClientError::Discovery(
646        "A2A agent card does not expose JSONRPC transport".to_string(),
647    ))
648}
649
650fn current_additional_jsonrpc_url(card: &Value) -> Result<Option<&str>, A2aClientError> {
651    let Some(interfaces) = card.get("additionalInterfaces") else {
652        return Ok(None);
653    };
654    let interfaces = interfaces.as_array().ok_or_else(|| {
655        A2aClientError::Discovery("A2A additionalInterfaces must be an array".to_string())
656    })?;
657    for interface in interfaces {
658        if interface
659            .get("transport")
660            .and_then(Value::as_str)
661            .is_some_and(transport_is_jsonrpc)
662        {
663            let interface_url = interface
664                .get("url")
665                .and_then(Value::as_str)
666                .ok_or_else(|| {
667                    A2aClientError::Discovery(
668                        "A2A JSONRPC additionalInterface missing url".to_string(),
669                    )
670                })?;
671            return Ok(Some(interface_url));
672        }
673    }
674    Ok(None)
675}
676
677fn endpoint_from_legacy_supported_interfaces(
678    card: &Value,
679    allow_cleartext: bool,
680    requested_authority: &str,
681) -> Result<Option<Url>, A2aClientError> {
682    let Some(interfaces) = card.get("supportedInterfaces") else {
683        return Ok(None);
684    };
685    let interfaces = interfaces.as_array().ok_or_else(|| {
686        A2aClientError::Discovery("A2A supportedInterfaces must be an array".to_string())
687    })?;
688    let mut saw_jsonrpc = false;
689    for interface in interfaces {
690        if !interface
691            .get("protocolBinding")
692            .and_then(Value::as_str)
693            .is_some_and(transport_is_jsonrpc)
694        {
695            continue;
696        }
697        saw_jsonrpc = true;
698        if interface.get("protocolVersion").and_then(Value::as_str) != Some(A2A_PROTOCOL_VERSION) {
699            continue;
700        }
701        let interface_url = interface
702            .get("url")
703            .and_then(Value::as_str)
704            .ok_or_else(|| {
705                A2aClientError::Discovery("A2A JSONRPC supportedInterface missing url".to_string())
706            })?;
707        return resolve_declared_url(
708            interface_url,
709            allow_cleartext,
710            requested_authority,
711            "JSONRPC supportedInterface url",
712        )
713        .map(Some);
714    }
715    if saw_jsonrpc {
716        return Err(A2aClientError::Discovery(format!(
717            "A2A supportedInterfaces does not expose JSONRPC for protocolVersion {A2A_PROTOCOL_VERSION}"
718        )));
719    }
720    Err(A2aClientError::Discovery(
721        "A2A agent card does not expose a JSONRPC supportedInterface".to_string(),
722    ))
723}
724
725fn transport_is_jsonrpc(transport: &str) -> bool {
726    transport.eq_ignore_ascii_case(A2A_JSONRPC_TRANSPORT)
727}
728
729fn resolve_declared_url(
730    raw_url: &str,
731    allow_cleartext: bool,
732    requested_authority: &str,
733    label: &str,
734) -> Result<Url, A2aClientError> {
735    let url = Url::parse(raw_url).map_err(|error| {
736        A2aClientError::Discovery(format!(
737            "invalid A2A {label} '{}': {error}",
738            crate::egress::redact_diagnostic_text(raw_url)
739        ))
740    })?;
741    ensure_cleartext_allowed(&url, allow_cleartext, label)?;
742    let declared_authority = url_authority(&url)?;
743    if !authorities_equivalent(&declared_authority, requested_authority) {
744        return Err(A2aClientError::Denied(format!(
745            "A2A {label} authority mismatch: requested '{requested_authority}', card returned '{declared_authority}'"
746        )));
747    }
748    Ok(url)
749}
750
751fn card_resolution_schemes(allow_cleartext: bool) -> &'static [&'static str] {
752    if allow_cleartext {
753        &["https", "http"]
754    } else {
755        &["https"]
756    }
757}
758
759/// Decide whether an HTTPS discovery failure should fall through to cleartext.
760///
761/// External targets only fall back on `ConnectionRefused` — the common "HTTPS
762/// port isn't listening" case. TLS handshake failures to an external host MUST
763/// NOT silently downgrade to HTTP, because an active network attacker can
764/// forge TLS errors to trigger a downgrade.
765///
766/// Loopback targets (`127.0.0.0/8`, `::1`, `localhost`) fall back on any
767/// discovery-style error. They cover the standard local-dev case where
768/// `harn serve` binds HTTP-only on `127.0.0.1:PORT`, and the SSRF threat
769/// model for loopback is already bounded — any attacker who can reach the
770/// local loopback already has code execution on the box.
771fn should_try_cleartext_fallback(
772    scheme: &str,
773    allow_cleartext: bool,
774    error: &AgentCardFetchError,
775    authority: &str,
776) -> bool {
777    if !allow_cleartext || scheme != "https" {
778        return false;
779    }
780    match error {
781        AgentCardFetchError::Cancelled(_)
782        | AgentCardFetchError::Denied(_)
783        | AgentCardFetchError::Timeout(_) => false,
784        AgentCardFetchError::ConnectRefused(_) => true,
785        AgentCardFetchError::Discovery(_) => is_loopback_authority(authority),
786    }
787}
788
789fn ensure_cleartext_allowed(
790    url: &Url,
791    allow_cleartext: bool,
792    label: &str,
793) -> Result<(), A2aClientError> {
794    if allow_cleartext || url.scheme() != "http" {
795        return Ok(());
796    }
797    Err(A2aClientError::Denied(format!(
798        "cleartext A2A {label} '{url}' requires `allow_cleartext = true` on the trigger binding"
799    )))
800}
801
802fn is_loopback_authority(authority: &str) -> bool {
803    let (host, _) = split_authority(authority);
804    if host.eq_ignore_ascii_case("localhost") {
805        return true;
806    }
807    if let Ok(ip) = host.parse::<std::net::IpAddr>() {
808        return ip.is_loopback();
809    }
810    false
811}
812
813/// Return true when two authority strings refer to the same A2A endpoint.
814///
815/// Exact string equality is the default — an agent card that reports a
816/// different host than the one the client asked for is a security-relevant
817/// discrepancy (see harn#248 SSRF hardening). The one well-defined exception
818/// is loopback: `localhost`, `127.0.0.1`, `::1`, and the rest of
819/// `127.0.0.0/8` are all the same socket on this machine, and `harn serve`
820/// hardcodes `http://localhost:PORT` in its agent card even when a caller
821/// dials `127.0.0.1:PORT`. Treating both sides as loopback avoids a spurious
822/// mismatch in that case without widening the external-host trust boundary.
823fn authorities_equivalent(card_authority: &str, requested_authority: &str) -> bool {
824    if card_authority == requested_authority {
825        return true;
826    }
827    let (_, card_port) = split_authority(card_authority);
828    let (_, requested_port) = split_authority(requested_authority);
829    if card_port != requested_port {
830        return false;
831    }
832    is_loopback_authority(card_authority) && is_loopback_authority(requested_authority)
833}
834
835/// Split an authority into `(host, port_or_empty)`. Strips IPv6 brackets so
836/// `[::1]:8080` becomes `("::1", "8080")`.
837fn split_authority(authority: &str) -> (&str, &str) {
838    let (host_raw, port) = if authority.starts_with('[') {
839        // IPv6 bracketed form: "[addr]:port" or "[addr]".
840        if let Some(end) = authority.rfind(']') {
841            let host = &authority[..=end];
842            let rest = &authority[end + 1..];
843            let port = rest.strip_prefix(':').unwrap_or("");
844            (host, port)
845        } else {
846            (authority, "")
847        }
848    } else {
849        match authority.rsplit_once(':') {
850            Some((host, port)) => (host, port),
851            None => (authority, ""),
852        }
853    };
854    let host = host_raw.trim_start_matches('[').trim_end_matches(']');
855    (host, port)
856}
857
858fn agent_card_fetch_error_message(error: &AgentCardFetchError) -> String {
859    match error {
860        AgentCardFetchError::Cancelled(message)
861        | AgentCardFetchError::Discovery(message)
862        | AgentCardFetchError::ConnectRefused(message)
863        | AgentCardFetchError::Denied(message)
864        | AgentCardFetchError::Timeout(message) => message.clone(),
865    }
866}
867
868fn is_connect_refused(error: &reqwest::Error) -> bool {
869    if !error.is_connect() {
870        return false;
871    }
872    let mut source = error.source();
873    while let Some(cause) = source {
874        if let Some(io_error) = cause.downcast_ref::<std::io::Error>() {
875            if io_error.kind() == std::io::ErrorKind::ConnectionRefused {
876                return true;
877            }
878        }
879        source = cause.source();
880    }
881    false
882}
883
884fn url_authority(url: &Url) -> Result<String, A2aClientError> {
885    let host = url
886        .host_str()
887        .ok_or_else(|| A2aClientError::Discovery(format!("A2A card url '{url}' missing host")))?;
888    Ok(if let Some(port) = url.port() {
889        format!("{host}:{port}")
890    } else {
891        host.to_string()
892    })
893}
894
895async fn send_jsonrpc(
896    rpc_url: &str,
897    request: &Value,
898    trace_id: &str,
899    cancel_rx: &mut broadcast::Receiver<()>,
900) -> Result<Value, A2aClientError> {
901    let response = send_http(
902        crate::llm::shared_blocking_client()
903            .post(rpc_url)
904            .header(reqwest::header::CONTENT_TYPE, "application/json")
905            .header("A2A-Version", A2A_PROTOCOL_VERSION)
906            .header("A2A-Trace-Id", trace_id)
907            .json(request),
908        cancel_rx,
909        "A2A task dispatch cancelled",
910    )
911    .await?;
912    if matches!(
913        response.status(),
914        reqwest::StatusCode::UNAUTHORIZED | reqwest::StatusCode::FORBIDDEN
915    ) {
916        return Err(A2aClientError::Denied(format!(
917            "A2A task dispatch returned HTTP {}",
918            response.status()
919        )));
920    }
921    if !response.status().is_success() {
922        return Err(A2aClientError::Protocol(format!(
923            "A2A task dispatch returned HTTP {}",
924            response.status()
925        )));
926    }
927    response
928        .json::<Value>()
929        .await
930        .map_err(|error| A2aClientError::Protocol(format!("parse A2A dispatch response: {error}")))
931}
932
933async fn send_http(
934    request: reqwest::RequestBuilder,
935    cancel_rx: &mut broadcast::Receiver<()>,
936    cancelled_message: &'static str,
937) -> Result<reqwest::Response, A2aClientError> {
938    tokio::select! {
939        response = request.send() => response.map_err(|error| {
940            if error.is_timeout() {
941                A2aClientError::Timeout(format!(
942                    "A2A HTTP request timed out: {}",
943                    crate::egress::redact_reqwest_error(&error)
944                ))
945            } else {
946                A2aClientError::Protocol(format!(
947                    "A2A HTTP request failed: {}",
948                    crate::egress::redact_reqwest_error(&error)
949                ))
950            }
951        }),
952        _ = recv_cancel(cancel_rx) => Err(A2aClientError::Cancelled(cancelled_message.to_string())),
953    }
954}
955
956fn task_state(task: &Value) -> Result<&str, A2aClientError> {
957    task.pointer("/status/state")
958        .and_then(Value::as_str)
959        .filter(|value| !value.is_empty())
960        .ok_or_else(|| {
961            A2aClientError::Protocol("A2A task response missing result.status.state".to_string())
962        })
963}
964
965fn task_status_message(task: &Value) -> Option<&str> {
966    task.pointer("/status/message/parts")
967        .and_then(Value::as_array)
968        .and_then(|parts| {
969            parts.iter().find_map(|part| {
970                if part.get("type").and_then(Value::as_str) == Some("text") {
971                    part.get("text").and_then(Value::as_str).map(str::trim)
972                } else {
973                    None
974                }
975            })
976        })
977        .filter(|message| !message.is_empty())
978}
979
980fn extract_inline_result(task: &Value) -> Value {
981    let text = task
982        .get("history")
983        .and_then(Value::as_array)
984        .and_then(|history| {
985            history.iter().rev().find_map(|message| {
986                let role = message.get("role").and_then(Value::as_str)?;
987                if role != "agent" {
988                    return None;
989                }
990                message
991                    .get("parts")
992                    .and_then(Value::as_array)
993                    .and_then(|parts| {
994                        parts.iter().find_map(|part| {
995                            if part.get("type").and_then(Value::as_str) == Some("text") {
996                                part.get("text").and_then(Value::as_str).map(str::trim_end)
997                            } else {
998                                None
999                            }
1000                        })
1001                    })
1002            })
1003        });
1004    match text {
1005        Some(text) if !text.is_empty() => {
1006            serde_json::from_str(text).unwrap_or_else(|_| Value::String(text.to_string()))
1007        }
1008        _ => task.clone(),
1009    }
1010}
1011
1012async fn recv_cancel(cancel_rx: &mut broadcast::Receiver<()>) {
1013    let _ = cancel_rx.recv().await;
1014}
1015
1016#[cfg(test)]
1017mod tests {
1018    use super::*;
1019
1020    #[test]
1021    fn target_agent_label_prefers_path() {
1022        assert_eq!(target_agent_label("reviewer.prod/triage"), "triage");
1023        assert_eq!(target_agent_label("reviewer.prod"), "reviewer.prod");
1024    }
1025
1026    #[test]
1027    fn extract_inline_result_parses_json_text() {
1028        let task = serde_json::json!({
1029            "history": [
1030                {"role": "user", "parts": [{"type": "text", "text": "ignored"}]},
1031                {"role": "agent", "parts": [{"type": "text", "text": "{\"trace_id\":\"trace_123\"}\n"}]},
1032            ]
1033        });
1034        assert_eq!(
1035            extract_inline_result(&task),
1036            serde_json::json!({"trace_id": "trace_123"})
1037        );
1038    }
1039
1040    #[test]
1041    fn discovery_prefers_https_before_http() {
1042        assert_eq!(card_resolution_schemes(false), ["https"]);
1043        assert_eq!(card_resolution_schemes(true), ["https", "http"]);
1044    }
1045
1046    #[test]
1047    fn endpoint_from_card_accepts_current_preferred_transport() {
1048        let endpoint = endpoint_from_card(
1049            "https://trusted.example/.well-known/agent-card.json".to_string(),
1050            false,
1051            "trusted.example",
1052            "triage".to_string(),
1053            &serde_json::json!({
1054                "name": "trusted",
1055                "protocolVersion": "0.3.0",
1056                "url": "https://trusted.example/rpc",
1057                "preferredTransport": "JSONRPC",
1058                "additionalInterfaces": [{
1059                    "url": "https://trusted.example/rpc",
1060                    "transport": "JSONRPC"
1061                }]
1062            }),
1063        )
1064        .expect("current A2A card should resolve");
1065        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1066        assert_eq!(
1067            endpoint.card_url,
1068            "https://trusted.example/.well-known/agent-card.json"
1069        );
1070        assert_eq!(endpoint.target_agent, "triage");
1071    }
1072
1073    #[test]
1074    fn endpoint_from_card_uses_additional_jsonrpc_interface_when_needed() {
1075        let endpoint = endpoint_from_card(
1076            "https://trusted.example/.well-known/agent-card.json".to_string(),
1077            false,
1078            "trusted.example",
1079            "triage".to_string(),
1080            &serde_json::json!({
1081                "name": "trusted",
1082                "protocolVersion": "0.3.0",
1083                "url": "https://trusted.example/rest",
1084                "preferredTransport": "HTTP+JSON",
1085                "additionalInterfaces": [{
1086                    "url": "https://trusted.example/rpc",
1087                    "transport": "JSONRPC"
1088                }]
1089            }),
1090        )
1091        .expect("current A2A card should resolve through additionalInterfaces");
1092        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1093    }
1094
1095    #[test]
1096    fn endpoint_from_card_accepts_legacy_supported_interfaces() {
1097        let endpoint = endpoint_from_card(
1098            "https://trusted.example/.well-known/agent-card.json".to_string(),
1099            false,
1100            "trusted.example",
1101            "triage".to_string(),
1102            &serde_json::json!({
1103                "name": "trusted",
1104                "supportedInterfaces": [{
1105                    "protocolBinding": "JSONRPC",
1106                    "protocolVersion": "0.3.0",
1107                    "url": "https://trusted.example/rpc"
1108                }],
1109            }),
1110        )
1111        .expect("legacy A2A card should resolve during the transition");
1112        assert_eq!(endpoint.rpc_url, "https://trusted.example/rpc");
1113    }
1114
1115    #[test]
1116    fn endpoint_from_card_rejects_removed_interfaces_shape() {
1117        let error = endpoint_from_card(
1118            "https://trusted.example/.well-known/agent-card.json".to_string(),
1119            false,
1120            "trusted.example",
1121            "triage".to_string(),
1122            &serde_json::json!({
1123                "url": "https://trusted.example",
1124                "interfaces": [{"protocol": "jsonrpc", "url": "/rpc"}],
1125            }),
1126        )
1127        .expect_err("pre-0.3 Harn discovery shape should be rejected");
1128        assert_eq!(
1129            error.to_string(),
1130            "A2A agent card missing preferredTransport/additionalInterfaces"
1131        );
1132    }
1133
1134    #[test]
1135    fn cleartext_fallback_only_after_https_connect_refused() {
1136        assert!(should_try_cleartext_fallback(
1137            "https",
1138            true,
1139            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1140            "reviewer.example:443",
1141        ));
1142        assert!(!should_try_cleartext_fallback(
1143            "http",
1144            true,
1145            &AgentCardFetchError::ConnectRefused("connect refused".to_string()),
1146            "reviewer.example:443",
1147        ));
1148        assert!(!should_try_cleartext_fallback(
1149            "https",
1150            true,
1151            &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1152            "reviewer.example:443",
1153        ));
1154    }
1155
1156    #[test]
1157    fn cleartext_fallback_requires_opt_in_even_for_loopback_authorities() {
1158        for authority in [
1159            "127.0.0.1:8080",
1160            "localhost:8080",
1161            "[::1]:8080",
1162            "127.1.2.3:9000",
1163        ] {
1164            assert!(
1165                !should_try_cleartext_fallback(
1166                    "https",
1167                    false,
1168                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1169                    authority,
1170                ),
1171                "cleartext fallback must stay disabled without opt-in for '{authority}'"
1172            );
1173        }
1174    }
1175
1176    #[test]
1177    fn cleartext_fallback_allows_loopback_after_opt_in() {
1178        // Local dev: harn serve is HTTP-only, so TLS handshake fails but we
1179        // still need the HTTP fallback to succeed.
1180        for authority in [
1181            "127.0.0.1:8080",
1182            "localhost:8080",
1183            "[::1]:8080",
1184            "127.1.2.3:9000",
1185        ] {
1186            assert!(
1187                should_try_cleartext_fallback(
1188                    "https",
1189                    true,
1190                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1191                    authority,
1192                ),
1193                "expected cleartext fallback for loopback authority '{authority}'"
1194            );
1195        }
1196    }
1197
1198    #[test]
1199    fn cleartext_fallback_denies_external_tls_failures() {
1200        // External target + TLS handshake failure must not downgrade — an
1201        // attacker able to forge TLS errors shouldn't force cleartext.
1202        for authority in [
1203            "reviewer.example:443",
1204            "8.8.8.8:443",
1205            "192.168.1.10:8080",
1206            "10.0.0.5:8443",
1207        ] {
1208            assert!(
1209                !should_try_cleartext_fallback(
1210                    "https",
1211                    true,
1212                    &AgentCardFetchError::Discovery("tls handshake failed".to_string()),
1213                    authority,
1214                ),
1215                "cleartext fallback must be denied for external authority '{authority}'"
1216            );
1217        }
1218    }
1219
1220    #[test]
1221    fn is_loopback_authority_recognises_loopback_forms() {
1222        assert!(is_loopback_authority("127.0.0.1:8080"));
1223        assert!(is_loopback_authority("localhost:8080"));
1224        assert!(is_loopback_authority("LOCALHOST:9000"));
1225        assert!(is_loopback_authority("[::1]:8080"));
1226        assert!(is_loopback_authority("127.5.5.5:1234"));
1227        assert!(!is_loopback_authority("8.8.8.8:443"));
1228        assert!(!is_loopback_authority("192.168.1.10:8080"));
1229        assert!(!is_loopback_authority("example.com:443"));
1230        assert!(!is_loopback_authority("reviewer.prod"));
1231    }
1232
1233    #[test]
1234    fn endpoint_from_card_rejects_card_url_authority_mismatch() {
1235        let error = endpoint_from_card(
1236            "https://trusted.example/.well-known/agent-card.json".to_string(),
1237            false,
1238            "trusted.example",
1239            "triage".to_string(),
1240            &serde_json::json!({
1241                "protocolVersion": "0.3.0",
1242                "url": "https://evil.example",
1243                "preferredTransport": "JSONRPC",
1244            }),
1245        )
1246        .unwrap_err();
1247        assert_eq!(
1248            error.to_string(),
1249            "A2A agent card url authority mismatch: requested 'trusted.example', card returned 'evil.example'"
1250        );
1251    }
1252
1253    #[test]
1254    fn endpoint_from_card_rejects_cleartext_without_opt_in() {
1255        let error = endpoint_from_card(
1256            "https://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1257            false,
1258            "127.0.0.1:8080",
1259            "triage".to_string(),
1260            &serde_json::json!({
1261                "protocolVersion": "0.3.0",
1262                "url": "http://localhost:8080",
1263                "preferredTransport": "JSONRPC",
1264            }),
1265        )
1266        .expect_err("cleartext card should require explicit opt-in");
1267        assert!(error
1268            .to_string()
1269            .contains("requires `allow_cleartext = true`"));
1270    }
1271
1272    #[test]
1273    fn endpoint_from_card_accepts_loopback_alias_pairs_when_cleartext_opted_in() {
1274        // harn serve reports `http://localhost:PORT` in its card, but clients
1275        // commonly dial `127.0.0.1:PORT`. Both refer to the same socket, so
1276        // the authority check must not spuriously reject the pair.
1277        let card = serde_json::json!({
1278            "protocolVersion": "0.3.0",
1279            "url": "http://localhost:8080",
1280            "preferredTransport": "JSONRPC",
1281        });
1282        let endpoint = endpoint_from_card(
1283            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1284            true,
1285            "127.0.0.1:8080",
1286            "triage".to_string(),
1287            &card,
1288        )
1289        .expect("loopback alias pair should be accepted");
1290        assert_eq!(endpoint.rpc_url, "http://localhost:8080/");
1291
1292        // IPv6 loopback `[::1]` also aliases to `127.0.0.1` / `localhost`.
1293        let card_v6 = serde_json::json!({
1294            "protocolVersion": "0.3.0",
1295            "url": "http://[::1]:8080",
1296            "preferredTransport": "JSONRPC",
1297        });
1298        let endpoint_v6 = endpoint_from_card(
1299            "http://localhost:8080/.well-known/agent-card.json".to_string(),
1300            true,
1301            "localhost:8080",
1302            "triage".to_string(),
1303            &card_v6,
1304        )
1305        .expect("IPv6 loopback alias should be accepted");
1306        assert_eq!(endpoint_v6.rpc_url, "http://[::1]:8080/");
1307
1308        // Port mismatch is still rejected even on loopback.
1309        let card_wrong_port = serde_json::json!({
1310            "protocolVersion": "0.3.0",
1311            "url": "http://localhost:9000",
1312            "preferredTransport": "JSONRPC",
1313        });
1314        let error = endpoint_from_card(
1315            "http://127.0.0.1:8080/.well-known/agent-card.json".to_string(),
1316            true,
1317            "127.0.0.1:8080",
1318            "triage".to_string(),
1319            &card_wrong_port,
1320        )
1321        .expect_err("mismatched ports must still be rejected even on loopback");
1322        assert!(error
1323            .to_string()
1324            .contains("A2A agent card url authority mismatch"));
1325    }
1326
1327    #[test]
1328    fn authorities_equivalent_rejects_non_loopback_host_mismatch() {
1329        assert!(!authorities_equivalent(
1330            "internal.corp.example:443",
1331            "trusted.example:443",
1332        ));
1333        assert!(!authorities_equivalent("10.0.0.5:8080", "127.0.0.1:8080",));
1334        assert!(authorities_equivalent(
1335            "trusted.example:443",
1336            "trusted.example:443",
1337        ));
1338    }
1339}