car-a2a 0.20.0

Bridge between Common Agent Runtime and the Linux Foundation Agent2Agent (A2A) v1.0 protocol
Documentation
//! Outbound A2A client.
//!
//! Lets a CAR instance call out to *another* A2A peer over the
//! HTTP+JSON-RPC binding. Mirrors the dispatcher's surface from the
//! caller's side: build a request, get back a typed response.
//!
//! The bridge crate's primary purpose is *inbound* — exposing CAR as
//! an A2A agent — but having a thin outbound client is useful for:
//!
//! - CAR-to-CAR communication when one CAR runtime needs to drive
//!   another over the network.
//! - Smoke-testing the inbound surface from an integration test
//!   without standing up Python or JS.
//! - Cooking up workflows that call multiple A2A peers in sequence
//!   from inside a CAR proposal.
//!
//! For richer client features (multi-transport selection, automatic
//! retries, agent-card-driven negotiation, OAuth2 token refresh),
//! the upstream crates `a2a-protocol-client` and `a2a-client` on
//! crates.io are the right pick. This module is intentionally
//! minimal.

use crate::types::{
    AgentCard, GetTaskParams, ListTasksParams, ListTasksResult, Message, PushNotificationConfig,
    SendMessageParams, SendMessageResult, Task,
};
use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value;
use std::sync::atomic::{AtomicU64, Ordering};

#[derive(Debug, thiserror::Error)]
pub enum ClientError {
    #[error("transport error: {0}")]
    Transport(#[from] reqwest::Error),
    /// Peer returned a non-2xx HTTP status. Most often 401/403
    /// (auth failure) or 5xx (peer overload). Body is captured
    /// (truncated) for debugging.
    #[error("a2a peer returned HTTP {code}: {body}")]
    Status { code: u16, body: String },
    /// Couldn't serialise the outbound request body. Caller bug.
    #[error("serialization error: {0}")]
    Serialize(#[from] serde_json::Error),
    /// Peer returned a JSON-RPC error envelope.
    #[error("a2a peer returned error code {code}: {message}")]
    Rpc { code: i32, message: String },
    /// Peer returned a JSON-RPC success envelope, but the `result`
    /// field couldn't be deserialised into the caller's expected
    /// type. Distinct from `Serialize` (request-side) so users can
    /// tell "I sent garbage" from "the peer returned an
    /// unexpected shape."
    #[error("a2a peer returned an unexpected result shape: {0}")]
    BadResultShape(String),
    /// Peer returned a response envelope missing `result` and `error`
    /// (i.e. neither success nor failure). JSON-RPC violation.
    #[error("a2a peer returned malformed response: {0}")]
    Malformed(String),
}

/// Auth credentials a peer agent expects on every request. Mirrors
/// the inbound auth surface in `crate::auth`.
#[derive(Debug, Clone)]
pub enum ClientAuth {
    None,
    /// `Authorization: Bearer <token>`
    Bearer(String),
    /// Custom header (e.g. `X-API-Key: <key>`).
    Header {
        name: String,
        value: String,
    },
}

impl Default for ClientAuth {
    fn default() -> Self {
        Self::None
    }
}

/// Outbound A2A client.
///
/// Holds a `reqwest::Client`, the peer's base URL, and optional
/// auth. Methods make a single JSON-RPC call per invocation.
pub struct A2aClient {
    http: reqwest::Client,
    base_url: String,
    auth: ClientAuth,
    next_id: AtomicU64,
}

impl A2aClient {
    /// Build a client pointing at `base_url`. Trailing slashes are
    /// trimmed.
    pub fn new(base_url: impl Into<String>) -> Self {
        let base = base_url.into();
        let trimmed = base.trim_end_matches('/').to_string();
        Self {
            http: reqwest::Client::builder()
                .timeout(std::time::Duration::from_secs(30))
                .build()
                .expect("default reqwest client must build"),
            base_url: trimmed,
            auth: ClientAuth::None,
            next_id: AtomicU64::new(1),
        }
    }

    /// Replace the HTTP client (e.g. with a proxy-configured one).
    pub fn with_http_client(mut self, http: reqwest::Client) -> Self {
        self.http = http;
        self
    }

    pub fn with_auth(mut self, auth: ClientAuth) -> Self {
        self.auth = auth;
        self
    }

    /// Fetch the peer's well-known Agent Card.
    pub async fn agent_card(&self) -> Result<AgentCard, ClientError> {
        let url = format!("{}/.well-known/agent-card.json", self.base_url);
        let resp = self.apply_auth(self.http.get(url)).send().await?;
        let body: Value = resp.json().await?;
        Ok(serde_json::from_value(body)?)
    }

    /// `SendMessage` — non-streaming send. Peers running A2A v0.3 only
    /// reject the v1.0 method name; for those, use `.call()` with
    /// `"message/send"` and the same params struct.
    ///
    /// `blocking`: when true, asks the peer to wait for the task to
    /// reach a terminal state before responding. Peers may ignore
    /// the flag and return early — the returned `Task` may be in
    /// `submitted`/`working` state regardless.
    pub async fn send_message(
        &self,
        message: Message,
        blocking: bool,
    ) -> Result<SendMessageResult, ClientError> {
        let params = SendMessageParams {
            message,
            configuration: Some(crate::types::MessageConfiguration {
                blocking,
                ..Default::default()
            }),
        };
        self.call("SendMessage", &params).await
    }

    pub async fn get_task(&self, task_id: impl Into<String>) -> Result<Task, ClientError> {
        let params = GetTaskParams {
            id: task_id.into(),
            history_length: None,
        };
        self.call("GetTask", &params).await
    }

    pub async fn list_tasks(
        &self,
        filter: ListTasksParams,
    ) -> Result<ListTasksResult, ClientError> {
        self.call("ListTasks", &filter).await
    }

    pub async fn cancel_task(&self, task_id: impl Into<String>) -> Result<Task, ClientError> {
        #[derive(Serialize)]
        struct P {
            id: String,
        }
        self.call("CancelTask", &P { id: task_id.into() }).await
    }

    /// Register a push-notification webhook for a task. Returns the
    /// peer-assigned config id.
    pub async fn set_push_config(
        &self,
        task_id: impl Into<String>,
        config: PushNotificationConfig,
    ) -> Result<String, ClientError> {
        #[derive(Serialize)]
        #[serde(rename_all = "camelCase")]
        struct P {
            task_id: String,
            config: PushNotificationConfig,
        }
        let resp: Value = self
            .call(
                "CreateTaskPushNotificationConfig",
                &P {
                    task_id: task_id.into(),
                    config,
                },
            )
            .await?;
        resp.get("configId")
            .and_then(|v| v.as_str())
            .map(|s| s.to_string())
            .ok_or_else(|| ClientError::Malformed("missing configId in response".into()))
    }

    /// Generic JSON-RPC call. Public so callers with peer-specific
    /// methods (or peers that have added extensions) can drive them.
    pub async fn call<P: Serialize, R: DeserializeOwned>(
        &self,
        method: &str,
        params: &P,
    ) -> Result<R, ClientError> {
        let id = self.next_id.fetch_add(1, Ordering::SeqCst);
        let body = serde_json::json!({
            "jsonrpc": "2.0",
            "method": method,
            "params": params,
            "id": id,
        });
        let resp = self
            .apply_auth(self.http.post(&self.base_url).json(&body))
            .send()
            .await?;
        let status = resp.status();
        if !status.is_success() {
            // Capture (truncated, redacted) body so the caller can
            // debug 401/403/5xx. Cap at 1KB to avoid logging whole
            // error pages. Run a redaction pass first so peers that
            // echo the request back can't surface our bearer token
            // through caller logs.
            let body = resp.text().await.unwrap_or_default();
            let truncated: String = body.chars().take(1024).collect();
            return Err(ClientError::Status {
                code: status.as_u16(),
                body: redact_credentials(&truncated),
            });
        }
        let envelope: Value = resp.json().await?;
        if let Some(err) = envelope.get("error") {
            let code = err.get("code").and_then(|v| v.as_i64()).unwrap_or(0) as i32;
            let message = err
                .get("message")
                .and_then(|v| v.as_str())
                .unwrap_or("(no message)")
                .to_string();
            return Err(ClientError::Rpc { code, message });
        }
        let result = envelope
            .get("result")
            .ok_or_else(|| ClientError::Malformed("missing `result` field".into()))?
            .clone();
        // Deserialise failure here is a peer-shape problem, not a
        // caller-side serialisation bug. Surface it distinctly.
        serde_json::from_value(result).map_err(|e| ClientError::BadResultShape(e.to_string()))
    }

    fn apply_auth(&self, req: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
        match &self.auth {
            ClientAuth::None => req,
            ClientAuth::Bearer(token) => req.bearer_auth(token),
            ClientAuth::Header { name, value } => req.header(name.as_str(), value.as_str()),
        }
    }
}

/// Strip `Authorization: Bearer …`, `X-API-Key: …`, and similar
/// header echoes from a captured body string. Doesn't catch every
/// case (peers can echo arbitrary fields), but kills the obvious
/// vector where a misbehaving peer's 4xx response surfaces our
/// bearer token in the caller's logs.
fn redact_credentials(input: &str) -> String {
    // Cheap: scan for known token-like patterns line by line.
    // Avoids pulling in `regex` for one redaction pass.
    let mut out = String::with_capacity(input.len());
    for line in input.split_inclusive('\n') {
        let trimmed = line.trim_start();
        let lower = trimmed.to_ascii_lowercase();
        let is_creds = lower.starts_with("authorization:")
            || lower.starts_with("authorization\":")
            || lower.starts_with("\"authorization\":")
            || lower.starts_with("x-api-key:")
            || lower.starts_with("\"x-api-key\":")
            || lower.starts_with("api-key:")
            || lower.starts_with("bearer ")
            || lower.contains(" bearer ");
        if is_creds {
            // Keep up to the first colon (or quote) for context; redact the rest.
            if let Some(idx) = line.find(':') {
                out.push_str(&line[..=idx]);
                out.push_str(" [REDACTED]");
                if line.ends_with('\n') {
                    out.push('\n');
                }
            } else {
                out.push_str("[REDACTED]");
                if line.ends_with('\n') {
                    out.push('\n');
                }
            }
        } else {
            out.push_str(line);
        }
    }
    out
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::types::{MessageRole, Part, TextPart};
    use std::collections::HashMap;

    #[test]
    fn client_trims_trailing_slash() {
        let c = A2aClient::new("http://example.test/");
        assert_eq!(c.base_url, "http://example.test");
    }

    #[test]
    fn client_auth_default_is_none() {
        let c = A2aClient::new("http://x");
        assert!(matches!(c.auth, ClientAuth::None));
    }

    #[test]
    fn redact_credentials_strips_authorization_lines() {
        let body = "HTTP/1.1 401 Unauthorized\r\nAuthorization: Bearer SECRET123\r\nX-API-Key: KEY456\r\nContent-Type: text/plain\r\n\r\nrequest rejected";
        let red = redact_credentials(body);
        assert!(!red.contains("SECRET123"), "bearer not redacted: {}", red);
        assert!(!red.contains("KEY456"), "api key not redacted: {}", red);
        assert!(red.contains("[REDACTED]"));
        assert!(red.contains("Content-Type: text/plain"));
    }

    #[test]
    fn message_can_be_constructed_for_call() {
        // Smoke test that the type plumbing compiles end-to-end.
        let _msg = Message {
            message_id: "m".into(),
            role: MessageRole::User,
            parts: vec![Part::Text(TextPart {
                text: "hi".into(),
                metadata: HashMap::new(),
            })],
            task_id: None,
            context_id: None,
            metadata: HashMap::new(),
        };
    }
}