1pub mod builders;
6mod error;
7#[cfg(feature = "grpc")]
8pub mod grpc;
9pub mod prelude;
10pub mod response;
11pub mod sse;
12
13use turul_a2a_proto as pb;
14use turul_a2a_types::wire;
15
16pub use builders::MessageBuilder;
17pub use error::A2aClientError;
18pub use sse::{SseEvent, SseStream, StreamEvent, TypedSseEvent, TypedSseStream};
19
20#[derive(Debug, Clone, Default)]
22pub enum ClientAuth {
23 #[default]
24 None,
25 Bearer(String),
26 ApiKey {
27 header: String,
28 key: String,
29 },
30}
31
32#[derive(Debug, Clone)]
34pub struct A2aClient {
35 base_url: String,
36 tenant: Option<String>,
37 auth: ClientAuth,
38 http: reqwest::Client,
39 agent_card: Option<pb::AgentCard>,
40}
41
42impl A2aClient {
43 pub fn new(base_url: impl Into<String>) -> Self {
45 Self {
46 base_url: base_url.into().trim_end_matches('/').to_string(),
47 tenant: None,
48 auth: ClientAuth::None,
49 http: reqwest::Client::new(),
50 agent_card: None,
51 }
52 }
53
54 pub fn with_auth(mut self, auth: ClientAuth) -> Self {
55 self.auth = auth;
56 self
57 }
58
59 pub fn with_tenant(mut self, tenant: impl Into<String>) -> Self {
60 self.tenant = Some(tenant.into());
61 self
62 }
63
64 pub async fn discover(base_url: impl Into<String>) -> Result<Self, A2aClientError> {
66 let mut client = Self::new(base_url);
67 let card = client.fetch_agent_card().await?;
68 client.agent_card = Some(card);
69 Ok(client)
70 }
71
72 pub async fn fetch_agent_card(&self) -> Result<pb::AgentCard, A2aClientError> {
74 let url = format!("{}{}", self.base_url, wire::http::WELL_KNOWN_AGENT_CARD);
75 let resp = self.http.get(&url).send().await?;
76 if !resp.status().is_success() {
77 return Err(A2aClientError::Http {
78 status: resp.status().as_u16(),
79 message: resp.text().await.unwrap_or_default(),
80 });
81 }
82 let card: pb::AgentCard = resp.json().await?;
83 Ok(card)
84 }
85
86 pub fn agent_card(&self) -> Option<&pb::AgentCard> {
88 self.agent_card.as_ref()
89 }
90
91 fn url(&self, path: &str) -> String {
94 match &self.tenant {
95 Some(tenant) => {
96 let encoded_tenant = reqwest::Url::parse("http://x")
97 .unwrap()
98 .join(&format!("{}/", tenant))
99 .map(|u| {
100 u.path()
101 .trim_end_matches('/')
102 .trim_start_matches('/')
103 .to_string()
104 })
105 .unwrap_or_else(|_| tenant.clone());
106 format!("{}/{}{}", self.base_url, encoded_tenant, path)
107 }
108 None => format!("{}{}", self.base_url, path),
109 }
110 }
111
112 fn request(&self, method: reqwest::Method, url: &str) -> reqwest::RequestBuilder {
114 let mut req = self.http.request(method, url).header("a2a-version", "1.0");
115 match &self.auth {
116 ClientAuth::None => {}
117 ClientAuth::Bearer(token) => {
118 req = req.bearer_auth(token);
119 }
120 ClientAuth::ApiKey { header, key } => {
121 req = req.header(header.as_str(), key.as_str());
122 }
123 }
124 req
125 }
126
127 pub async fn send_message(
135 &self,
136 request: impl Into<pb::SendMessageRequest>,
137 ) -> Result<crate::response::SendResponse, A2aClientError> {
138 let proto_resp = self.send_message_proto(request.into()).await?;
139 crate::response::SendResponse::try_from(proto_resp)
140 }
141
142 pub async fn get_task(
144 &self,
145 task_id: &str,
146 history_length: Option<i32>,
147 ) -> Result<turul_a2a_types::Task, A2aClientError> {
148 let proto_task = self.get_task_proto(task_id, history_length).await?;
149 turul_a2a_types::Task::try_from(proto_task)
150 .map_err(|e| A2aClientError::Conversion(e.to_string()))
151 }
152
153 pub async fn cancel_task(
155 &self,
156 task_id: &str,
157 ) -> Result<turul_a2a_types::Task, A2aClientError> {
158 let proto_task = self.cancel_task_proto(task_id).await?;
159 turul_a2a_types::Task::try_from(proto_task)
160 .map_err(|e| A2aClientError::Conversion(e.to_string()))
161 }
162
163 pub async fn list_tasks(
165 &self,
166 params: &ListTasksParams,
167 ) -> Result<crate::response::ListResponse, A2aClientError> {
168 let proto_resp = self.list_tasks_proto(params).await?;
169 crate::response::ListResponse::try_from(proto_resp)
170 }
171
172 pub async fn send_message_proto(
178 &self,
179 request: pb::SendMessageRequest,
180 ) -> Result<pb::SendMessageResponse, A2aClientError> {
181 let url = self.url("/message:send");
182 let resp = self
183 .request(reqwest::Method::POST, &url)
184 .header("content-type", "application/json")
185 .json(&request)
186 .send()
187 .await?;
188
189 if !resp.status().is_success() {
190 return Err(self.parse_error(resp).await);
191 }
192 let response: pb::SendMessageResponse = resp.json().await?;
193 Ok(response)
194 }
195
196 pub async fn get_task_proto(
198 &self,
199 task_id: &str,
200 history_length: Option<i32>,
201 ) -> Result<pb::Task, A2aClientError> {
202 let url = self.url(&format!("/tasks/{task_id}"));
203 let mut req = self.request(reqwest::Method::GET, &url);
204 if let Some(hl) = history_length {
205 req = req.query(&[("historyLength", hl.to_string())]);
206 }
207 let resp = req.send().await?;
208
209 if !resp.status().is_success() {
210 return Err(self.parse_error(resp).await);
211 }
212 let task: pb::Task = resp.json().await?;
213 Ok(task)
214 }
215
216 pub async fn cancel_task_proto(&self, task_id: &str) -> Result<pb::Task, A2aClientError> {
218 let url = self.url(&format!("/tasks/{task_id}:cancel"));
219 let resp = self
220 .request(reqwest::Method::POST, &url)
221 .header("a2a-version", "1.0")
222 .send()
223 .await?;
224
225 if !resp.status().is_success() {
226 return Err(self.parse_error(resp).await);
227 }
228 let task: pb::Task = resp.json().await?;
229 Ok(task)
230 }
231
232 pub async fn list_tasks_proto(
234 &self,
235 params: &ListTasksParams,
236 ) -> Result<pb::ListTasksResponse, A2aClientError> {
237 let url = self.url("/tasks");
238 let mut req = self.request(reqwest::Method::GET, &url);
239 if let Some(ref ctx) = params.context_id {
240 req = req.query(&[("contextId", ctx.as_str())]);
241 }
242 if let Some(ref status) = params.status {
243 req = req.query(&[("status", status.as_str())]);
244 }
245 if let Some(ps) = params.page_size {
246 req = req.query(&[("pageSize", &ps.to_string())]);
247 }
248 if let Some(ref pt) = params.page_token {
249 req = req.query(&[("pageToken", pt.as_str())]);
250 }
251
252 let resp = req.send().await?;
253
254 if !resp.status().is_success() {
255 return Err(self.parse_error(resp).await);
256 }
257 let response: pb::ListTasksResponse = resp.json().await?;
258 Ok(response)
259 }
260
261 pub async fn send_streaming_message(
270 &self,
271 request: impl Into<pb::SendMessageRequest>,
272 ) -> Result<TypedSseStream, A2aClientError> {
273 let raw = self.send_streaming_message_raw(request).await?;
274 Ok(TypedSseStream::from_raw(raw))
275 }
276
277 pub async fn subscribe_to_task(
282 &self,
283 task_id: &str,
284 last_event_id: Option<&str>,
285 ) -> Result<TypedSseStream, A2aClientError> {
286 let raw = self.subscribe_to_task_raw(task_id, last_event_id).await?;
287 Ok(TypedSseStream::from_raw(raw))
288 }
289
290 pub async fn send_streaming_message_raw(
292 &self,
293 request: impl Into<pb::SendMessageRequest>,
294 ) -> Result<SseStream, A2aClientError> {
295 let request = request.into();
296 let url = self.url("/message:stream");
297 let resp = self
298 .request(reqwest::Method::POST, &url)
299 .header("content-type", "application/json")
300 .header("accept", "text/event-stream")
301 .json(&request)
302 .send()
303 .await?;
304
305 if !resp.status().is_success() {
306 return Err(self.parse_error_from_status(resp).await);
307 }
308 Ok(SseStream::from_response(resp))
309 }
310
311 pub async fn subscribe_to_task_raw(
313 &self,
314 task_id: &str,
315 last_event_id: Option<&str>,
316 ) -> Result<SseStream, A2aClientError> {
317 let url = self.url(&format!("/tasks/{task_id}:subscribe"));
318 let mut req = self
319 .request(reqwest::Method::GET, &url)
320 .header("accept", "text/event-stream");
321
322 if let Some(lei) = last_event_id {
323 req = req.header("Last-Event-ID", lei);
324 }
325
326 let resp = req.send().await?;
327
328 if !resp.status().is_success() {
329 return Err(self.parse_error_from_status(resp).await);
330 }
331 Ok(SseStream::from_response(resp))
332 }
333
334 pub async fn create_push_config(
344 &self,
345 task_id: &str,
346 url: impl Into<String>,
347 token: impl Into<String>,
348 ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
349 let cfg = turul_a2a_types::PushConfigBuilder::new(url, token)
350 .task_id(task_id)
351 .build();
352 self.create_push_config_with(task_id, cfg).await
353 }
354
355 pub async fn create_push_config_with(
360 &self,
361 task_id: &str,
362 config: turul_a2a_types::PushConfig,
363 ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
364 let endpoint = self.url(&format!("/tasks/{task_id}/pushNotificationConfigs"));
365 let resp = self
366 .request(reqwest::Method::POST, &endpoint)
367 .header("content-type", "application/json")
368 .json(config.as_proto())
369 .send()
370 .await?;
371
372 if !resp.status().is_success() {
373 return Err(self.parse_error_from_status(resp).await);
374 }
375 let proto: pb::TaskPushNotificationConfig = resp.json().await?;
376 Ok(proto.into())
377 }
378
379 pub async fn get_push_config(
381 &self,
382 task_id: &str,
383 config_id: &str,
384 ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
385 let url = self.url(&format!(
386 "/tasks/{task_id}/pushNotificationConfigs/{config_id}"
387 ));
388 let resp = self.request(reqwest::Method::GET, &url).send().await?;
389
390 if !resp.status().is_success() {
391 return Err(self.parse_error_from_status(resp).await);
392 }
393 let proto: pb::TaskPushNotificationConfig = resp.json().await?;
394 Ok(proto.into())
395 }
396
397 pub async fn list_push_configs(
399 &self,
400 task_id: &str,
401 page_size: Option<i32>,
402 page_token: Option<&str>,
403 ) -> Result<turul_a2a_types::PushConfigPage, A2aClientError> {
404 let url = self.url(&format!("/tasks/{task_id}/pushNotificationConfigs"));
405 let mut req = self.request(reqwest::Method::GET, &url);
406 if let Some(ps) = page_size {
407 req = req.query(&[("pageSize", ps.to_string())]);
408 }
409 if let Some(pt) = page_token {
410 req = req.query(&[("pageToken", pt)]);
411 }
412 let resp = req.send().await?;
413
414 if !resp.status().is_success() {
415 return Err(self.parse_error_from_status(resp).await);
416 }
417 let proto: pb::ListTaskPushNotificationConfigsResponse = resp.json().await?;
418 Ok(turul_a2a_types::PushConfigPage::new(
419 proto.configs.into_iter().map(Into::into).collect(),
420 (!proto.next_page_token.is_empty()).then_some(proto.next_page_token),
421 ))
422 }
423
424 pub async fn delete_push_config(
426 &self,
427 task_id: &str,
428 config_id: &str,
429 ) -> Result<(), A2aClientError> {
430 let url = self.url(&format!(
431 "/tasks/{task_id}/pushNotificationConfigs/{config_id}"
432 ));
433 let resp = self.request(reqwest::Method::DELETE, &url).send().await?;
434
435 if !resp.status().is_success() {
436 return Err(self.parse_error_from_status(resp).await);
437 }
438 Ok(())
439 }
440
441 pub async fn fetch_extended_agent_card(&self) -> Result<pb::AgentCard, A2aClientError> {
447 let url = self.url("/extendedAgentCard");
448 let resp = self.request(reqwest::Method::GET, &url).send().await?;
449
450 if !resp.status().is_success() {
451 return Err(self.parse_error_from_status(resp).await);
452 }
453 Ok(resp.json().await?)
454 }
455
456 async fn parse_error_from_status(&self, resp: reqwest::Response) -> A2aClientError {
462 self.parse_error(resp).await
463 }
464
465 async fn parse_error(&self, resp: reqwest::Response) -> A2aClientError {
467 let status = resp.status().as_u16();
468 let body = resp.text().await.unwrap_or_default();
469
470 if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
472 if let Some(error) = json.get("error") {
473 let message = error
474 .get("message")
475 .and_then(|m| m.as_str())
476 .unwrap_or("Unknown error")
477 .to_string();
478
479 let reason = error
481 .get("details")
482 .and_then(|d| d.as_array())
483 .and_then(|arr| arr.first())
484 .and_then(|info| info.get("reason"))
485 .and_then(|r| r.as_str())
486 .map(|s| s.to_string());
487
488 return A2aClientError::A2aError {
489 status,
490 message,
491 reason,
492 };
493 }
494 }
495
496 A2aClientError::Http {
497 status,
498 message: body,
499 }
500 }
501}
502
503#[derive(Debug, Clone, Default)]
505pub struct ListTasksParams {
506 pub context_id: Option<String>,
507 pub status: Option<String>,
508 pub page_size: Option<i32>,
509 pub page_token: Option<String>,
510}