1use std::pin::Pin;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4use std::time::Duration;
5
6use futures_core::Stream;
7use futures_util::stream;
8use reqwest::Url;
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11
12use crate::A2AError;
13use crate::error::ProblemDetails;
14use crate::jsonrpc::{
15 CONTENT_TYPE_NOT_SUPPORTED, EXTENDED_AGENT_CARD_NOT_CONFIGURED, EXTENSION_SUPPORT_REQUIRED,
16 INTERNAL_ERROR, INVALID_AGENT_RESPONSE, INVALID_PARAMS, INVALID_REQUEST, JSONRPC_VERSION,
17 JsonRpcError, JsonRpcId, JsonRpcRequest, JsonRpcResponse, METHOD_CANCEL_TASK,
18 METHOD_CREATE_TASK_PUSH_NOTIFICATION_CONFIG, METHOD_DELETE_TASK_PUSH_NOTIFICATION_CONFIG,
19 METHOD_GET_EXTENDED_AGENT_CARD, METHOD_GET_TASK, METHOD_GET_TASK_PUSH_NOTIFICATION_CONFIG,
20 METHOD_LIST_TASK_PUSH_NOTIFICATION_CONFIGS, METHOD_LIST_TASKS, METHOD_NOT_FOUND,
21 METHOD_SEND_MESSAGE, PARSE_ERROR, PROTOCOL_VERSION, PUSH_NOTIFICATION_NOT_SUPPORTED,
22 TASK_NOT_CANCELABLE, TASK_NOT_FOUND, UNSUPPORTED_OPERATION, VERSION_NOT_SUPPORTED,
23};
24use crate::types::{
25 AgentCard, AgentInterface, CancelTaskRequest, DeleteTaskPushNotificationConfigRequest,
26 GetExtendedAgentCardRequest, GetTaskPushNotificationConfigRequest, GetTaskRequest,
27 ListTaskPushNotificationConfigsRequest, ListTaskPushNotificationConfigsResponse,
28 ListTasksRequest, ListTasksResponse, SendMessageRequest, SendMessageResponse, StreamResponse,
29 SubscribeToTaskRequest, Task, TaskPushNotificationConfig,
30};
31
32use super::discovery::{
33 AgentCardDiscovery, AgentCardDiscoveryConfig, ensure_trailing_slash, normalize_base_url,
34 resolve_interface_url,
35};
36
37#[derive(Debug, Clone)]
39pub struct A2AClientConfig {
40 pub discovery_ttl: Duration,
42 pub extensions: Vec<String>,
44}
45
46impl Default for A2AClientConfig {
47 fn default() -> Self {
48 Self {
49 discovery_ttl: Duration::from_secs(300),
50 extensions: Vec::new(),
51 }
52 }
53}
54
55#[derive(Debug)]
56enum TransportEndpoint {
57 JsonRpc(Url),
58 HttpJson(Url),
59}
60
61pub type A2AClientStream =
63 Pin<Box<dyn Stream<Item = Result<StreamResponse, A2AError>> + Send + 'static>>;
64
65#[derive(Debug)]
67pub struct A2AClient {
68 base_url: Url,
69 client: reqwest::Client,
70 discovery: AgentCardDiscovery,
71 config: A2AClientConfig,
72 request_ids: Arc<AtomicU64>,
73}
74
75impl A2AClient {
76 pub fn new(base_url: &str) -> Result<Self, A2AError> {
78 Self::with_config(base_url, A2AClientConfig::default())
79 }
80
81 pub fn with_config(base_url: &str, config: A2AClientConfig) -> Result<Self, A2AError> {
83 Self::with_http_client(base_url, reqwest::Client::new(), config)
84 }
85
86 pub fn with_http_client(
88 base_url: &str,
89 client: reqwest::Client,
90 config: A2AClientConfig,
91 ) -> Result<Self, A2AError> {
92 let base_url = normalize_base_url(base_url)?;
93 let discovery = AgentCardDiscovery::with_http_client(
94 client.clone(),
95 AgentCardDiscoveryConfig {
96 ttl: config.discovery_ttl,
97 },
98 );
99
100 Ok(Self {
101 base_url,
102 client,
103 discovery,
104 config,
105 request_ids: Arc::new(AtomicU64::new(1)),
106 })
107 }
108
109 pub async fn discover_agent_card(&self) -> Result<AgentCard, A2AError> {
111 self.discovery.discover(self.base_url.as_ref()).await
112 }
113
114 pub async fn refresh_agent_card(&self) -> Result<AgentCard, A2AError> {
116 self.discovery.refresh(self.base_url.as_ref()).await
117 }
118
119 pub async fn send_message(
121 &self,
122 request: SendMessageRequest,
123 ) -> Result<SendMessageResponse, A2AError> {
124 request.validate()?;
125
126 let response: SendMessageResponse = match self.transport().await? {
127 TransportEndpoint::JsonRpc(url) => {
128 self.jsonrpc_call(&url, METHOD_SEND_MESSAGE, &request)
129 .await?
130 }
131 TransportEndpoint::HttpJson(base_url) => {
132 let url = rest_url(&base_url, request.tenant.as_deref(), &["message:send"])?;
133 self.read_json_response(
134 self.apply_protocol_headers(self.client.post(url))
135 .json(&request)
136 .send()
137 .await?,
138 )
139 .await?
140 }
141 };
142
143 response.validate()?;
144 Ok(response)
145 }
146
147 pub async fn send_streaming_message(
149 &self,
150 request: SendMessageRequest,
151 ) -> Result<A2AClientStream, A2AError> {
152 request.validate()?;
153
154 let base_url = self.http_json_transport().await?;
155 let url = rest_url(&base_url, request.tenant.as_deref(), &["message:stream"])?;
156 let response = self
157 .apply_protocol_headers(
158 self.client
159 .post(url)
160 .header(reqwest::header::ACCEPT, "text/event-stream"),
161 )
162 .json(&request)
163 .send()
164 .await?;
165
166 self.read_sse_response(response).await
167 }
168
169 pub async fn get_task(&self, request: GetTaskRequest) -> Result<Task, A2AError> {
171 match self.transport().await? {
172 TransportEndpoint::JsonRpc(url) => {
173 self.jsonrpc_call(&url, METHOD_GET_TASK, &request).await
174 }
175 TransportEndpoint::HttpJson(base_url) => {
176 let url = rest_url(
177 &base_url,
178 request.tenant.as_deref(),
179 &["tasks", &request.id],
180 )?;
181 self.read_json_response(
182 self.apply_protocol_headers(self.client.get(url))
183 .query(&GetTaskQuery {
184 history_length: request.history_length,
185 })
186 .send()
187 .await?,
188 )
189 .await
190 }
191 }
192 }
193
194 pub async fn list_tasks(
196 &self,
197 request: ListTasksRequest,
198 ) -> Result<ListTasksResponse, A2AError> {
199 request.validate()?;
200
201 match self.transport().await? {
202 TransportEndpoint::JsonRpc(url) => {
203 self.jsonrpc_call(&url, METHOD_LIST_TASKS, &request).await
204 }
205 TransportEndpoint::HttpJson(base_url) => {
206 let url = rest_url(&base_url, request.tenant.as_deref(), &["tasks"])?;
207 self.read_json_response(
208 self.apply_protocol_headers(self.client.get(url))
209 .query(&ListTasksQuery {
210 context_id: request.context_id,
211 status: request.status,
212 page_size: request.page_size,
213 page_token: request.page_token,
214 history_length: request.history_length,
215 status_timestamp_after: request.status_timestamp_after,
216 include_artifacts: request.include_artifacts,
217 })
218 .send()
219 .await?,
220 )
221 .await
222 }
223 }
224 }
225
226 pub async fn cancel_task(&self, request: CancelTaskRequest) -> Result<Task, A2AError> {
228 match self.transport().await? {
229 TransportEndpoint::JsonRpc(url) => {
230 self.jsonrpc_call(&url, METHOD_CANCEL_TASK, &request).await
231 }
232 TransportEndpoint::HttpJson(base_url) => {
233 let cancel_segment = format!("{}:cancel", request.id);
234 let url = rest_url(
235 &base_url,
236 request.tenant.as_deref(),
237 &["tasks", &cancel_segment],
238 )?;
239 let builder = self.apply_protocol_headers(self.client.post(url));
240 let builder = if let Some(metadata) = &request.metadata {
241 builder.json(&CancelTaskBody {
242 metadata: Some(metadata.clone()),
243 })
244 } else {
245 builder
246 };
247 self.read_json_response(builder.send().await?).await
248 }
249 }
250 }
251
252 pub async fn get_extended_agent_card(
254 &self,
255 request: GetExtendedAgentCardRequest,
256 ) -> Result<AgentCard, A2AError> {
257 match self.transport().await? {
258 TransportEndpoint::JsonRpc(url) => {
259 self.jsonrpc_call(&url, METHOD_GET_EXTENDED_AGENT_CARD, &request)
260 .await
261 }
262 TransportEndpoint::HttpJson(base_url) => {
263 let url = rest_url(&base_url, request.tenant.as_deref(), &["extendedAgentCard"])?;
264 self.read_json_response(
265 self.apply_protocol_headers(self.client.get(url))
266 .send()
267 .await?,
268 )
269 .await
270 }
271 }
272 }
273
274 pub async fn create_task_push_notification_config(
276 &self,
277 request: TaskPushNotificationConfig,
278 ) -> Result<TaskPushNotificationConfig, A2AError> {
279 match self.transport().await? {
280 TransportEndpoint::JsonRpc(url) => {
281 self.jsonrpc_call(&url, METHOD_CREATE_TASK_PUSH_NOTIFICATION_CONFIG, &request)
282 .await
283 }
284 TransportEndpoint::HttpJson(base_url) => {
285 let url = rest_url(
286 &base_url,
287 request.tenant.as_deref(),
288 &["tasks", &request.task_id, "pushNotificationConfigs"],
289 )?;
290 self.read_json_response(
291 self.apply_protocol_headers(self.client.post(url))
292 .json(&request)
293 .send()
294 .await?,
295 )
296 .await
297 }
298 }
299 }
300
301 pub async fn get_task_push_notification_config(
303 &self,
304 request: GetTaskPushNotificationConfigRequest,
305 ) -> Result<TaskPushNotificationConfig, A2AError> {
306 match self.transport().await? {
307 TransportEndpoint::JsonRpc(url) => {
308 self.jsonrpc_call(&url, METHOD_GET_TASK_PUSH_NOTIFICATION_CONFIG, &request)
309 .await
310 }
311 TransportEndpoint::HttpJson(base_url) => {
312 let url = rest_url(
313 &base_url,
314 request.tenant.as_deref(),
315 &[
316 "tasks",
317 &request.task_id,
318 "pushNotificationConfigs",
319 &request.id,
320 ],
321 )?;
322 self.read_json_response(
323 self.apply_protocol_headers(self.client.get(url))
324 .send()
325 .await?,
326 )
327 .await
328 }
329 }
330 }
331
332 pub async fn list_task_push_notification_configs(
334 &self,
335 request: ListTaskPushNotificationConfigsRequest,
336 ) -> Result<ListTaskPushNotificationConfigsResponse, A2AError> {
337 request.validate()?;
338
339 match self.transport().await? {
340 TransportEndpoint::JsonRpc(url) => {
341 self.jsonrpc_call(&url, METHOD_LIST_TASK_PUSH_NOTIFICATION_CONFIGS, &request)
342 .await
343 }
344 TransportEndpoint::HttpJson(base_url) => {
345 let url = rest_url(
346 &base_url,
347 request.tenant.as_deref(),
348 &["tasks", &request.task_id, "pushNotificationConfigs"],
349 )?;
350 self.read_json_response(
351 self.apply_protocol_headers(self.client.get(url))
352 .query(&ListTaskPushNotificationConfigsQuery {
353 page_size: request.page_size,
354 page_token: request.page_token,
355 })
356 .send()
357 .await?,
358 )
359 .await
360 }
361 }
362 }
363
364 pub async fn delete_task_push_notification_config(
366 &self,
367 request: DeleteTaskPushNotificationConfigRequest,
368 ) -> Result<(), A2AError> {
369 match self.transport().await? {
370 TransportEndpoint::JsonRpc(url) => self
371 .jsonrpc_call::<_, serde_json::Value>(
372 &url,
373 METHOD_DELETE_TASK_PUSH_NOTIFICATION_CONFIG,
374 &request,
375 )
376 .await
377 .map(|_| ()),
378 TransportEndpoint::HttpJson(base_url) => {
379 let url = rest_url(
380 &base_url,
381 request.tenant.as_deref(),
382 &[
383 "tasks",
384 &request.task_id,
385 "pushNotificationConfigs",
386 &request.id,
387 ],
388 )?;
389 self.read_json_response::<serde_json::Value>(
390 self.apply_protocol_headers(self.client.delete(url))
391 .send()
392 .await?,
393 )
394 .await
395 .map(|_| ())
396 }
397 }
398 }
399
400 pub async fn subscribe_to_task(
402 &self,
403 request: SubscribeToTaskRequest,
404 ) -> Result<A2AClientStream, A2AError> {
405 let base_url = self.http_json_transport().await?;
406 let subscribe_segment = format!("{}:subscribe", request.id);
407 let url = rest_url(
408 &base_url,
409 request.tenant.as_deref(),
410 &["tasks", &subscribe_segment],
411 )?;
412 let response = self
413 .apply_protocol_headers(
414 self.client
415 .get(url)
416 .header(reqwest::header::ACCEPT, "text/event-stream"),
417 )
418 .send()
419 .await?;
420
421 self.read_sse_response(response).await
422 }
423
424 async fn transport(&self) -> Result<TransportEndpoint, A2AError> {
425 let card = self.discover_agent_card().await?;
426 select_transport(&self.base_url, &card.supported_interfaces)
427 }
428
429 async fn http_json_transport(&self) -> Result<Url, A2AError> {
430 let card = self.discover_agent_card().await?;
431 select_http_json_transport(&self.base_url, &card.supported_interfaces)
432 }
433
434 async fn jsonrpc_call<P, R>(&self, url: &Url, method: &str, params: &P) -> Result<R, A2AError>
435 where
436 P: Serialize,
437 R: DeserializeOwned,
438 {
439 let id = JsonRpcId::String(format!(
440 "req-{}",
441 self.request_ids.fetch_add(1, Ordering::Relaxed)
442 ));
443 let request = JsonRpcRequest {
444 jsonrpc: JSONRPC_VERSION.to_owned(),
445 method: method.to_owned(),
446 params: Some(serde_json::to_value(params)?),
447 id: id.clone(),
448 };
449
450 let response = self
451 .apply_protocol_headers(self.client.post(url.clone()))
452 .json(&request)
453 .send()
454 .await?;
455 let bytes = response.bytes().await?;
456 let envelope: JsonRpcResponse = serde_json::from_slice(&bytes)
457 .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))?;
458
459 if envelope.jsonrpc != JSONRPC_VERSION {
460 return Err(A2AError::InvalidAgentResponse(
461 "jsonrpc must be \"2.0\"".to_owned(),
462 ));
463 }
464
465 if envelope.id != id {
466 return Err(A2AError::InvalidAgentResponse(
467 "response id did not match request id".to_owned(),
468 ));
469 }
470
471 match (envelope.result, envelope.error) {
472 (Some(result), None) => serde_json::from_value(result)
473 .map_err(|error| A2AError::InvalidAgentResponse(error.to_string())),
474 (None, Some(error)) => Err(map_jsonrpc_error(error)),
475 _ => Err(A2AError::InvalidAgentResponse(
476 "response must contain exactly one of result or error".to_owned(),
477 )),
478 }
479 }
480
481 async fn read_json_response<T>(&self, response: reqwest::Response) -> Result<T, A2AError>
482 where
483 T: DeserializeOwned,
484 {
485 let status = response.status();
486 let bytes = response.bytes().await?;
487
488 if status.is_success() {
489 return serde_json::from_slice(&bytes)
490 .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()));
491 }
492
493 if let Ok(problem) = serde_json::from_slice::<ProblemDetails>(&bytes) {
494 return Err(problem.to_a2a_error());
495 }
496
497 if let Ok(error) = serde_json::from_slice::<LegacyRestErrorEnvelope>(&bytes) {
498 return Err(map_jsonrpc_error(error.error));
499 }
500
501 Err(A2AError::InvalidAgentResponse(format!(
502 "unexpected HTTP status {}",
503 status
504 )))
505 }
506
507 async fn read_sse_response(
508 &self,
509 response: reqwest::Response,
510 ) -> Result<A2AClientStream, A2AError> {
511 let status = response.status();
512 if !status.is_success() {
513 let bytes = response.bytes().await?;
514 if let Ok(problem) = serde_json::from_slice::<ProblemDetails>(&bytes) {
515 return Err(problem.to_a2a_error());
516 }
517
518 if let Ok(error) = serde_json::from_slice::<LegacyRestErrorEnvelope>(&bytes) {
519 return Err(map_jsonrpc_error(error.error));
520 }
521
522 return Err(A2AError::InvalidAgentResponse(format!(
523 "unexpected HTTP status {}",
524 status
525 )));
526 }
527
528 Ok(Box::pin(sse_stream(response)))
529 }
530
531 fn apply_protocol_headers(&self, builder: reqwest::RequestBuilder) -> reqwest::RequestBuilder {
532 let mut builder = builder.header("A2A-Version", PROTOCOL_VERSION);
533 if !self.config.extensions.is_empty() {
534 builder = builder.header("A2A-Extensions", self.config.extensions.join(","));
535 }
536
537 builder
538 }
539}
540
541fn select_transport(
542 base_url: &Url,
543 interfaces: &[AgentInterface],
544) -> Result<TransportEndpoint, A2AError> {
545 let mut advertised_versions = Vec::new();
546
547 for interface in interfaces {
548 if !interface
549 .protocol_version
550 .eq_ignore_ascii_case(PROTOCOL_VERSION)
551 {
552 advertised_versions.push(interface.protocol_version.clone());
553 continue;
554 }
555
556 if interface.protocol_binding.eq_ignore_ascii_case("JSONRPC") {
557 return resolve_interface_url(base_url, &interface.url).map(TransportEndpoint::JsonRpc);
558 }
559
560 if interface.protocol_binding.eq_ignore_ascii_case("HTTP+JSON") {
561 return resolve_interface_url(base_url, &interface.url)
562 .map(ensure_trailing_slash)
563 .map(TransportEndpoint::HttpJson);
564 }
565 }
566
567 if !advertised_versions.is_empty() {
568 advertised_versions.sort();
569 advertised_versions.dedup();
570 return Err(A2AError::VersionNotSupported(format!(
571 "client supports A2A-Version {}, agent advertised {}",
572 PROTOCOL_VERSION,
573 advertised_versions.join(", ")
574 )));
575 }
576
577 Err(A2AError::InvalidAgentResponse(
578 "agent card does not advertise a supported interface".to_owned(),
579 ))
580}
581
582fn select_http_json_transport(
583 base_url: &Url,
584 interfaces: &[AgentInterface],
585) -> Result<Url, A2AError> {
586 let mut advertised_versions = Vec::new();
587
588 for interface in interfaces {
589 if !interface.protocol_binding.eq_ignore_ascii_case("HTTP+JSON") {
590 continue;
591 }
592
593 if !interface
594 .protocol_version
595 .eq_ignore_ascii_case(PROTOCOL_VERSION)
596 {
597 advertised_versions.push(interface.protocol_version.clone());
598 continue;
599 }
600
601 return resolve_interface_url(base_url, &interface.url).map(ensure_trailing_slash);
602 }
603
604 if !advertised_versions.is_empty() {
605 advertised_versions.sort();
606 advertised_versions.dedup();
607 return Err(A2AError::VersionNotSupported(format!(
608 "client supports A2A-Version {}, agent advertised HTTP+JSON {}",
609 PROTOCOL_VERSION,
610 advertised_versions.join(", ")
611 )));
612 }
613
614 Err(A2AError::InvalidAgentResponse(
615 "agent card does not advertise an HTTP+JSON interface".to_owned(),
616 ))
617}
618
619fn rest_url(base_url: &Url, tenant: Option<&str>, segments: &[&str]) -> Result<Url, A2AError> {
620 let mut url = ensure_trailing_slash(base_url.clone());
621 {
622 let mut path_segments = url
623 .path_segments_mut()
624 .map_err(|_| A2AError::InvalidRequest("base URL cannot be a base".to_owned()))?;
625 path_segments.pop_if_empty();
626 if let Some(tenant) = tenant {
627 path_segments.push(tenant);
628 }
629 for segment in segments {
630 path_segments.push(segment);
631 }
632 }
633
634 Ok(url)
635}
636
637fn map_jsonrpc_error(error: JsonRpcError) -> A2AError {
638 if let Some(info) = error.first_error_info() {
639 return A2AError::from_error_info(error.code, &error.message, Some(&info));
640 }
641
642 let detail = error
643 .data
644 .as_ref()
645 .and_then(serde_json::Value::as_str)
646 .unwrap_or(&error.message)
647 .to_owned();
648
649 match error.code {
650 TASK_NOT_FOUND => A2AError::TaskNotFound(detail),
651 TASK_NOT_CANCELABLE => A2AError::TaskNotCancelable(detail),
652 PUSH_NOTIFICATION_NOT_SUPPORTED => A2AError::PushNotificationNotSupported(detail),
653 UNSUPPORTED_OPERATION => A2AError::UnsupportedOperation(detail),
654 CONTENT_TYPE_NOT_SUPPORTED => A2AError::ContentTypeNotSupported(detail),
655 INVALID_AGENT_RESPONSE => A2AError::InvalidAgentResponse(detail),
656 EXTENDED_AGENT_CARD_NOT_CONFIGURED => A2AError::ExtendedAgentCardNotConfigured(detail),
657 EXTENSION_SUPPORT_REQUIRED => A2AError::ExtensionSupportRequired(detail),
658 VERSION_NOT_SUPPORTED => A2AError::VersionNotSupported(detail),
659 PARSE_ERROR => A2AError::ParseError(detail),
660 INVALID_REQUEST => A2AError::InvalidRequest(detail),
661 METHOD_NOT_FOUND => A2AError::MethodNotFound(detail),
662 INVALID_PARAMS => A2AError::InvalidParams(detail),
663 INTERNAL_ERROR => A2AError::Internal(detail),
664 code => A2AError::Internal(format!("jsonrpc error {}: {}", code, error.message)),
665 }
666}
667
668fn sse_stream(
669 response: reqwest::Response,
670) -> impl Stream<Item = Result<StreamResponse, A2AError>> + Send {
671 stream::try_unfold(
672 SseState {
673 response,
674 buffer: Vec::new(),
675 },
676 |mut state| async move {
677 loop {
678 if let Some(frame) = take_sse_frame(&mut state.buffer, false)?
679 && let Some(item) = parse_sse_frame(frame)?
680 {
681 item.validate()?;
682 return Ok(Some((item, state)));
683 }
684
685 match state.response.chunk().await? {
686 Some(chunk) => state.buffer.extend_from_slice(&chunk),
687 None => match take_sse_frame(&mut state.buffer, true)? {
688 Some(frame) => {
689 if let Some(item) = parse_sse_frame(frame)? {
690 item.validate()?;
691 return Ok(Some((item, state)));
692 }
693 }
694 None => return Ok(None),
695 },
696 }
697 }
698 },
699 )
700}
701
702#[derive(Debug)]
703struct SseState {
704 response: reqwest::Response,
705 buffer: Vec<u8>,
706}
707
708fn take_sse_frame(buffer: &mut Vec<u8>, eof: bool) -> Result<Option<Vec<u8>>, A2AError> {
709 if let Some((index, delimiter_len)) = sse_frame_boundary(buffer) {
710 let frame = buffer[..index].to_vec();
711 buffer.drain(..index + delimiter_len);
712 return Ok(Some(frame));
713 }
714
715 if eof && !buffer.is_empty() {
716 return Ok(Some(std::mem::take(buffer)));
717 }
718
719 Ok(None)
720}
721
722fn sse_frame_boundary(buffer: &[u8]) -> Option<(usize, usize)> {
723 for index in 0..buffer.len().saturating_sub(1) {
724 if buffer[index] == b'\n' && buffer[index + 1] == b'\n' {
725 return Some((index, 2));
726 }
727
728 if index + 3 < buffer.len() && &buffer[index..index + 4] == b"\r\n\r\n" {
729 return Some((index, 4));
730 }
731 }
732
733 None
734}
735
736fn parse_sse_frame(frame: Vec<u8>) -> Result<Option<StreamResponse>, A2AError> {
737 let text = String::from_utf8(frame)
738 .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))?;
739 let mut data_lines = Vec::new();
740
741 for line in text.lines() {
742 let line = line.strip_suffix('\r').unwrap_or(line);
743 if line.is_empty() || line.starts_with(':') {
744 continue;
745 }
746
747 if let Some(data) = line.strip_prefix("data:") {
748 let data = data.strip_prefix(' ').unwrap_or(data);
749 data_lines.push(data.to_owned());
750 }
751 }
752
753 if data_lines.is_empty() {
754 return Ok(None);
755 }
756
757 serde_json::from_str::<StreamResponse>(&data_lines.join("\n"))
758 .map(Some)
759 .map_err(|error| A2AError::InvalidAgentResponse(error.to_string()))
760}
761
762#[derive(serde::Serialize)]
763#[serde(rename_all = "camelCase")]
764struct GetTaskQuery {
765 #[serde(skip_serializing_if = "Option::is_none")]
766 history_length: Option<i32>,
767}
768
769#[derive(serde::Serialize)]
770#[serde(rename_all = "camelCase")]
771struct ListTasksQuery {
772 #[serde(skip_serializing_if = "Option::is_none")]
773 context_id: Option<String>,
774 #[serde(skip_serializing_if = "Option::is_none")]
775 status: Option<crate::types::TaskState>,
776 #[serde(skip_serializing_if = "Option::is_none")]
777 page_size: Option<i32>,
778 #[serde(skip_serializing_if = "Option::is_none")]
779 page_token: Option<String>,
780 #[serde(skip_serializing_if = "Option::is_none")]
781 history_length: Option<i32>,
782 #[serde(skip_serializing_if = "Option::is_none")]
783 status_timestamp_after: Option<String>,
784 #[serde(skip_serializing_if = "Option::is_none")]
785 include_artifacts: Option<bool>,
786}
787
788#[derive(serde::Serialize)]
789#[serde(rename_all = "camelCase")]
790struct ListTaskPushNotificationConfigsQuery {
791 #[serde(skip_serializing_if = "Option::is_none")]
792 page_size: Option<i32>,
793 #[serde(skip_serializing_if = "Option::is_none")]
794 page_token: Option<String>,
795}
796
797#[derive(serde::Serialize)]
798#[serde(rename_all = "camelCase")]
799struct CancelTaskBody {
800 #[serde(skip_serializing_if = "Option::is_none")]
801 metadata: Option<crate::types::JsonObject>,
802}
803
804#[derive(serde::Deserialize)]
805struct LegacyRestErrorEnvelope {
806 error: JsonRpcError,
807}
808
809#[cfg(test)]
810mod tests {
811 use super::map_jsonrpc_error;
812 use crate::A2AError;
813 use crate::jsonrpc::JsonRpcError;
814 use crate::jsonrpc::{
815 CONTENT_TYPE_NOT_SUPPORTED, EXTENDED_AGENT_CARD_NOT_CONFIGURED, EXTENSION_SUPPORT_REQUIRED,
816 INTERNAL_ERROR, INVALID_AGENT_RESPONSE, INVALID_PARAMS, INVALID_REQUEST, METHOD_NOT_FOUND,
817 PARSE_ERROR, PUSH_NOTIFICATION_NOT_SUPPORTED, TASK_NOT_CANCELABLE, TASK_NOT_FOUND,
818 UNSUPPORTED_OPERATION, VERSION_NOT_SUPPORTED,
819 };
820
821 #[test]
822 fn map_jsonrpc_error_covers_all_protocol_codes() {
823 let cases = [
824 (TASK_NOT_FOUND, "task missing"),
825 (TASK_NOT_CANCELABLE, "task busy"),
826 (PUSH_NOTIFICATION_NOT_SUPPORTED, "push unsupported"),
827 (UNSUPPORTED_OPERATION, "operation unsupported"),
828 (CONTENT_TYPE_NOT_SUPPORTED, "content type unsupported"),
829 (INVALID_AGENT_RESPONSE, "invalid agent response"),
830 (
831 EXTENDED_AGENT_CARD_NOT_CONFIGURED,
832 "extended agent card missing",
833 ),
834 (EXTENSION_SUPPORT_REQUIRED, "extension required"),
835 (VERSION_NOT_SUPPORTED, "version unsupported"),
836 (PARSE_ERROR, "parse error"),
837 (INVALID_REQUEST, "invalid request"),
838 (METHOD_NOT_FOUND, "missing method"),
839 (INVALID_PARAMS, "invalid params"),
840 (INTERNAL_ERROR, "internal error"),
841 ];
842
843 for (code, detail) in cases {
844 let mapped = map_jsonrpc_error(JsonRpcError {
845 code,
846 message: format!("message for {code}"),
847 data: Some(serde_json::Value::String(detail.to_owned())),
848 });
849
850 match code {
851 TASK_NOT_FOUND => {
852 assert!(matches!(mapped, A2AError::TaskNotFound(value) if value == detail));
853 }
854 TASK_NOT_CANCELABLE => {
855 assert!(
856 matches!(mapped, A2AError::TaskNotCancelable(value) if value == detail)
857 );
858 }
859 PUSH_NOTIFICATION_NOT_SUPPORTED => {
860 assert!(
861 matches!(mapped, A2AError::PushNotificationNotSupported(value) if value == detail)
862 );
863 }
864 UNSUPPORTED_OPERATION => {
865 assert!(
866 matches!(mapped, A2AError::UnsupportedOperation(value) if value == detail)
867 );
868 }
869 CONTENT_TYPE_NOT_SUPPORTED => {
870 assert!(
871 matches!(mapped, A2AError::ContentTypeNotSupported(value) if value == detail)
872 );
873 }
874 INVALID_AGENT_RESPONSE => {
875 assert!(
876 matches!(mapped, A2AError::InvalidAgentResponse(value) if value == detail)
877 );
878 }
879 EXTENDED_AGENT_CARD_NOT_CONFIGURED => {
880 assert!(
881 matches!(mapped, A2AError::ExtendedAgentCardNotConfigured(value) if value == detail)
882 );
883 }
884 EXTENSION_SUPPORT_REQUIRED => {
885 assert!(
886 matches!(mapped, A2AError::ExtensionSupportRequired(value) if value == detail)
887 );
888 }
889 VERSION_NOT_SUPPORTED => {
890 assert!(
891 matches!(mapped, A2AError::VersionNotSupported(value) if value == detail)
892 );
893 }
894 PARSE_ERROR => {
895 assert!(matches!(mapped, A2AError::ParseError(value) if value == detail));
896 }
897 INVALID_REQUEST => {
898 assert!(matches!(mapped, A2AError::InvalidRequest(value) if value == detail));
899 }
900 METHOD_NOT_FOUND => {
901 assert!(matches!(mapped, A2AError::MethodNotFound(value) if value == detail));
902 }
903 INVALID_PARAMS => {
904 assert!(matches!(mapped, A2AError::InvalidParams(value) if value == detail));
905 }
906 INTERNAL_ERROR => {
907 assert!(matches!(mapped, A2AError::Internal(value) if value == detail));
908 }
909 _ => unreachable!("all cases should be covered"),
910 }
911 }
912 }
913
914 #[test]
915 fn map_jsonrpc_error_prefers_structured_error_info() {
916 let mapped = map_jsonrpc_error(JsonRpcError {
917 code: TASK_NOT_FOUND,
918 message: "fallback message".to_owned(),
919 data: Some(serde_json::json!({
920 "@type": "type.googleapis.com/google.rpc.ErrorInfo",
921 "reason": "TASK_NOT_FOUND",
922 "domain": "a2a-protocol.org",
923 "metadata": {
924 "taskId": "task-123"
925 }
926 })),
927 });
928
929 assert!(matches!(mapped, A2AError::TaskNotFound(value) if value == "task-123"));
930 }
931}