Skip to main content

adk_server/a2a/
client.rs

1use crate::a2a::{
2    AgentCard, JsonRpcRequest, JsonRpcResponse, Message, MessageSendParams,
3    TaskArtifactUpdateEvent, TaskStatusUpdateEvent, UpdateEvent,
4};
5use adk_core::Result;
6use futures::stream::Stream;
7use serde_json::Value;
8use std::pin::Pin;
9
10/// A2A client for communicating with remote A2A agents
11pub struct A2aClient {
12    http_client: reqwest::Client,
13    agent_card: AgentCard,
14}
15
16impl A2aClient {
17    /// Create a new A2A client from an agent card
18    pub fn new(agent_card: AgentCard) -> Self {
19        Self { http_client: reqwest::Client::new(), agent_card }
20    }
21
22    /// Resolve an agent card from a URL (fetch from /.well-known/agent.json)
23    pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard> {
24        let url = format!("{}/.well-known/agent.json", base_url.trim_end_matches('/'));
25
26        let client = reqwest::Client::new();
27        let response =
28            client.get(&url).send().await.map_err(|e| {
29                adk_core::AdkError::agent(format!("Failed to fetch agent card: {e}"))
30            })?;
31
32        if !response.status().is_success() {
33            return Err(adk_core::AdkError::agent(format!(
34                "Failed to fetch agent card: HTTP {}",
35                response.status()
36            )));
37        }
38
39        let card: AgentCard = response
40            .json()
41            .await
42            .map_err(|e| adk_core::AdkError::agent(format!("Failed to parse agent card: {e}")))?;
43
44        Ok(card)
45    }
46
47    /// Create a client by resolving an agent card from a URL
48    pub async fn from_url(base_url: &str) -> Result<Self> {
49        let card = Self::resolve_agent_card(base_url).await?;
50        Ok(Self::new(card))
51    }
52
53    /// Get the agent card
54    pub fn agent_card(&self) -> &AgentCard {
55        &self.agent_card
56    }
57
58    /// Send a message to the remote agent (blocking/non-streaming)
59    pub async fn send_message(&self, message: Message) -> Result<JsonRpcResponse> {
60        let request = JsonRpcRequest {
61            jsonrpc: "2.0".to_string(),
62            method: "message/send".to_string(),
63            params: Some(
64                serde_json::to_value(MessageSendParams { message, config: None })
65                    .map_err(|e| adk_core::AdkError::agent(e.to_string()))?,
66            ),
67            id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
68        };
69
70        let response = self
71            .http_client
72            .post(&self.agent_card.url)
73            .json(&request)
74            .send()
75            .await
76            .map_err(|e| adk_core::AdkError::agent(format!("Request failed: {e}")))?;
77
78        if !response.status().is_success() {
79            return Err(adk_core::AdkError::agent(format!(
80                "Request failed: HTTP {}",
81                response.status()
82            )));
83        }
84
85        let rpc_response: JsonRpcResponse = response
86            .json()
87            .await
88            .map_err(|e| adk_core::AdkError::agent(format!("Failed to parse response: {e}")))?;
89
90        Ok(rpc_response)
91    }
92
93    /// Send a message and receive streaming events via SSE
94    pub async fn send_streaming_message(
95        &self,
96        message: Message,
97    ) -> Result<Pin<Box<dyn Stream<Item = Result<UpdateEvent>> + Send>>> {
98        let stream_url = format!("{}/stream", self.agent_card.url.trim_end_matches('/'));
99
100        let request = JsonRpcRequest {
101            jsonrpc: "2.0".to_string(),
102            method: "message/stream".to_string(),
103            params: Some(
104                serde_json::to_value(MessageSendParams { message, config: None })
105                    .map_err(|e| adk_core::AdkError::agent(e.to_string()))?,
106            ),
107            id: Some(Value::String(uuid::Uuid::new_v4().to_string())),
108        };
109
110        let response = self
111            .http_client
112            .post(&stream_url)
113            .json(&request)
114            .send()
115            .await
116            .map_err(|e| adk_core::AdkError::agent(format!("Request failed: {e}")))?;
117
118        if !response.status().is_success() {
119            return Err(adk_core::AdkError::agent(format!(
120                "Request failed: HTTP {}",
121                response.status()
122            )));
123        }
124
125        // Parse SSE stream
126        let stream = async_stream::stream! {
127            let mut bytes_stream = response.bytes_stream();
128            let mut buffer = String::new();
129
130            use futures::StreamExt;
131            while let Some(chunk_result) = bytes_stream.next().await {
132                let chunk = match chunk_result {
133                    Ok(c) => c,
134                    Err(e) => {
135                        yield Err(adk_core::AdkError::agent(format!("Stream error: {e}")));
136                        break;
137                    }
138                };
139
140                buffer.push_str(&String::from_utf8_lossy(&chunk));
141
142                // Process complete SSE events
143                while let Some(event_end) = buffer.find("\n\n") {
144                    let event_data = buffer[..event_end].to_string();
145                    buffer = buffer[event_end + 2..].to_string();
146
147                    // Parse SSE event
148                    if let Some(data) = parse_sse_data(&event_data) {
149                        // Skip done events
150                        if data.is_empty() {
151                            continue;
152                        }
153
154                        // Parse JSON-RPC response
155                        match serde_json::from_str::<JsonRpcResponse>(&data) {
156                            Ok(rpc_response) => {
157                                if let Some(result) = rpc_response.result {
158                                    // Try to parse as different event types
159                                    if let Ok(status_event) = serde_json::from_value::<TaskStatusUpdateEvent>(result.clone()) {
160                                        yield Ok(UpdateEvent::TaskStatusUpdate(status_event));
161                                    } else if let Ok(artifact_event) = serde_json::from_value::<TaskArtifactUpdateEvent>(result) {
162                                        yield Ok(UpdateEvent::TaskArtifactUpdate(artifact_event));
163                                    }
164                                } else if let Some(error) = rpc_response.error {
165                                    yield Err(adk_core::AdkError::agent(format!(
166                                        "RPC error: {} ({})",
167                                        error.message, error.code
168                                    )));
169                                }
170                            }
171                            Err(e) => {
172                                // Skip parse errors for non-JSON data
173                                tracing::debug!("Failed to parse SSE data: {e}");
174                            }
175                        }
176                    }
177                }
178            }
179        };
180
181        Ok(Box::pin(stream))
182    }
183}
184
185/// Parse the data field from an SSE event
186fn parse_sse_data(event: &str) -> Option<String> {
187    for line in event.lines() {
188        if let Some(data) = line.strip_prefix("data:") {
189            return Some(data.trim().to_string());
190        }
191    }
192    None
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_parse_sse_data() {
201        let event = "event: message\ndata: {\"test\": true}\n";
202        assert_eq!(parse_sse_data(event), Some("{\"test\": true}".to_string()));
203    }
204
205    #[test]
206    fn test_parse_sse_data_no_data() {
207        let event = "event: ping\n";
208        assert_eq!(parse_sse_data(event), None);
209    }
210}
211
212// ── A2A v1.0.0 Client ───────────────────────────────────────────────────────
213
214#[cfg(feature = "a2a-v1")]
215pub mod v1_client {
216    //! A2A v1.0.0 client for communicating with remote A2A agents.
217    //!
218    //! Sends the `A2A-Version: 1.0` header on all requests, supports all 11
219    //! v1.0.0 operations via JSON-RPC, optional REST binding, structured error
220    //! parsing, configurable retry, and agent card caching with conditional
221    //! request headers.
222
223    use a2a_protocol_types::jsonrpc::JsonRpcRequest;
224    use a2a_protocol_types::task::{Task, TaskState};
225    use a2a_protocol_types::{AgentCard, Message, TaskPushNotificationConfig};
226    use reqwest::header::{HeaderMap, HeaderValue};
227    use serde_json::Value;
228    use std::time::Duration;
229
230    /// Header name for A2A protocol version negotiation.
231    const A2A_VERSION_HEADER: &str = "a2a-version";
232
233    /// The v1.0.0 protocol version string.
234    const A2A_VERSION: &str = "1.0";
235
236    /// Well-known path for v1 agent cards.
237    const AGENT_CARD_PATH: &str = "/.well-known/agent-card.json";
238
239    /// Retry configuration for transient failures.
240    #[derive(Debug, Clone)]
241    pub struct RetryConfig {
242        /// Maximum number of retry attempts (0 = no retries).
243        pub max_retries: u32,
244        /// Base delay between retries (doubles each attempt).
245        pub base_delay: Duration,
246    }
247
248    impl Default for RetryConfig {
249        fn default() -> Self {
250            Self { max_retries: 3, base_delay: Duration::from_secs(1) }
251        }
252    }
253
254    /// Error returned by the v1 client.
255    #[derive(Debug, thiserror::Error)]
256    pub enum V1ClientError {
257        /// HTTP transport error.
258        #[error("HTTP error: {0}")]
259        Http(#[from] reqwest::Error),
260
261        /// JSON-RPC error returned by the server.
262        #[error("JSON-RPC error {code}: {message}")]
263        JsonRpc { code: i32, message: String, data: Option<Value> },
264
265        /// Version negotiation failed — server does not support requested version.
266        #[error("version not supported: requested {requested}, server supports: {supported:?}")]
267        VersionNotSupported { requested: String, supported: Vec<String> },
268
269        /// Serialization/deserialization error.
270        #[error("serialization error: {0}")]
271        Serde(#[from] serde_json::Error),
272
273        /// The server returned an unexpected HTTP status.
274        #[error("unexpected HTTP status {status}: {body}")]
275        UnexpectedStatus { status: u16, body: String },
276    }
277
278    /// Cached agent card with ETag and Last-Modified for conditional requests.
279    #[derive(Debug, Clone, Default)]
280    struct CachedCard {
281        card: Option<AgentCard>,
282        etag: Option<String>,
283        last_modified: Option<String>,
284    }
285
286    /// A2A v1.0.0 client.
287    ///
288    /// Sends `A2A-Version: 1.0` on every request, supports all 11 operations
289    /// via JSON-RPC (and optionally REST), parses structured error responses,
290    /// caches agent cards with conditional headers, and retries transient
291    /// failures.
292    pub struct A2aV1Client {
293        http_client: reqwest::Client,
294        agent_card: AgentCard,
295        retry_config: RetryConfig,
296        cached_card: std::sync::Mutex<CachedCard>,
297    }
298
299    impl A2aV1Client {
300        /// Creates a new v1 client from an already-resolved agent card.
301        pub fn new(agent_card: AgentCard) -> Self {
302            Self {
303                http_client: reqwest::Client::new(),
304                agent_card,
305                retry_config: RetryConfig::default(),
306                cached_card: std::sync::Mutex::new(CachedCard::default()),
307            }
308        }
309
310        /// Creates a new v1 client with custom retry configuration.
311        pub fn with_retry(agent_card: AgentCard, retry_config: RetryConfig) -> Self {
312            Self {
313                http_client: reqwest::Client::new(),
314                agent_card,
315                retry_config,
316                cached_card: std::sync::Mutex::new(CachedCard::default()),
317            }
318        }
319
320        /// Returns a reference to the agent card.
321        pub fn agent_card(&self) -> &AgentCard {
322            &self.agent_card
323        }
324
325        /// Returns the JSON-RPC endpoint URL from the agent card's
326        /// `supportedInterfaces`.
327        fn jsonrpc_url(&self) -> Option<&str> {
328            self.agent_card
329                .supported_interfaces
330                .iter()
331                .find(|i| i.protocol_binding == "JSONRPC")
332                .map(|i| i.url.as_str())
333        }
334
335        /// Returns the REST endpoint URL from the agent card's
336        /// `supportedInterfaces`, if available.
337        fn rest_url(&self) -> Option<&str> {
338            self.agent_card
339                .supported_interfaces
340                .iter()
341                .find(|i| i.protocol_binding == "HTTP+JSON")
342                .map(|i| i.url.as_str())
343        }
344
345        /// Builds default headers including `A2A-Version: 1.0`.
346        fn default_headers() -> HeaderMap {
347            let mut headers = HeaderMap::new();
348            headers.insert(A2A_VERSION_HEADER, HeaderValue::from_static(A2A_VERSION));
349            headers
350        }
351
352        // ── Agent card resolution ────────────────────────────────────────
353
354        /// Resolves an agent card from a base URL, fetching from
355        /// `/.well-known/agent-card.json` with `A2A-Version: 1.0`.
356        ///
357        /// Caches the ETag and Last-Modified headers for subsequent
358        /// conditional requests.
359        pub async fn resolve_agent_card(base_url: &str) -> Result<AgentCard, V1ClientError> {
360            let url = format!("{}{AGENT_CARD_PATH}", base_url.trim_end_matches('/'));
361            let client = reqwest::Client::new();
362            let response = client.get(&url).headers(Self::default_headers()).send().await?;
363
364            if !response.status().is_success() {
365                let status = response.status().as_u16();
366                let body = response.text().await.unwrap_or_default();
367                return Err(V1ClientError::UnexpectedStatus { status, body });
368            }
369
370            let card: AgentCard = response.json().await?;
371            Ok(card)
372        }
373
374        /// Resolves an agent card using conditional headers if a cached
375        /// version exists. Returns `None` if the server responds 304.
376        pub async fn resolve_agent_card_cached(
377            &self,
378            base_url: &str,
379        ) -> Result<Option<AgentCard>, V1ClientError> {
380            let url = format!("{}{AGENT_CARD_PATH}", base_url.trim_end_matches('/'));
381
382            let mut req = self.http_client.get(&url).headers(Self::default_headers());
383
384            // Add conditional headers from cache
385            {
386                let cache = self.cached_card.lock().unwrap();
387                if let Some(etag) = &cache.etag {
388                    req = req.header("If-None-Match", etag.as_str());
389                }
390                if let Some(lm) = &cache.last_modified {
391                    req = req.header("If-Modified-Since", lm.as_str());
392                }
393            }
394
395            let response = req.send().await?;
396
397            if response.status() == reqwest::StatusCode::NOT_MODIFIED {
398                return Ok(None);
399            }
400
401            if !response.status().is_success() {
402                let status = response.status().as_u16();
403                let body = response.text().await.unwrap_or_default();
404                return Err(V1ClientError::UnexpectedStatus { status, body });
405            }
406
407            // Cache ETag and Last-Modified from response
408            let etag =
409                response.headers().get("etag").and_then(|v| v.to_str().ok()).map(String::from);
410            let last_modified = response
411                .headers()
412                .get("last-modified")
413                .and_then(|v| v.to_str().ok())
414                .map(String::from);
415
416            let card: AgentCard = response.json().await?;
417
418            {
419                let mut cache = self.cached_card.lock().unwrap();
420                cache.card = Some(card.clone());
421                cache.etag = etag;
422                cache.last_modified = last_modified;
423            }
424
425            Ok(Some(card))
426        }
427
428        // ── JSON-RPC transport ───────────────────────────────────────────
429
430        /// Sends a JSON-RPC request and returns the parsed result.
431        async fn jsonrpc_call<T: serde::de::DeserializeOwned>(
432            &self,
433            method: &str,
434            params: Value,
435        ) -> Result<T, V1ClientError> {
436            let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
437                status: 0,
438                body: "no JSONRPC interface in agent card".to_string(),
439            })?;
440
441            let request = JsonRpcRequest::with_params(
442                serde_json::json!(uuid::Uuid::new_v4().to_string()),
443                method,
444                params,
445            );
446
447            let response = self.send_with_retry(url, &request).await?;
448            let status = response.status();
449
450            // Check for version negotiation failure
451            if status == reqwest::StatusCode::BAD_REQUEST {
452                let body: Value = response.json().await?;
453                if let Some(err) = body.get("error") {
454                    let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0) as i32;
455                    if code == -32009 {
456                        return Err(Self::parse_version_error(err));
457                    }
458                }
459                return Err(Self::parse_jsonrpc_error(&body));
460            }
461
462            let body: Value = response.json().await?;
463
464            // Check for JSON-RPC error
465            if body.get("error").is_some() {
466                return Err(Self::parse_jsonrpc_error(&body));
467            }
468
469            // Extract result
470            let result = body.get("result").cloned().unwrap_or(Value::Null);
471            let parsed: T = serde_json::from_value(result)?;
472            Ok(parsed)
473        }
474
475        /// Sends an HTTP request with retry logic for transient failures.
476        async fn send_with_retry(
477            &self,
478            url: &str,
479            request: &JsonRpcRequest,
480        ) -> Result<reqwest::Response, V1ClientError> {
481            let mut last_err = None;
482
483            for attempt in 0..=self.retry_config.max_retries {
484                if attempt > 0 {
485                    let delay = self.retry_config.base_delay * 2u32.pow(attempt - 1);
486                    tokio::time::sleep(delay).await;
487                }
488
489                match self
490                    .http_client
491                    .post(url)
492                    .headers(Self::default_headers())
493                    .json(request)
494                    .send()
495                    .await
496                {
497                    Ok(resp) => {
498                        let status = resp.status().as_u16();
499                        // Retry on 429 and 5xx
500                        if (status == 429 || status >= 500)
501                            && attempt < self.retry_config.max_retries
502                        {
503                            last_err = Some(V1ClientError::UnexpectedStatus {
504                                status,
505                                body: format!("retryable status on attempt {attempt}"),
506                            });
507                            continue;
508                        }
509                        return Ok(resp);
510                    }
511                    Err(e) => {
512                        if attempt < self.retry_config.max_retries && e.is_timeout() {
513                            last_err = Some(V1ClientError::Http(e));
514                            continue;
515                        }
516                        return Err(V1ClientError::Http(e));
517                    }
518                }
519            }
520
521            Err(last_err.unwrap_or_else(|| V1ClientError::UnexpectedStatus {
522                status: 0,
523                body: "retry exhausted".to_string(),
524            }))
525        }
526
527        // ── REST transport ───────────────────────────────────────────────
528
529        /// Sends a REST request (POST with JSON body) and returns the parsed
530        /// result. Falls back to JSON-RPC if no REST interface is available.
531        async fn rest_post<T: serde::de::DeserializeOwned>(
532            &self,
533            path: &str,
534            body: &Value,
535        ) -> Result<T, V1ClientError> {
536            let base = match self.rest_url() {
537                Some(url) => url.to_string(),
538                None => {
539                    return Err(V1ClientError::UnexpectedStatus {
540                        status: 0,
541                        body: "no HTTP+JSON interface in agent card".to_string(),
542                    });
543                }
544            };
545            let url = format!("{}{path}", base.trim_end_matches('/'));
546
547            let response = self
548                .http_client
549                .post(&url)
550                .headers(Self::default_headers())
551                .header("content-type", "application/a2a+json")
552                .json(body)
553                .send()
554                .await?;
555
556            if !response.status().is_success() {
557                let status = response.status().as_u16();
558                let body_text = response.text().await.unwrap_or_default();
559                return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
560            }
561
562            let result: T = response.json().await?;
563            Ok(result)
564        }
565
566        /// Sends a REST GET request and returns the parsed result.
567        async fn rest_get<T: serde::de::DeserializeOwned>(
568            &self,
569            path: &str,
570        ) -> Result<T, V1ClientError> {
571            let base = match self.rest_url() {
572                Some(url) => url.to_string(),
573                None => {
574                    return Err(V1ClientError::UnexpectedStatus {
575                        status: 0,
576                        body: "no HTTP+JSON interface in agent card".to_string(),
577                    });
578                }
579            };
580            let url = format!("{}{path}", base.trim_end_matches('/'));
581
582            let response =
583                self.http_client.get(&url).headers(Self::default_headers()).send().await?;
584
585            if !response.status().is_success() {
586                let status = response.status().as_u16();
587                let body_text = response.text().await.unwrap_or_default();
588                return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
589            }
590
591            let result: T = response.json().await?;
592            Ok(result)
593        }
594
595        /// Sends a REST DELETE request.
596        async fn rest_delete(&self, path: &str) -> Result<(), V1ClientError> {
597            let base = match self.rest_url() {
598                Some(url) => url.to_string(),
599                None => {
600                    return Err(V1ClientError::UnexpectedStatus {
601                        status: 0,
602                        body: "no HTTP+JSON interface in agent card".to_string(),
603                    });
604                }
605            };
606            let url = format!("{}{path}", base.trim_end_matches('/'));
607
608            let response =
609                self.http_client.delete(&url).headers(Self::default_headers()).send().await?;
610
611            if !response.status().is_success() {
612                let status = response.status().as_u16();
613                let body_text = response.text().await.unwrap_or_default();
614                return Err(V1ClientError::UnexpectedStatus { status, body: body_text });
615            }
616
617            Ok(())
618        }
619
620        // ── Error parsing ────────────────────────────────────────────────
621
622        /// Parses a JSON-RPC error response into a `V1ClientError`.
623        fn parse_jsonrpc_error(body: &Value) -> V1ClientError {
624            let err = match body.get("error") {
625                Some(e) => e,
626                None => {
627                    return V1ClientError::JsonRpc {
628                        code: 0,
629                        message: "unknown error".to_string(),
630                        data: Some(body.clone()),
631                    };
632                }
633            };
634
635            let code = err.get("code").and_then(|c| c.as_i64()).unwrap_or(0) as i32;
636            let message =
637                err.get("message").and_then(|m| m.as_str()).unwrap_or("unknown error").to_string();
638            let data = err.get("data").cloned();
639
640            V1ClientError::JsonRpc { code, message, data }
641        }
642
643        /// Parses a version negotiation error, extracting supported versions
644        /// from the structured `data` field.
645        fn parse_version_error(err: &Value) -> V1ClientError {
646            let data = err.get("data");
647            let mut supported = Vec::new();
648
649            // Try to extract supported versions from ErrorInfo metadata
650            if let Some(data_arr) = data.and_then(|d| d.as_array()) {
651                for info in data_arr {
652                    if let Some(meta) = info.get("metadata") {
653                        if let Some(versions) = meta.get("supported").and_then(|v| v.as_str()) {
654                            supported = versions.split(", ").map(String::from).collect();
655                        }
656                    }
657                }
658            }
659
660            V1ClientError::VersionNotSupported { requested: A2A_VERSION.to_string(), supported }
661        }
662
663        // ── 11 v1.0.0 Operations (JSON-RPC) ─────────────────────────────
664
665        /// Sends a message to the remote agent (JSON-RPC `SendMessage`).
666        pub async fn send_message(&self, message: Message) -> Result<Task, V1ClientError> {
667            self.jsonrpc_call("SendMessage", serde_json::json!({ "message": message })).await
668        }
669
670        /// Sends a streaming message (JSON-RPC `SendStreamingMessage`).
671        ///
672        /// Returns the raw response for SSE parsing by the caller.
673        pub async fn send_streaming_message(
674            &self,
675            message: Message,
676        ) -> Result<reqwest::Response, V1ClientError> {
677            let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
678                status: 0,
679                body: "no JSONRPC interface in agent card".to_string(),
680            })?;
681
682            let request = JsonRpcRequest::with_params(
683                serde_json::json!(uuid::Uuid::new_v4().to_string()),
684                "SendStreamingMessage",
685                serde_json::json!({ "message": message }),
686            );
687
688            let response = self
689                .http_client
690                .post(url)
691                .headers(Self::default_headers())
692                .json(&request)
693                .send()
694                .await?;
695
696            Ok(response)
697        }
698
699        /// Retrieves a task by ID (JSON-RPC `GetTask`).
700        pub async fn get_task(
701            &self,
702            task_id: &str,
703            history_length: Option<u32>,
704        ) -> Result<Task, V1ClientError> {
705            let mut params = serde_json::json!({ "id": task_id });
706            if let Some(len) = history_length {
707                params["historyLength"] = serde_json::json!(len);
708            }
709            self.jsonrpc_call("GetTask", params).await
710        }
711
712        /// Cancels a task (JSON-RPC `CancelTask`).
713        pub async fn cancel_task(&self, task_id: &str) -> Result<Task, V1ClientError> {
714            self.jsonrpc_call("CancelTask", serde_json::json!({ "id": task_id })).await
715        }
716
717        /// Lists tasks with optional filtering (JSON-RPC `ListTasks`).
718        pub async fn list_tasks(
719            &self,
720            context_id: Option<&str>,
721            status: Option<TaskState>,
722            page_size: Option<u32>,
723            page_token: Option<&str>,
724        ) -> Result<Vec<Task>, V1ClientError> {
725            let mut params = serde_json::json!({});
726            if let Some(cid) = context_id {
727                params["contextId"] = serde_json::json!(cid);
728            }
729            if let Some(s) = status {
730                params["status"] = serde_json::to_value(s)?;
731            }
732            if let Some(ps) = page_size {
733                params["pageSize"] = serde_json::json!(ps);
734            }
735            if let Some(pt) = page_token {
736                params["pageToken"] = serde_json::json!(pt);
737            }
738            self.jsonrpc_call("ListTasks", params).await
739        }
740
741        /// Subscribes to task updates (JSON-RPC `SubscribeToTask`).
742        ///
743        /// Returns the raw response for SSE parsing by the caller.
744        pub async fn subscribe_to_task(
745            &self,
746            task_id: &str,
747        ) -> Result<reqwest::Response, V1ClientError> {
748            let url = self.jsonrpc_url().ok_or_else(|| V1ClientError::UnexpectedStatus {
749                status: 0,
750                body: "no JSONRPC interface in agent card".to_string(),
751            })?;
752
753            let request = JsonRpcRequest::with_params(
754                serde_json::json!(uuid::Uuid::new_v4().to_string()),
755                "SubscribeToTask",
756                serde_json::json!({ "id": task_id }),
757            );
758
759            let response = self
760                .http_client
761                .post(url)
762                .headers(Self::default_headers())
763                .json(&request)
764                .send()
765                .await?;
766
767            Ok(response)
768        }
769
770        /// Creates a push notification config (JSON-RPC
771        /// `CreateTaskPushNotificationConfig`).
772        pub async fn create_push_notification_config(
773            &self,
774            config: TaskPushNotificationConfig,
775        ) -> Result<TaskPushNotificationConfig, V1ClientError> {
776            self.jsonrpc_call("CreateTaskPushNotificationConfig", serde_json::to_value(&config)?)
777                .await
778        }
779
780        /// Gets a push notification config (JSON-RPC
781        /// `GetTaskPushNotificationConfig`).
782        pub async fn get_push_notification_config(
783            &self,
784            task_id: &str,
785            config_id: &str,
786        ) -> Result<TaskPushNotificationConfig, V1ClientError> {
787            self.jsonrpc_call(
788                "GetTaskPushNotificationConfig",
789                serde_json::json!({ "taskId": task_id, "id": config_id }),
790            )
791            .await
792        }
793
794        /// Lists push notification configs (JSON-RPC
795        /// `ListTaskPushNotificationConfigs`).
796        pub async fn list_push_notification_configs(
797            &self,
798            task_id: &str,
799        ) -> Result<Vec<TaskPushNotificationConfig>, V1ClientError> {
800            self.jsonrpc_call(
801                "ListTaskPushNotificationConfigs",
802                serde_json::json!({ "taskId": task_id }),
803            )
804            .await
805        }
806
807        /// Deletes a push notification config (JSON-RPC
808        /// `DeleteTaskPushNotificationConfig`).
809        pub async fn delete_push_notification_config(
810            &self,
811            task_id: &str,
812            config_id: &str,
813        ) -> Result<(), V1ClientError> {
814            let _: Value = self
815                .jsonrpc_call(
816                    "DeleteTaskPushNotificationConfig",
817                    serde_json::json!({ "taskId": task_id, "id": config_id }),
818                )
819                .await?;
820            Ok(())
821        }
822
823        /// Gets the extended agent card (JSON-RPC `GetExtendedAgentCard`).
824        pub async fn get_extended_agent_card(&self) -> Result<AgentCard, V1ClientError> {
825            self.jsonrpc_call("GetExtendedAgentCard", serde_json::json!({})).await
826        }
827
828        // ── REST binding operations ──────────────────────────────────────
829
830        /// Sends a message via REST (`POST /message:send`).
831        pub async fn send_message_rest(&self, message: Message) -> Result<Task, V1ClientError> {
832            self.rest_post("/message:send", &serde_json::json!({ "message": message })).await
833        }
834
835        /// Gets a task via REST (`GET /tasks/{taskId}`).
836        pub async fn get_task_rest(&self, task_id: &str) -> Result<Task, V1ClientError> {
837            self.rest_get(&format!("/tasks/{task_id}")).await
838        }
839
840        /// Cancels a task via REST (`POST /tasks/{taskId}:cancel`).
841        pub async fn cancel_task_rest(&self, task_id: &str) -> Result<Task, V1ClientError> {
842            self.rest_post(&format!("/tasks/{task_id}:cancel"), &serde_json::json!({})).await
843        }
844
845        /// Lists tasks via REST (`GET /tasks`).
846        pub async fn list_tasks_rest(&self) -> Result<Vec<Task>, V1ClientError> {
847            self.rest_get("/tasks").await
848        }
849
850        /// Creates a push notification config via REST
851        /// (`POST /tasks/{taskId}/pushNotificationConfigs`).
852        pub async fn create_push_notification_config_rest(
853            &self,
854            task_id: &str,
855            config: TaskPushNotificationConfig,
856        ) -> Result<TaskPushNotificationConfig, V1ClientError> {
857            self.rest_post(
858                &format!("/tasks/{task_id}/pushNotificationConfigs"),
859                &serde_json::to_value(&config)?,
860            )
861            .await
862        }
863
864        /// Gets a push notification config via REST
865        /// (`GET /tasks/{taskId}/pushNotificationConfigs/{configId}`).
866        pub async fn get_push_notification_config_rest(
867            &self,
868            task_id: &str,
869            config_id: &str,
870        ) -> Result<TaskPushNotificationConfig, V1ClientError> {
871            self.rest_get(&format!("/tasks/{task_id}/pushNotificationConfigs/{config_id}")).await
872        }
873
874        /// Lists push notification configs via REST
875        /// (`GET /tasks/{taskId}/pushNotificationConfigs`).
876        pub async fn list_push_notification_configs_rest(
877            &self,
878            task_id: &str,
879        ) -> Result<Vec<TaskPushNotificationConfig>, V1ClientError> {
880            self.rest_get(&format!("/tasks/{task_id}/pushNotificationConfigs")).await
881        }
882
883        /// Deletes a push notification config via REST
884        /// (`DELETE /tasks/{taskId}/pushNotificationConfigs/{configId}`).
885        pub async fn delete_push_notification_config_rest(
886            &self,
887            task_id: &str,
888            config_id: &str,
889        ) -> Result<(), V1ClientError> {
890            self.rest_delete(&format!("/tasks/{task_id}/pushNotificationConfigs/{config_id}")).await
891        }
892
893        /// Gets the extended agent card via REST (`GET /extendedAgentCard`).
894        pub async fn get_extended_agent_card_rest(&self) -> Result<AgentCard, V1ClientError> {
895            self.rest_get("/extendedAgentCard").await
896        }
897    }
898
899    #[cfg(test)]
900    mod tests {
901        use super::*;
902        use a2a_protocol_types::{AgentCapabilities, AgentCard, AgentInterface, AgentSkill};
903
904        fn make_test_card() -> AgentCard {
905            AgentCard {
906                name: "test-agent".to_string(),
907                url: Some("http://localhost:9999".to_string()),
908                description: "A test agent".to_string(),
909                version: "1.0.0".to_string(),
910                supported_interfaces: vec![
911                    AgentInterface {
912                        url: "http://localhost:9999/a2a".to_string(),
913                        protocol_binding: "JSONRPC".to_string(),
914                        protocol_version: "1.0".to_string(),
915                        tenant: None,
916                    },
917                    AgentInterface {
918                        url: "http://localhost:9999/rest".to_string(),
919                        protocol_binding: "HTTP+JSON".to_string(),
920                        protocol_version: "1.0".to_string(),
921                        tenant: None,
922                    },
923                ],
924                default_input_modes: vec!["text/plain".to_string()],
925                default_output_modes: vec!["text/plain".to_string()],
926                skills: vec![AgentSkill {
927                    id: "echo".to_string(),
928                    name: "Echo".to_string(),
929                    description: "Echoes input".to_string(),
930                    tags: vec![],
931                    examples: None,
932                    input_modes: None,
933                    output_modes: None,
934                    security_requirements: None,
935                }],
936                capabilities: AgentCapabilities::default(),
937                provider: None,
938                icon_url: None,
939                documentation_url: None,
940                security_schemes: None,
941                security_requirements: None,
942                signatures: None,
943            }
944        }
945
946        fn make_jsonrpc_only_card() -> AgentCard {
947            let mut card = make_test_card();
948            card.supported_interfaces.retain(|i| i.protocol_binding == "JSONRPC");
949            card
950        }
951
952        #[test]
953        fn new_client_stores_agent_card() {
954            let card = make_test_card();
955            let client = A2aV1Client::new(card.clone());
956            assert_eq!(client.agent_card().name, "test-agent");
957            assert_eq!(client.agent_card().version, "1.0.0");
958        }
959
960        #[test]
961        fn with_retry_stores_config() {
962            let card = make_test_card();
963            let config = RetryConfig { max_retries: 5, base_delay: Duration::from_millis(500) };
964            let client = A2aV1Client::with_retry(card, config);
965            assert_eq!(client.retry_config.max_retries, 5);
966            assert_eq!(client.retry_config.base_delay, Duration::from_millis(500));
967        }
968
969        #[test]
970        fn default_retry_config() {
971            let config = RetryConfig::default();
972            assert_eq!(config.max_retries, 3);
973            assert_eq!(config.base_delay, Duration::from_secs(1));
974        }
975
976        #[test]
977        fn jsonrpc_url_found() {
978            let client = A2aV1Client::new(make_test_card());
979            assert_eq!(client.jsonrpc_url(), Some("http://localhost:9999/a2a"));
980        }
981
982        #[test]
983        fn rest_url_found() {
984            let client = A2aV1Client::new(make_test_card());
985            assert_eq!(client.rest_url(), Some("http://localhost:9999/rest"));
986        }
987
988        #[test]
989        fn rest_url_none_when_not_available() {
990            let client = A2aV1Client::new(make_jsonrpc_only_card());
991            assert!(client.rest_url().is_none());
992        }
993
994        #[test]
995        fn default_headers_include_version() {
996            let headers = A2aV1Client::default_headers();
997            let version = headers.get(A2A_VERSION_HEADER).unwrap();
998            assert_eq!(version, "1.0");
999        }
1000
1001        #[test]
1002        fn parse_jsonrpc_error_extracts_fields() {
1003            let body = serde_json::json!({
1004                "jsonrpc": "2.0",
1005                "error": {
1006                    "code": -32001,
1007                    "message": "Task not found: task_123",
1008                    "data": [{"@type": "type.googleapis.com/google.rpc.ErrorInfo"}]
1009                },
1010                "id": 1
1011            });
1012            let err = A2aV1Client::parse_jsonrpc_error(&body);
1013            match err {
1014                V1ClientError::JsonRpc { code, message, data } => {
1015                    assert_eq!(code, -32001);
1016                    assert_eq!(message, "Task not found: task_123");
1017                    assert!(data.is_some());
1018                }
1019                other => panic!("expected JsonRpc error, got: {other}"),
1020            }
1021        }
1022
1023        #[test]
1024        fn parse_jsonrpc_error_handles_missing_error_field() {
1025            let body = serde_json::json!({"result": "ok"});
1026            let err = A2aV1Client::parse_jsonrpc_error(&body);
1027            match err {
1028                V1ClientError::JsonRpc { code, .. } => {
1029                    assert_eq!(code, 0);
1030                }
1031                other => panic!("expected JsonRpc error, got: {other}"),
1032            }
1033        }
1034
1035        #[test]
1036        fn parse_version_error_extracts_supported_versions() {
1037            let err_obj = serde_json::json!({
1038                "code": -32009,
1039                "message": "Version not supported",
1040                "data": [{
1041                    "@type": "type.googleapis.com/google.rpc.ErrorInfo",
1042                    "reason": "VERSION_NOT_SUPPORTED",
1043                    "domain": "a2a-protocol.org",
1044                    "metadata": {
1045                        "requested": "2.0",
1046                        "supported": "0.3, 1.0"
1047                    }
1048                }]
1049            });
1050            let err = A2aV1Client::parse_version_error(&err_obj);
1051            match err {
1052                V1ClientError::VersionNotSupported { requested, supported } => {
1053                    assert_eq!(requested, "1.0");
1054                    assert_eq!(supported, vec!["0.3", "1.0"]);
1055                }
1056                other => panic!("expected VersionNotSupported, got: {other}"),
1057            }
1058        }
1059
1060        #[test]
1061        fn parse_version_error_handles_empty_data() {
1062            let err_obj = serde_json::json!({
1063                "code": -32009,
1064                "message": "Version not supported"
1065            });
1066            let err = A2aV1Client::parse_version_error(&err_obj);
1067            match err {
1068                V1ClientError::VersionNotSupported { supported, .. } => {
1069                    assert!(supported.is_empty());
1070                }
1071                other => panic!("expected VersionNotSupported, got: {other}"),
1072            }
1073        }
1074
1075        #[test]
1076        fn v1_client_error_display() {
1077            let err = V1ClientError::JsonRpc {
1078                code: -32001,
1079                message: "Task not found".to_string(),
1080                data: None,
1081            };
1082            assert_eq!(err.to_string(), "JSON-RPC error -32001: Task not found");
1083
1084            let err = V1ClientError::VersionNotSupported {
1085                requested: "2.0".to_string(),
1086                supported: vec!["0.3".to_string(), "1.0".to_string()],
1087            };
1088            assert!(err.to_string().contains("2.0"));
1089            assert!(err.to_string().contains("0.3"));
1090
1091            let err =
1092                V1ClientError::UnexpectedStatus { status: 500, body: "internal error".to_string() };
1093            assert!(err.to_string().contains("500"));
1094        }
1095
1096        #[test]
1097        fn cached_card_starts_empty() {
1098            let client = A2aV1Client::new(make_test_card());
1099            let cache = client.cached_card.lock().unwrap();
1100            assert!(cache.card.is_none());
1101            assert!(cache.etag.is_none());
1102            assert!(cache.last_modified.is_none());
1103        }
1104    }
1105}