Skip to main content

a2a_rust/client/
api.rs

1use std::pin::Pin;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Duration;
5
6use futures_core::Stream;
7use futures_util::stream;
8use reqwest::Url;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::A2AError;
13use crate::error::ProblemDetails;
14use crate::jsonrpc::{
15    CONTENT_TYPE_NOT_SUPPORTED, EXTENDED_AGENT_CARD_NOT_CONFIGURED, EXTENSION_SUPPORT_REQUIRED,
16    INTERNAL_ERROR, INVALID_AGENT_RESPONSE, INVALID_PARAMS, INVALID_REQUEST, JSONRPC_VERSION,
17    JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse, METHOD_CANCEL_TASK,
18    METHOD_CREATE_TASK_PUSH_NOTIFICATION_CONFIG, METHOD_DELETE_TASK_PUSH_NOTIFICATION_CONFIG,
19    METHOD_GET_EXTENDED_AGENT_CARD, METHOD_GET_TASK, METHOD_GET_TASK_PUSH_NOTIFICATION_CONFIG,
20    METHOD_LIST_TASK_PUSH_NOTIFICATION_CONFIGS, METHOD_LIST_TASKS, METHOD_NOT_FOUND,
21    METHOD_SEND_MESSAGE, PARSE_ERROR, PROTOCOL_VERSION, PUSH_NOTIFICATION_NOT_SUPPORTED,
22    TASK_NOT_CANCELABLE, TASK_NOT_FOUND, UNSUPPORTED_OPERATION, VERSION_NOT_SUPPORTED,
23};
24use crate::types::{
25    AgentCard, AgentInterface, CancelTaskRequest, DeleteTaskPushNotificationConfigRequest,
26    GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest,
27    ListTaskPushNotificationConfigsRequest, ListTaskPushNotificationConfigsResponse,
28    ListTasksRequest, ListTasksResponse, SendMessageRequest, SendMessageResponse, StreamResponse,
29    SubscribeToTaskRequest, Task, TaskPushNotificationConfig,
30};
31
32use super::discovery::{
33    AgentCardDiscovery, AgentCardDiscoveryConfig, ensure_trailing_slash, normalize_base_url,
34    resolve_interface_url,
35};
36
37/// Configuration for [`A2AClient`].
38#[derive(Debug, Clone)]
39pub struct A2AClientConfig {
40    /// Discovery cache time-to-live.
41    pub discovery_ttl: Duration,
42    /// Extension URIs sent as `A2A-Extensions: uri1,uri2`.
43    pub extensions: Vec<String>,
44}
45
46impl Default for A2AClientConfig {
47    fn default() -> Self {
48        Self {
49            discovery_ttl: Duration::from_secs(300),
50            extensions: Vec::new(),
51        }
52    }
53}
54
55#[derive(Debug)]
56enum TransportEndpoint {
57    JsonRpc(Url),
58    HttpJson(Url),
59}
60
61/// Stream of validated SSE items returned by streaming client operations.
62pub type A2AClientStream =
63    Pin<Box<dyn Stream<Item = Result<StreamResponse, A2AError>> + Send + 'static>>;
64
65/// HTTP client for discovery, unary calls, and SSE streams against a remote agent.
66#[derive(Debug)]
67pub struct A2AClient {
68    base_url: Url,
69    client: reqwest::Client,
70    discovery: AgentCardDiscovery,
71    config: A2AClientConfig,
72    request_ids: Arc<AtomicU64>,
73}
74
75impl A2AClient {
76    /// Create a client with default configuration and a default `reqwest` client.
77    pub fn new(base_url: &str) -> Result<Self, A2AError> {
78        Self::with_config(base_url, A2AClientConfig::default())
79    }
80
81    /// Create a client with explicit SDK configuration.
82    pub fn with_config(base_url: &str, config: A2AClientConfig) -> Result<Self, A2AError> {
83        Self::with_http_client(base_url, reqwest::Client::new(), config)
84    }
85
86    /// Create a client with an explicit `reqwest` client and SDK configuration.
87    pub fn with_http_client(
88        base_url: &str,
89        client: reqwest::Client,
90        config: A2AClientConfig,
91    ) -> Result<Self, A2AError> {
92        let base_url = normalize_base_url(base_url)?;
93        let discovery = AgentCardDiscovery::with_http_client(
94            client.clone(),
95            AgentCardDiscoveryConfig {
96                ttl: config.discovery_ttl,
97            },
98        );
99
100        Ok(Self {
101            base_url,
102            client,
103            discovery,
104            config,
105            request_ids: Arc::new(AtomicU64::new(1)),
106        })
107    }
108
109    /// Discover the remote agent card, using the discovery cache when fresh.
110    pub async fn discover_agent_card(&self) -> Result<AgentCard, A2AError> {
111        self.discovery.discover(self.base_url.as_ref()).await
112    }
113
114    /// Refresh the remote agent card and replace any cached copy.
115    pub async fn refresh_agent_card(&self) -> Result<AgentCard, A2AError> {
116        self.discovery.refresh(self.base_url.as_ref()).await
117    }
118
119    /// Invoke `SendMessage` over the server's preferred unary transport.
120    pub async fn send_message(
121        &self,
122        request: SendMessageRequest,
123    ) -> Result<SendMessageResponse, A2AError> {
124        request.validate()?;
125
126        let response: SendMessageResponse = match self.transport().await? {
127            TransportEndpoint::JsonRpc(url) => {
128                self.jsonrpc_call(&url, METHOD_SEND_MESSAGE, &request)
129                    .await?
130            }
131            TransportEndpoint::HttpJson(base_url) => {
132                let url = rest_url(&base_url, request.tenant.as_deref(), &["message:send"])?;
133                self.read_json_response(
134                    self.apply_protocol_headers(self.client.post(url))
135                        .json(&request)
136                        .send()
137                        .await?,
138                )
139                .await?
140            }
141        };
142
143        response.validate()?;
144        Ok(response)
145    }
146
147    /// Invoke `SendStreamingMessage` over HTTP+JSON SSE.
148    pub async fn send_streaming_message(
149        &self,
150        request: SendMessageRequest,
151    ) -> Result<A2AClientStream, A2AError> {
152        request.validate()?;
153
154        let base_url = self.http_json_transport().await?;
155        let url = rest_url(&base_url, request.tenant.as_deref(), &["message:stream"])?;
156        let response = self
157            .apply_protocol_headers(
158                self.client
159                    .post(url)
160                    .header(reqwest::header::ACCEPT, "text/event-stream"),
161            )
162            .json(&request)
163            .send()
164            .await?;
165
166        self.read_sse_response(response).await
167    }
168
169    /// Fetch a task by identifier.
170    pub async fn get_task(&self, request: GetTaskRequest) -> Result<Task, A2AError> {
171        match self.transport().await? {
172            TransportEndpoint::JsonRpc(url) => {
173                self.jsonrpc_call(&url, METHOD_GET_TASK, &request).await
174            }
175            TransportEndpoint::HttpJson(base_url) => {
176                let url = rest_url(
177                    &base_url,
178                    request.tenant.as_deref(),
179                    &["tasks", &request.id],
180                )?;
181                self.read_json_response(
182                    self.apply_protocol_headers(self.client.get(url))
183                        .query(&GetTaskQuery {
184                            history_length: request.history_length,
185                        })
186                        .send()
187                        .await?,
188                )
189                .await
190            }
191        }
192    }
193
194    /// List tasks using the server's preferred unary transport.
195    pub async fn list_tasks(
196        &self,
197        request: ListTasksRequest,
198    ) -> Result<ListTasksResponse, A2AError> {
199        request.validate()?;
200
201        match self.transport().await? {
202            TransportEndpoint::JsonRpc(url) => {
203                self.jsonrpc_call(&url, METHOD_LIST_TASKS, &request).await
204            }
205            TransportEndpoint::HttpJson(base_url) => {
206                let url = rest_url(&base_url, request.tenant.as_deref(), &["tasks"])?;
207                self.read_json_response(
208                    self.apply_protocol_headers(self.client.get(url))
209                        .query(&ListTasksQuery {
210                            context_id: request.context_id,
211                            status: request.status,
212                            page_size: request.page_size,
213                            page_token: request.page_token,
214                            history_length: request.history_length,
215                            status_timestamp_after: request.status_timestamp_after,
216                            include_artifacts: request.include_artifacts,
217                        })
218                        .send()
219                        .await?,
220                )
221                .await
222            }
223        }
224    }
225
226    /// Request cancellation of a task.
227    pub async fn cancel_task(&self, request: CancelTaskRequest) -> Result<Task, A2AError> {
228        match self.transport().await? {
229            TransportEndpoint::JsonRpc(url) => {
230                self.jsonrpc_call(&url, METHOD_CANCEL_TASK, &request).await
231            }
232            TransportEndpoint::HttpJson(base_url) => {
233                let cancel_segment = format!("{}:cancel", request.id);
234                let url = rest_url(
235                    &base_url,
236                    request.tenant.as_deref(),
237                    &["tasks", &cancel_segment],
238                )?;
239                let builder = self.apply_protocol_headers(self.client.post(url));
240                let builder = if let Some(metadata) = &request.metadata {
241                    builder.json(&CancelTaskBody {
242                        metadata: Some(metadata.clone()),
243                    })
244                } else {
245                    builder
246                };
247                self.read_json_response(builder.send().await?).await
248            }
249        }
250    }
251
252    /// Fetch the extended agent card when the remote agent advertises it.
253    pub async fn get_extended_agent_card(
254        &self,
255        request: GetExtendedAgentCardRequest,
256    ) -> Result<AgentCard, A2AError> {
257        match self.transport().await? {
258            TransportEndpoint::JsonRpc(url) => {
259                self.jsonrpc_call(&url, METHOD_GET_EXTENDED_AGENT_CARD, &request)
260                    .await
261            }
262            TransportEndpoint::HttpJson(base_url) => {
263                let url = rest_url(&base_url, request.tenant.as_deref(), &["extendedAgentCard"])?;
264                self.read_json_response(
265                    self.apply_protocol_headers(self.client.get(url))
266                        .send()
267                        .await?,
268                )
269                .await
270            }
271        }
272    }
273
274    /// Create or replace a push-notification configuration for a task.
275    pub async fn create_task_push_notification_config(
276        &self,
277        request: TaskPushNotificationConfig,
278    ) -> Result<TaskPushNotificationConfig, A2AError> {
279        match self.transport().await? {
280            TransportEndpoint::JsonRpc(url) => {
281                self.jsonrpc_call(&url, METHOD_CREATE_TASK_PUSH_NOTIFICATION_CONFIG, &request)
282                    .await
283            }
284            TransportEndpoint::HttpJson(base_url) => {
285                let url = rest_url(
286                    &base_url,
287                    request.tenant.as_deref(),
288                    &["tasks", &request.task_id, "pushNotificationConfigs"],
289                )?;
290                self.read_json_response(
291                    self.apply_protocol_headers(self.client.post(url))
292                        .json(&request)
293                        .send()
294                        .await?,
295                )
296                .await
297            }
298        }
299    }
300
301    /// Fetch a single push-notification configuration by identifier.
302    pub async fn get_task_push_notification_config(
303        &self,
304        request: GetTaskPushNotificationConfigRequest,
305    ) -> Result<TaskPushNotificationConfig, A2AError> {
306        match self.transport().await? {
307            TransportEndpoint::JsonRpc(url) => {
308                self.jsonrpc_call(&url, METHOD_GET_TASK_PUSH_NOTIFICATION_CONFIG, &request)
309                    .await
310            }
311            TransportEndpoint::HttpJson(base_url) => {
312                let url = rest_url(
313                    &base_url,
314                    request.tenant.as_deref(),
315                    &[
316                        "tasks",
317                        &request.task_id,
318                        "pushNotificationConfigs",
319                        &request.id,
320                    ],
321                )?;
322                self.read_json_response(
323                    self.apply_protocol_headers(self.client.get(url))
324                        .send()
325                        .await?,
326                )
327                .await
328            }
329        }
330    }
331
332    /// List push-notification configurations for a task.
333    pub async fn list_task_push_notification_configs(
334        &self,
335        request: ListTaskPushNotificationConfigsRequest,
336    ) -> Result<ListTaskPushNotificationConfigsResponse, A2AError> {
337        request.validate()?;
338
339        match self.transport().await? {
340            TransportEndpoint::JsonRpc(url) => {
341                self.jsonrpc_call(&url, METHOD_LIST_TASK_PUSH_NOTIFICATION_CONFIGS, &request)
342                    .await
343            }
344            TransportEndpoint::HttpJson(base_url) => {
345                let url = rest_url(
346                    &base_url,
347                    request.tenant.as_deref(),
348                    &["tasks", &request.task_id, "pushNotificationConfigs"],
349                )?;
350                self.read_json_response(
351                    self.apply_protocol_headers(self.client.get(url))
352                        .query(&ListTaskPushNotificationConfigsQuery {
353                            page_size: request.page_size,
354                            page_token: request.page_token,
355                        })
356                        .send()
357                        .await?,
358                )
359                .await
360            }
361        }
362    }
363
364    /// Delete a push-notification configuration by identifier.
365    pub async fn delete_task_push_notification_config(
366        &self,
367        request: DeleteTaskPushNotificationConfigRequest,
368    ) -> Result<(), A2AError> {
369        match self.transport().await? {
370            TransportEndpoint::JsonRpc(url) => self
371                .jsonrpc_call::<_, serde_json::Value>(
372                    &url,
373                    METHOD_DELETE_TASK_PUSH_NOTIFICATION_CONFIG,
374                    &request,
375                )
376                .await
377                .map(|_| ()),
378            TransportEndpoint::HttpJson(base_url) => {
379                let url = rest_url(
380                    &base_url,
381                    request.tenant.as_deref(),
382                    &[
383                        "tasks",
384                        &request.task_id,
385                        "pushNotificationConfigs",
386                        &request.id,
387                    ],
388                )?;
389                self.read_json_response::<serde_json::Value>(
390                    self.apply_protocol_headers(self.client.delete(url))
391                        .send()
392                        .await?,
393                )
394                .await
395                .map(|_| ())
396            }
397        }
398    }
399
400    /// Subscribe to task updates over HTTP+JSON SSE.
401    pub async fn subscribe_to_task(
402        &self,
403        request: SubscribeToTaskRequest,
404    ) -> Result<A2AClientStream, A2AError> {
405        let base_url = self.http_json_transport().await?;
406        let subscribe_segment = format!("{}:subscribe", request.id);
407        let url = rest_url(
408            &base_url,
409            request.tenant.as_deref(),
410            &["tasks", &subscribe_segment],
411        )?;
412        let response = self
413            .apply_protocol_headers(
414                self.client
415                    .get(url)
416                    .header(reqwest::header::ACCEPT, "text/event-stream"),
417            )
418            .send()
419            .await?;
420
421        self.read_sse_response(response).await
422    }
423
424    async fn transport(&self) -> Result<TransportEndpoint, A2AError> {
425        let card = self.discover_agent_card().await?;
426        select_transport(&self.base_url, &card.supported_interfaces)
427    }
428
429    async fn http_json_transport(&self) -> Result<Url, A2AError> {
430        let card = self.discover_agent_card().await?;
431        select_http_json_transport(&self.base_url, &card.supported_interfaces)
432    }
433
434    async fn jsonrpc_call<P, R>(&self, url: &Url, method: &str, params: &P) -> Result<R, A2AError>
435    where
436        P: Serialize,
437        R: DeserializeOwned,
438    {
439        let id = JsonRpcId::String(format!(
440            "req-{}",
441            self.request_ids.fetch_add(1, Ordering::Relaxed)
442        ));
443        let request = JsonRpcRequest {
444            jsonrpc: JSONRPC_VERSION.to_owned(),
445            method: method.to_owned(),
446            params: Some(serde_json::to_value(params)?),
447            id: id.clone(),
448        };
449
450        let response = self
451            .apply_protocol_headers(self.client.post(url.clone()))
452            .json(&request)
453            .send()
454            .await?;
455        let bytes = response.bytes().await?;
456        let envelope: JsonRpcResponse = serde_json::from_slice(&bytes)
457            .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))?;
458
459        if envelope.jsonrpc != JSONRPC_VERSION {
460            return Err(A2AError::InvalidAgentResponse(
461                "jsonrpc must be \"2.0\"".to_owned(),
462            ));
463        }
464
465        if envelope.id != id {
466            return Err(A2AError::InvalidAgentResponse(
467                "response id did not match request id".to_owned(),
468            ));
469        }
470
471        match (envelope.result, envelope.error) {
472            (Some(result), None) => serde_json::from_value(result)
473                .map_err(|error| A2AError::InvalidAgentResponse(error.to_string())),
474            (None, Some(error)) => Err(map_jsonrpc_error(error)),
475            _ => Err(A2AError::InvalidAgentResponse(
476                "response must contain exactly one of result or error".to_owned(),
477            )),
478        }
479    }
480
481    async fn read_json_response<T>(&self, response: reqwest::Response) -> Result<T, A2AError>
482    where
483        T: DeserializeOwned,
484    {
485        let status = response.status();
486        let bytes = response.bytes().await?;
487
488        if status.is_success() {
489            return serde_json::from_slice(&bytes)
490                .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()));
491        }
492
493        if let Ok(problem) = serde_json::from_slice::<ProblemDetails>(&bytes) {
494            return Err(problem.to_a2a_error());
495        }
496
497        if let Ok(error) = serde_json::from_slice::<LegacyRestErrorEnvelope>(&bytes) {
498            return Err(map_jsonrpc_error(error.error));
499        }
500
501        Err(A2AError::InvalidAgentResponse(format!(
502            "unexpected HTTP status {}",
503            status
504        )))
505    }
506
507    async fn read_sse_response(
508        &self,
509        response: reqwest::Response,
510    ) -> Result<A2AClientStream, A2AError> {
511        let status = response.status();
512        if !status.is_success() {
513            let bytes = response.bytes().await?;
514            if let Ok(problem) = serde_json::from_slice::<ProblemDetails>(&bytes) {
515                return Err(problem.to_a2a_error());
516            }
517
518            if let Ok(error) = serde_json::from_slice::<LegacyRestErrorEnvelope>(&bytes) {
519                return Err(map_jsonrpc_error(error.error));
520            }
521
522            return Err(A2AError::InvalidAgentResponse(format!(
523                "unexpected HTTP status {}",
524                status
525            )));
526        }
527
528        Ok(Box::pin(sse_stream(response)))
529    }
530
531    fn apply_protocol_headers(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
532        let mut builder = builder.header("A2A-Version", PROTOCOL_VERSION);
533        if !self.config.extensions.is_empty() {
534            builder = builder.header("A2A-Extensions", self.config.extensions.join(","));
535        }
536
537        builder
538    }
539}
540
541fn select_transport(
542    base_url: &Url,
543    interfaces: &[AgentInterface],
544) -> Result<TransportEndpoint, A2AError> {
545    let mut advertised_versions = Vec::new();
546
547    for interface in interfaces {
548        if !interface
549            .protocol_version
550            .eq_ignore_ascii_case(PROTOCOL_VERSION)
551        {
552            advertised_versions.push(interface.protocol_version.clone());
553            continue;
554        }
555
556        if interface.protocol_binding.eq_ignore_ascii_case("JSONRPC") {
557            return resolve_interface_url(base_url, &interface.url).map(TransportEndpoint::JsonRpc);
558        }
559
560        if interface.protocol_binding.eq_ignore_ascii_case("HTTP+JSON") {
561            return resolve_interface_url(base_url, &interface.url)
562                .map(ensure_trailing_slash)
563                .map(TransportEndpoint::HttpJson);
564        }
565    }
566
567    if !advertised_versions.is_empty() {
568        advertised_versions.sort();
569        advertised_versions.dedup();
570        return Err(A2AError::VersionNotSupported(format!(
571            "client supports A2A-Version {}, agent advertised {}",
572            PROTOCOL_VERSION,
573            advertised_versions.join(", ")
574        )));
575    }
576
577    Err(A2AError::InvalidAgentResponse(
578        "agent card does not advertise a supported interface".to_owned(),
579    ))
580}
581
582fn select_http_json_transport(
583    base_url: &Url,
584    interfaces: &[AgentInterface],
585) -> Result<Url, A2AError> {
586    let mut advertised_versions = Vec::new();
587
588    for interface in interfaces {
589        if !interface.protocol_binding.eq_ignore_ascii_case("HTTP+JSON") {
590            continue;
591        }
592
593        if !interface
594            .protocol_version
595            .eq_ignore_ascii_case(PROTOCOL_VERSION)
596        {
597            advertised_versions.push(interface.protocol_version.clone());
598            continue;
599        }
600
601        return resolve_interface_url(base_url, &interface.url).map(ensure_trailing_slash);
602    }
603
604    if !advertised_versions.is_empty() {
605        advertised_versions.sort();
606        advertised_versions.dedup();
607        return Err(A2AError::VersionNotSupported(format!(
608            "client supports A2A-Version {}, agent advertised HTTP+JSON {}",
609            PROTOCOL_VERSION,
610            advertised_versions.join(", ")
611        )));
612    }
613
614    Err(A2AError::InvalidAgentResponse(
615        "agent card does not advertise an HTTP+JSON interface".to_owned(),
616    ))
617}
618
619fn rest_url(base_url: &Url, tenant: Option<&str>, segments: &[&str]) -> Result<Url, A2AError> {
620    let mut url = ensure_trailing_slash(base_url.clone());
621    {
622        let mut path_segments = url
623            .path_segments_mut()
624            .map_err(|_| A2AError::InvalidRequest("base URL cannot be a base".to_owned()))?;
625        path_segments.pop_if_empty();
626        if let Some(tenant) = tenant {
627            path_segments.push(tenant);
628        }
629        for segment in segments {
630            path_segments.push(segment);
631        }
632    }
633
634    Ok(url)
635}
636
637fn map_jsonrpc_error(error: JsonRpcError) -> A2AError {
638    if let Some(info) = error.first_error_info() {
639        return A2AError::from_error_info(error.code, &error.message, Some(&info));
640    }
641
642    let detail = error
643        .data
644        .as_ref()
645        .and_then(serde_json::Value::as_str)
646        .unwrap_or(&error.message)
647        .to_owned();
648
649    match error.code {
650        TASK_NOT_FOUND => A2AError::TaskNotFound(detail),
651        TASK_NOT_CANCELABLE => A2AError::TaskNotCancelable(detail),
652        PUSH_NOTIFICATION_NOT_SUPPORTED => A2AError::PushNotificationNotSupported(detail),
653        UNSUPPORTED_OPERATION => A2AError::UnsupportedOperation(detail),
654        CONTENT_TYPE_NOT_SUPPORTED => A2AError::ContentTypeNotSupported(detail),
655        INVALID_AGENT_RESPONSE => A2AError::InvalidAgentResponse(detail),
656        EXTENDED_AGENT_CARD_NOT_CONFIGURED => A2AError::ExtendedAgentCardNotConfigured(detail),
657        EXTENSION_SUPPORT_REQUIRED => A2AError::ExtensionSupportRequired(detail),
658        VERSION_NOT_SUPPORTED => A2AError::VersionNotSupported(detail),
659        PARSE_ERROR => A2AError::ParseError(detail),
660        INVALID_REQUEST => A2AError::InvalidRequest(detail),
661        METHOD_NOT_FOUND => A2AError::MethodNotFound(detail),
662        INVALID_PARAMS => A2AError::InvalidParams(detail),
663        INTERNAL_ERROR => A2AError::Internal(detail),
664        code => A2AError::Internal(format!("jsonrpc error {}: {}", code, error.message)),
665    }
666}
667
668fn sse_stream(
669    response: reqwest::Response,
670) -> impl Stream<Item = Result<StreamResponse, A2AError>> + Send {
671    stream::try_unfold(
672        SseState {
673            response,
674            buffer: Vec::new(),
675        },
676        |mut state| async move {
677            loop {
678                if let Some(frame) = take_sse_frame(&mut state.buffer, false)?
679                    && let Some(item) = parse_sse_frame(frame)?
680                {
681                    item.validate()?;
682                    return Ok(Some((item, state)));
683                }
684
685                match state.response.chunk().await? {
686                    Some(chunk) => state.buffer.extend_from_slice(&chunk),
687                    None => match take_sse_frame(&mut state.buffer, true)? {
688                        Some(frame) => {
689                            if let Some(item) = parse_sse_frame(frame)? {
690                                item.validate()?;
691                                return Ok(Some((item, state)));
692                            }
693                        }
694                        None => return Ok(None),
695                    },
696                }
697            }
698        },
699    )
700}
701
702#[derive(Debug)]
703struct SseState {
704    response: reqwest::Response,
705    buffer: Vec<u8>,
706}
707
708fn take_sse_frame(buffer: &mut Vec<u8>, eof: bool) -> Result<Option<Vec<u8>>, A2AError> {
709    if let Some((index, delimiter_len)) = sse_frame_boundary(buffer) {
710        let frame = buffer[..index].to_vec();
711        buffer.drain(..index + delimiter_len);
712        return Ok(Some(frame));
713    }
714
715    if eof && !buffer.is_empty() {
716        return Ok(Some(std::mem::take(buffer)));
717    }
718
719    Ok(None)
720}
721
722fn sse_frame_boundary(buffer: &[u8]) -> Option<(usize, usize)> {
723    for index in 0..buffer.len().saturating_sub(1) {
724        if buffer[index] == b'\n' && buffer[index + 1] == b'\n' {
725            return Some((index, 2));
726        }
727
728        if index + 3 < buffer.len() && &buffer[index..index + 4] == b"\r\n\r\n" {
729            return Some((index, 4));
730        }
731    }
732
733    None
734}
735
736fn parse_sse_frame(frame: Vec<u8>) -> Result<Option<StreamResponse>, A2AError> {
737    let text = String::from_utf8(frame)
738        .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))?;
739    let mut data_lines = Vec::new();
740
741    for line in text.lines() {
742        let line = line.strip_suffix('\r').unwrap_or(line);
743        if line.is_empty() || line.starts_with(':') {
744            continue;
745        }
746
747        if let Some(data) = line.strip_prefix("data:") {
748            let data = data.strip_prefix(' ').unwrap_or(data);
749            data_lines.push(data.to_owned());
750        }
751    }
752
753    if data_lines.is_empty() {
754        return Ok(None);
755    }
756
757    serde_json::from_str::<StreamResponse>(&data_lines.join("\n"))
758        .map(Some)
759        .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))
760}
761
762#[derive(serde::Serialize)]
763#[serde(rename_all = "camelCase")]
764struct GetTaskQuery {
765    #[serde(skip_serializing_if = "Option::is_none")]
766    history_length: Option<i32>,
767}
768
769#[derive(serde::Serialize)]
770#[serde(rename_all = "camelCase")]
771struct ListTasksQuery {
772    #[serde(skip_serializing_if = "Option::is_none")]
773    context_id: Option<String>,
774    #[serde(skip_serializing_if = "Option::is_none")]
775    status: Option<crate::types::TaskState>,
776    #[serde(skip_serializing_if = "Option::is_none")]
777    page_size: Option<i32>,
778    #[serde(skip_serializing_if = "Option::is_none")]
779    page_token: Option<String>,
780    #[serde(skip_serializing_if = "Option::is_none")]
781    history_length: Option<i32>,
782    #[serde(skip_serializing_if = "Option::is_none")]
783    status_timestamp_after: Option<String>,
784    #[serde(skip_serializing_if = "Option::is_none")]
785    include_artifacts: Option<bool>,
786}
787
788#[derive(serde::Serialize)]
789#[serde(rename_all = "camelCase")]
790struct ListTaskPushNotificationConfigsQuery {
791    #[serde(skip_serializing_if = "Option::is_none")]
792    page_size: Option<i32>,
793    #[serde(skip_serializing_if = "Option::is_none")]
794    page_token: Option<String>,
795}
796
797#[derive(serde::Serialize)]
798#[serde(rename_all = "camelCase")]
799struct CancelTaskBody {
800    #[serde(skip_serializing_if = "Option::is_none")]
801    metadata: Option<crate::types::JsonObject>,
802}
803
804#[derive(serde::Deserialize)]
805struct LegacyRestErrorEnvelope {
806    error: JsonRpcError,
807}
808
809#[cfg(test)]
810mod tests {
811    use super::map_jsonrpc_error;
812    use crate::A2AError;
813    use crate::jsonrpc::JsonRpcError;
814    use crate::jsonrpc::{
815        CONTENT_TYPE_NOT_SUPPORTED, EXTENDED_AGENT_CARD_NOT_CONFIGURED, EXTENSION_SUPPORT_REQUIRED,
816        INTERNAL_ERROR, INVALID_AGENT_RESPONSE, INVALID_PARAMS, INVALID_REQUEST, METHOD_NOT_FOUND,
817        PARSE_ERROR, PUSH_NOTIFICATION_NOT_SUPPORTED, TASK_NOT_CANCELABLE, TASK_NOT_FOUND,
818        UNSUPPORTED_OPERATION, VERSION_NOT_SUPPORTED,
819    };
820
821    #[test]
822    fn map_jsonrpc_error_covers_all_protocol_codes() {
823        let cases = [
824            (TASK_NOT_FOUND, "task missing"),
825            (TASK_NOT_CANCELABLE, "task busy"),
826            (PUSH_NOTIFICATION_NOT_SUPPORTED, "push unsupported"),
827            (UNSUPPORTED_OPERATION, "operation unsupported"),
828            (CONTENT_TYPE_NOT_SUPPORTED, "content type unsupported"),
829            (INVALID_AGENT_RESPONSE, "invalid agent response"),
830            (
831                EXTENDED_AGENT_CARD_NOT_CONFIGURED,
832                "extended agent card missing",
833            ),
834            (EXTENSION_SUPPORT_REQUIRED, "extension required"),
835            (VERSION_NOT_SUPPORTED, "version unsupported"),
836            (PARSE_ERROR, "parse error"),
837            (INVALID_REQUEST, "invalid request"),
838            (METHOD_NOT_FOUND, "missing method"),
839            (INVALID_PARAMS, "invalid params"),
840            (INTERNAL_ERROR, "internal error"),
841        ];
842
843        for (code, detail) in cases {
844            let mapped = map_jsonrpc_error(JsonRpcError {
845                code,
846                message: format!("message for {code}"),
847                data: Some(serde_json::Value::String(detail.to_owned())),
848            });
849
850            match code {
851                TASK_NOT_FOUND => {
852                    assert!(matches!(mapped, A2AError::TaskNotFound(value) if value == detail));
853                }
854                TASK_NOT_CANCELABLE => {
855                    assert!(
856                        matches!(mapped, A2AError::TaskNotCancelable(value) if value == detail)
857                    );
858                }
859                PUSH_NOTIFICATION_NOT_SUPPORTED => {
860                    assert!(
861                        matches!(mapped, A2AError::PushNotificationNotSupported(value) if value == detail)
862                    );
863                }
864                UNSUPPORTED_OPERATION => {
865                    assert!(
866                        matches!(mapped, A2AError::UnsupportedOperation(value) if value == detail)
867                    );
868                }
869                CONTENT_TYPE_NOT_SUPPORTED => {
870                    assert!(
871                        matches!(mapped, A2AError::ContentTypeNotSupported(value) if value == detail)
872                    );
873                }
874                INVALID_AGENT_RESPONSE => {
875                    assert!(
876                        matches!(mapped, A2AError::InvalidAgentResponse(value) if value == detail)
877                    );
878                }
879                EXTENDED_AGENT_CARD_NOT_CONFIGURED => {
880                    assert!(
881                        matches!(mapped, A2AError::ExtendedAgentCardNotConfigured(value) if value == detail)
882                    );
883                }
884                EXTENSION_SUPPORT_REQUIRED => {
885                    assert!(
886                        matches!(mapped, A2AError::ExtensionSupportRequired(value) if value == detail)
887                    );
888                }
889                VERSION_NOT_SUPPORTED => {
890                    assert!(
891                        matches!(mapped, A2AError::VersionNotSupported(value) if value == detail)
892                    );
893                }
894                PARSE_ERROR => {
895                    assert!(matches!(mapped, A2AError::ParseError(value) if value == detail));
896                }
897                INVALID_REQUEST => {
898                    assert!(matches!(mapped, A2AError::InvalidRequest(value) if value == detail));
899                }
900                METHOD_NOT_FOUND => {
901                    assert!(matches!(mapped, A2AError::MethodNotFound(value) if value == detail));
902                }
903                INVALID_PARAMS => {
904                    assert!(matches!(mapped, A2AError::InvalidParams(value) if value == detail));
905                }
906                INTERNAL_ERROR => {
907                    assert!(matches!(mapped, A2AError::Internal(value) if value == detail));
908                }
909                _ => unreachable!("all cases should be covered"),
910            }
911        }
912    }
913
914    #[test]
915    fn map_jsonrpc_error_prefers_structured_error_info() {
916        let mapped = map_jsonrpc_error(JsonRpcError {
917            code: TASK_NOT_FOUND,
918            message: "fallback message".to_owned(),
919            data: Some(serde_json::json!({
920                "@type": "type.googleapis.com/google.rpc.ErrorInfo",
921                "reason": "TASK_NOT_FOUND",
922                "domain": "a2a-protocol.org",
923                "metadata": {
924                    "taskId": "task-123"
925                }
926            })),
927        });
928
929        assert!(matches!(mapped, A2AError::TaskNotFound(value) if value == "task-123"));
930    }
931}