1use crate::a2a::{
2 AgentCard, JsonRpcRequest, JsonRpcResponse, Message, MessageSendParams,
3 TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10pub struct A2aClient {
12 http_client: reqwest::Client,
13 agent_card: AgentCard,
14}
15
16impl A2aClient {
17 pub fn new(agent_card: AgentCard) -> Self {
19 Self { http_client: reqwest::Client::new(), agent_card }
20 }
21
22 pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard> {
24 let url = format!("{}/.well-known/agent.json", base_url.trim_end_matches('/'));
25
26 let client = reqwest::Client::new();
27 let response =
28 client.get(&url).send().await.map_err(|e| {
29 adk_core::AdkError::agent(format!("Failed to fetch agent card: {e}"))
30 })?;
31
32 if !response.status().is_success() {
33 return Err(adk_core::AdkError::agent(format!(
34 "Failed to fetch agent card: HTTP {}",
35 response.status()
36 )));
37 }
38
39 let card: AgentCard = response
40 .json()
41 .await
42 .map_err(|e| adk_core::AdkError::agent(format!("Failed to parse agent card: {e}")))?;
43
44 Ok(card)
45 }
46
47 pub async fn from_url(base_url: &str) -> Result<Self> {
49 let card = Self::resolve_agent_card(base_url).await?;
50 Ok(Self::new(card))
51 }
52
53 pub fn agent_card(&self) -> &AgentCard {
55 &self.agent_card
56 }
57
58 pub async fn send_message(&self, message: Message) -> Result<JsonRpcResponse> {
60 let request = JsonRpcRequest {
61 jsonrpc: "2.0".to_string(),
62 method: "message/send".to_string(),
63 params: Some(
64 serde_json::to_value(MessageSendParams { message, config: None })
65 .map_err(|e| adk_core::AdkError::agent(e.to_string()))?,
66 ),
67 id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
68 };
69
70 let response = self
71 .http_client
72 .post(&self.agent_card.url)
73 .json(&request)
74 .send()
75 .await
76 .map_err(|e| adk_core::AdkError::agent(format!("Request failed: {e}")))?;
77
78 if !response.status().is_success() {
79 return Err(adk_core::AdkError::agent(format!(
80 "Request failed: HTTP {}",
81 response.status()
82 )));
83 }
84
85 let rpc_response: JsonRpcResponse = response
86 .json()
87 .await
88 .map_err(|e| adk_core::AdkError::agent(format!("Failed to parse response: {e}")))?;
89
90 Ok(rpc_response)
91 }
92
93 pub async fn send_streaming_message(
95 &self,
96 message: Message,
97 ) -> Result<Pin<Box<dyn Stream<Item = Result<UpdateEvent>> + Send>>> {
98 let stream_url = format!("{}/stream", self.agent_card.url.trim_end_matches('/'));
99
100 let request = JsonRpcRequest {
101 jsonrpc: "2.0".to_string(),
102 method: "message/stream".to_string(),
103 params: Some(
104 serde_json::to_value(MessageSendParams { message, config: None })
105 .map_err(|e| adk_core::AdkError::agent(e.to_string()))?,
106 ),
107 id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
108 };
109
110 let response = self
111 .http_client
112 .post(&stream_url)
113 .json(&request)
114 .send()
115 .await
116 .map_err(|e| adk_core::AdkError::agent(format!("Request failed: {e}")))?;
117
118 if !response.status().is_success() {
119 return Err(adk_core::AdkError::agent(format!(
120 "Request failed: HTTP {}",
121 response.status()
122 )));
123 }
124
125 let stream = async_stream::stream! {
127 let mut bytes_stream = response.bytes_stream();
128 let mut buffer = String::new();
129
130 use futures::StreamExt;
131 while let Some(chunk_result) = bytes_stream.next().await {
132 let chunk = match chunk_result {
133 Ok(c) => c,
134 Err(e) => {
135 yield Err(adk_core::AdkError::agent(format!("Stream error: {e}")));
136 break;
137 }
138 };
139
140 buffer.push_str(&String::from_utf8_lossy(&chunk));
141
142 while let Some(event_end) = buffer.find("\n\n") {
144 let event_data = buffer[..event_end].to_string();
145 buffer = buffer[event_end + 2..].to_string();
146
147 if let Some(data) = parse_sse_data(&event_data) {
149 if data.is_empty() {
151 continue;
152 }
153
154 match serde_json::from_str::<JsonRpcResponse>(&data) {
156 Ok(rpc_response) => {
157 if let Some(result) = rpc_response.result {
158 if let Ok(status_event) = serde_json::from_value::<TaskStatusUpdateEvent>(result.clone()) {
160 yield Ok(UpdateEvent::TaskStatusUpdate(status_event));
161 } else if let Ok(artifact_event) = serde_json::from_value::<TaskArtifactUpdateEvent>(result) {
162 yield Ok(UpdateEvent::TaskArtifactUpdate(artifact_event));
163 }
164 } else if let Some(error) = rpc_response.error {
165 yield Err(adk_core::AdkError::agent(format!(
166 "RPC error: {} ({})",
167 error.message, error.code
168 )));
169 }
170 }
171 Err(e) => {
172 tracing::debug!("Failed to parse SSE data: {e}");
174 }
175 }
176 }
177 }
178 }
179 };
180
181 Ok(Box::pin(stream))
182 }
183}
184
185fn parse_sse_data(event: &str) -> Option<String> {
187 for line in event.lines() {
188 if let Some(data) = line.strip_prefix("data:") {
189 return Some(data.trim().to_string());
190 }
191 }
192 None
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_parse_sse_data() {
201 let event = "event: message\ndata: {\"test\": true}\n";
202 assert_eq!(parse_sse_data(event), Some("{\"test\": true}".to_string()));
203 }
204
205 #[test]
206 fn test_parse_sse_data_no_data() {
207 let event = "event: ping\n";
208 assert_eq!(parse_sse_data(event), None);
209 }
210}
211
212#[cfg(feature = "a2a-v1")]
215pub mod v1_client {
216 use a2a_protocol_types::jsonrpc::JsonRpcRequest;
224 use a2a_protocol_types::task::{Task, TaskState};
225 use a2a_protocol_types::{AgentCard, Message, TaskPushNotificationConfig};
226 use reqwest::header::{HeaderMap, HeaderValue};
227 use serde_json::Value;
228 use std::time::Duration;
229
230 const A2A_VERSION_HEADER: &str = "a2a-version";
232
233 const A2A_VERSION: &str = "1.0";
235
236 const AGENT_CARD_PATH: &str = "/.well-known/agent-card.json";
238
239 #[derive(Debug, Clone)]
241 pub struct RetryConfig {
242 pub max_retries: u32,
244 pub base_delay: Duration,
246 }
247
248 impl Default for RetryConfig {
249 fn default() -> Self {
250 Self { max_retries: 3, base_delay: Duration::from_secs(1) }
251 }
252 }
253
254 #[derive(Debug, thiserror::Error)]
256 pub enum V1ClientError {
257 #[error("HTTP error: {0}")]
259 Http(#[from] reqwest::Error),
260
261 #[error("JSON-RPC error {code}: {message}")]
263 JsonRpc { code: i32, message: String, data: Option<Value> },
264
265 #[error("version not supported: requested {requested}, server supports: {supported:?}")]
267 VersionNotSupported { requested: String, supported: Vec<String> },
268
269 #[error("serialization error: {0}")]
271 Serde(#[from] serde_json::Error),
272
273 #[error("unexpected HTTP status {status}: {body}")]
275 UnexpectedStatus { status: u16, body: String },
276 }
277
278 #[derive(Debug, Clone, Default)]
280 struct CachedCard {
281 card: Option<AgentCard>,
282 etag: Option<String>,
283 last_modified: Option<String>,
284 }
285
286 pub struct A2aV1Client {
293 http_client: reqwest::Client,
294 agent_card: AgentCard,
295 retry_config: RetryConfig,
296 cached_card: std::sync::Mutex<CachedCard>,
297 }
298
299 impl A2aV1Client {
300 pub fn new(agent_card: AgentCard) -> Self {
302 Self {
303 http_client: reqwest::Client::new(),
304 agent_card,
305 retry_config: RetryConfig::default(),
306 cached_card: std::sync::Mutex::new(CachedCard::default()),
307 }
308 }
309
310 pub fn with_retry(agent_card: AgentCard, retry_config: RetryConfig) -> Self {
312 Self {
313 http_client: reqwest::Client::new(),
314 agent_card,
315 retry_config,
316 cached_card: std::sync::Mutex::new(CachedCard::default()),
317 }
318 }
319
320 pub fn agent_card(&self) -> &AgentCard {
322 &self.agent_card
323 }
324
325 fn jsonrpc_url(&self) -> Option<&str> {
328 self.agent_card
329 .supported_interfaces
330 .iter()
331 .find(|i| i.protocol_binding == "JSONRPC")
332 .map(|i| i.url.as_str())
333 }
334
335 fn rest_url(&self) -> Option<&str> {
338 self.agent_card
339 .supported_interfaces
340 .iter()
341 .find(|i| i.protocol_binding == "HTTP+JSON")
342 .map(|i| i.url.as_str())
343 }
344
345 fn default_headers() -> HeaderMap {
347 let mut headers = HeaderMap::new();
348 headers.insert(A2A_VERSION_HEADER, HeaderValue::from_static(A2A_VERSION));
349 headers
350 }
351
352 pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard, V1ClientError> {
360 let url = format!("{}{AGENT_CARD_PATH}", base_url.trim_end_matches('/'));
361 let client = reqwest::Client::new();
362 let response = client.get(&url).headers(Self::default_headers()).send().await?;
363
364 if !response.status().is_success() {
365 let status = response.status().as_u16();
366 let body = response.text().await.unwrap_or_default();
367 return Err(V1ClientError::UnexpectedStatus { status, body });
368 }
369
370 let card: AgentCard = response.json().await?;
371 Ok(card)
372 }
373
374 pub async fn resolve_agent_card_cached(
377 &self,
378 base_url: &str,
379 ) -> Result<Option<AgentCard>, V1ClientError> {
380 let url = format!("{}{AGENT_CARD_PATH}", base_url.trim_end_matches('/'));
381
382 let mut req = self.http_client.get(&url).headers(Self::default_headers());
383
384 {
386 let cache = self.cached_card.lock().unwrap();
387 if let Some(etag) = &cache.etag {
388 req = req.header("If-None-Match", etag.as_str());
389 }
390 if let Some(lm) = &cache.last_modified {
391 req = req.header("If-Modified-Since", lm.as_str());
392 }
393 }
394
395 let response = req.send().await?;
396
397 if response.status() == reqwest::StatusCode::NOT_MODIFIED {
398 return Ok(None);
399 }
400
401 if !response.status().is_success() {
402 let status = response.status().as_u16();
403 let body = response.text().await.unwrap_or_default();
404 return Err(V1ClientError::UnexpectedStatus { status, body });
405 }
406
407 let etag =
409 response.headers().get("etag").and_then(|v| v.to_str().ok()).map(String::from);
410 let last_modified = response
411 .headers()
412 .get("last-modified")
413 .and_then(|v| v.to_str().ok())
414 .map(String::from);
415
416 let card: AgentCard = response.json().await?;
417
418 {
419 let mut cache = self.cached_card.lock().unwrap();
420 cache.card = Some(card.clone());
421 cache.etag = etag;
422 cache.last_modified = last_modified;
423 }
424
425 Ok(Some(card))
426 }
427
428 async fn jsonrpc_call<T: serde::de::DeserializeOwned>(
432 &self,
433 method: &str,
434 params: Value,
435 ) -> Result<T, V1ClientError> {
436 let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
437 status: 0,
438 body: "no JSONRPC interface in agent card".to_string(),
439 })?;
440
441 let request = JsonRpcRequest::with_params(
442 serde_json::json!(uuid::Uuid::new_v4().to_string()),
443 method,
444 params,
445 );
446
447 let response = self.send_with_retry(url, &request).await?;
448 let status = response.status();
449
450 if status == reqwest::StatusCode::BAD_REQUEST {
452 let body: Value = response.json().await?;
453 if let Some(err) = body.get("error") {
454 let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0) as i32;
455 if code == -32009 {
456 return Err(Self::parse_version_error(err));
457 }
458 }
459 return Err(Self::parse_jsonrpc_error(&body));
460 }
461
462 let body: Value = response.json().await?;
463
464 if body.get("error").is_some() {
466 return Err(Self::parse_jsonrpc_error(&body));
467 }
468
469 let result = body.get("result").cloned().unwrap_or(Value::Null);
471 let parsed: T = serde_json::from_value(result)?;
472 Ok(parsed)
473 }
474
475 async fn send_with_retry(
477 &self,
478 url: &str,
479 request: &JsonRpcRequest,
480 ) -> Result<reqwest::Response, V1ClientError> {
481 let mut last_err = None;
482
483 for attempt in 0..=self.retry_config.max_retries {
484 if attempt > 0 {
485 let delay = self.retry_config.base_delay * 2u32.pow(attempt - 1);
486 tokio::time::sleep(delay).await;
487 }
488
489 match self
490 .http_client
491 .post(url)
492 .headers(Self::default_headers())
493 .json(request)
494 .send()
495 .await
496 {
497 Ok(resp) => {
498 let status = resp.status().as_u16();
499 if (status == 429 || status >= 500)
501 && attempt < self.retry_config.max_retries
502 {
503 last_err = Some(V1ClientError::UnexpectedStatus {
504 status,
505 body: format!("retryable status on attempt {attempt}"),
506 });
507 continue;
508 }
509 return Ok(resp);
510 }
511 Err(e) => {
512 if attempt < self.retry_config.max_retries && e.is_timeout() {
513 last_err = Some(V1ClientError::Http(e));
514 continue;
515 }
516 return Err(V1ClientError::Http(e));
517 }
518 }
519 }
520
521 Err(last_err.unwrap_or_else(|| V1ClientError::UnexpectedStatus {
522 status: 0,
523 body: "retry exhausted".to_string(),
524 }))
525 }
526
527 async fn rest_post<T: serde::de::DeserializeOwned>(
532 &self,
533 path: &str,
534 body: &Value,
535 ) -> Result<T, V1ClientError> {
536 let base = match self.rest_url() {
537 Some(url) => url.to_string(),
538 None => {
539 return Err(V1ClientError::UnexpectedStatus {
540 status: 0,
541 body: "no HTTP+JSON interface in agent card".to_string(),
542 });
543 }
544 };
545 let url = format!("{}{path}", base.trim_end_matches('/'));
546
547 let response = self
548 .http_client
549 .post(&url)
550 .headers(Self::default_headers())
551 .header("content-type", "application/a2a+json")
552 .json(body)
553 .send()
554 .await?;
555
556 if !response.status().is_success() {
557 let status = response.status().as_u16();
558 let body_text = response.text().await.unwrap_or_default();
559 return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
560 }
561
562 let result: T = response.json().await?;
563 Ok(result)
564 }
565
566 async fn rest_get<T: serde::de::DeserializeOwned>(
568 &self,
569 path: &str,
570 ) -> Result<T, V1ClientError> {
571 let base = match self.rest_url() {
572 Some(url) => url.to_string(),
573 None => {
574 return Err(V1ClientError::UnexpectedStatus {
575 status: 0,
576 body: "no HTTP+JSON interface in agent card".to_string(),
577 });
578 }
579 };
580 let url = format!("{}{path}", base.trim_end_matches('/'));
581
582 let response =
583 self.http_client.get(&url).headers(Self::default_headers()).send().await?;
584
585 if !response.status().is_success() {
586 let status = response.status().as_u16();
587 let body_text = response.text().await.unwrap_or_default();
588 return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
589 }
590
591 let result: T = response.json().await?;
592 Ok(result)
593 }
594
595 async fn rest_delete(&self, path: &str) -> Result<(), V1ClientError> {
597 let base = match self.rest_url() {
598 Some(url) => url.to_string(),
599 None => {
600 return Err(V1ClientError::UnexpectedStatus {
601 status: 0,
602 body: "no HTTP+JSON interface in agent card".to_string(),
603 });
604 }
605 };
606 let url = format!("{}{path}", base.trim_end_matches('/'));
607
608 let response =
609 self.http_client.delete(&url).headers(Self::default_headers()).send().await?;
610
611 if !response.status().is_success() {
612 let status = response.status().as_u16();
613 let body_text = response.text().await.unwrap_or_default();
614 return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
615 }
616
617 Ok(())
618 }
619
620 fn parse_jsonrpc_error(body: &Value) -> V1ClientError {
624 let err = match body.get("error") {
625 Some(e) => e,
626 None => {
627 return V1ClientError::JsonRpc {
628 code: 0,
629 message: "unknown error".to_string(),
630 data: Some(body.clone()),
631 };
632 }
633 };
634
635 let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0) as i32;
636 let message =
637 err.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error").to_string();
638 let data = err.get("data").cloned();
639
640 V1ClientError::JsonRpc { code, message, data }
641 }
642
643 fn parse_version_error(err: &Value) -> V1ClientError {
646 let data = err.get("data");
647 let mut supported = Vec::new();
648
649 if let Some(data_arr) = data.and_then(|d| d.as_array()) {
651 for info in data_arr {
652 if let Some(meta) = info.get("metadata") {
653 if let Some(versions) = meta.get("supported").and_then(|v| v.as_str()) {
654 supported = versions.split(", ").map(String::from).collect();
655 }
656 }
657 }
658 }
659
660 V1ClientError::VersionNotSupported { requested: A2A_VERSION.to_string(), supported }
661 }
662
663 pub async fn send_message(&self, message: Message) -> Result<Task, V1ClientError> {
667 self.jsonrpc_call("SendMessage", serde_json::json!({ "message": message })).await
668 }
669
670 pub async fn send_streaming_message(
674 &self,
675 message: Message,
676 ) -> Result<reqwest::Response, V1ClientError> {
677 let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
678 status: 0,
679 body: "no JSONRPC interface in agent card".to_string(),
680 })?;
681
682 let request = JsonRpcRequest::with_params(
683 serde_json::json!(uuid::Uuid::new_v4().to_string()),
684 "SendStreamingMessage",
685 serde_json::json!({ "message": message }),
686 );
687
688 let response = self
689 .http_client
690 .post(url)
691 .headers(Self::default_headers())
692 .json(&request)
693 .send()
694 .await?;
695
696 Ok(response)
697 }
698
699 pub async fn get_task(
701 &self,
702 task_id: &str,
703 history_length: Option<u32>,
704 ) -> Result<Task, V1ClientError> {
705 let mut params = serde_json::json!({ "id": task_id });
706 if let Some(len) = history_length {
707 params["historyLength"] = serde_json::json!(len);
708 }
709 self.jsonrpc_call("GetTask", params).await
710 }
711
712 pub async fn cancel_task(&self, task_id: &str) -> Result<Task, V1ClientError> {
714 self.jsonrpc_call("CancelTask", serde_json::json!({ "id": task_id })).await
715 }
716
717 pub async fn list_tasks(
719 &self,
720 context_id: Option<&str>,
721 status: Option<TaskState>,
722 page_size: Option<u32>,
723 page_token: Option<&str>,
724 ) -> Result<Vec<Task>, V1ClientError> {
725 let mut params = serde_json::json!({});
726 if let Some(cid) = context_id {
727 params["contextId"] = serde_json::json!(cid);
728 }
729 if let Some(s) = status {
730 params["status"] = serde_json::to_value(s)?;
731 }
732 if let Some(ps) = page_size {
733 params["pageSize"] = serde_json::json!(ps);
734 }
735 if let Some(pt) = page_token {
736 params["pageToken"] = serde_json::json!(pt);
737 }
738 self.jsonrpc_call("ListTasks", params).await
739 }
740
741 pub async fn subscribe_to_task(
745 &self,
746 task_id: &str,
747 ) -> Result<reqwest::Response, V1ClientError> {
748 let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
749 status: 0,
750 body: "no JSONRPC interface in agent card".to_string(),
751 })?;
752
753 let request = JsonRpcRequest::with_params(
754 serde_json::json!(uuid::Uuid::new_v4().to_string()),
755 "SubscribeToTask",
756 serde_json::json!({ "id": task_id }),
757 );
758
759 let response = self
760 .http_client
761 .post(url)
762 .headers(Self::default_headers())
763 .json(&request)
764 .send()
765 .await?;
766
767 Ok(response)
768 }
769
770 pub async fn create_push_notification_config(
773 &self,
774 config: TaskPushNotificationConfig,
775 ) -> Result<TaskPushNotificationConfig, V1ClientError> {
776 self.jsonrpc_call("CreateTaskPushNotificationConfig", serde_json::to_value(&config)?)
777 .await
778 }
779
780 pub async fn get_push_notification_config(
783 &self,
784 task_id: &str,
785 config_id: &str,
786 ) -> Result<TaskPushNotificationConfig, V1ClientError> {
787 self.jsonrpc_call(
788 "GetTaskPushNotificationConfig",
789 serde_json::json!({ "taskId": task_id, "id": config_id }),
790 )
791 .await
792 }
793
794 pub async fn list_push_notification_configs(
797 &self,
798 task_id: &str,
799 ) -> Result<Vec<TaskPushNotificationConfig>, V1ClientError> {
800 self.jsonrpc_call(
801 "ListTaskPushNotificationConfigs",
802 serde_json::json!({ "taskId": task_id }),
803 )
804 .await
805 }
806
807 pub async fn delete_push_notification_config(
810 &self,
811 task_id: &str,
812 config_id: &str,
813 ) -> Result<(), V1ClientError> {
814 let _: Value = self
815 .jsonrpc_call(
816 "DeleteTaskPushNotificationConfig",
817 serde_json::json!({ "taskId": task_id, "id": config_id }),
818 )
819 .await?;
820 Ok(())
821 }
822
823 pub async fn get_extended_agent_card(&self) -> Result<AgentCard, V1ClientError> {
825 self.jsonrpc_call("GetExtendedAgentCard", serde_json::json!({})).await
826 }
827
828 pub async fn send_message_rest(&self, message: Message) -> Result<Task, V1ClientError> {
832 self.rest_post("/message:send", &serde_json::json!({ "message": message })).await
833 }
834
835 pub async fn get_task_rest(&self, task_id: &str) -> Result<Task, V1ClientError> {
837 self.rest_get(&format!("/tasks/{task_id}")).await
838 }
839
840 pub async fn cancel_task_rest(&self, task_id: &str) -> Result<Task, V1ClientError> {
842 self.rest_post(&format!("/tasks/{task_id}:cancel"), &serde_json::json!({})).await
843 }
844
845 pub async fn list_tasks_rest(&self) -> Result<Vec<Task>, V1ClientError> {
847 self.rest_get("/tasks").await
848 }
849
850 pub async fn create_push_notification_config_rest(
853 &self,
854 task_id: &str,
855 config: TaskPushNotificationConfig,
856 ) -> Result<TaskPushNotificationConfig, V1ClientError> {
857 self.rest_post(
858 &format!("/tasks/{task_id}/pushNotificationConfigs"),
859 &serde_json::to_value(&config)?,
860 )
861 .await
862 }
863
864 pub async fn get_push_notification_config_rest(
867 &self,
868 task_id: &str,
869 config_id: &str,
870 ) -> Result<TaskPushNotificationConfig, V1ClientError> {
871 self.rest_get(&format!("/tasks/{task_id}/pushNotificationConfigs/{config_id}")).await
872 }
873
874 pub async fn list_push_notification_configs_rest(
877 &self,
878 task_id: &str,
879 ) -> Result<Vec<TaskPushNotificationConfig>, V1ClientError> {
880 self.rest_get(&format!("/tasks/{task_id}/pushNotificationConfigs")).await
881 }
882
883 pub async fn delete_push_notification_config_rest(
886 &self,
887 task_id: &str,
888 config_id: &str,
889 ) -> Result<(), V1ClientError> {
890 self.rest_delete(&format!("/tasks/{task_id}/pushNotificationConfigs/{config_id}")).await
891 }
892
893 pub async fn get_extended_agent_card_rest(&self) -> Result<AgentCard, V1ClientError> {
895 self.rest_get("/extendedAgentCard").await
896 }
897 }
898
899 #[cfg(test)]
900 mod tests {
901 use super::*;
902 use a2a_protocol_types::{AgentCapabilities, AgentCard, AgentInterface, AgentSkill};
903
904 fn make_test_card() -> AgentCard {
905 AgentCard {
906 name: "test-agent".to_string(),
907 url: Some("http://localhost:9999".to_string()),
908 description: "A test agent".to_string(),
909 version: "1.0.0".to_string(),
910 supported_interfaces: vec![
911 AgentInterface {
912 url: "http://localhost:9999/a2a".to_string(),
913 protocol_binding: "JSONRPC".to_string(),
914 protocol_version: "1.0".to_string(),
915 tenant: None,
916 },
917 AgentInterface {
918 url: "http://localhost:9999/rest".to_string(),
919 protocol_binding: "HTTP+JSON".to_string(),
920 protocol_version: "1.0".to_string(),
921 tenant: None,
922 },
923 ],
924 default_input_modes: vec!["text/plain".to_string()],
925 default_output_modes: vec!["text/plain".to_string()],
926 skills: vec![AgentSkill {
927 id: "echo".to_string(),
928 name: "Echo".to_string(),
929 description: "Echoes input".to_string(),
930 tags: vec![],
931 examples: None,
932 input_modes: None,
933 output_modes: None,
934 security_requirements: None,
935 }],
936 capabilities: AgentCapabilities::default(),
937 provider: None,
938 icon_url: None,
939 documentation_url: None,
940 security_schemes: None,
941 security_requirements: None,
942 signatures: None,
943 }
944 }
945
946 fn make_jsonrpc_only_card() -> AgentCard {
947 let mut card = make_test_card();
948 card.supported_interfaces.retain(|i| i.protocol_binding == "JSONRPC");
949 card
950 }
951
952 #[test]
953 fn new_client_stores_agent_card() {
954 let card = make_test_card();
955 let client = A2aV1Client::new(card.clone());
956 assert_eq!(client.agent_card().name, "test-agent");
957 assert_eq!(client.agent_card().version, "1.0.0");
958 }
959
960 #[test]
961 fn with_retry_stores_config() {
962 let card = make_test_card();
963 let config = RetryConfig { max_retries: 5, base_delay: Duration::from_millis(500) };
964 let client = A2aV1Client::with_retry(card, config);
965 assert_eq!(client.retry_config.max_retries, 5);
966 assert_eq!(client.retry_config.base_delay, Duration::from_millis(500));
967 }
968
969 #[test]
970 fn default_retry_config() {
971 let config = RetryConfig::default();
972 assert_eq!(config.max_retries, 3);
973 assert_eq!(config.base_delay, Duration::from_secs(1));
974 }
975
976 #[test]
977 fn jsonrpc_url_found() {
978 let client = A2aV1Client::new(make_test_card());
979 assert_eq!(client.jsonrpc_url(), Some("http://localhost:9999/a2a"));
980 }
981
982 #[test]
983 fn rest_url_found() {
984 let client = A2aV1Client::new(make_test_card());
985 assert_eq!(client.rest_url(), Some("http://localhost:9999/rest"));
986 }
987
988 #[test]
989 fn rest_url_none_when_not_available() {
990 let client = A2aV1Client::new(make_jsonrpc_only_card());
991 assert!(client.rest_url().is_none());
992 }
993
994 #[test]
995 fn default_headers_include_version() {
996 let headers = A2aV1Client::default_headers();
997 let version = headers.get(A2A_VERSION_HEADER).unwrap();
998 assert_eq!(version, "1.0");
999 }
1000
1001 #[test]
1002 fn parse_jsonrpc_error_extracts_fields() {
1003 let body = serde_json::json!({
1004 "jsonrpc": "2.0",
1005 "error": {
1006 "code": -32001,
1007 "message": "Task not found: task_123",
1008 "data": [{"@type": "type.googleapis.com/google.rpc.ErrorInfo"}]
1009 },
1010 "id": 1
1011 });
1012 let err = A2aV1Client::parse_jsonrpc_error(&body);
1013 match err {
1014 V1ClientError::JsonRpc { code, message, data } => {
1015 assert_eq!(code, -32001);
1016 assert_eq!(message, "Task not found: task_123");
1017 assert!(data.is_some());
1018 }
1019 other => panic!("expected JsonRpc error, got: {other}"),
1020 }
1021 }
1022
1023 #[test]
1024 fn parse_jsonrpc_error_handles_missing_error_field() {
1025 let body = serde_json::json!({"result": "ok"});
1026 let err = A2aV1Client::parse_jsonrpc_error(&body);
1027 match err {
1028 V1ClientError::JsonRpc { code, .. } => {
1029 assert_eq!(code, 0);
1030 }
1031 other => panic!("expected JsonRpc error, got: {other}"),
1032 }
1033 }
1034
1035 #[test]
1036 fn parse_version_error_extracts_supported_versions() {
1037 let err_obj = serde_json::json!({
1038 "code": -32009,
1039 "message": "Version not supported",
1040 "data": [{
1041 "@type": "type.googleapis.com/google.rpc.ErrorInfo",
1042 "reason": "VERSION_NOT_SUPPORTED",
1043 "domain": "a2a-protocol.org",
1044 "metadata": {
1045 "requested": "2.0",
1046 "supported": "0.3, 1.0"
1047 }
1048 }]
1049 });
1050 let err = A2aV1Client::parse_version_error(&err_obj);
1051 match err {
1052 V1ClientError::VersionNotSupported { requested, supported } => {
1053 assert_eq!(requested, "1.0");
1054 assert_eq!(supported, vec!["0.3", "1.0"]);
1055 }
1056 other => panic!("expected VersionNotSupported, got: {other}"),
1057 }
1058 }
1059
1060 #[test]
1061 fn parse_version_error_handles_empty_data() {
1062 let err_obj = serde_json::json!({
1063 "code": -32009,
1064 "message": "Version not supported"
1065 });
1066 let err = A2aV1Client::parse_version_error(&err_obj);
1067 match err {
1068 V1ClientError::VersionNotSupported { supported, .. } => {
1069 assert!(supported.is_empty());
1070 }
1071 other => panic!("expected VersionNotSupported, got: {other}"),
1072 }
1073 }
1074
1075 #[test]
1076 fn v1_client_error_display() {
1077 let err = V1ClientError::JsonRpc {
1078 code: -32001,
1079 message: "Task not found".to_string(),
1080 data: None,
1081 };
1082 assert_eq!(err.to_string(), "JSON-RPC error -32001: Task not found");
1083
1084 let err = V1ClientError::VersionNotSupported {
1085 requested: "2.0".to_string(),
1086 supported: vec!["0.3".to_string(), "1.0".to_string()],
1087 };
1088 assert!(err.to_string().contains("2.0"));
1089 assert!(err.to_string().contains("0.3"));
1090
1091 let err =
1092 V1ClientError::UnexpectedStatus { status: 500, body: "internal error".to_string() };
1093 assert!(err.to_string().contains("500"));
1094 }
1095
1096 #[test]
1097 fn cached_card_starts_empty() {
1098 let client = A2aV1Client::new(make_test_card());
1099 let cache = client.cached_card.lock().unwrap();
1100 assert!(cache.card.is_none());
1101 assert!(cache.etag.is_none());
1102 assert!(cache.last_modified.is_none());
1103 }
1104 }
1105}