Skip to main content

brainwires_a2a/client/
mod.rs

1//! A2A client — unified client with transport selection.
2
3/// Agent card discovery.
4pub mod discovery;
5/// gRPC transport.
6pub mod grpc_transport;
7/// JSON-RPC over HTTP+SSE transport.
8pub mod jsonrpc_transport;
9/// HTTP/REST transport.
10pub mod rest_transport;
11/// SSE stream parser.
12pub mod sse;
13
14pub use discovery::discover_agent_card;
15#[cfg(feature = "grpc-client")]
16pub use grpc_transport::GrpcTransport;
17pub use jsonrpc_transport::JsonRpcTransport;
18pub use rest_transport::RestTransport;
19
20use std::pin::Pin;
21use std::sync::Arc;
22
23use futures::Stream;
24use url::Url;
25
26use crate::agent_card::AgentCard;
27use crate::error::A2aError;
28use crate::jsonrpc;
29use crate::params::*;
30use crate::push_notification::TaskPushNotificationConfig;
31use crate::streaming::{SendMessageResponse, StreamResponse};
32use crate::task::Task;
33
34/// Transport selection.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum Transport {
37    /// JSON-RPC 2.0 over HTTP with SSE streaming.
38    JsonRpc,
39    /// HTTP/REST endpoints.
40    Rest,
41    /// gRPC (requires `grpc-client` feature).
42    Grpc,
43}
44
45/// Unified A2A client.
46pub struct A2aClient {
47    transport: Transport,
48    jsonrpc: Option<Arc<JsonRpcTransport>>,
49    rest: Option<Arc<RestTransport>>,
50    #[cfg(feature = "grpc-client")]
51    grpc: Option<Arc<tokio::sync::Mutex<GrpcTransport>>>,
52}
53
54impl A2aClient {
55    /// Create a client using JSON-RPC transport.
56    pub fn new_jsonrpc(base_url: Url) -> Self {
57        let client = reqwest::Client::new();
58        Self {
59            transport: Transport::JsonRpc,
60            jsonrpc: Some(Arc::new(JsonRpcTransport::new(base_url, client, None))),
61            rest: None,
62            #[cfg(feature = "grpc-client")]
63            grpc: None,
64        }
65    }
66
67    /// Create a client using REST transport.
68    pub fn new_rest(base_url: Url) -> Self {
69        let client = reqwest::Client::new();
70        Self {
71            transport: Transport::Rest,
72            jsonrpc: None,
73            rest: Some(Arc::new(RestTransport::new(base_url, client, None))),
74            #[cfg(feature = "grpc-client")]
75            grpc: None,
76        }
77    }
78
79    /// Create a client using gRPC transport.
80    #[cfg(feature = "grpc-client")]
81    pub async fn new_grpc(endpoint: &str) -> Result<Self, A2aError> {
82        let transport = GrpcTransport::connect(endpoint).await?;
83        Ok(Self {
84            transport: Transport::Grpc,
85            jsonrpc: None,
86            rest: None,
87            grpc: Some(Arc::new(tokio::sync::Mutex::new(transport))),
88        })
89    }
90
91    /// Set a bearer token for authentication.
92    ///
93    /// Returns a new client with the token applied to the active transport.
94    pub fn with_bearer_token(self, token: &str) -> Self {
95        let token = token.to_string();
96        match self.transport {
97            Transport::JsonRpc => {
98                if let Some(t) = &self.jsonrpc {
99                    let new_transport = JsonRpcTransport::new(
100                        t.base_url().clone(),
101                        t.http_client().clone(),
102                        Some(token),
103                    );
104                    Self {
105                        jsonrpc: Some(Arc::new(new_transport)),
106                        ..self
107                    }
108                } else {
109                    self
110                }
111            }
112            Transport::Rest => {
113                if let Some(t) = &self.rest {
114                    let new_transport = RestTransport::new(
115                        t.base_url().clone(),
116                        t.http_client().clone(),
117                        Some(token),
118                    );
119                    Self {
120                        rest: Some(Arc::new(new_transport)),
121                        ..self
122                    }
123                } else {
124                    self
125                }
126            }
127            Transport::Grpc => {
128                // gRPC auth is set at connect time; log a warning
129                tracing::warn!(
130                    "Bearer token on existing gRPC transport not supported; pass token at connect time"
131                );
132                self
133            }
134        }
135    }
136
137    /// Discover an agent card from a well-known URL.
138    pub async fn discover(base_url: &str) -> Result<AgentCard, A2aError> {
139        discover_agent_card(base_url).await
140    }
141
142    /// Send a message.
143    pub async fn send_message(
144        &self,
145        req: SendMessageRequest,
146    ) -> Result<SendMessageResponse, A2aError> {
147        match self.transport {
148            Transport::JsonRpc => {
149                let t = self
150                    .jsonrpc
151                    .as_ref()
152                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
153                let params = serde_json::to_value(&req)?;
154                let result = t.call(jsonrpc::METHOD_MESSAGE_SEND, params).await?;
155                serde_json::from_value(result).map_err(Into::into)
156            }
157            Transport::Rest => {
158                let t = self
159                    .rest
160                    .as_ref()
161                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
162                let result = t.post("/message:send", &req).await?;
163                serde_json::from_value(result).map_err(Into::into)
164            }
165            #[cfg(feature = "grpc-client")]
166            Transport::Grpc => {
167                let t = self
168                    .grpc
169                    .as_ref()
170                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
171                let mut guard = t.lock().await;
172                guard.send_message(req).await
173            }
174            #[cfg(not(feature = "grpc-client"))]
175            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
176        }
177    }
178
179    /// Stream a message (returns SSE events).
180    pub fn stream_message(
181        &self,
182        req: SendMessageRequest,
183    ) -> Pin<Box<dyn Stream<Item = Result<StreamResponse, A2aError>> + Send>> {
184        match self.transport {
185            Transport::JsonRpc => {
186                if let Some(t) = &self.jsonrpc {
187                    let params = serde_json::to_value(&req).unwrap_or_default();
188                    t.call_stream(jsonrpc::METHOD_MESSAGE_STREAM, params)
189                } else {
190                    Box::pin(futures::stream::once(async {
191                        Err(A2aError::internal("No JSON-RPC transport"))
192                    }))
193                }
194            }
195            Transport::Rest => {
196                if let Some(t) = &self.rest {
197                    let body = serde_json::to_value(&req).unwrap_or_default();
198                    t.post_stream("/message:stream", body)
199                } else {
200                    Box::pin(futures::stream::once(async {
201                        Err(A2aError::internal("No REST transport"))
202                    }))
203                }
204            }
205            #[cfg(feature = "grpc-client")]
206            Transport::Grpc => {
207                if let Some(t) = &self.grpc {
208                    let grpc = t.clone();
209                    Box::pin(async_stream::stream! {
210                        let inner = {
211                            let mut guard = grpc.lock().await;
212                            guard.send_streaming_message(req).await
213                        }; // lock dropped here
214                        match inner {
215                            Ok(mut stream) => {
216                                use futures::StreamExt;
217                                while let Some(item) = stream.next().await {
218                                    yield item;
219                                }
220                            }
221                            Err(e) => yield Err(e),
222                        }
223                    })
224                } else {
225                    Box::pin(futures::stream::once(async {
226                        Err(A2aError::internal("No gRPC transport"))
227                    }))
228                }
229            }
230            #[cfg(not(feature = "grpc-client"))]
231            Transport::Grpc => Box::pin(futures::stream::once(async {
232                Err(A2aError::unsupported_operation("gRPC not enabled"))
233            })),
234        }
235    }
236
237    /// Get a task by ID.
238    pub async fn get_task(&self, req: GetTaskRequest) -> Result<Task, A2aError> {
239        match self.transport {
240            Transport::JsonRpc => {
241                let t = self
242                    .jsonrpc
243                    .as_ref()
244                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
245                let params = serde_json::to_value(&req)?;
246                let result = t.call(jsonrpc::METHOD_TASKS_GET, params).await?;
247                serde_json::from_value(result).map_err(Into::into)
248            }
249            Transport::Rest => {
250                let t = self
251                    .rest
252                    .as_ref()
253                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
254                let result = t.get(&format!("/tasks/{}", req.id)).await?;
255                serde_json::from_value(result).map_err(Into::into)
256            }
257            #[cfg(feature = "grpc-client")]
258            Transport::Grpc => {
259                let t = self
260                    .grpc
261                    .as_ref()
262                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
263                let mut guard = t.lock().await;
264                guard.get_task(req).await
265            }
266            #[cfg(not(feature = "grpc-client"))]
267            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
268        }
269    }
270
271    /// List tasks.
272    pub async fn list_tasks(&self, req: ListTasksRequest) -> Result<ListTasksResponse, A2aError> {
273        match self.transport {
274            Transport::JsonRpc => {
275                let t = self
276                    .jsonrpc
277                    .as_ref()
278                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
279                let params = serde_json::to_value(&req)?;
280                let result = t.call(jsonrpc::METHOD_TASKS_LIST, params).await?;
281                serde_json::from_value(result).map_err(Into::into)
282            }
283            Transport::Rest => {
284                let t = self
285                    .rest
286                    .as_ref()
287                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
288                let result = t.get("/tasks").await?;
289                serde_json::from_value(result).map_err(Into::into)
290            }
291            #[cfg(feature = "grpc-client")]
292            Transport::Grpc => {
293                let t = self
294                    .grpc
295                    .as_ref()
296                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
297                let mut guard = t.lock().await;
298                guard.list_tasks(req).await
299            }
300            #[cfg(not(feature = "grpc-client"))]
301            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
302        }
303    }
304
305    /// Cancel a task.
306    pub async fn cancel_task(&self, req: CancelTaskRequest) -> Result<Task, A2aError> {
307        match self.transport {
308            Transport::JsonRpc => {
309                let t = self
310                    .jsonrpc
311                    .as_ref()
312                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
313                let params = serde_json::to_value(&req)?;
314                let result = t.call(jsonrpc::METHOD_TASKS_CANCEL, params).await?;
315                serde_json::from_value(result).map_err(Into::into)
316            }
317            Transport::Rest => {
318                let t = self
319                    .rest
320                    .as_ref()
321                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
322                let result = t.post(&format!("/tasks/{}:cancel", req.id), &req).await?;
323                serde_json::from_value(result).map_err(Into::into)
324            }
325            #[cfg(feature = "grpc-client")]
326            Transport::Grpc => {
327                let t = self
328                    .grpc
329                    .as_ref()
330                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
331                let mut guard = t.lock().await;
332                guard.cancel_task(req).await
333            }
334            #[cfg(not(feature = "grpc-client"))]
335            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
336        }
337    }
338
339    /// Subscribe to task updates.
340    pub fn subscribe_to_task(
341        &self,
342        req: SubscribeToTaskRequest,
343    ) -> Pin<Box<dyn Stream<Item = Result<StreamResponse, A2aError>> + Send>> {
344        match self.transport {
345            Transport::JsonRpc => {
346                if let Some(t) = &self.jsonrpc {
347                    let params = serde_json::to_value(&req).unwrap_or_default();
348                    t.call_stream(jsonrpc::METHOD_TASKS_RESUBSCRIBE, params)
349                } else {
350                    Box::pin(futures::stream::once(async {
351                        Err(A2aError::internal("No JSON-RPC transport"))
352                    }))
353                }
354            }
355            Transport::Rest => {
356                if let Some(t) = &self.rest {
357                    let body = serde_json::to_value(&req).unwrap_or_default();
358                    t.post_stream(&format!("/tasks/{}:subscribe", req.id), body)
359                } else {
360                    Box::pin(futures::stream::once(async {
361                        Err(A2aError::internal("No REST transport"))
362                    }))
363                }
364            }
365            #[cfg(feature = "grpc-client")]
366            Transport::Grpc => {
367                if let Some(t) = &self.grpc {
368                    let grpc = t.clone();
369                    Box::pin(async_stream::stream! {
370                        let inner = {
371                            let mut guard = grpc.lock().await;
372                            guard.subscribe_to_task(req).await
373                        }; // lock dropped here
374                        match inner {
375                            Ok(mut stream) => {
376                                use futures::StreamExt;
377                                while let Some(item) = stream.next().await {
378                                    yield item;
379                                }
380                            }
381                            Err(e) => yield Err(e),
382                        }
383                    })
384                } else {
385                    Box::pin(futures::stream::once(async {
386                        Err(A2aError::internal("No gRPC transport"))
387                    }))
388                }
389            }
390            #[cfg(not(feature = "grpc-client"))]
391            Transport::Grpc => Box::pin(futures::stream::once(async {
392                Err(A2aError::unsupported_operation("gRPC not enabled"))
393            })),
394        }
395    }
396
397    /// Set push notification config.
398    pub async fn set_push_config(
399        &self,
400        config: TaskPushNotificationConfig,
401    ) -> Result<TaskPushNotificationConfig, A2aError> {
402        match self.transport {
403            Transport::JsonRpc => {
404                let t = self
405                    .jsonrpc
406                    .as_ref()
407                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
408                let params = serde_json::to_value(&config)?;
409                let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_SET, params).await?;
410                serde_json::from_value(result).map_err(Into::into)
411            }
412            Transport::Rest => {
413                let t = self
414                    .rest
415                    .as_ref()
416                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
417                let path = format!("/tasks/{}/pushNotificationConfigs", config.task_id);
418                let result = t.post(&path, &config).await?;
419                serde_json::from_value(result).map_err(Into::into)
420            }
421            #[cfg(feature = "grpc-client")]
422            Transport::Grpc => {
423                let t = self
424                    .grpc
425                    .as_ref()
426                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
427                let mut guard = t.lock().await;
428                guard.create_push_config(config).await
429            }
430            #[cfg(not(feature = "grpc-client"))]
431            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
432        }
433    }
434
435    /// Get push notification config.
436    pub async fn get_push_config(
437        &self,
438        req: GetTaskPushNotificationConfigRequest,
439    ) -> Result<TaskPushNotificationConfig, A2aError> {
440        match self.transport {
441            Transport::JsonRpc => {
442                let t = self
443                    .jsonrpc
444                    .as_ref()
445                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
446                let params = serde_json::to_value(&req)?;
447                let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_GET, params).await?;
448                serde_json::from_value(result).map_err(Into::into)
449            }
450            Transport::Rest => {
451                let t = self
452                    .rest
453                    .as_ref()
454                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
455                let path = format!("/tasks/{}/pushNotificationConfigs/{}", req.task_id, req.config_id);
456                let result = t.get(&path).await?;
457                serde_json::from_value(result).map_err(Into::into)
458            }
459            #[cfg(feature = "grpc-client")]
460            Transport::Grpc => {
461                let t = self
462                    .grpc
463                    .as_ref()
464                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
465                let mut guard = t.lock().await;
466                guard.get_push_config(req).await
467            }
468            #[cfg(not(feature = "grpc-client"))]
469            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
470        }
471    }
472
473    /// Delete push notification config.
474    pub async fn delete_push_config(
475        &self,
476        req: DeleteTaskPushNotificationConfigRequest,
477    ) -> Result<(), A2aError> {
478        match self.transport {
479            Transport::JsonRpc => {
480                let t = self
481                    .jsonrpc
482                    .as_ref()
483                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
484                let params = serde_json::to_value(&req)?;
485                let _ = t.call(jsonrpc::METHOD_PUSH_CONFIG_DELETE, params).await?;
486                Ok(())
487            }
488            Transport::Rest => {
489                let t = self
490                    .rest
491                    .as_ref()
492                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
493                let path = format!("/tasks/{}/pushNotificationConfigs/{}", req.task_id, req.config_id);
494                t.delete(&path).await
495            }
496            #[cfg(feature = "grpc-client")]
497            Transport::Grpc => {
498                let t = self
499                    .grpc
500                    .as_ref()
501                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
502                let mut guard = t.lock().await;
503                guard.delete_push_config(req).await
504            }
505            #[cfg(not(feature = "grpc-client"))]
506            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
507        }
508    }
509
510    /// List push notification configs.
511    pub async fn list_push_configs(
512        &self,
513        req: ListTaskPushNotificationConfigsRequest,
514    ) -> Result<ListTaskPushNotificationConfigsResponse, A2aError> {
515        match self.transport {
516            Transport::JsonRpc => {
517                let t = self
518                    .jsonrpc
519                    .as_ref()
520                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
521                let params = serde_json::to_value(&req)?;
522                let result = t.call(jsonrpc::METHOD_PUSH_CONFIG_LIST, params).await?;
523                serde_json::from_value(result).map_err(Into::into)
524            }
525            Transport::Rest => {
526                let t = self
527                    .rest
528                    .as_ref()
529                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
530                let path = format!("/tasks/{}/pushNotificationConfigs", req.task_id);
531                let result = t.get(&path).await?;
532                serde_json::from_value(result).map_err(Into::into)
533            }
534            #[cfg(feature = "grpc-client")]
535            Transport::Grpc => {
536                let t = self
537                    .grpc
538                    .as_ref()
539                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
540                let mut guard = t.lock().await;
541                guard.list_push_configs(req).await
542            }
543            #[cfg(not(feature = "grpc-client"))]
544            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
545        }
546    }
547
548    /// Get the authenticated extended agent card.
549    pub async fn get_authenticated_extended_card(
550        &self,
551        req: GetExtendedAgentCardRequest,
552    ) -> Result<AgentCard, A2aError> {
553        match self.transport {
554            Transport::JsonRpc => {
555                let t = self
556                    .jsonrpc
557                    .as_ref()
558                    .ok_or_else(|| A2aError::internal("No JSON-RPC transport"))?;
559                let params = serde_json::to_value(&req)?;
560                let result = t.call(jsonrpc::METHOD_EXTENDED_CARD, params).await?;
561                serde_json::from_value(result).map_err(Into::into)
562            }
563            Transport::Rest => {
564                let t = self
565                    .rest
566                    .as_ref()
567                    .ok_or_else(|| A2aError::internal("No REST transport"))?;
568                let result = t.get("/extendedAgentCard").await?;
569                serde_json::from_value(result).map_err(Into::into)
570            }
571            #[cfg(feature = "grpc-client")]
572            Transport::Grpc => {
573                let t = self
574                    .grpc
575                    .as_ref()
576                    .ok_or_else(|| A2aError::internal("No gRPC transport"))?;
577                let mut guard = t.lock().await;
578                guard.get_extended_agent_card(req).await
579            }
580            #[cfg(not(feature = "grpc-client"))]
581            Transport::Grpc => Err(A2aError::unsupported_operation("gRPC not enabled")),
582        }
583    }
584}