1use self::sse_parser::SseParser;
7use crate::constants::{AGENT_CARD_PATH, JSONRPC_VERSION};
8use crate::error::{A2AError, A2AResult};
9use a2a_types::{self as v1, JSONRPCErrorResponse, JSONRPCId};
10use futures_core::Stream;
11use reqwest::{Client, RequestBuilder, Url};
12use serde::{Deserialize, Serialize};
13use std::pin::Pin;
14use std::sync::Arc;
15use std::sync::atomic::{AtomicU64, Ordering};
16
17#[cfg(not(target_arch = "wasm32"))]
18type BoxedResultStream<T> = Pin<Box<dyn Stream<Item = A2AResult<T>> + Send>>;
19#[cfg(target_arch = "wasm32")]
20type BoxedResultStream<T> = Pin<Box<dyn Stream<Item = A2AResult<T>>>>;
21
22type SseStream = BoxedResultStream<v1::StreamResponse>;
23
24#[derive(Clone)]
26pub struct A2AClient {
27 client: Client,
29 rpc_endpoint_url: Option<String>,
31 http_json_endpoint_url: Option<String>,
33 auth_token: Option<String>,
35 request_id_counter: Arc<AtomicU64>,
37 agent_card: Arc<v1::AgentCard>,
39}
40
41#[derive(Debug, Serialize)]
43struct JsonRpcRequest<T> {
44 jsonrpc: String,
45 id: JSONRPCId,
46 method: String,
47 params: T,
48}
49
50#[derive(Debug, Serialize, Deserialize)]
52#[serde(untagged)]
53enum JsonRpcResponse<T> {
54 Success { id: Option<JSONRPCId>, result: T },
55 Error(JSONRPCErrorResponse),
56}
57
58#[cfg(not(target_arch = "wasm32"))]
60const DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
61
62fn default_client() -> Client {
66 #[cfg(not(target_arch = "wasm32"))]
67 {
68 Client::builder()
69 .timeout(DEFAULT_TIMEOUT)
70 .build()
71 .unwrap_or_default()
72 }
73 #[cfg(target_arch = "wasm32")]
74 {
75 Client::new()
76 }
77}
78
79fn parse_agent_card_bytes(bytes: &[u8]) -> A2AResult<v1::AgentCard> {
80 serde_json::from_slice(bytes).map_err(|error| A2AError::SerializationError {
81 message: format!("Failed to parse agent card: {error}"),
82 })
83}
84
85fn normalize_endpoint_url(url: &str) -> Option<String> {
86 let trimmed = url.trim().trim_end_matches('/');
87 (!trimmed.is_empty()).then(|| trimmed.to_string())
88}
89
90fn record_endpoint(slot: &mut Option<String>, url: &str) {
91 if slot.is_none() {
92 *slot = normalize_endpoint_url(url);
93 }
94}
95
96fn resolve_endpoints(agent_card: &v1::AgentCard) -> A2AResult<(Option<String>, Option<String>)> {
97 let mut rpc_endpoint_url = None;
98 let mut http_json_endpoint_url = None;
99
100 for interface in &agent_card.supported_interfaces {
101 match interface.protocol_binding.as_str() {
102 "JSONRPC" => record_endpoint(&mut rpc_endpoint_url, &interface.url),
103 "HTTP+JSON" => record_endpoint(&mut http_json_endpoint_url, &interface.url),
104 _ => {}
105 }
106 }
107
108 if rpc_endpoint_url.is_none() && http_json_endpoint_url.is_none() {
109 return Err(A2AError::InvalidParameter {
110 message: "Agent card does not contain a supported JSON-RPC or HTTP+JSON endpoint"
111 .to_string(),
112 });
113 }
114
115 Ok((rpc_endpoint_url, http_json_endpoint_url))
116}
117
118fn timestamp_to_rfc3339(ts: pbjson_types::Timestamp) -> A2AResult<String> {
120 chrono::DateTime::from_timestamp(ts.seconds, ts.nanos.cast_unsigned())
121 .map(|dt| dt.to_rfc3339())
122 .ok_or_else(|| A2AError::InvalidParameter {
123 message: format!(
124 "Invalid timestamp: seconds={} nanos={}",
125 ts.seconds, ts.nanos
126 ),
127 })
128}
129
130fn task_state_query_value(value: i32) -> A2AResult<Option<String>> {
131 let state = v1::TaskState::try_from(value).map_err(|_| A2AError::InvalidParameter {
132 message: format!("Unknown task state enum value {value}"),
133 })?;
134
135 match state {
136 v1::TaskState::Unspecified => Ok(None),
137 other => Ok(Some(other.as_str_name().to_string())),
138 }
139}
140
141mod sse_parser {
143 use super::{A2AError, A2AResult, JsonRpcResponse};
144 use futures_core::Stream;
145 use serde::de::DeserializeOwned;
146 use std::pin::Pin;
147 use std::task::{Context, Poll};
148
149 #[cfg(not(target_arch = "wasm32"))]
151 pub trait ByteStreamTrait: Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send {}
152 #[cfg(not(target_arch = "wasm32"))]
153 impl<T: Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send> ByteStreamTrait for T {}
154
155 #[cfg(target_arch = "wasm32")]
156 pub trait ByteStreamTrait: Stream<Item = Result<bytes::Bytes, reqwest::Error>> {}
157 #[cfg(target_arch = "wasm32")]
158 impl<T: Stream<Item = Result<bytes::Bytes, reqwest::Error>>> ByteStreamTrait for T {}
159
160 #[cfg(not(target_arch = "wasm32"))]
162 type PinnedByteStream =
163 Pin<Box<dyn Stream<Item = Result<bytes::Bytes, reqwest::Error>> + Send>>;
164 #[cfg(target_arch = "wasm32")]
165 type PinnedByteStream = Pin<Box<dyn Stream<Item = Result<bytes::Bytes, reqwest::Error>>>>;
166
167 pub struct SseParser<T> {
169 inner: PinnedByteStream,
170 buffer: String,
171 event_data_buffer: String,
172 pending_results: Vec<A2AResult<T>>,
173 parser: fn(&str) -> A2AResult<T>,
174 }
175
176 impl<T> SseParser<T> {
177 pub fn new(
179 inner: impl ByteStreamTrait + 'static,
180 parser: fn(&str) -> A2AResult<T>,
181 ) -> Self {
182 Self {
183 inner: Box::pin(inner),
184 buffer: String::new(),
185 event_data_buffer: String::new(),
186 pending_results: Vec::new(),
187 parser,
188 }
189 }
190
191 fn process_chunk(&mut self, chunk: bytes::Bytes) -> Vec<A2AResult<T>> {
193 self.buffer.push_str(&String::from_utf8_lossy(&chunk));
194 let mut results = Vec::new();
195
196 while let Some(newline_pos) = self.buffer.find('\n') {
198 let line = self.buffer[..newline_pos]
199 .trim_end_matches('\r')
200 .to_string();
201 self.buffer = self.buffer[newline_pos + 1..].to_string();
202
203 if line.is_empty() {
204 if !self.event_data_buffer.is_empty() {
206 match (self.parser)(&self.event_data_buffer) {
207 Ok(result) => results.push(Ok(result)),
208 Err(e) => results.push(Err(e)),
209 }
210 self.event_data_buffer.clear();
211 }
212 } else if let Some(data) = line.strip_prefix("data:") {
213 if !self.event_data_buffer.is_empty() {
215 self.event_data_buffer.push('\n');
216 }
217 self.event_data_buffer.push_str(data.trim_start());
218 } else if line.starts_with(':') {
219 }
221 }
222 results
223 }
224 }
225
226 impl<T: Unpin> Stream for SseParser<T> {
227 type Item = A2AResult<T>;
228
229 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
230 let this = self.get_mut();
231 if let Some(result) = this.pending_results.pop() {
233 return Poll::Ready(Some(result));
234 }
235
236 match this.inner.as_mut().poll_next(cx) {
238 Poll::Ready(Some(Ok(chunk))) => {
239 let mut results = this.process_chunk(chunk);
240 if results.is_empty() {
241 cx.waker().wake_by_ref();
243 Poll::Pending
244 } else {
245 results.reverse();
247 this.pending_results = results;
248 Poll::Ready(this.pending_results.pop())
249 }
250 }
251 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(A2AError::NetworkError {
252 message: format!("Stream error: {}", e),
253 }))),
254 Poll::Ready(None) => Poll::Ready(None),
255 Poll::Pending => Poll::Pending,
256 }
257 }
258 }
259
260 pub(super) fn process_jsonrpc_sse_event<T>(json_data: &str) -> A2AResult<T>
262 where
263 T: DeserializeOwned,
264 {
265 if json_data.trim().is_empty() {
266 return Err(A2AError::SerializationError {
267 message: "Empty SSE event data".to_string(),
268 });
269 }
270
271 let json_response: JsonRpcResponse<T> =
272 serde_json::from_str(json_data).map_err(|e| A2AError::SerializationError {
273 message: format!("Failed to parse SSE event data: {}", e),
274 })?;
275
276 match json_response {
277 JsonRpcResponse::Success { result, .. } => Ok(result),
278 JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
279 message: format!("SSE event contained an error: {}", err.error.message),
280 code: Some(err.error.code),
281 }),
282 }
283 }
284
285 pub(super) fn process_direct_sse_event<T>(json_data: &str) -> A2AResult<T>
287 where
288 T: DeserializeOwned,
289 {
290 if json_data.trim().is_empty() {
291 return Err(A2AError::SerializationError {
292 message: "Empty SSE event data".to_string(),
293 });
294 }
295
296 serde_json::from_str(json_data).map_err(|e| A2AError::SerializationError {
297 message: format!("Failed to parse SSE event data: {}", e),
298 })
299 }
300
301 #[cfg(test)]
302 mod tests {
303 use super::*;
304 use a2a_types::{self as v1, JSONRPCError, JSONRPCErrorResponse, JSONRPCId};
305 use bytes::Bytes;
306 use futures_util::{StreamExt, stream};
307
308 fn sample_message(text: &str) -> v1::Message {
309 v1::Message {
310 message_id: format!("msg-{text}"),
311 context_id: "ctx-1".into(),
312 task_id: "task-1".into(),
313 role: v1::Role::Agent.into(),
314 parts: vec![v1::Part {
315 content: Some(v1::part::Content::Text(text.to_string())),
316 metadata: None,
317 filename: String::new(),
318 media_type: "text/plain".into(),
319 }],
320 metadata: None,
321 reference_task_ids: Vec::new(),
322 extensions: Vec::new(),
323 }
324 }
325
326 #[tokio::test]
327 async fn sse_parser_emits_multiple_events_in_order() {
328 let first = JsonRpcResponse::Success {
329 id: Some(JSONRPCId::Integer(1)),
330 result: v1::StreamResponse {
331 payload: Some(v1::stream_response::Payload::Message(sample_message("one"))),
332 },
333 };
334 let second = JsonRpcResponse::Success {
335 id: Some(JSONRPCId::Integer(2)),
336 result: v1::StreamResponse {
337 payload: Some(v1::stream_response::Payload::Message(sample_message("two"))),
338 },
339 };
340 let payload = format!(
341 "data: {}\n\ndata: {}\n\n",
342 serde_json::to_string(&first).expect("json"),
343 serde_json::to_string(&second).expect("json")
344 );
345 let byte_stream = stream::iter(vec![Ok::<Bytes, reqwest::Error>(Bytes::from(payload))]);
346
347 let mut parser =
348 SseParser::new(byte_stream, process_jsonrpc_sse_event::<v1::StreamResponse>);
349 let first_item: v1::StreamResponse =
350 parser.next().await.expect("first event").expect("ok");
351 let second_item: v1::StreamResponse =
352 parser.next().await.expect("second event").expect("ok");
353
354 match first_item.payload {
355 Some(v1::stream_response::Payload::Message(msg)) => {
356 assert!(
357 msg.parts.iter().any(|part| {
358 matches!(part.content, Some(v1::part::Content::Text(_)))
359 })
360 );
361 }
362 other => panic!("expected message, got {other:?}"),
363 }
364
365 match second_item.payload {
366 Some(v1::stream_response::Payload::Message(msg)) => {
367 assert!(msg.message_id.contains("two"));
368 }
369 other => panic!("expected message, got {other:?}"),
370 }
371 }
372
373 #[test]
374 fn process_sse_event_returns_error_for_remote_failure() {
375 let error = JsonRpcResponse::<v1::StreamResponse>::Error(JSONRPCErrorResponse {
376 jsonrpc: "2.0".into(),
377 error: JSONRPCError {
378 code: -1,
379 message: "boom".into(),
380 data: None,
381 },
382 id: Some(JSONRPCId::Integer(1)),
383 });
384 let json = serde_json::to_string(&error).expect("json");
385 let result = process_jsonrpc_sse_event::<v1::StreamResponse>(&json);
386 assert!(matches!(result, Err(A2AError::RemoteAgentError { .. })));
387 }
388 }
389}
390
391impl A2AClient {
392 pub async fn from_card_url(base_url: impl AsRef<str>) -> A2AResult<Self> {
416 Self::from_card_url_with_client(base_url, default_client()).await
417 }
418
419 pub async fn from_card_url_with_client(
452 base_url: impl AsRef<str>,
453 http_client: Client,
454 ) -> A2AResult<Self> {
455 let base_url = base_url.as_ref().trim_end_matches('/');
456 let card_url = format!("{}/{}", base_url, AGENT_CARD_PATH);
457
458 let response = http_client
459 .get(&card_url)
460 .header("Accept", "application/json")
461 .send()
462 .await
463 .map_err(|e| A2AError::NetworkError {
464 message: format!("Failed to fetch agent card from {}: {}", card_url, e),
465 })?;
466
467 if !response.status().is_success() {
468 return Err(A2AError::NetworkError {
469 message: format!("Failed to fetch agent card: HTTP {}", response.status()),
470 });
471 }
472
473 let bytes = response
474 .bytes()
475 .await
476 .map_err(|e| A2AError::SerializationError {
477 message: format!("Failed to read agent card response: {}", e),
478 })?;
479 let agent_card = parse_agent_card_bytes(&bytes)?;
480 let (rpc_endpoint_url, http_json_endpoint_url) = resolve_endpoints(&agent_card)?;
481
482 Ok(Self {
483 client: http_client,
484 rpc_endpoint_url,
485 http_json_endpoint_url,
486 auth_token: None,
487 request_id_counter: Arc::new(AtomicU64::new(1)),
488 agent_card: Arc::new(agent_card),
489 })
490 }
491
492 pub fn from_card(agent_card: v1::AgentCard) -> A2AResult<Self> {
513 Self::from_card_with_client(agent_card, default_client())
514 }
515
516 pub fn from_card_with_client(
551 agent_card: v1::AgentCard,
552 http_client: Client,
553 ) -> A2AResult<Self> {
554 let (rpc_endpoint_url, http_json_endpoint_url) = resolve_endpoints(&agent_card)?;
555
556 Ok(Self {
557 client: http_client,
558 rpc_endpoint_url,
559 http_json_endpoint_url,
560 auth_token: None,
561 request_id_counter: Arc::new(AtomicU64::new(1)),
562 agent_card: Arc::new(agent_card),
563 })
564 }
565
566 pub fn from_card_with_headers(
592 agent_card: v1::AgentCard,
593 headers: std::collections::HashMap<String, String>,
594 ) -> A2AResult<Self> {
595 use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
596 use std::str::FromStr;
597
598 let mut header_map = HeaderMap::new();
599 for (key, value) in headers {
600 let header_name =
601 HeaderName::from_str(&key).map_err(|e| A2AError::InvalidParameter {
602 message: format!("Invalid header name '{}': {}", key, e),
603 })?;
604 let header_value =
605 HeaderValue::from_str(&value).map_err(|e| A2AError::InvalidParameter {
606 message: format!("Invalid header value for '{}': {}", key, e),
607 })?;
608 header_map.insert(header_name, header_value);
609 }
610
611 let http_client = Client::builder()
612 .default_headers(header_map)
613 .build()
614 .map_err(|e| A2AError::NetworkError {
615 message: format!("Failed to build HTTP client with headers: {}", e),
616 })?;
617
618 Self::from_card_with_client(agent_card, http_client)
619 }
620
621 pub fn with_auth_token(mut self, token: impl Into<String>) -> Self {
623 self.auth_token = Some(token.into());
624 self
625 }
626
627 pub fn agent_card(&self) -> &v1::AgentCard {
629 &self.agent_card
630 }
631
632 pub async fn fetch_agent_card(&self, base_url: impl AsRef<str>) -> A2AResult<v1::AgentCard> {
638 let base_url = base_url.as_ref().trim_end_matches('/');
639 let card_url = format!("{}/{}", base_url, AGENT_CARD_PATH);
640
641 let mut req = self
642 .client
643 .get(&card_url)
644 .header("Accept", "application/json");
645
646 if let Some(token) = &self.auth_token {
647 req = req.bearer_auth(token);
648 }
649
650 let response = req.send().await.map_err(|e| A2AError::NetworkError {
651 message: format!("Failed to fetch agent card from {}: {}", card_url, e),
652 })?;
653
654 if !response.status().is_success() {
655 return Err(A2AError::NetworkError {
656 message: format!("Failed to fetch agent card: HTTP {}", response.status()),
657 });
658 }
659
660 let bytes = response
661 .bytes()
662 .await
663 .map_err(|e| A2AError::SerializationError {
664 message: format!("Failed to read agent card response: {}", e),
665 })?;
666
667 parse_agent_card_bytes(&bytes)
668 }
669
670 pub async fn fetch_extended_agent_card_if_available(&self) -> A2AResult<Option<v1::AgentCard>> {
680 let advertises_extended = self
681 .agent_card
682 .capabilities
683 .as_ref()
684 .and_then(|c| c.extended_agent_card)
685 .unwrap_or(false);
686
687 if !advertises_extended {
688 return Ok(None);
689 }
690
691 let card = self
692 .get_extended_agent_card(v1::GetExtendedAgentCardRequest {
693 tenant: String::new(),
694 })
695 .await?;
696
697 Ok(Some(card))
698 }
699
700 fn next_request_id(&self) -> JSONRPCId {
702 let id = self.request_id_counter.fetch_add(1, Ordering::SeqCst);
703 JSONRPCId::Integer(id as i64)
704 }
705
706 fn rpc_endpoint(&self) -> A2AResult<&str> {
707 self.rpc_endpoint_url
708 .as_deref()
709 .ok_or_else(|| A2AError::InvalidParameter {
710 message: "Agent does not advertise a JSON-RPC endpoint".to_string(),
711 })
712 }
713
714 fn http_json_base_url(&self) -> Option<&str> {
715 self.http_json_endpoint_url.as_deref()
716 }
717
718 fn build_http_json_url(&self, segments: &[&str]) -> A2AResult<String> {
719 let base = self
720 .http_json_base_url()
721 .ok_or_else(|| A2AError::InvalidParameter {
722 message: "Agent does not advertise an HTTP+JSON endpoint".to_string(),
723 })?;
724 let mut url = Url::parse(base).map_err(|error| A2AError::InvalidParameter {
725 message: format!("Invalid HTTP+JSON base URL `{base}`: {error}"),
726 })?;
727 {
728 let mut path_segments =
729 url.path_segments_mut()
730 .map_err(|()| A2AError::InvalidParameter {
731 message: format!("HTTP+JSON base URL `{base}` cannot accept path segments"),
732 })?;
733 for segment in segments {
734 path_segments.push(segment);
735 }
736 }
737 Ok(url.to_string())
738 }
739
740 fn prepare_request_with_tenant(&self, request: RequestBuilder, tenant: &str) -> RequestBuilder {
742 let mut req = self.prepare_request(request);
743 if !tenant.is_empty() {
744 req = req.header("X-A2A-Tenant", tenant);
745 }
746 req
747 }
748
749 fn prepare_request(&self, mut request: RequestBuilder) -> RequestBuilder {
750 for (key, value) in Self::inject_trace_context() {
751 request = request.header(key, value);
752 }
753
754 if let Some(token) = &self.auth_token {
755 request = request.bearer_auth(token);
756 }
757
758 request
759 }
760
761 async fn send_json_request<TResponse>(
762 &self,
763 request: RequestBuilder,
764 context: &str,
765 tenant: &str,
766 ) -> A2AResult<TResponse>
767 where
768 TResponse: for<'de> Deserialize<'de>,
769 {
770 let response = self
771 .prepare_request_with_tenant(request, tenant)
772 .send()
773 .await
774 .map_err(|e| A2AError::NetworkError {
775 message: format!("Failed to send {context} request: {e}"),
776 })?;
777
778 if !response.status().is_success() {
779 let status = response.status();
780 let error_text = response.text().await.unwrap_or_default();
781 if let Ok(error_json) = serde_json::from_str::<JSONRPCErrorResponse>(&error_text) {
782 return Err(A2AError::RemoteAgentError {
783 message: error_json.error.message,
784 code: Some(error_json.error.code),
785 });
786 }
787 return Err(A2AError::NetworkError {
788 message: format!("HTTP error {status}: {error_text}"),
789 });
790 }
791
792 let content_type = response
793 .headers()
794 .get("Content-Type")
795 .and_then(|v| v.to_str().ok())
796 .unwrap_or("")
797 .to_owned();
798
799 if !content_type.starts_with("application/json") {
800 let body = response.text().await.unwrap_or_default();
801 return Err(A2AError::SerializationError {
802 message: format!(
803 "Expected Content-Type application/json for {context}, got '{content_type}': {body}"
804 ),
805 });
806 }
807
808 response
809 .json()
810 .await
811 .map_err(|e| A2AError::SerializationError {
812 message: format!("Failed to parse {context} response: {e}"),
813 })
814 }
815
816 async fn start_sse_request(
817 &self,
818 request: RequestBuilder,
819 context: &str,
820 tenant: &str,
821 ) -> A2AResult<reqwest::Response> {
822 let response = self
823 .prepare_request_with_tenant(request, tenant)
824 .send()
825 .await
826 .map_err(|e| A2AError::NetworkError {
827 message: format!("Failed to send {context} request: {e}"),
828 })?;
829
830 if !response.status().is_success() {
831 let status = response.status();
832 let error_text = response.text().await.unwrap_or_default();
833 return Err(A2AError::NetworkError {
834 message: format!("HTTP error {status}: {error_text}"),
835 });
836 }
837
838 let content_type = response
839 .headers()
840 .get("Content-Type")
841 .and_then(|v| v.to_str().ok())
842 .unwrap_or("");
843
844 if !content_type.starts_with("text/event-stream") {
845 return Err(A2AError::NetworkError {
846 message: format!(
847 "Invalid response Content-Type for SSE stream. Expected 'text/event-stream', got '{content_type}'"
848 ),
849 });
850 }
851
852 Ok(response)
853 }
854
855 fn inject_trace_context() -> std::collections::HashMap<String, String> {
861 use opentelemetry::global;
862 use tracing_opentelemetry::OpenTelemetrySpanExt;
863
864 let mut carrier = std::collections::HashMap::new();
865
866 let context = tracing::Span::current().context();
868
869 global::get_text_map_propagator(|propagator| {
872 propagator.inject_context(&context, &mut carrier);
873 });
874
875 carrier
876 }
877
878 async fn post_rpc_request<TParams, TResponse>(
880 &self,
881 method: &str,
882 params: TParams,
883 ) -> A2AResult<JsonRpcResponse<TResponse>>
884 where
885 TParams: Serialize,
886 TResponse: for<'de> Deserialize<'de>,
887 {
888 let request_id = self.next_request_id();
889 let rpc_request = JsonRpcRequest {
890 jsonrpc: JSONRPC_VERSION.to_string(),
891 method: method.to_string(),
892 params,
893 id: request_id.clone(),
894 };
895
896 let req = self
897 .client
898 .post(self.rpc_endpoint()?)
899 .header("Content-Type", "application/json")
900 .header("Accept", "application/json")
901 .json(&rpc_request);
902
903 let response =
904 self.prepare_request(req)
905 .send()
906 .await
907 .map_err(|e| A2AError::NetworkError {
908 message: format!("Failed to send {method} request: {e}"),
909 })?;
910
911 if !response.status().is_success() {
912 let status = response.status();
913 let error_text = response.text().await.unwrap_or_default();
914 if let Ok(error_json) = serde_json::from_str::<JSONRPCErrorResponse>(&error_text) {
915 return Ok(JsonRpcResponse::Error(error_json));
916 }
917 return Err(A2AError::NetworkError {
918 message: format!("HTTP error {}: {}", status, error_text),
919 });
920 }
921
922 let content_type = response
923 .headers()
924 .get("Content-Type")
925 .and_then(|v| v.to_str().ok())
926 .unwrap_or("")
927 .to_owned();
928
929 if !content_type.starts_with("application/json") {
930 let body = response.text().await.unwrap_or_default();
931 return Err(A2AError::SerializationError {
932 message: format!(
933 "Expected Content-Type application/json for {method}, got '{content_type}': {body}"
934 ),
935 });
936 }
937
938 let json_response: JsonRpcResponse<TResponse> =
939 response
940 .json()
941 .await
942 .map_err(|e| A2AError::SerializationError {
943 message: format!("Failed to parse {} response: {}", method, e),
944 })?;
945
946 if let JsonRpcResponse::Success {
948 id: Some(resp_id),
949 ..
950 } = &json_response
951 && resp_id != &request_id
952 {
953 return Err(A2AError::InvalidParameter {
954 message: format!(
955 "JSON-RPC response ID mismatch for method '{method}': expected {request_id:?}, got {resp_id:?}"
956 ),
957 });
958 }
959
960 Ok(json_response)
961 }
962
963 fn unwrap_rpc_response<T>(&self, response: JsonRpcResponse<T>) -> A2AResult<T> {
964 match response {
965 JsonRpcResponse::Success { result, .. } => Ok(result),
966 JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
967 message: format!("Remote agent error: {}", err.error.message),
968 code: Some(err.error.code),
969 }),
970 }
971 }
972
973 fn ensure_streaming_enabled(&self, action: &str) -> A2AResult<()> {
974 if self
975 .agent_card
976 .capabilities
977 .as_ref()
978 .and_then(|capabilities| capabilities.streaming)
979 .unwrap_or(false)
980 {
981 Ok(())
982 } else {
983 Err(A2AError::InvalidParameter {
984 message: format!("Agent does not support streaming (required for {action})"),
985 })
986 }
987 }
988
989 fn ensure_push_notifications_enabled(&self) -> A2AResult<()> {
990 if self
991 .agent_card
992 .capabilities
993 .as_ref()
994 .and_then(|capabilities| capabilities.push_notifications)
995 .unwrap_or(false)
996 {
997 Ok(())
998 } else {
999 Err(A2AError::InvalidParameter {
1000 message: "Agent does not support push notifications (capabilities.pushNotifications is not true)"
1001 .to_string(),
1002 })
1003 }
1004 }
1005
1006 pub async fn send_message(
1008 &self,
1009 request: v1::SendMessageRequest,
1010 ) -> A2AResult<v1::SendMessageResponse> {
1011 if self.http_json_base_url().is_some() {
1012 let tenant = request.tenant.clone();
1013 let url = self.build_http_json_url(&["message:send"])?;
1014 return self
1015 .send_json_request(
1016 self.client
1017 .post(url)
1018 .header("Content-Type", "application/json")
1019 .header("Accept", "application/json")
1020 .json(&request),
1021 "SendMessage",
1022 &tenant,
1023 )
1024 .await;
1025 }
1026
1027 self.unwrap_rpc_response(self.post_rpc_request("SendMessage", request).await?)
1028 }
1029
1030 pub async fn send_streaming_message(
1032 &self,
1033 request: v1::SendMessageRequest,
1034 ) -> A2AResult<SseStream> {
1035 self.ensure_streaming_enabled("SendStreamingMessage")?;
1036
1037 if self.http_json_base_url().is_some() {
1038 let tenant = request.tenant.clone();
1039 let url = self.build_http_json_url(&["message:stream"])?;
1040 let response = self
1041 .start_sse_request(
1042 self.client
1043 .post(url)
1044 .header("Content-Type", "application/json")
1045 .header("Accept", "text/event-stream")
1046 .json(&request),
1047 "SendStreamingMessage",
1048 &tenant,
1049 )
1050 .await?;
1051 return Ok(Box::pin(SseParser::new(
1052 response.bytes_stream(),
1053 sse_parser::process_direct_sse_event::<v1::StreamResponse>,
1054 )));
1055 }
1056
1057 let rpc_request = JsonRpcRequest {
1058 jsonrpc: JSONRPC_VERSION.to_string(),
1059 method: "SendStreamingMessage".to_string(),
1060 params: request,
1061 id: self.next_request_id(),
1062 };
1063 let response = self
1064 .start_sse_request(
1065 self.client
1066 .post(self.rpc_endpoint()?)
1067 .header("Content-Type", "application/json")
1068 .header("Accept", "text/event-stream")
1069 .json(&rpc_request),
1070 "SendStreamingMessage",
1071 "",
1072 )
1073 .await?;
1074
1075 Ok(Box::pin(SseParser::new(
1076 response.bytes_stream(),
1077 sse_parser::process_jsonrpc_sse_event::<v1::StreamResponse>,
1078 )))
1079 }
1080
1081 pub async fn get_task(&self, request: v1::GetTaskRequest) -> A2AResult<v1::Task> {
1083 if self.http_json_base_url().is_some() {
1084 #[derive(Serialize)]
1085 struct GetTaskQuery {
1086 #[serde(skip_serializing_if = "Option::is_none", rename = "historyLength")]
1087 history_length: Option<i32>,
1088 }
1089
1090 let tenant = request.tenant.clone();
1091 let url = self.build_http_json_url(&["tasks", &request.id])?;
1092 return self
1093 .send_json_request(
1094 self.client
1095 .get(url)
1096 .header("Accept", "application/json")
1097 .query(&GetTaskQuery {
1098 history_length: request.history_length,
1099 }),
1100 "GetTask",
1101 &tenant,
1102 )
1103 .await;
1104 }
1105
1106 self.unwrap_rpc_response(self.post_rpc_request("GetTask", request).await?)
1107 }
1108
1109 pub async fn list_tasks(
1111 &self,
1112 request: v1::ListTasksRequest,
1113 ) -> A2AResult<v1::ListTasksResponse> {
1114 if self.http_json_base_url().is_some() {
1115 #[derive(Serialize)]
1116 struct ListTasksQuery {
1117 #[serde(skip_serializing_if = "String::is_empty", rename = "contextId")]
1118 context_id: String,
1119 #[serde(skip_serializing_if = "Option::is_none")]
1120 status: Option<String>,
1121 #[serde(skip_serializing_if = "Option::is_none", rename = "pageSize")]
1122 page_size: Option<i32>,
1123 #[serde(skip_serializing_if = "String::is_empty", rename = "pageToken")]
1124 page_token: String,
1125 #[serde(skip_serializing_if = "Option::is_none", rename = "historyLength")]
1126 history_length: Option<i32>,
1127 #[serde(
1128 skip_serializing_if = "Option::is_none",
1129 rename = "statusTimestampAfter"
1130 )]
1131 status_timestamp_after: Option<String>,
1132 #[serde(skip_serializing_if = "Option::is_none", rename = "includeArtifacts")]
1133 include_artifacts: Option<bool>,
1134 }
1135
1136 let tenant = request.tenant.clone();
1137 let url = self.build_http_json_url(&["tasks"])?;
1138 let status = task_state_query_value(request.status)?;
1139 let status_timestamp_after = request
1140 .status_timestamp_after
1141 .map(timestamp_to_rfc3339)
1142 .transpose()?;
1143 return self
1144 .send_json_request(
1145 self.client
1146 .get(url)
1147 .header("Accept", "application/json")
1148 .query(&ListTasksQuery {
1149 context_id: request.context_id,
1150 status,
1151 page_size: request.page_size,
1152 page_token: request.page_token,
1153 history_length: request.history_length,
1154 status_timestamp_after,
1155 include_artifacts: request.include_artifacts,
1156 }),
1157 "ListTasks",
1158 &tenant,
1159 )
1160 .await;
1161 }
1162
1163 self.unwrap_rpc_response(self.post_rpc_request("ListTasks", request).await?)
1164 }
1165
1166 pub async fn cancel_task(&self, request: v1::CancelTaskRequest) -> A2AResult<v1::Task> {
1168 if self.http_json_base_url().is_some() {
1169 let cancel_segment = format!("{}:cancel", request.id);
1170 let tenant = request.tenant.clone();
1171 let url = self.build_http_json_url(&["tasks", &cancel_segment])?;
1172 return self
1173 .send_json_request(
1174 self.client
1175 .post(url)
1176 .header("Content-Type", "application/json")
1177 .header("Accept", "application/json")
1178 .json(&request),
1179 "CancelTask",
1180 &tenant,
1181 )
1182 .await;
1183 }
1184
1185 self.unwrap_rpc_response(self.post_rpc_request("CancelTask", request).await?)
1186 }
1187
1188 pub async fn subscribe_to_task(
1190 &self,
1191 request: v1::SubscribeToTaskRequest,
1192 ) -> A2AResult<SseStream> {
1193 self.ensure_streaming_enabled("SubscribeToTask")?;
1194
1195 if self.http_json_base_url().is_some() {
1196 let subscribe_segment = format!("{}:subscribe", request.id);
1197 let tenant = request.tenant.clone();
1198 let url = self.build_http_json_url(&["tasks", &subscribe_segment])?;
1199 let response = self
1200 .start_sse_request(
1201 self.client.get(url).header("Accept", "text/event-stream"),
1202 "SubscribeToTask",
1203 &tenant,
1204 )
1205 .await?;
1206 return Ok(Box::pin(SseParser::new(
1207 response.bytes_stream(),
1208 sse_parser::process_direct_sse_event::<v1::StreamResponse>,
1209 )));
1210 }
1211
1212 let rpc_request = JsonRpcRequest {
1213 jsonrpc: JSONRPC_VERSION.to_string(),
1214 method: "SubscribeToTask".to_string(),
1215 params: request,
1216 id: self.next_request_id(),
1217 };
1218 let response = self
1219 .start_sse_request(
1220 self.client
1221 .post(self.rpc_endpoint()?)
1222 .header("Content-Type", "application/json")
1223 .header("Accept", "text/event-stream")
1224 .json(&rpc_request),
1225 "SubscribeToTask",
1226 "",
1227 )
1228 .await?;
1229
1230 Ok(Box::pin(SseParser::new(
1231 response.bytes_stream(),
1232 sse_parser::process_jsonrpc_sse_event::<v1::StreamResponse>,
1233 )))
1234 }
1235
1236 pub async fn get_extended_agent_card(
1238 &self,
1239 request: v1::GetExtendedAgentCardRequest,
1240 ) -> A2AResult<v1::AgentCard> {
1241 if self.http_json_base_url().is_some() {
1242 let tenant = request.tenant.clone();
1243 let url = self.build_http_json_url(&["extendedAgentCard"])?;
1244 return self
1245 .send_json_request(
1246 self.client.get(url).header("Accept", "application/json"),
1247 "GetExtendedAgentCard",
1248 &tenant,
1249 )
1250 .await;
1251 }
1252
1253 self.unwrap_rpc_response(
1254 self.post_rpc_request("GetExtendedAgentCard", request)
1255 .await?,
1256 )
1257 }
1258
1259 pub async fn create_task_push_notification_config(
1261 &self,
1262 request: v1::TaskPushNotificationConfig,
1263 ) -> A2AResult<v1::TaskPushNotificationConfig> {
1264 self.ensure_push_notifications_enabled()?;
1265
1266 if self.http_json_base_url().is_some() {
1267 let tenant = request.tenant.clone();
1268 let url =
1269 self.build_http_json_url(&["tasks", &request.task_id, "pushNotificationConfigs"])?;
1270 return self
1271 .send_json_request(
1272 self.client
1273 .post(url)
1274 .header("Content-Type", "application/json")
1275 .header("Accept", "application/json")
1276 .json(&request),
1277 "CreateTaskPushNotificationConfig",
1278 &tenant,
1279 )
1280 .await;
1281 }
1282
1283 self.unwrap_rpc_response(
1284 self.post_rpc_request("CreateTaskPushNotificationConfig", request)
1285 .await?,
1286 )
1287 }
1288
1289 pub async fn get_task_push_notification_config(
1291 &self,
1292 request: v1::GetTaskPushNotificationConfigRequest,
1293 ) -> A2AResult<v1::TaskPushNotificationConfig> {
1294 if self.http_json_base_url().is_some() {
1295 let tenant = request.tenant.clone();
1296 let url = self.build_http_json_url(&[
1297 "tasks",
1298 &request.task_id,
1299 "pushNotificationConfigs",
1300 &request.id,
1301 ])?;
1302 return self
1303 .send_json_request(
1304 self.client.get(url).header("Accept", "application/json"),
1305 "GetTaskPushNotificationConfig",
1306 &tenant,
1307 )
1308 .await;
1309 }
1310
1311 self.unwrap_rpc_response(
1312 self.post_rpc_request("GetTaskPushNotificationConfig", request)
1313 .await?,
1314 )
1315 }
1316
1317 pub async fn list_task_push_notification_configs(
1319 &self,
1320 request: v1::ListTaskPushNotificationConfigsRequest,
1321 ) -> A2AResult<v1::ListTaskPushNotificationConfigsResponse> {
1322 if self.http_json_base_url().is_some() {
1323 #[derive(Serialize)]
1324 struct ListConfigsQuery {
1325 #[serde(rename = "pageSize")]
1326 page_size: i32,
1327 #[serde(skip_serializing_if = "String::is_empty", rename = "pageToken")]
1328 page_token: String,
1329 }
1330
1331 let tenant = request.tenant.clone();
1332 let url =
1333 self.build_http_json_url(&["tasks", &request.task_id, "pushNotificationConfigs"])?;
1334 return self
1335 .send_json_request(
1336 self.client
1337 .get(url)
1338 .header("Accept", "application/json")
1339 .query(&ListConfigsQuery {
1340 page_size: request.page_size,
1341 page_token: request.page_token,
1342 }),
1343 "ListTaskPushNotificationConfigs",
1344 &tenant,
1345 )
1346 .await;
1347 }
1348
1349 self.unwrap_rpc_response(
1350 self.post_rpc_request("ListTaskPushNotificationConfigs", request)
1351 .await?,
1352 )
1353 }
1354
1355 pub async fn delete_task_push_notification_config(
1357 &self,
1358 request: v1::DeleteTaskPushNotificationConfigRequest,
1359 ) -> A2AResult<()> {
1360 if self.http_json_base_url().is_some() {
1361 let tenant = request.tenant.clone();
1362 let url = self.build_http_json_url(&[
1363 "tasks",
1364 &request.task_id,
1365 "pushNotificationConfigs",
1366 &request.id,
1367 ])?;
1368 let _: serde_json::Value = self
1370 .send_json_request(
1371 self.client.delete(url).header("Accept", "application/json"),
1372 "DeleteTaskPushNotificationConfig",
1373 &tenant,
1374 )
1375 .await?;
1376 return Ok(());
1377 }
1378
1379 let _: serde_json::Value = self.unwrap_rpc_response(
1382 self.post_rpc_request("DeleteTaskPushNotificationConfig", request)
1383 .await?,
1384 )?;
1385 Ok(())
1386 }
1387
1388 pub async fn call_extension_method<TParams, TResponse>(
1396 &self,
1397 method: &str,
1398 params: TParams,
1399 ) -> A2AResult<TResponse>
1400 where
1401 TParams: Serialize,
1402 TResponse: for<'de> Deserialize<'de>,
1403 {
1404 match self.post_rpc_request(method, params).await? {
1405 JsonRpcResponse::Success { result, .. } => Ok(result),
1406 JsonRpcResponse::Error(err) => Err(A2AError::RemoteAgentError {
1407 message: format!("Remote agent error: {}", err.error.message),
1408 code: Some(err.error.code),
1409 }),
1410 }
1411 }
1412}
1413
1414#[cfg(test)]
1415mod tests {
1416 use super::*;
1417
1418 #[test]
1419 fn test_client_requires_valid_card_url() {
1420 let card_without_url = v1::AgentCard {
1421 name: "Test".to_string(),
1422 description: "Test".to_string(),
1423 supported_interfaces: vec![],
1424 provider: None,
1425 version: "1.0.0".to_string(),
1426 documentation_url: None,
1427 capabilities: Some(v1::AgentCapabilities::default()),
1428 security_schemes: std::collections::HashMap::new(),
1429 security_requirements: Vec::new(),
1430 default_input_modes: vec![],
1431 default_output_modes: vec![],
1432 skills: vec![],
1433 signatures: vec![],
1434 icon_url: None,
1435 };
1436
1437 assert!(A2AClient::from_card(card_without_url).is_err());
1438 }
1439
1440 #[test]
1441 fn test_from_card_with_headers() {
1442 let mut headers = std::collections::HashMap::new();
1443 headers.insert("Authorization".to_string(), "Bearer token123".to_string());
1444 headers.insert("X-API-Key".to_string(), "my-api-key".to_string());
1445
1446 let card = v1::AgentCard {
1447 name: "Test".to_string(),
1448 description: "Test agent".to_string(),
1449 supported_interfaces: vec![v1::AgentInterface {
1450 url: "https://example.com".to_string(),
1451 protocol_binding: "JSONRPC".to_string(),
1452 tenant: String::new(),
1453 protocol_version: "1.0".to_string(),
1454 }],
1455 provider: None,
1456 version: "1.0.0".to_string(),
1457 documentation_url: None,
1458 capabilities: Some(v1::AgentCapabilities::default()),
1459 security_schemes: std::collections::HashMap::new(),
1460 security_requirements: Vec::new(),
1461 default_input_modes: vec![],
1462 default_output_modes: vec![],
1463 skills: vec![],
1464 signatures: vec![],
1465 icon_url: None,
1466 };
1467
1468 let result = A2AClient::from_card_with_headers(card, headers);
1469 assert!(result.is_ok());
1470
1471 let client = result.unwrap();
1472 assert_eq!(
1473 client.rpc_endpoint_url.as_deref(),
1474 Some("https://example.com")
1475 );
1476 assert_eq!(client.http_json_endpoint_url, None);
1477 }
1478
1479 #[test]
1480 fn test_from_card_with_invalid_header_name() {
1481 let mut headers = std::collections::HashMap::new();
1482 headers.insert("Invalid Header Name!".to_string(), "value".to_string());
1483
1484 let card = v1::AgentCard {
1485 name: "Test".to_string(),
1486 description: "Test agent".to_string(),
1487 supported_interfaces: vec![v1::AgentInterface {
1488 url: "https://example.com".to_string(),
1489 protocol_binding: "JSONRPC".to_string(),
1490 tenant: String::new(),
1491 protocol_version: "1.0".to_string(),
1492 }],
1493 provider: None,
1494 version: "1.0.0".to_string(),
1495 documentation_url: None,
1496 capabilities: Some(v1::AgentCapabilities::default()),
1497 security_schemes: std::collections::HashMap::new(),
1498 security_requirements: Vec::new(),
1499 default_input_modes: vec![],
1500 default_output_modes: vec![],
1501 skills: vec![],
1502 signatures: vec![],
1503 icon_url: None,
1504 };
1505
1506 let result = A2AClient::from_card_with_headers(card, headers);
1507 assert!(result.is_err());
1508 if let Err(err) = result {
1509 assert!(matches!(err, A2AError::InvalidParameter { .. }));
1510 }
1511 }
1512
1513 #[test]
1514 fn next_request_id_is_monotonic() {
1515 let client = A2AClient::from_card(v1::AgentCard {
1516 name: "Test".to_string(),
1517 description: "desc".to_string(),
1518 supported_interfaces: vec![v1::AgentInterface {
1519 url: "https://example.com".to_string(),
1520 protocol_binding: "JSONRPC".to_string(),
1521 tenant: String::new(),
1522 protocol_version: "1.0".to_string(),
1523 }],
1524 provider: None,
1525 version: "1.0.0".to_string(),
1526 documentation_url: None,
1527 capabilities: Some(v1::AgentCapabilities::default()),
1528 security_schemes: std::collections::HashMap::new(),
1529 security_requirements: Vec::new(),
1530 default_input_modes: vec![],
1531 default_output_modes: vec![],
1532 skills: vec![],
1533 signatures: vec![],
1534 icon_url: None,
1535 })
1536 .expect("valid card");
1537
1538 let first = match client.next_request_id() {
1539 JSONRPCId::Integer(value) => value,
1540 other => panic!("unexpected id variant: {other:?}"),
1541 };
1542 let second = match client.next_request_id() {
1543 JSONRPCId::Integer(value) => value,
1544 other => panic!("unexpected id variant: {other:?}"),
1545 };
1546
1547 assert_eq!(first, 1);
1548 assert_eq!(second, 2);
1549 }
1550
1551 #[test]
1552 fn parses_v1_agent_card_bytes() {
1553 let card = v1::AgentCard {
1554 name: "V1 Agent".to_string(),
1555 description: "Latest schema".to_string(),
1556 supported_interfaces: vec![
1557 v1::AgentInterface {
1558 url: "https://example.com/rpc".to_string(),
1559 protocol_binding: "JSONRPC".to_string(),
1560 tenant: String::new(),
1561 protocol_version: "1.0".to_string(),
1562 },
1563 v1::AgentInterface {
1564 url: "https://example.com".to_string(),
1565 protocol_binding: "HTTP+JSON".to_string(),
1566 tenant: String::new(),
1567 protocol_version: "1.0".to_string(),
1568 },
1569 ],
1570 provider: None,
1571 version: "1.2.3".to_string(),
1572 documentation_url: None,
1573 capabilities: Some(v1::AgentCapabilities {
1574 streaming: Some(true),
1575 push_notifications: Some(false),
1576 extensions: Vec::new(),
1577 extended_agent_card: Some(true),
1578 }),
1579 security_schemes: std::collections::HashMap::new(),
1580 security_requirements: Vec::new(),
1581 default_input_modes: vec!["text/plain".to_string()],
1582 default_output_modes: vec!["text/plain".to_string()],
1583 skills: Vec::new(),
1584 signatures: Vec::new(),
1585 icon_url: None,
1586 };
1587
1588 let json = serde_json::to_vec(&card).expect("v1 card json");
1589 let parsed = parse_agent_card_bytes(&json).expect("parsed card");
1590
1591 assert_eq!(parsed.name, "V1 Agent");
1592 assert_eq!(parsed.supported_interfaces[0].protocol_version, "1.0");
1593 assert_eq!(
1594 parsed.supported_interfaces[0].url,
1595 "https://example.com/rpc"
1596 );
1597 assert_eq!(
1598 parsed.capabilities.as_ref().and_then(|caps| caps.streaming),
1599 Some(true)
1600 );
1601 assert_eq!(
1602 parsed
1603 .capabilities
1604 .as_ref()
1605 .and_then(|caps| caps.extended_agent_card),
1606 Some(true)
1607 );
1608 assert_eq!(parsed.supported_interfaces.len(), 2);
1609 assert_eq!(parsed.supported_interfaces[1].protocol_binding, "HTTP+JSON");
1610 assert_eq!(parsed.supported_interfaces[1].url, "https://example.com");
1611 }
1612
1613 #[test]
1614 fn resolves_http_json_endpoint_from_additional_interfaces() {
1615 let client = A2AClient::from_card(v1::AgentCard {
1616 name: "Test".to_string(),
1617 description: "desc".to_string(),
1618 supported_interfaces: vec![
1619 v1::AgentInterface {
1620 url: "https://example.com/rpc".to_string(),
1621 protocol_binding: "JSONRPC".to_string(),
1622 tenant: String::new(),
1623 protocol_version: "1.0".to_string(),
1624 },
1625 v1::AgentInterface {
1626 url: "https://example.com".to_string(),
1627 protocol_binding: "HTTP+JSON".to_string(),
1628 tenant: String::new(),
1629 protocol_version: "1.0".to_string(),
1630 },
1631 ],
1632 provider: None,
1633 version: "1.0.0".to_string(),
1634 documentation_url: None,
1635 capabilities: Some(v1::AgentCapabilities::default()),
1636 security_schemes: std::collections::HashMap::new(),
1637 security_requirements: Vec::new(),
1638 default_input_modes: vec![],
1639 default_output_modes: vec![],
1640 skills: vec![],
1641 signatures: vec![],
1642 icon_url: None,
1643 })
1644 .expect("valid card");
1645
1646 assert_eq!(
1647 client.rpc_endpoint_url.as_deref(),
1648 Some("https://example.com/rpc")
1649 );
1650 assert_eq!(
1651 client.http_json_endpoint_url.as_deref(),
1652 Some("https://example.com")
1653 );
1654 }
1655
1656 #[test]
1657 fn build_http_json_url_does_not_include_tenant_in_path() {
1658 let client = A2AClient::from_card(v1::AgentCard {
1659 name: "Test".to_string(),
1660 description: "desc".to_string(),
1661 supported_interfaces: vec![v1::AgentInterface {
1662 url: "https://agent.example.com".to_string(),
1663 protocol_binding: "HTTP+JSON".to_string(),
1664 tenant: String::new(),
1665 protocol_version: "1.0".to_string(),
1666 }],
1667 provider: None,
1668 version: "1.0.0".to_string(),
1669 documentation_url: None,
1670 capabilities: Some(v1::AgentCapabilities::default()),
1671 security_schemes: std::collections::HashMap::new(),
1672 security_requirements: Vec::new(),
1673 default_input_modes: vec![],
1674 default_output_modes: vec![],
1675 skills: vec![],
1676 signatures: vec![],
1677 icon_url: None,
1678 })
1679 .expect("valid card");
1680
1681 let url = client
1682 .build_http_json_url(&["tasks", "task-1"])
1683 .expect("url");
1684 assert_eq!(url, "https://agent.example.com/tasks/task-1");
1685
1686 let url_with_action = client
1687 .build_http_json_url(&["tasks", "task-1:cancel"])
1688 .expect("url");
1689 assert_eq!(
1690 url_with_action,
1691 "https://agent.example.com/tasks/task-1:cancel"
1692 );
1693 }
1694
1695 #[test]
1696 fn timestamp_to_rfc3339_converts_correctly() {
1697 let ts = pbjson_types::Timestamp {
1699 seconds: 1_705_320_000,
1700 nanos: 0,
1701 };
1702 let result = timestamp_to_rfc3339(ts).expect("valid timestamp");
1703 assert!(result.starts_with("2024-01-15"), "got: {result}");
1704 assert!(result.contains("12:00:00"), "got: {result}");
1705 }
1706
1707 #[test]
1708 fn timestamp_to_rfc3339_rejects_invalid_timestamp() {
1709 let ts = pbjson_types::Timestamp {
1710 seconds: i64::MAX,
1711 nanos: i32::MAX,
1712 };
1713 assert!(timestamp_to_rfc3339(ts).is_err());
1714 }
1715
1716 #[test]
1717 fn fetch_extended_card_returns_none_when_not_advertised() {
1718 let client = A2AClient::from_card(v1::AgentCard {
1719 name: "Test".to_string(),
1720 description: "desc".to_string(),
1721 supported_interfaces: vec![v1::AgentInterface {
1722 url: "https://example.com/rpc".to_string(),
1723 protocol_binding: "JSONRPC".to_string(),
1724 tenant: String::new(),
1725 protocol_version: "1.0".to_string(),
1726 }],
1727 provider: None,
1728 version: "1.0.0".to_string(),
1729 documentation_url: None,
1730 capabilities: Some(v1::AgentCapabilities {
1732 streaming: Some(true),
1733 push_notifications: Some(false),
1734 extensions: Vec::new(),
1735 extended_agent_card: Some(false),
1736 }),
1737 security_schemes: std::collections::HashMap::new(),
1738 security_requirements: Vec::new(),
1739 default_input_modes: vec![],
1740 default_output_modes: vec![],
1741 skills: vec![],
1742 signatures: vec![],
1743 icon_url: None,
1744 })
1745 .expect("valid card");
1746
1747 let result = tokio::runtime::Runtime::new()
1749 .unwrap()
1750 .block_on(client.fetch_extended_agent_card_if_available());
1751 assert!(matches!(result, Ok(None)));
1752 }
1753}