Skip to main content

a2a_client/
client.rs

1//! A2A Client for calling remote A2A agents
2//!
3//! This module provides a client for making A2A protocol calls to remote agents.
4//! It supports both streaming and non-streaming interactions.
5
6use self::sse_parser::SseParser;
7use crate::constants::{AGENT_CARD_PATH, JSONRPC_VERSION};
8use crate::error::{A2AError, A2AResult};
9use a2a_types::{self as v1, JSONRPCErrorResponse, JSONRPCId};
10use futures_core::Stream;
11use reqwest::{Client, RequestBuilder, Url};
12use serde::{Deserialize, Serialize};
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16
17#[cfg(not(target_arch = "wasm32"))]
18type BoxedResultStream<T> = Pin<Box<dyn Stream<Item = A2AResult<T>> + Send>>;
19#[cfg(target_arch = "wasm32")]
20type BoxedResultStream<T> = Pin<Box<dyn Stream<Item = A2AResult<T>>>>;
21
22type SseStream = BoxedResultStream<v1::StreamResponse>;
23
24/// A2A client for communicating with remote agents
25#[derive(Clone)]
26pub struct A2AClient {
27    /// HTTP client for making requests
28    client: Client,
29    /// JSON-RPC service endpoint URL from the agent card, if available.
30    rpc_endpoint_url: Option<String>,
31    /// HTTP+JSON base URL from the agent card, if available.
32    http_json_endpoint_url: Option<String>,
33    /// Optional authentication token
34    auth_token: Option<String>,
35    /// Request ID counter for JSON-RPC requests
36    request_id_counter: Arc<AtomicU64>,
37    /// Cached agent card
38    agent_card: Arc<v1::AgentCard>,
39}
40
41/// JSON-RPC 2.0 request structure
42#[derive(Debug, Serialize)]
43struct JsonRpcRequest<T> {
44    jsonrpc: String,
45    id: JSONRPCId,
46    method: String,
47    params: T,
48}
49
50/// JSON-RPC 2.0 response structure
51#[derive(Debug, Serialize, Deserialize)]
52#[serde(untagged)]
53enum JsonRpcResponse<T> {
54    Success { id: Option<JSONRPCId>, result: T },
55    Error(JSONRPCErrorResponse),
56}
57
58/// Default HTTP request timeout applied on native targets.
59#[cfg(not(target_arch = "wasm32"))]
60const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
61
62/// Builds a `reqwest::Client` with the default 30-second timeout.
63/// On WASM, `reqwest::ClientBuilder` does not support `timeout`, so we fall
64/// back to a plain default client.
65fn default_client() -> Client {
66    #[cfg(not(target_arch = "wasm32"))]
67    {
68        Client::builder()
69            .timeout(DEFAULT_TIMEOUT)
70            .build()
71            .unwrap_or_default()
72    }
73    #[cfg(target_arch = "wasm32")]
74    {
75        Client::new()
76    }
77}
78
79fn parse_agent_card_bytes(bytes: &[u8]) -> A2AResult<v1::AgentCard> {
80    serde_json::from_slice(bytes).map_err(|error| A2AError::SerializationError {
81        message: format!("Failed to parse agent card: {error}"),
82    })
83}
84
85fn normalize_endpoint_url(url: &str) -> Option<String> {
86    let trimmed = url.trim().trim_end_matches('/');
87    (!trimmed.is_empty()).then(|| trimmed.to_string())
88}
89
90fn record_endpoint(slot: &mut Option<String>, url: &str) {
91    if slot.is_none() {
92        *slot = normalize_endpoint_url(url);
93    }
94}
95
96fn resolve_endpoints(agent_card: &v1::AgentCard) -> A2AResult<(Option<String>, Option<String>)> {
97    let mut rpc_endpoint_url = None;
98    let mut http_json_endpoint_url = None;
99
100    for interface in &agent_card.supported_interfaces {
101        match interface.protocol_binding.as_str() {
102            "JSONRPC" => record_endpoint(&mut rpc_endpoint_url, &interface.url),
103            "HTTP+JSON" => record_endpoint(&mut http_json_endpoint_url, &interface.url),
104            _ => {}
105        }
106    }
107
108    if rpc_endpoint_url.is_none() && http_json_endpoint_url.is_none() {
109        return Err(A2AError::InvalidParameter {
110            message: "Agent card does not contain a supported JSON-RPC or HTTP+JSON endpoint"
111                .to_string(),
112        });
113    }
114
115    Ok((rpc_endpoint_url, http_json_endpoint_url))
116}
117
118/// Converts a `pbjson_types::Timestamp` to an RFC 3339 string for use as a query parameter.
119fn timestamp_to_rfc3339(ts: pbjson_types::Timestamp) -> A2AResult<String> {
120    chrono::DateTime::from_timestamp(ts.seconds, ts.nanos.cast_unsigned())
121        .map(|dt| dt.to_rfc3339())
122        .ok_or_else(|| A2AError::InvalidParameter {
123            message: format!(
124                "Invalid timestamp: seconds={} nanos={}",
125                ts.seconds, ts.nanos
126            ),
127        })
128}
129
130fn task_state_query_value(value: i32) -> A2AResult<Option<String>> {
131    let state = v1::TaskState::try_from(value).map_err(|_| A2AError::InvalidParameter {
132        message: format!("Unknown task state enum value {value}"),
133    })?;
134
135    match state {
136        v1::TaskState::Unspecified => Ok(None),
137        other => Ok(Some(other.as_str_name().to_string())),
138    }
139}
140
141/// Handles parsing of Server-Sent Events (SSE) streams, accommodating both WASM and native targets.
142mod sse_parser {
143    use super::{A2AError, A2AResult, JsonRpcResponse};
144    use futures_core::Stream;
145    use serde::de::DeserializeOwned;
146    use std::pin::Pin;
147    use std::task::{Context, Poll};
148
149    // Define a trait that abstracts over the `Send` bound, which is required for non-WASM targets.
150    #[cfg(not(target_arch = "wasm32"))]
151    pub trait ByteStreamTrait: Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send {}
152    #[cfg(not(target_arch = "wasm32"))]
153    impl<T: Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send> ByteStreamTrait for T {}
154
155    #[cfg(target_arch = "wasm32")]
156    pub trait ByteStreamTrait: Stream<Item = Result<bytes::Bytes, reqwest::Error>> {}
157    #[cfg(target_arch = "wasm32")]
158    impl<T: Stream<Item = Result<bytes::Bytes, reqwest::Error>>> ByteStreamTrait for T {}
159
160    // Define a type alias for the pinned byte stream to avoid repetition.
161    #[cfg(not(target_arch = "wasm32"))]
162    type PinnedByteStream =
163        Pin<Box<dyn Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send>>;
164    #[cfg(target_arch = "wasm32")]
165    type PinnedByteStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, reqwest::Error>>>>;
166
167    /// A parser for Server-Sent Events (SSE) streams.
168    pub struct SseParser<T> {
169        inner: PinnedByteStream,
170        buffer: String,
171        event_data_buffer: String,
172        pending_results: Vec<A2AResult<T>>,
173        parser: fn(&str) -> A2AResult<T>,
174    }
175
176    impl<T> SseParser<T> {
177        /// Creates a new SSE parser from a byte stream.
178        pub fn new(
179            inner: impl ByteStreamTrait + 'static,
180            parser: fn(&str) -> A2AResult<T>,
181        ) -> Self {
182            Self {
183                inner: Box::pin(inner),
184                buffer: String::new(),
185                event_data_buffer: String::new(),
186                pending_results: Vec::new(),
187                parser,
188            }
189        }
190
191        /// Processes a chunk of bytes from the stream, parsing full SSE events.
192        fn process_chunk(&mut self, chunk: bytes::Bytes) -> Vec<A2AResult<T>> {
193            self.buffer.push_str(&String::from_utf8_lossy(&chunk));
194            let mut results = Vec::new();
195
196            // Process buffer line by line.
197            while let Some(newline_pos) = self.buffer.find('\n') {
198                let line = self.buffer[..newline_pos]
199                    .trim_end_matches('\r')
200                    .to_string();
201                self.buffer = self.buffer[newline_pos + 1..].to_string();
202
203                if line.is_empty() {
204                    // An empty line signifies the end of an event.
205                    if !self.event_data_buffer.is_empty() {
206                        match (self.parser)(&self.event_data_buffer) {
207                            Ok(result) => results.push(Ok(result)),
208                            Err(e) => results.push(Err(e)),
209                        }
210                        self.event_data_buffer.clear();
211                    }
212                } else if let Some(data) = line.strip_prefix("data:") {
213                    // Accumulate data lines for a single event.
214                    if !self.event_data_buffer.is_empty() {
215                        self.event_data_buffer.push('\n');
216                    }
217                    self.event_data_buffer.push_str(data.trim_start());
218                } else if line.starts_with(':') {
219                    // Ignore comment lines.
220                }
221            }
222            results
223        }
224    }
225
226    impl<T: Unpin> Stream for SseParser<T> {
227        type Item = A2AResult<T>;
228
229        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230            let this = self.get_mut();
231            // Drain any pending results from the last chunk processing.
232            if let Some(result) = this.pending_results.pop() {
233                return Poll::Ready(Some(result));
234            }
235
236            // Poll the underlying stream for more data.
237            match this.inner.as_mut().poll_next(cx) {
238                Poll::Ready(Some(Ok(chunk))) => {
239                    let mut results = this.process_chunk(chunk);
240                    if results.is_empty() {
241                        // If no full events were parsed, wait for more data.
242                        cx.waker().wake_by_ref();
243                        Poll::Pending
244                    } else {
245                        // Reverse results to return them in the correct order.
246                        results.reverse();
247                        this.pending_results = results;
248                        Poll::Ready(this.pending_results.pop())
249                    }
250                }
251                Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(A2AError::NetworkError {
252                    message: format!("Stream error: {}", e),
253                }))),
254                Poll::Ready(None) => Poll::Ready(None),
255                Poll::Pending => Poll::Pending,
256            }
257        }
258    }
259
260    /// Processes the data part of a single SSE event carrying a JSON-RPC success envelope.
261    pub(super) fn process_jsonrpc_sse_event<T>(json_data: &str) -> A2AResult<T>
262    where
263        T: DeserializeOwned,
264    {
265        if json_data.trim().is_empty() {
266            return Err(A2AError::SerializationError {
267                message: "Empty SSE event data".to_string(),
268            });
269        }
270
271        let json_response: JsonRpcResponse<T> =
272            serde_json::from_str(json_data).map_err(|e| A2AError::SerializationError {
273                message: format!("Failed to parse SSE event data: {}", e),
274            })?;
275
276        match json_response {
277            JsonRpcResponse::Success { result, .. } => Ok(result),
278            JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
279                message: format!("SSE event contained an error: {}", err.error.message),
280                code: Some(err.error.code),
281            }),
282        }
283    }
284
285    /// Processes the data part of a single SSE event carrying a direct JSON payload.
286    pub(super) fn process_direct_sse_event<T>(json_data: &str) -> A2AResult<T>
287    where
288        T: DeserializeOwned,
289    {
290        if json_data.trim().is_empty() {
291            return Err(A2AError::SerializationError {
292                message: "Empty SSE event data".to_string(),
293            });
294        }
295
296        serde_json::from_str(json_data).map_err(|e| A2AError::SerializationError {
297            message: format!("Failed to parse SSE event data: {}", e),
298        })
299    }
300
301    #[cfg(test)]
302    mod tests {
303        use super::*;
304        use a2a_types::{self as v1, JSONRPCError, JSONRPCErrorResponse, JSONRPCId};
305        use bytes::Bytes;
306        use futures_util::{StreamExt, stream};
307
308        fn sample_message(text: &str) -> v1::Message {
309            v1::Message {
310                message_id: format!("msg-{text}"),
311                context_id: "ctx-1".into(),
312                task_id: "task-1".into(),
313                role: v1::Role::Agent.into(),
314                parts: vec![v1::Part {
315                    content: Some(v1::part::Content::Text(text.to_string())),
316                    metadata: None,
317                    filename: String::new(),
318                    media_type: "text/plain".into(),
319                }],
320                metadata: None,
321                reference_task_ids: Vec::new(),
322                extensions: Vec::new(),
323            }
324        }
325
326        #[tokio::test]
327        async fn sse_parser_emits_multiple_events_in_order() {
328            let first = JsonRpcResponse::Success {
329                id: Some(JSONRPCId::Integer(1)),
330                result: v1::StreamResponse {
331                    payload: Some(v1::stream_response::Payload::Message(sample_message("one"))),
332                },
333            };
334            let second = JsonRpcResponse::Success {
335                id: Some(JSONRPCId::Integer(2)),
336                result: v1::StreamResponse {
337                    payload: Some(v1::stream_response::Payload::Message(sample_message("two"))),
338                },
339            };
340            let payload = format!(
341                "data: {}\n\ndata: {}\n\n",
342                serde_json::to_string(&first).expect("json"),
343                serde_json::to_string(&second).expect("json")
344            );
345            let byte_stream = stream::iter(vec![Ok::<Bytes, reqwest::Error>(Bytes::from(payload))]);
346
347            let mut parser =
348                SseParser::new(byte_stream, process_jsonrpc_sse_event::<v1::StreamResponse>);
349            let first_item: v1::StreamResponse =
350                parser.next().await.expect("first event").expect("ok");
351            let second_item: v1::StreamResponse =
352                parser.next().await.expect("second event").expect("ok");
353
354            match first_item.payload {
355                Some(v1::stream_response::Payload::Message(msg)) => {
356                    assert!(
357                        msg.parts.iter().any(|part| {
358                            matches!(part.content, Some(v1::part::Content::Text(_)))
359                        })
360                    );
361                }
362                other => panic!("expected message, got {other:?}"),
363            }
364
365            match second_item.payload {
366                Some(v1::stream_response::Payload::Message(msg)) => {
367                    assert!(msg.message_id.contains("two"));
368                }
369                other => panic!("expected message, got {other:?}"),
370            }
371        }
372
373        #[test]
374        fn process_sse_event_returns_error_for_remote_failure() {
375            let error = JsonRpcResponse::<v1::StreamResponse>::Error(JSONRPCErrorResponse {
376                jsonrpc: "2.0".into(),
377                error: JSONRPCError {
378                    code: -1,
379                    message: "boom".into(),
380                    data: None,
381                },
382                id: Some(JSONRPCId::Integer(1)),
383            });
384            let json = serde_json::to_string(&error).expect("json");
385            let result = process_jsonrpc_sse_event::<v1::StreamResponse>(&json);
386            assert!(matches!(result, Err(A2AError::RemoteAgentError { .. })));
387        }
388    }
389}
390
391impl A2AClient {
392    /// Create a new A2A client from an agent card URL
393    ///
394    /// This will fetch the agent card from the specified URL and use the
395    /// advertised v1 endpoints from the card for all subsequent requests.
396    ///
397    /// Uses a default `reqwest::Client` for HTTP requests. For custom HTTP
398    /// configuration, use `from_card_url_with_client()`.
399    ///
400    /// # Example
401    ///
402    /// ```no_run
403    /// use a2a_client::A2AClient;
404    ///
405    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
406    /// let client = A2AClient::from_card_url("https://agent.example.com").await?;
407    /// # Ok(())
408    /// # }
409    /// ```
410    ///
411    /// # Errors
412    ///
413    /// Returns an error if the agent card cannot be fetched, parsed, or does not advertise
414    /// a supported `JSONRPC` or `HTTP+JSON` interface.
415    pub async fn from_card_url(base_url: impl AsRef<str>) -> A2AResult<Self> {
416        Self::from_card_url_with_client(base_url, default_client()).await
417    }
418
419    /// Create a new A2A client from an agent card URL with a custom HTTP client
420    ///
421    /// This allows you to provide a pre-configured `reqwest::Client` with
422    /// custom settings like timeouts, proxies, TLS config, default headers, etc.
423    ///
424    /// # Example
425    ///
426    /// ```no_run
427    /// # #[cfg(not(target_family = "wasm"))]
428    /// # {
429    /// use a2a_client::A2AClient;
430    /// use reqwest::Client;
431    /// use std::time::Duration;
432    ///
433    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
434    /// let http_client = Client::builder()
435    ///     .timeout(Duration::from_secs(30))
436    ///     .build()?;
437    ///
438    /// let client = A2AClient::from_card_url_with_client(
439    ///     "https://agent.example.com",
440    ///     http_client
441    /// ).await?;
442    /// # Ok(())
443    /// # }
444    /// # }
445    /// ```
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the agent card cannot be fetched, the response status is not successful,
450    /// JSON parsing fails, or the card does not advertise a supported interface.
451    pub async fn from_card_url_with_client(
452        base_url: impl AsRef<str>,
453        http_client: Client,
454    ) -> A2AResult<Self> {
455        let base_url = base_url.as_ref().trim_end_matches('/');
456        let card_url = format!("{}/{}", base_url, AGENT_CARD_PATH);
457
458        let response = http_client
459            .get(&card_url)
460            .header("Accept", "application/json")
461            .send()
462            .await
463            .map_err(|e| A2AError::NetworkError {
464                message: format!("Failed to fetch agent card from {}: {}", card_url, e),
465            })?;
466
467        if !response.status().is_success() {
468            return Err(A2AError::NetworkError {
469                message: format!("Failed to fetch agent card: HTTP {}", response.status()),
470            });
471        }
472
473        let bytes = response
474            .bytes()
475            .await
476            .map_err(|e| A2AError::SerializationError {
477                message: format!("Failed to read agent card response: {}", e),
478            })?;
479        let agent_card = parse_agent_card_bytes(&bytes)?;
480        let (rpc_endpoint_url, http_json_endpoint_url) = resolve_endpoints(&agent_card)?;
481
482        Ok(Self {
483            client: http_client,
484            rpc_endpoint_url,
485            http_json_endpoint_url,
486            auth_token: None,
487            request_id_counter: Arc::new(AtomicU64::new(1)),
488            agent_card: Arc::new(agent_card),
489        })
490    }
491
492    /// Create a new A2A client directly from an agent card
493    ///
494    /// This is useful when you already have an agent card and don't need to fetch it.
495    /// Uses a default `reqwest::Client`. For custom HTTP configuration, use `from_card_with_client()`.
496    ///
497    /// # Example
498    ///
499    /// ```no_run
500    /// use a2a_client::A2AClient;
501    /// use a2a_types::AgentCard;
502    ///
503    /// # fn example(agent_card: AgentCard) -> Result<(), Box<dyn std::error::Error>> {
504    /// let client = A2AClient::from_card(agent_card)?;
505    /// # Ok(())
506    /// # }
507    /// ```
508    ///
509    /// # Errors
510    ///
511    /// Returns an error if the agent card does not advertise a supported `JSONRPC` or `HTTP+JSON` interface.
512    pub fn from_card(agent_card: v1::AgentCard) -> A2AResult<Self> {
513        Self::from_card_with_client(agent_card, default_client())
514    }
515
516    /// Create a new A2A client from an agent card with a custom HTTP client
517    ///
518    /// This allows you to provide a pre-configured `reqwest::Client` with
519    /// custom settings like timeouts, proxies, TLS config, default headers, etc.
520    ///
521    /// # Example
522    ///
523    /// ```no_run
524    /// # #[cfg(not(target_family = "wasm"))]
525    /// # {
526    /// use a2a_client::A2AClient;
527    /// use a2a_types::AgentCard;
528    /// use reqwest::Client;
529    /// use std::time::Duration;
530    ///
531    /// # fn example(agent_card: AgentCard) -> Result<(), Box<dyn std::error::Error>> {
532    /// let http_client = Client::builder()
533    ///     .timeout(Duration::from_secs(30))
534    ///     .default_headers({
535    ///         let mut headers = reqwest::header::HeaderMap::new();
536    ///         headers.insert("X-Custom-Header", "value".parse()?);
537    ///         headers
538    ///     })
539    ///     .build()?;
540    ///
541    /// let client = A2AClient::from_card_with_client(agent_card, http_client)?;
542    /// # Ok(())
543    /// # }
544    /// # }
545    /// ```
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if the agent card does not advertise a supported `JSONRPC` or `HTTP+JSON` interface.
550    pub fn from_card_with_client(
551        agent_card: v1::AgentCard,
552        http_client: Client,
553    ) -> A2AResult<Self> {
554        let (rpc_endpoint_url, http_json_endpoint_url) = resolve_endpoints(&agent_card)?;
555
556        Ok(Self {
557            client: http_client,
558            rpc_endpoint_url,
559            http_json_endpoint_url,
560            auth_token: None,
561            request_id_counter: Arc::new(AtomicU64::new(1)),
562            agent_card: Arc::new(agent_card),
563        })
564    }
565
566    /// Create a new A2A client from an agent card with custom headers
567    ///
568    /// This is a convenience method that builds a reqwest::Client with the provided
569    /// headers and uses it to create the A2AClient.
570    ///
571    /// # Example
572    ///
573    /// ```no_run
574    /// use a2a_client::A2AClient;
575    /// use a2a_types::AgentCard;
576    /// use std::collections::HashMap;
577    ///
578    /// # fn example(agent_card: AgentCard) -> Result<(), Box<dyn std::error::Error>> {
579    /// let mut headers = HashMap::new();
580    /// headers.insert("Authorization".to_string(), "Bearer token123".to_string());
581    /// headers.insert("X-API-Key".to_string(), "my-api-key".to_string());
582    ///
583    /// let client = A2AClient::from_card_with_headers(agent_card, headers)?;
584    /// # Ok(())
585    /// # }
586    /// ```
587    ///
588    /// # Errors
589    ///
590    /// Returns an error if the agent card is invalid, headers cannot be parsed, or the HTTP client cannot be built.
591    pub fn from_card_with_headers(
592        agent_card: v1::AgentCard,
593        headers: std::collections::HashMap<String, String>,
594    ) -> A2AResult<Self> {
595        use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
596        use std::str::FromStr;
597
598        let mut header_map = HeaderMap::new();
599        for (key, value) in headers {
600            let header_name =
601                HeaderName::from_str(&key).map_err(|e| A2AError::InvalidParameter {
602                    message: format!("Invalid header name '{}': {}", key, e),
603                })?;
604            let header_value =
605                HeaderValue::from_str(&value).map_err(|e| A2AError::InvalidParameter {
606                    message: format!("Invalid header value for '{}': {}", key, e),
607                })?;
608            header_map.insert(header_name, header_value);
609        }
610
611        let http_client = Client::builder()
612            .default_headers(header_map)
613            .build()
614            .map_err(|e| A2AError::NetworkError {
615                message: format!("Failed to build HTTP client with headers: {}", e),
616            })?;
617
618        Self::from_card_with_client(agent_card, http_client)
619    }
620
621    /// Set authentication token (builder pattern)
622    pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
623        self.auth_token = Some(token.into());
624        self
625    }
626
627    /// Get the cached agent card
628    pub fn agent_card(&self) -> &v1::AgentCard {
629        &self.agent_card
630    }
631
632    /// Fetch a fresh agent card from the base URL
633    ///
634    /// # Errors
635    ///
636    /// Returns an error if the network request fails, the response status is not successful, or JSON parsing fails.
637    pub async fn fetch_agent_card(&self, base_url: impl AsRef<str>) -> A2AResult<v1::AgentCard> {
638        let base_url = base_url.as_ref().trim_end_matches('/');
639        let card_url = format!("{}/{}", base_url, AGENT_CARD_PATH);
640
641        let mut req = self
642            .client
643            .get(&card_url)
644            .header("Accept", "application/json");
645
646        if let Some(token) = &self.auth_token {
647            req = req.bearer_auth(token);
648        }
649
650        let response = req.send().await.map_err(|e| A2AError::NetworkError {
651            message: format!("Failed to fetch agent card from {}: {}", card_url, e),
652        })?;
653
654        if !response.status().is_success() {
655            return Err(A2AError::NetworkError {
656                message: format!("Failed to fetch agent card: HTTP {}", response.status()),
657            });
658        }
659
660        let bytes = response
661            .bytes()
662            .await
663            .map_err(|e| A2AError::SerializationError {
664                message: format!("Failed to read agent card response: {}", e),
665            })?;
666
667        parse_agent_card_bytes(&bytes)
668    }
669
670    /// Fetch the extended agent card if the agent advertises one.
671    ///
672    /// Checks `capabilities.extended_agent_card` on the cached public card first.
673    /// If the agent does not advertise an extended card, returns `None` immediately
674    /// without making a network request.
675    ///
676    /// # Errors
677    ///
678    /// Returns an error if the network request fails or the response cannot be parsed.
679    pub async fn fetch_extended_agent_card_if_available(&self) -> A2AResult<Option<v1::AgentCard>> {
680        let advertises_extended = self
681            .agent_card
682            .capabilities
683            .as_ref()
684            .and_then(|c| c.extended_agent_card)
685            .unwrap_or(false);
686
687        if !advertises_extended {
688            return Ok(None);
689        }
690
691        let card = self
692            .get_extended_agent_card(v1::GetExtendedAgentCardRequest {
693                tenant: String::new(),
694            })
695            .await?;
696
697        Ok(Some(card))
698    }
699
700    /// Get the next request ID
701    fn next_request_id(&self) -> JSONRPCId {
702        let id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
703        JSONRPCId::Integer(id as i64)
704    }
705
706    fn rpc_endpoint(&self) -> A2AResult<&str> {
707        self.rpc_endpoint_url
708            .as_deref()
709            .ok_or_else(|| A2AError::InvalidParameter {
710                message: "Agent does not advertise a JSON-RPC endpoint".to_string(),
711            })
712    }
713
714    fn http_json_base_url(&self) -> Option<&str> {
715        self.http_json_endpoint_url.as_deref()
716    }
717
718    fn build_http_json_url(&self, segments: &[&str]) -> A2AResult<String> {
719        let base = self
720            .http_json_base_url()
721            .ok_or_else(|| A2AError::InvalidParameter {
722                message: "Agent does not advertise an HTTP+JSON endpoint".to_string(),
723            })?;
724        let mut url = Url::parse(base).map_err(|error| A2AError::InvalidParameter {
725            message: format!("Invalid HTTP+JSON base URL `{base}`: {error}"),
726        })?;
727        {
728            let mut path_segments =
729                url.path_segments_mut()
730                    .map_err(|()| A2AError::InvalidParameter {
731                        message: format!("HTTP+JSON base URL `{base}` cannot accept path segments"),
732                    })?;
733            for segment in segments {
734                path_segments.push(segment);
735            }
736        }
737        Ok(url.to_string())
738    }
739
740    /// Applies auth, tracing headers, and the optional `X-A2A-Tenant` header to a request.
741    fn prepare_request_with_tenant(&self, request: RequestBuilder, tenant: &str) -> RequestBuilder {
742        let mut req = self.prepare_request(request);
743        if !tenant.is_empty() {
744            req = req.header("X-A2A-Tenant", tenant);
745        }
746        req
747    }
748
749    fn prepare_request(&self, mut request: RequestBuilder) -> RequestBuilder {
750        for (key, value) in Self::inject_trace_context() {
751            request = request.header(key, value);
752        }
753
754        if let Some(token) = &self.auth_token {
755            request = request.bearer_auth(token);
756        }
757
758        request
759    }
760
761    async fn send_json_request<TResponse>(
762        &self,
763        request: RequestBuilder,
764        context: &str,
765        tenant: &str,
766    ) -> A2AResult<TResponse>
767    where
768        TResponse: for<'de> Deserialize<'de>,
769    {
770        let response = self
771            .prepare_request_with_tenant(request, tenant)
772            .send()
773            .await
774            .map_err(|e| A2AError::NetworkError {
775                message: format!("Failed to send {context} request: {e}"),
776            })?;
777
778        if !response.status().is_success() {
779            let status = response.status();
780            let error_text = response.text().await.unwrap_or_default();
781            if let Ok(error_json) = serde_json::from_str::<JSONRPCErrorResponse>(&error_text) {
782                return Err(A2AError::RemoteAgentError {
783                    message: error_json.error.message,
784                    code: Some(error_json.error.code),
785                });
786            }
787            return Err(A2AError::NetworkError {
788                message: format!("HTTP error {status}: {error_text}"),
789            });
790        }
791
792        let content_type = response
793            .headers()
794            .get("Content-Type")
795            .and_then(|v| v.to_str().ok())
796            .unwrap_or("")
797            .to_owned();
798
799        if !content_type.starts_with("application/json") {
800            let body = response.text().await.unwrap_or_default();
801            return Err(A2AError::SerializationError {
802                message: format!(
803                    "Expected Content-Type application/json for {context}, got '{content_type}': {body}"
804                ),
805            });
806        }
807
808        response
809            .json()
810            .await
811            .map_err(|e| A2AError::SerializationError {
812                message: format!("Failed to parse {context} response: {e}"),
813            })
814    }
815
816    async fn start_sse_request(
817        &self,
818        request: RequestBuilder,
819        context: &str,
820        tenant: &str,
821    ) -> A2AResult<reqwest::Response> {
822        let response = self
823            .prepare_request_with_tenant(request, tenant)
824            .send()
825            .await
826            .map_err(|e| A2AError::NetworkError {
827                message: format!("Failed to send {context} request: {e}"),
828            })?;
829
830        if !response.status().is_success() {
831            let status = response.status();
832            let error_text = response.text().await.unwrap_or_default();
833            return Err(A2AError::NetworkError {
834                message: format!("HTTP error {status}: {error_text}"),
835            });
836        }
837
838        let content_type = response
839            .headers()
840            .get("Content-Type")
841            .and_then(|v| v.to_str().ok())
842            .unwrap_or("");
843
844        if !content_type.starts_with("text/event-stream") {
845            return Err(A2AError::NetworkError {
846                message: format!(
847                    "Invalid response Content-Type for SSE stream. Expected 'text/event-stream', got '{content_type}'"
848                ),
849            });
850        }
851
852        Ok(response)
853    }
854
855    /// Inject W3C Trace Context into HTTP headers for distributed tracing
856    ///
857    /// Extracts the OpenTelemetry context from the current tracing span and
858    /// injects it into a carrier (HashMap) that can be used as HTTP headers.
859    /// This enables trace propagation across service boundaries.
860    fn inject_trace_context() -> std::collections::HashMap<String, String> {
861        use opentelemetry::global;
862        use tracing_opentelemetry::OpenTelemetrySpanExt;
863
864        let mut carrier = std::collections::HashMap::new();
865
866        // Get the OpenTelemetry context from the current tracing span
867        let context = tracing::Span::current().context();
868
869        // Inject the context into the carrier (adds traceparent, tracestate headers)
870        // OpenTelemetry 0.31+ uses a closure-based API
871        global::get_text_map_propagator(|propagator| {
872            propagator.inject_context(&context, &mut carrier);
873        });
874
875        carrier
876    }
877
878    /// Helper method to make a generic JSON-RPC POST request
879    async fn post_rpc_request<TParams, TResponse>(
880        &self,
881        method: &str,
882        params: TParams,
883    ) -> A2AResult<JsonRpcResponse<TResponse>>
884    where
885        TParams: Serialize,
886        TResponse: for<'de> Deserialize<'de>,
887    {
888        let request_id = self.next_request_id();
889        let rpc_request = JsonRpcRequest {
890            jsonrpc: JSONRPC_VERSION.to_string(),
891            method: method.to_string(),
892            params,
893            id: request_id.clone(),
894        };
895
896        let req = self
897            .client
898            .post(self.rpc_endpoint()?)
899            .header("Content-Type", "application/json")
900            .header("Accept", "application/json")
901            .json(&rpc_request);
902
903        let response =
904            self.prepare_request(req)
905                .send()
906                .await
907                .map_err(|e| A2AError::NetworkError {
908                    message: format!("Failed to send {method} request: {e}"),
909                })?;
910
911        if !response.status().is_success() {
912            let status = response.status();
913            let error_text = response.text().await.unwrap_or_default();
914            if let Ok(error_json) = serde_json::from_str::<JSONRPCErrorResponse>(&error_text) {
915                return Ok(JsonRpcResponse::Error(error_json));
916            }
917            return Err(A2AError::NetworkError {
918                message: format!("HTTP error {}: {}", status, error_text),
919            });
920        }
921
922        let content_type = response
923            .headers()
924            .get("Content-Type")
925            .and_then(|v| v.to_str().ok())
926            .unwrap_or("")
927            .to_owned();
928
929        if !content_type.starts_with("application/json") {
930            let body = response.text().await.unwrap_or_default();
931            return Err(A2AError::SerializationError {
932                message: format!(
933                    "Expected Content-Type application/json for {method}, got '{content_type}': {body}"
934                ),
935            });
936        }
937
938        let json_response: JsonRpcResponse<TResponse> =
939            response
940                .json()
941                .await
942                .map_err(|e| A2AError::SerializationError {
943                    message: format!("Failed to parse {} response: {}", method, e),
944                })?;
945
946        // Validate that the response ID matches the request ID per JSON-RPC §5.
947        if let JsonRpcResponse::Success {
948            id: Some(resp_id),
949            ..
950        } = &json_response
951            && resp_id != &request_id
952        {
953            return Err(A2AError::InvalidParameter {
954                message: format!(
955                    "JSON-RPC response ID mismatch for method '{method}': expected {request_id:?}, got {resp_id:?}"
956                ),
957            });
958        }
959
960        Ok(json_response)
961    }
962
963    fn unwrap_rpc_response<T>(&self, response: JsonRpcResponse<T>) -> A2AResult<T> {
964        match response {
965            JsonRpcResponse::Success { result, .. } => Ok(result),
966            JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
967                message: format!("Remote agent error: {}", err.error.message),
968                code: Some(err.error.code),
969            }),
970        }
971    }
972
973    fn ensure_streaming_enabled(&self, action: &str) -> A2AResult<()> {
974        if self
975            .agent_card
976            .capabilities
977            .as_ref()
978            .and_then(|capabilities| capabilities.streaming)
979            .unwrap_or(false)
980        {
981            Ok(())
982        } else {
983            Err(A2AError::InvalidParameter {
984                message: format!("Agent does not support streaming (required for {action})"),
985            })
986        }
987    }
988
989    fn ensure_push_notifications_enabled(&self) -> A2AResult<()> {
990        if self
991            .agent_card
992            .capabilities
993            .as_ref()
994            .and_then(|capabilities| capabilities.push_notifications)
995            .unwrap_or(false)
996        {
997            Ok(())
998        } else {
999            Err(A2AError::InvalidParameter {
1000                message: "Agent does not support push notifications (capabilities.pushNotifications is not true)"
1001                    .to_string(),
1002            })
1003        }
1004    }
1005
1006    /// Send a message using the A2A v1 surface.
1007    pub async fn send_message(
1008        &self,
1009        request: v1::SendMessageRequest,
1010    ) -> A2AResult<v1::SendMessageResponse> {
1011        if self.http_json_base_url().is_some() {
1012            let tenant = request.tenant.clone();
1013            let url = self.build_http_json_url(&["message:send"])?;
1014            return self
1015                .send_json_request(
1016                    self.client
1017                        .post(url)
1018                        .header("Content-Type", "application/json")
1019                        .header("Accept", "application/json")
1020                        .json(&request),
1021                    "SendMessage",
1022                    &tenant,
1023                )
1024                .await;
1025        }
1026
1027        self.unwrap_rpc_response(self.post_rpc_request("SendMessage", request).await?)
1028    }
1029
1030    /// Send a streaming message using the A2A v1 surface.
1031    pub async fn send_streaming_message(
1032        &self,
1033        request: v1::SendMessageRequest,
1034    ) -> A2AResult<SseStream> {
1035        self.ensure_streaming_enabled("SendStreamingMessage")?;
1036
1037        if self.http_json_base_url().is_some() {
1038            let tenant = request.tenant.clone();
1039            let url = self.build_http_json_url(&["message:stream"])?;
1040            let response = self
1041                .start_sse_request(
1042                    self.client
1043                        .post(url)
1044                        .header("Content-Type", "application/json")
1045                        .header("Accept", "text/event-stream")
1046                        .json(&request),
1047                    "SendStreamingMessage",
1048                    &tenant,
1049                )
1050                .await?;
1051            return Ok(Box::pin(SseParser::new(
1052                response.bytes_stream(),
1053                sse_parser::process_direct_sse_event::<v1::StreamResponse>,
1054            )));
1055        }
1056
1057        let rpc_request = JsonRpcRequest {
1058            jsonrpc: JSONRPC_VERSION.to_string(),
1059            method: "SendStreamingMessage".to_string(),
1060            params: request,
1061            id: self.next_request_id(),
1062        };
1063        let response = self
1064            .start_sse_request(
1065                self.client
1066                    .post(self.rpc_endpoint()?)
1067                    .header("Content-Type", "application/json")
1068                    .header("Accept", "text/event-stream")
1069                    .json(&rpc_request),
1070                "SendStreamingMessage",
1071                "",
1072            )
1073            .await?;
1074
1075        Ok(Box::pin(SseParser::new(
1076            response.bytes_stream(),
1077            sse_parser::process_jsonrpc_sse_event::<v1::StreamResponse>,
1078        )))
1079    }
1080
1081    /// Retrieve a task using the A2A v1 surface.
1082    pub async fn get_task(&self, request: v1::GetTaskRequest) -> A2AResult<v1::Task> {
1083        if self.http_json_base_url().is_some() {
1084            #[derive(Serialize)]
1085            struct GetTaskQuery {
1086                #[serde(skip_serializing_if = "Option::is_none", rename = "historyLength")]
1087                history_length: Option<i32>,
1088            }
1089
1090            let tenant = request.tenant.clone();
1091            let url = self.build_http_json_url(&["tasks", &request.id])?;
1092            return self
1093                .send_json_request(
1094                    self.client
1095                        .get(url)
1096                        .header("Accept", "application/json")
1097                        .query(&GetTaskQuery {
1098                            history_length: request.history_length,
1099                        }),
1100                    "GetTask",
1101                    &tenant,
1102                )
1103                .await;
1104        }
1105
1106        self.unwrap_rpc_response(self.post_rpc_request("GetTask", request).await?)
1107    }
1108
1109    /// List tasks using the A2A v1 surface.
1110    pub async fn list_tasks(
1111        &self,
1112        request: v1::ListTasksRequest,
1113    ) -> A2AResult<v1::ListTasksResponse> {
1114        if self.http_json_base_url().is_some() {
1115            #[derive(Serialize)]
1116            struct ListTasksQuery {
1117                #[serde(skip_serializing_if = "String::is_empty", rename = "contextId")]
1118                context_id: String,
1119                #[serde(skip_serializing_if = "Option::is_none")]
1120                status: Option<String>,
1121                #[serde(skip_serializing_if = "Option::is_none", rename = "pageSize")]
1122                page_size: Option<i32>,
1123                #[serde(skip_serializing_if = "String::is_empty", rename = "pageToken")]
1124                page_token: String,
1125                #[serde(skip_serializing_if = "Option::is_none", rename = "historyLength")]
1126                history_length: Option<i32>,
1127                #[serde(
1128                    skip_serializing_if = "Option::is_none",
1129                    rename = "statusTimestampAfter"
1130                )]
1131                status_timestamp_after: Option<String>,
1132                #[serde(skip_serializing_if = "Option::is_none", rename = "includeArtifacts")]
1133                include_artifacts: Option<bool>,
1134            }
1135
1136            let tenant = request.tenant.clone();
1137            let url = self.build_http_json_url(&["tasks"])?;
1138            let status = task_state_query_value(request.status)?;
1139            let status_timestamp_after = request
1140                .status_timestamp_after
1141                .map(timestamp_to_rfc3339)
1142                .transpose()?;
1143            return self
1144                .send_json_request(
1145                    self.client
1146                        .get(url)
1147                        .header("Accept", "application/json")
1148                        .query(&ListTasksQuery {
1149                            context_id: request.context_id,
1150                            status,
1151                            page_size: request.page_size,
1152                            page_token: request.page_token,
1153                            history_length: request.history_length,
1154                            status_timestamp_after,
1155                            include_artifacts: request.include_artifacts,
1156                        }),
1157                    "ListTasks",
1158                    &tenant,
1159                )
1160                .await;
1161        }
1162
1163        self.unwrap_rpc_response(self.post_rpc_request("ListTasks", request).await?)
1164    }
1165
1166    /// Cancel a task using the A2A v1 surface.
1167    pub async fn cancel_task(&self, request: v1::CancelTaskRequest) -> A2AResult<v1::Task> {
1168        if self.http_json_base_url().is_some() {
1169            let cancel_segment = format!("{}:cancel", request.id);
1170            let tenant = request.tenant.clone();
1171            let url = self.build_http_json_url(&["tasks", &cancel_segment])?;
1172            return self
1173                .send_json_request(
1174                    self.client
1175                        .post(url)
1176                        .header("Content-Type", "application/json")
1177                        .header("Accept", "application/json")
1178                        .json(&request),
1179                    "CancelTask",
1180                    &tenant,
1181                )
1182                .await;
1183        }
1184
1185        self.unwrap_rpc_response(self.post_rpc_request("CancelTask", request).await?)
1186    }
1187
1188    /// Subscribe to a task stream using the A2A v1 surface.
1189    pub async fn subscribe_to_task(
1190        &self,
1191        request: v1::SubscribeToTaskRequest,
1192    ) -> A2AResult<SseStream> {
1193        self.ensure_streaming_enabled("SubscribeToTask")?;
1194
1195        if self.http_json_base_url().is_some() {
1196            let subscribe_segment = format!("{}:subscribe", request.id);
1197            let tenant = request.tenant.clone();
1198            let url = self.build_http_json_url(&["tasks", &subscribe_segment])?;
1199            let response = self
1200                .start_sse_request(
1201                    self.client.get(url).header("Accept", "text/event-stream"),
1202                    "SubscribeToTask",
1203                    &tenant,
1204                )
1205                .await?;
1206            return Ok(Box::pin(SseParser::new(
1207                response.bytes_stream(),
1208                sse_parser::process_direct_sse_event::<v1::StreamResponse>,
1209            )));
1210        }
1211
1212        let rpc_request = JsonRpcRequest {
1213            jsonrpc: JSONRPC_VERSION.to_string(),
1214            method: "SubscribeToTask".to_string(),
1215            params: request,
1216            id: self.next_request_id(),
1217        };
1218        let response = self
1219            .start_sse_request(
1220                self.client
1221                    .post(self.rpc_endpoint()?)
1222                    .header("Content-Type", "application/json")
1223                    .header("Accept", "text/event-stream")
1224                    .json(&rpc_request),
1225                "SubscribeToTask",
1226                "",
1227            )
1228            .await?;
1229
1230        Ok(Box::pin(SseParser::new(
1231            response.bytes_stream(),
1232            sse_parser::process_jsonrpc_sse_event::<v1::StreamResponse>,
1233        )))
1234    }
1235
1236    /// Fetch the extended agent card using the A2A v1 surface.
1237    pub async fn get_extended_agent_card(
1238        &self,
1239        request: v1::GetExtendedAgentCardRequest,
1240    ) -> A2AResult<v1::AgentCard> {
1241        if self.http_json_base_url().is_some() {
1242            let tenant = request.tenant.clone();
1243            let url = self.build_http_json_url(&["extendedAgentCard"])?;
1244            return self
1245                .send_json_request(
1246                    self.client.get(url).header("Accept", "application/json"),
1247                    "GetExtendedAgentCard",
1248                    &tenant,
1249                )
1250                .await;
1251        }
1252
1253        self.unwrap_rpc_response(
1254            self.post_rpc_request("GetExtendedAgentCard", request)
1255                .await?,
1256        )
1257    }
1258
1259    /// Create or replace a task push notification config using the A2A v1 surface.
1260    pub async fn create_task_push_notification_config(
1261        &self,
1262        request: v1::TaskPushNotificationConfig,
1263    ) -> A2AResult<v1::TaskPushNotificationConfig> {
1264        self.ensure_push_notifications_enabled()?;
1265
1266        if self.http_json_base_url().is_some() {
1267            let tenant = request.tenant.clone();
1268            let url =
1269                self.build_http_json_url(&["tasks", &request.task_id, "pushNotificationConfigs"])?;
1270            return self
1271                .send_json_request(
1272                    self.client
1273                        .post(url)
1274                        .header("Content-Type", "application/json")
1275                        .header("Accept", "application/json")
1276                        .json(&request),
1277                    "CreateTaskPushNotificationConfig",
1278                    &tenant,
1279                )
1280                .await;
1281        }
1282
1283        self.unwrap_rpc_response(
1284            self.post_rpc_request("CreateTaskPushNotificationConfig", request)
1285                .await?,
1286        )
1287    }
1288
1289    /// Fetch a task push notification config using the A2A v1 surface.
1290    pub async fn get_task_push_notification_config(
1291        &self,
1292        request: v1::GetTaskPushNotificationConfigRequest,
1293    ) -> A2AResult<v1::TaskPushNotificationConfig> {
1294        if self.http_json_base_url().is_some() {
1295            let tenant = request.tenant.clone();
1296            let url = self.build_http_json_url(&[
1297                "tasks",
1298                &request.task_id,
1299                "pushNotificationConfigs",
1300                &request.id,
1301            ])?;
1302            return self
1303                .send_json_request(
1304                    self.client.get(url).header("Accept", "application/json"),
1305                    "GetTaskPushNotificationConfig",
1306                    &tenant,
1307                )
1308                .await;
1309        }
1310
1311        self.unwrap_rpc_response(
1312            self.post_rpc_request("GetTaskPushNotificationConfig", request)
1313                .await?,
1314        )
1315    }
1316
1317    /// List task push notification configs using the A2A v1 surface.
1318    pub async fn list_task_push_notification_configs(
1319        &self,
1320        request: v1::ListTaskPushNotificationConfigsRequest,
1321    ) -> A2AResult<v1::ListTaskPushNotificationConfigsResponse> {
1322        if self.http_json_base_url().is_some() {
1323            #[derive(Serialize)]
1324            struct ListConfigsQuery {
1325                #[serde(rename = "pageSize")]
1326                page_size: i32,
1327                #[serde(skip_serializing_if = "String::is_empty", rename = "pageToken")]
1328                page_token: String,
1329            }
1330
1331            let tenant = request.tenant.clone();
1332            let url =
1333                self.build_http_json_url(&["tasks", &request.task_id, "pushNotificationConfigs"])?;
1334            return self
1335                .send_json_request(
1336                    self.client
1337                        .get(url)
1338                        .header("Accept", "application/json")
1339                        .query(&ListConfigsQuery {
1340                            page_size: request.page_size,
1341                            page_token: request.page_token,
1342                        }),
1343                    "ListTaskPushNotificationConfigs",
1344                    &tenant,
1345                )
1346                .await;
1347        }
1348
1349        self.unwrap_rpc_response(
1350            self.post_rpc_request("ListTaskPushNotificationConfigs", request)
1351                .await?,
1352        )
1353    }
1354
1355    /// Delete a task push notification config using the A2A v1 surface.
1356    pub async fn delete_task_push_notification_config(
1357        &self,
1358        request: v1::DeleteTaskPushNotificationConfigRequest,
1359    ) -> A2AResult<()> {
1360        if self.http_json_base_url().is_some() {
1361            let tenant = request.tenant.clone();
1362            let url = self.build_http_json_url(&[
1363                "tasks",
1364                &request.task_id,
1365                "pushNotificationConfigs",
1366                &request.id,
1367            ])?;
1368            // DELETE returns an empty body; send the request and only check for errors.
1369            let _: serde_json::Value = self
1370                .send_json_request(
1371                    self.client.delete(url).header("Accept", "application/json"),
1372                    "DeleteTaskPushNotificationConfig",
1373                    &tenant,
1374                )
1375                .await?;
1376            return Ok(());
1377        }
1378
1379        // JSON-RPC: unwrap_rpc_response propagates any error response,
1380        // including those returned with a 2xx HTTP status.
1381        let _: serde_json::Value = self.unwrap_rpc_response(
1382            self.post_rpc_request("DeleteTaskPushNotificationConfig", request)
1383                .await?,
1384        )?;
1385        Ok(())
1386    }
1387
1388    /// Call a custom extension method
1389    ///
1390    /// This allows calling custom JSON-RPC methods defined by agent extensions.
1391    ///
1392    /// # Errors
1393    ///
1394    /// Returns an error if the RPC request fails or the remote agent returns an error response.
1395    pub async fn call_extension_method<TParams, TResponse>(
1396        &self,
1397        method: &str,
1398        params: TParams,
1399    ) -> A2AResult<TResponse>
1400    where
1401        TParams: Serialize,
1402        TResponse: for<'de> Deserialize<'de>,
1403    {
1404        match self.post_rpc_request(method, params).await? {
1405            JsonRpcResponse::Success { result, .. } => Ok(result),
1406            JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
1407                message: format!("Remote agent error: {}", err.error.message),
1408                code: Some(err.error.code),
1409            }),
1410        }
1411    }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416    use super::*;
1417
1418    #[test]
1419    fn test_client_requires_valid_card_url() {
1420        let card_without_url = v1::AgentCard {
1421            name: "Test".to_string(),
1422            description: "Test".to_string(),
1423            supported_interfaces: vec![],
1424            provider: None,
1425            version: "1.0.0".to_string(),
1426            documentation_url: None,
1427            capabilities: Some(v1::AgentCapabilities::default()),
1428            security_schemes: std::collections::HashMap::new(),
1429            security_requirements: Vec::new(),
1430            default_input_modes: vec![],
1431            default_output_modes: vec![],
1432            skills: vec![],
1433            signatures: vec![],
1434            icon_url: None,
1435        };
1436
1437        assert!(A2AClient::from_card(card_without_url).is_err());
1438    }
1439
1440    #[test]
1441    fn test_from_card_with_headers() {
1442        let mut headers = std::collections::HashMap::new();
1443        headers.insert("Authorization".to_string(), "Bearer token123".to_string());
1444        headers.insert("X-API-Key".to_string(), "my-api-key".to_string());
1445
1446        let card = v1::AgentCard {
1447            name: "Test".to_string(),
1448            description: "Test agent".to_string(),
1449            supported_interfaces: vec![v1::AgentInterface {
1450                url: "https://example.com".to_string(),
1451                protocol_binding: "JSONRPC".to_string(),
1452                tenant: String::new(),
1453                protocol_version: "1.0".to_string(),
1454            }],
1455            provider: None,
1456            version: "1.0.0".to_string(),
1457            documentation_url: None,
1458            capabilities: Some(v1::AgentCapabilities::default()),
1459            security_schemes: std::collections::HashMap::new(),
1460            security_requirements: Vec::new(),
1461            default_input_modes: vec![],
1462            default_output_modes: vec![],
1463            skills: vec![],
1464            signatures: vec![],
1465            icon_url: None,
1466        };
1467
1468        let result = A2AClient::from_card_with_headers(card, headers);
1469        assert!(result.is_ok());
1470
1471        let client = result.unwrap();
1472        assert_eq!(
1473            client.rpc_endpoint_url.as_deref(),
1474            Some("https://example.com")
1475        );
1476        assert_eq!(client.http_json_endpoint_url, None);
1477    }
1478
1479    #[test]
1480    fn test_from_card_with_invalid_header_name() {
1481        let mut headers = std::collections::HashMap::new();
1482        headers.insert("Invalid Header Name!".to_string(), "value".to_string());
1483
1484        let card = v1::AgentCard {
1485            name: "Test".to_string(),
1486            description: "Test agent".to_string(),
1487            supported_interfaces: vec![v1::AgentInterface {
1488                url: "https://example.com".to_string(),
1489                protocol_binding: "JSONRPC".to_string(),
1490                tenant: String::new(),
1491                protocol_version: "1.0".to_string(),
1492            }],
1493            provider: None,
1494            version: "1.0.0".to_string(),
1495            documentation_url: None,
1496            capabilities: Some(v1::AgentCapabilities::default()),
1497            security_schemes: std::collections::HashMap::new(),
1498            security_requirements: Vec::new(),
1499            default_input_modes: vec![],
1500            default_output_modes: vec![],
1501            skills: vec![],
1502            signatures: vec![],
1503            icon_url: None,
1504        };
1505
1506        let result = A2AClient::from_card_with_headers(card, headers);
1507        assert!(result.is_err());
1508        if let Err(err) = result {
1509            assert!(matches!(err, A2AError::InvalidParameter { .. }));
1510        }
1511    }
1512
1513    #[test]
1514    fn next_request_id_is_monotonic() {
1515        let client = A2AClient::from_card(v1::AgentCard {
1516            name: "Test".to_string(),
1517            description: "desc".to_string(),
1518            supported_interfaces: vec![v1::AgentInterface {
1519                url: "https://example.com".to_string(),
1520                protocol_binding: "JSONRPC".to_string(),
1521                tenant: String::new(),
1522                protocol_version: "1.0".to_string(),
1523            }],
1524            provider: None,
1525            version: "1.0.0".to_string(),
1526            documentation_url: None,
1527            capabilities: Some(v1::AgentCapabilities::default()),
1528            security_schemes: std::collections::HashMap::new(),
1529            security_requirements: Vec::new(),
1530            default_input_modes: vec![],
1531            default_output_modes: vec![],
1532            skills: vec![],
1533            signatures: vec![],
1534            icon_url: None,
1535        })
1536        .expect("valid card");
1537
1538        let first = match client.next_request_id() {
1539            JSONRPCId::Integer(value) => value,
1540            other => panic!("unexpected id variant: {other:?}"),
1541        };
1542        let second = match client.next_request_id() {
1543            JSONRPCId::Integer(value) => value,
1544            other => panic!("unexpected id variant: {other:?}"),
1545        };
1546
1547        assert_eq!(first, 1);
1548        assert_eq!(second, 2);
1549    }
1550
1551    #[test]
1552    fn parses_v1_agent_card_bytes() {
1553        let card = v1::AgentCard {
1554            name: "V1 Agent".to_string(),
1555            description: "Latest schema".to_string(),
1556            supported_interfaces: vec![
1557                v1::AgentInterface {
1558                    url: "https://example.com/rpc".to_string(),
1559                    protocol_binding: "JSONRPC".to_string(),
1560                    tenant: String::new(),
1561                    protocol_version: "1.0".to_string(),
1562                },
1563                v1::AgentInterface {
1564                    url: "https://example.com".to_string(),
1565                    protocol_binding: "HTTP+JSON".to_string(),
1566                    tenant: String::new(),
1567                    protocol_version: "1.0".to_string(),
1568                },
1569            ],
1570            provider: None,
1571            version: "1.2.3".to_string(),
1572            documentation_url: None,
1573            capabilities: Some(v1::AgentCapabilities {
1574                streaming: Some(true),
1575                push_notifications: Some(false),
1576                extensions: Vec::new(),
1577                extended_agent_card: Some(true),
1578            }),
1579            security_schemes: std::collections::HashMap::new(),
1580            security_requirements: Vec::new(),
1581            default_input_modes: vec!["text/plain".to_string()],
1582            default_output_modes: vec!["text/plain".to_string()],
1583            skills: Vec::new(),
1584            signatures: Vec::new(),
1585            icon_url: None,
1586        };
1587
1588        let json = serde_json::to_vec(&card).expect("v1 card json");
1589        let parsed = parse_agent_card_bytes(&json).expect("parsed card");
1590
1591        assert_eq!(parsed.name, "V1 Agent");
1592        assert_eq!(parsed.supported_interfaces[0].protocol_version, "1.0");
1593        assert_eq!(
1594            parsed.supported_interfaces[0].url,
1595            "https://example.com/rpc"
1596        );
1597        assert_eq!(
1598            parsed.capabilities.as_ref().and_then(|caps| caps.streaming),
1599            Some(true)
1600        );
1601        assert_eq!(
1602            parsed
1603                .capabilities
1604                .as_ref()
1605                .and_then(|caps| caps.extended_agent_card),
1606            Some(true)
1607        );
1608        assert_eq!(parsed.supported_interfaces.len(), 2);
1609        assert_eq!(parsed.supported_interfaces[1].protocol_binding, "HTTP+JSON");
1610        assert_eq!(parsed.supported_interfaces[1].url, "https://example.com");
1611    }
1612
1613    #[test]
1614    fn resolves_http_json_endpoint_from_additional_interfaces() {
1615        let client = A2AClient::from_card(v1::AgentCard {
1616            name: "Test".to_string(),
1617            description: "desc".to_string(),
1618            supported_interfaces: vec![
1619                v1::AgentInterface {
1620                    url: "https://example.com/rpc".to_string(),
1621                    protocol_binding: "JSONRPC".to_string(),
1622                    tenant: String::new(),
1623                    protocol_version: "1.0".to_string(),
1624                },
1625                v1::AgentInterface {
1626                    url: "https://example.com".to_string(),
1627                    protocol_binding: "HTTP+JSON".to_string(),
1628                    tenant: String::new(),
1629                    protocol_version: "1.0".to_string(),
1630                },
1631            ],
1632            provider: None,
1633            version: "1.0.0".to_string(),
1634            documentation_url: None,
1635            capabilities: Some(v1::AgentCapabilities::default()),
1636            security_schemes: std::collections::HashMap::new(),
1637            security_requirements: Vec::new(),
1638            default_input_modes: vec![],
1639            default_output_modes: vec![],
1640            skills: vec![],
1641            signatures: vec![],
1642            icon_url: None,
1643        })
1644        .expect("valid card");
1645
1646        assert_eq!(
1647            client.rpc_endpoint_url.as_deref(),
1648            Some("https://example.com/rpc")
1649        );
1650        assert_eq!(
1651            client.http_json_endpoint_url.as_deref(),
1652            Some("https://example.com")
1653        );
1654    }
1655
1656    #[test]
1657    fn build_http_json_url_does_not_include_tenant_in_path() {
1658        let client = A2AClient::from_card(v1::AgentCard {
1659            name: "Test".to_string(),
1660            description: "desc".to_string(),
1661            supported_interfaces: vec![v1::AgentInterface {
1662                url: "https://agent.example.com".to_string(),
1663                protocol_binding: "HTTP+JSON".to_string(),
1664                tenant: String::new(),
1665                protocol_version: "1.0".to_string(),
1666            }],
1667            provider: None,
1668            version: "1.0.0".to_string(),
1669            documentation_url: None,
1670            capabilities: Some(v1::AgentCapabilities::default()),
1671            security_schemes: std::collections::HashMap::new(),
1672            security_requirements: Vec::new(),
1673            default_input_modes: vec![],
1674            default_output_modes: vec![],
1675            skills: vec![],
1676            signatures: vec![],
1677            icon_url: None,
1678        })
1679        .expect("valid card");
1680
1681        let url = client
1682            .build_http_json_url(&["tasks", "task-1"])
1683            .expect("url");
1684        assert_eq!(url, "https://agent.example.com/tasks/task-1");
1685
1686        let url_with_action = client
1687            .build_http_json_url(&["tasks", "task-1:cancel"])
1688            .expect("url");
1689        assert_eq!(
1690            url_with_action,
1691            "https://agent.example.com/tasks/task-1:cancel"
1692        );
1693    }
1694
1695    #[test]
1696    fn timestamp_to_rfc3339_converts_correctly() {
1697        // 2024-01-15 12:00:00 UTC = 1705320000
1698        let ts = pbjson_types::Timestamp {
1699            seconds: 1_705_320_000,
1700            nanos: 0,
1701        };
1702        let result = timestamp_to_rfc3339(ts).expect("valid timestamp");
1703        assert!(result.starts_with("2024-01-15"), "got: {result}");
1704        assert!(result.contains("12:00:00"), "got: {result}");
1705    }
1706
1707    #[test]
1708    fn timestamp_to_rfc3339_rejects_invalid_timestamp() {
1709        let ts = pbjson_types::Timestamp {
1710            seconds: i64::MAX,
1711            nanos: i32::MAX,
1712        };
1713        assert!(timestamp_to_rfc3339(ts).is_err());
1714    }
1715
1716    #[test]
1717    fn fetch_extended_card_returns_none_when_not_advertised() {
1718        let client = A2AClient::from_card(v1::AgentCard {
1719            name: "Test".to_string(),
1720            description: "desc".to_string(),
1721            supported_interfaces: vec![v1::AgentInterface {
1722                url: "https://example.com/rpc".to_string(),
1723                protocol_binding: "JSONRPC".to_string(),
1724                tenant: String::new(),
1725                protocol_version: "1.0".to_string(),
1726            }],
1727            provider: None,
1728            version: "1.0.0".to_string(),
1729            documentation_url: None,
1730            // extended_agent_card not set / false
1731            capabilities: Some(v1::AgentCapabilities {
1732                streaming: Some(true),
1733                push_notifications: Some(false),
1734                extensions: Vec::new(),
1735                extended_agent_card: Some(false),
1736            }),
1737            security_schemes: std::collections::HashMap::new(),
1738            security_requirements: Vec::new(),
1739            default_input_modes: vec![],
1740            default_output_modes: vec![],
1741            skills: vec![],
1742            signatures: vec![],
1743            icon_url: None,
1744        })
1745        .expect("valid card");
1746
1747        // No network call — should return None immediately.
1748        let result = tokio::runtime::Runtime::new()
1749            .unwrap()
1750            .block_on(client.fetch_extended_agent_card_if_available());
1751        assert!(matches!(result, Ok(None)));
1752    }
1753}