1pub mod discovery;
5pub mod grpc_transport;
7pub mod jsonrpc_transport;
9pub mod rest_transport;
11pub mod sse;
13
14pub use discovery::discover_agent_card;
15#[cfg(feature = "grpc-client")]
16pub use grpc_transport::GrpcTransport;
17pub use jsonrpc_transport::JsonRpcTransport;
18pub use rest_transport::RestTransport;
19
20use std::pin::Pin;
21use std::sync::Arc;
22
23use futures::Stream;
24use url::Url;
25
26use crate::agent_card::AgentCard;
27use crate::error::A2aError;
28use crate::jsonrpc;
29use crate::params::*;
30use crate::push_notification::TaskPushNotificationConfig;
31use crate::streaming::{SendMessageResponse, StreamResponse};
32use crate::task::Task;
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum Transport {
37 JsonRpc,
39 Rest,
41 Grpc,
43}
44
45pub struct A2aClient {
47 transport: Transport,
48 jsonrpc: Option<Arc<JsonRpcTransport>>,
49 rest: Option<Arc<RestTransport>>,
50 #[cfg(feature = "grpc-client")]
51 grpc: Option<Arc<tokio::sync::Mutex<GrpcTransport>>>,
52}
53
54impl A2aClient {
55 pub fn new_jsonrpc(base_url: Url) -> Self {
57 let client = reqwest::Client::new();
58 Self {
59 transport: Transport::JsonRpc,
60 jsonrpc: Some(Arc::new(JsonRpcTransport::new(base_url, client, None))),
61 rest: None,
62 #[cfg(feature = "grpc-client")]
63 grpc: None,
64 }
65 }
66
67 pub fn new_rest(base_url: Url) -> Self {
69 let client = reqwest::Client::new();
70 Self {
71 transport: Transport::Rest,
72 jsonrpc: None,
73 rest: Some(Arc::new(RestTransport::new(base_url, client, None))),
74 #[cfg(feature = "grpc-client")]
75 grpc: None,
76 }
77 }
78
79 #[cfg(feature = "grpc-client")]
81 pub async fn new_grpc(endpoint: &str) -> Result<Self, A2aError> {
82 let transport = GrpcTransport::connect(endpoint).await?;
83 Ok(Self {
84 transport: Transport::Grpc,
85 jsonrpc: None,
86 rest: None,
87 grpc: Some(Arc::new(tokio::sync::Mutex::new(transport))),
88 })
89 }
90
91 pub fn with_bearer_token(self, token: &str) -> Self {
95 let token = token.to_string();
96 match self.transport {
97 Transport::JsonRpc => {
98 if let Some(t) = &self.jsonrpc {
99 let new_transport = JsonRpcTransport::new(
100 t.base_url().clone(),
101 t.http_client().clone(),
102 Some(token),
103 );
104 Self {
105 jsonrpc: Some(Arc::new(new_transport)),
106 ..self
107 }
108 } else {
109 self
110 }
111 }
112 Transport::Rest => {
113 if let Some(t) = &self.rest {
114 let new_transport = RestTransport::new(
115 t.base_url().clone(),
116 t.http_client().clone(),
117 Some(token),
118 );
119 Self {
120 rest: Some(Arc::new(new_transport)),
121 ..self
122 }
123 } else {
124 self
125 }
126 }
127 Transport::Grpc => {
128 tracing::warn!(
130 "Bearer token on existing gRPC transport not supported; pass token at connect time"
131 );
132 self
133 }
134 }
135 }
136
137 pub async fn discover(base_url: &str) -> Result<AgentCard, A2aError> {
139 discover_agent_card(base_url).await
140 }
141
142 pub async fn send_message(
144 &self,
145 req: SendMessageRequest,
146 ) -> Result<SendMessageResponse, A2aError> {
147 match self.transport {
148 Transport::JsonRpc => {
149 let t = self
150 .jsonrpc
151 .as_ref()
152 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
153 let params = serde_json::to_value(&req)?;
154 let result = t.call(jsonrpc::METHOD_MESSAGE_SEND, params).await?;
155 serde_json::from_value(result).map_err(Into::into)
156 }
157 Transport::Rest => {
158 let t = self
159 .rest
160 .as_ref()
161 .ok_or_else(|| A2aError::internal("No REST transport"))?;
162 let result = t.post("/message:send", &req).await?;
163 serde_json::from_value(result).map_err(Into::into)
164 }
165 #[cfg(feature = "grpc-client")]
166 Transport::Grpc => {
167 let t = self
168 .grpc
169 .as_ref()
170 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
171 let mut guard = t.lock().await;
172 guard.send_message(req).await
173 }
174 #[cfg(not(feature = "grpc-client"))]
175 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
176 }
177 }
178
179 pub fn stream_message(
181 &self,
182 req: SendMessageRequest,
183 ) -> Pin<Box<dyn Stream<Item = Result<StreamResponse, A2aError>> + Send>> {
184 match self.transport {
185 Transport::JsonRpc => {
186 if let Some(t) = &self.jsonrpc {
187 let params = serde_json::to_value(&req).unwrap_or_default();
188 t.call_stream(jsonrpc::METHOD_MESSAGE_STREAM, params)
189 } else {
190 Box::pin(futures::stream::once(async {
191 Err(A2aError::internal("No JSON-RPC transport"))
192 }))
193 }
194 }
195 Transport::Rest => {
196 if let Some(t) = &self.rest {
197 let body = serde_json::to_value(&req).unwrap_or_default();
198 t.post_stream("/message:stream", body)
199 } else {
200 Box::pin(futures::stream::once(async {
201 Err(A2aError::internal("No REST transport"))
202 }))
203 }
204 }
205 #[cfg(feature = "grpc-client")]
206 Transport::Grpc => {
207 if let Some(t) = &self.grpc {
208 let grpc = t.clone();
209 Box::pin(async_stream::stream! {
210 let inner = {
211 let mut guard = grpc.lock().await;
212 guard.send_streaming_message(req).await
213 }; match inner {
215 Ok(mut stream) => {
216 use futures::StreamExt;
217 while let Some(item) = stream.next().await {
218 yield item;
219 }
220 }
221 Err(e) => yield Err(e),
222 }
223 })
224 } else {
225 Box::pin(futures::stream::once(async {
226 Err(A2aError::internal("No gRPC transport"))
227 }))
228 }
229 }
230 #[cfg(not(feature = "grpc-client"))]
231 Transport::Grpc => Box::pin(futures::stream::once(async {
232 Err(A2aError::unsupported_operation("gRPC not enabled"))
233 })),
234 }
235 }
236
237 pub async fn get_task(&self, req: GetTaskRequest) -> Result<Task, A2aError> {
239 match self.transport {
240 Transport::JsonRpc => {
241 let t = self
242 .jsonrpc
243 .as_ref()
244 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
245 let params = serde_json::to_value(&req)?;
246 let result = t.call(jsonrpc::METHOD_TASKS_GET, params).await?;
247 serde_json::from_value(result).map_err(Into::into)
248 }
249 Transport::Rest => {
250 let t = self
251 .rest
252 .as_ref()
253 .ok_or_else(|| A2aError::internal("No REST transport"))?;
254 let result = t.get(&format!("/tasks/{}", req.id)).await?;
255 serde_json::from_value(result).map_err(Into::into)
256 }
257 #[cfg(feature = "grpc-client")]
258 Transport::Grpc => {
259 let t = self
260 .grpc
261 .as_ref()
262 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
263 let mut guard = t.lock().await;
264 guard.get_task(req).await
265 }
266 #[cfg(not(feature = "grpc-client"))]
267 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
268 }
269 }
270
271 pub async fn list_tasks(&self, req: ListTasksRequest) -> Result<ListTasksResponse, A2aError> {
273 match self.transport {
274 Transport::JsonRpc => {
275 let t = self
276 .jsonrpc
277 .as_ref()
278 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
279 let params = serde_json::to_value(&req)?;
280 let result = t.call(jsonrpc::METHOD_TASKS_LIST, params).await?;
281 serde_json::from_value(result).map_err(Into::into)
282 }
283 Transport::Rest => {
284 let t = self
285 .rest
286 .as_ref()
287 .ok_or_else(|| A2aError::internal("No REST transport"))?;
288 let result = t.get("/tasks").await?;
289 serde_json::from_value(result).map_err(Into::into)
290 }
291 #[cfg(feature = "grpc-client")]
292 Transport::Grpc => {
293 let t = self
294 .grpc
295 .as_ref()
296 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
297 let mut guard = t.lock().await;
298 guard.list_tasks(req).await
299 }
300 #[cfg(not(feature = "grpc-client"))]
301 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
302 }
303 }
304
305 pub async fn cancel_task(&self, req: CancelTaskRequest) -> Result<Task, A2aError> {
307 match self.transport {
308 Transport::JsonRpc => {
309 let t = self
310 .jsonrpc
311 .as_ref()
312 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
313 let params = serde_json::to_value(&req)?;
314 let result = t.call(jsonrpc::METHOD_TASKS_CANCEL, params).await?;
315 serde_json::from_value(result).map_err(Into::into)
316 }
317 Transport::Rest => {
318 let t = self
319 .rest
320 .as_ref()
321 .ok_or_else(|| A2aError::internal("No REST transport"))?;
322 let result = t.post(&format!("/tasks/{}:cancel", req.id), &req).await?;
323 serde_json::from_value(result).map_err(Into::into)
324 }
325 #[cfg(feature = "grpc-client")]
326 Transport::Grpc => {
327 let t = self
328 .grpc
329 .as_ref()
330 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
331 let mut guard = t.lock().await;
332 guard.cancel_task(req).await
333 }
334 #[cfg(not(feature = "grpc-client"))]
335 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
336 }
337 }
338
339 pub fn subscribe_to_task(
341 &self,
342 req: SubscribeToTaskRequest,
343 ) -> Pin<Box<dyn Stream<Item = Result<StreamResponse, A2aError>> + Send>> {
344 match self.transport {
345 Transport::JsonRpc => {
346 if let Some(t) = &self.jsonrpc {
347 let params = serde_json::to_value(&req).unwrap_or_default();
348 t.call_stream(jsonrpc::METHOD_TASKS_RESUBSCRIBE, params)
349 } else {
350 Box::pin(futures::stream::once(async {
351 Err(A2aError::internal("No JSON-RPC transport"))
352 }))
353 }
354 }
355 Transport::Rest => {
356 if let Some(t) = &self.rest {
357 let body = serde_json::to_value(&req).unwrap_or_default();
358 t.post_stream(&format!("/tasks/{}:subscribe", req.id), body)
359 } else {
360 Box::pin(futures::stream::once(async {
361 Err(A2aError::internal("No REST transport"))
362 }))
363 }
364 }
365 #[cfg(feature = "grpc-client")]
366 Transport::Grpc => {
367 if let Some(t) = &self.grpc {
368 let grpc = t.clone();
369 Box::pin(async_stream::stream! {
370 let inner = {
371 let mut guard = grpc.lock().await;
372 guard.subscribe_to_task(req).await
373 }; match inner {
375 Ok(mut stream) => {
376 use futures::StreamExt;
377 while let Some(item) = stream.next().await {
378 yield item;
379 }
380 }
381 Err(e) => yield Err(e),
382 }
383 })
384 } else {
385 Box::pin(futures::stream::once(async {
386 Err(A2aError::internal("No gRPC transport"))
387 }))
388 }
389 }
390 #[cfg(not(feature = "grpc-client"))]
391 Transport::Grpc => Box::pin(futures::stream::once(async {
392 Err(A2aError::unsupported_operation("gRPC not enabled"))
393 })),
394 }
395 }
396
397 pub async fn set_push_config(
399 &self,
400 config: TaskPushNotificationConfig,
401 ) -> Result<TaskPushNotificationConfig, A2aError> {
402 match self.transport {
403 Transport::JsonRpc => {
404 let t = self
405 .jsonrpc
406 .as_ref()
407 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
408 let params = serde_json::to_value(&config)?;
409 let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_SET, params).await?;
410 serde_json::from_value(result).map_err(Into::into)
411 }
412 Transport::Rest => {
413 let t = self
414 .rest
415 .as_ref()
416 .ok_or_else(|| A2aError::internal("No REST transport"))?;
417 let path = format!("/tasks/{}/pushNotificationConfigs", config.task_id);
418 let result = t.post(&path, &config).await?;
419 serde_json::from_value(result).map_err(Into::into)
420 }
421 #[cfg(feature = "grpc-client")]
422 Transport::Grpc => {
423 let t = self
424 .grpc
425 .as_ref()
426 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
427 let mut guard = t.lock().await;
428 guard.create_push_config(config).await
429 }
430 #[cfg(not(feature = "grpc-client"))]
431 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
432 }
433 }
434
435 pub async fn get_push_config(
437 &self,
438 req: GetTaskPushNotificationConfigRequest,
439 ) -> Result<TaskPushNotificationConfig, A2aError> {
440 match self.transport {
441 Transport::JsonRpc => {
442 let t = self
443 .jsonrpc
444 .as_ref()
445 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
446 let params = serde_json::to_value(&req)?;
447 let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_GET, params).await?;
448 serde_json::from_value(result).map_err(Into::into)
449 }
450 Transport::Rest => {
451 let t = self
452 .rest
453 .as_ref()
454 .ok_or_else(|| A2aError::internal("No REST transport"))?;
455 let path = format!("/tasks/{}/pushNotificationConfigs/{}", req.task_id, req.config_id);
456 let result = t.get(&path).await?;
457 serde_json::from_value(result).map_err(Into::into)
458 }
459 #[cfg(feature = "grpc-client")]
460 Transport::Grpc => {
461 let t = self
462 .grpc
463 .as_ref()
464 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
465 let mut guard = t.lock().await;
466 guard.get_push_config(req).await
467 }
468 #[cfg(not(feature = "grpc-client"))]
469 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
470 }
471 }
472
473 pub async fn delete_push_config(
475 &self,
476 req: DeleteTaskPushNotificationConfigRequest,
477 ) -> Result<(), A2aError> {
478 match self.transport {
479 Transport::JsonRpc => {
480 let t = self
481 .jsonrpc
482 .as_ref()
483 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
484 let params = serde_json::to_value(&req)?;
485 let _ = t.call(jsonrpc::METHOD_PUSH_CONFIG_DELETE, params).await?;
486 Ok(())
487 }
488 Transport::Rest => {
489 let t = self
490 .rest
491 .as_ref()
492 .ok_or_else(|| A2aError::internal("No REST transport"))?;
493 let path = format!("/tasks/{}/pushNotificationConfigs/{}", req.task_id, req.config_id);
494 t.delete(&path).await
495 }
496 #[cfg(feature = "grpc-client")]
497 Transport::Grpc => {
498 let t = self
499 .grpc
500 .as_ref()
501 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
502 let mut guard = t.lock().await;
503 guard.delete_push_config(req).await
504 }
505 #[cfg(not(feature = "grpc-client"))]
506 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
507 }
508 }
509
510 pub async fn list_push_configs(
512 &self,
513 req: ListTaskPushNotificationConfigsRequest,
514 ) -> Result<ListTaskPushNotificationConfigsResponse, A2aError> {
515 match self.transport {
516 Transport::JsonRpc => {
517 let t = self
518 .jsonrpc
519 .as_ref()
520 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
521 let params = serde_json::to_value(&req)?;
522 let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_LIST, params).await?;
523 serde_json::from_value(result).map_err(Into::into)
524 }
525 Transport::Rest => {
526 let t = self
527 .rest
528 .as_ref()
529 .ok_or_else(|| A2aError::internal("No REST transport"))?;
530 let path = format!("/tasks/{}/pushNotificationConfigs", req.task_id);
531 let result = t.get(&path).await?;
532 serde_json::from_value(result).map_err(Into::into)
533 }
534 #[cfg(feature = "grpc-client")]
535 Transport::Grpc => {
536 let t = self
537 .grpc
538 .as_ref()
539 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
540 let mut guard = t.lock().await;
541 guard.list_push_configs(req).await
542 }
543 #[cfg(not(feature = "grpc-client"))]
544 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
545 }
546 }
547
548 pub async fn get_authenticated_extended_card(
550 &self,
551 req: GetExtendedAgentCardRequest,
552 ) -> Result<AgentCard, A2aError> {
553 match self.transport {
554 Transport::JsonRpc => {
555 let t = self
556 .jsonrpc
557 .as_ref()
558 .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
559 let params = serde_json::to_value(&req)?;
560 let result = t.call(jsonrpc::METHOD_EXTENDED_CARD, params).await?;
561 serde_json::from_value(result).map_err(Into::into)
562 }
563 Transport::Rest => {
564 let t = self
565 .rest
566 .as_ref()
567 .ok_or_else(|| A2aError::internal("No REST transport"))?;
568 let result = t.get("/extendedAgentCard").await?;
569 serde_json::from_value(result).map_err(Into::into)
570 }
571 #[cfg(feature = "grpc-client")]
572 Transport::Grpc => {
573 let t = self
574 .grpc
575 .as_ref()
576 .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
577 let mut guard = t.lock().await;
578 guard.get_extended_agent_card(req).await
579 }
580 #[cfg(not(feature = "grpc-client"))]
581 Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
582 }
583 }
584}