Skip to main content

turul_a2a_client/
lib.rs

1//! A2A Protocol v1.0 client library.
2//!
3//! Independent of the server crate โ€” depends only on turul-a2a-types and turul-a2a-proto.
4
5pub mod builders;
6mod error;
7#[cfg(feature = "grpc")]
8pub mod grpc;
9pub mod prelude;
10pub mod response;
11pub mod sse;
12
13use turul_a2a_proto as pb;
14use turul_a2a_types::wire;
15
16pub use builders::MessageBuilder;
17pub use error::A2aClientError;
18pub use sse::{SseEvent, SseStream, StreamEvent, TypedSseEvent, TypedSseStream};
19
20/// Auth configuration for the client.
21#[derive(Debug, Clone, Default)]
22pub enum ClientAuth {
23    #[default]
24    None,
25    Bearer(String),
26    ApiKey {
27        header: String,
28        key: String,
29    },
30}
31
32/// A2A client for communicating with A2A agents.
33#[derive(Debug, Clone)]
34pub struct A2aClient {
35    base_url: String,
36    tenant: Option<String>,
37    auth: ClientAuth,
38    http: reqwest::Client,
39    agent_card: Option<pb::AgentCard>,
40}
41
42impl A2aClient {
43    /// Create a client pointing at a base URL.
44    pub fn new(base_url: impl Into<String>) -> Self {
45        Self {
46            base_url: base_url.into().trim_end_matches('/').to_string(),
47            tenant: None,
48            auth: ClientAuth::None,
49            http: reqwest::Client::new(),
50            agent_card: None,
51        }
52    }
53
54    pub fn with_auth(mut self, auth: ClientAuth) -> Self {
55        self.auth = auth;
56        self
57    }
58
59    pub fn with_tenant(mut self, tenant: impl Into<String>) -> Self {
60        self.tenant = Some(tenant.into());
61        self
62    }
63
64    /// Discover the agent by fetching `/.well-known/agent-card.json`.
65    pub async fn discover(base_url: impl Into<String>) -> Result<Self, A2aClientError> {
66        let mut client = Self::new(base_url);
67        let card = client.fetch_agent_card().await?;
68        client.agent_card = Some(card);
69        Ok(client)
70    }
71
72    /// Fetch the agent card from the well-known endpoint.
73    pub async fn fetch_agent_card(&self) -> Result<pb::AgentCard, A2aClientError> {
74        let url = format!("{}{}", self.base_url, wire::http::WELL_KNOWN_AGENT_CARD);
75        let resp = self.http.get(&url).send().await?;
76        if !resp.status().is_success() {
77            return Err(A2aClientError::Http {
78                status: resp.status().as_u16(),
79                message: resp.text().await.unwrap_or_default(),
80            });
81        }
82        let card: pb::AgentCard = resp.json().await?;
83        Ok(card)
84    }
85
86    /// Get the cached agent card (from `discover()` or `fetch_agent_card()`).
87    pub fn agent_card(&self) -> Option<&pb::AgentCard> {
88        self.agent_card.as_ref()
89    }
90
91    /// Build the URL with optional tenant prefix.
92    /// Tenant is percent-encoded to handle special characters safely.
93    fn url(&self, path: &str) -> String {
94        match &self.tenant {
95            Some(tenant) => {
96                let encoded_tenant = reqwest::Url::parse("http://x")
97                    .unwrap()
98                    .join(&format!("{}/", tenant))
99                    .map(|u| {
100                        u.path()
101                            .trim_end_matches('/')
102                            .trim_start_matches('/')
103                            .to_string()
104                    })
105                    .unwrap_or_else(|_| tenant.clone());
106                format!("{}/{}{}", self.base_url, encoded_tenant, path)
107            }
108            None => format!("{}{}", self.base_url, path),
109        }
110    }
111
112    /// Build a request with auth headers and A2A-Version.
113    fn request(&self, method: reqwest::Method, url: &str) -> reqwest::RequestBuilder {
114        let mut req = self.http.request(method, url).header("a2a-version", "1.0");
115        match &self.auth {
116            ClientAuth::None => {}
117            ClientAuth::Bearer(token) => {
118                req = req.bearer_auth(token);
119            }
120            ClientAuth::ApiKey { header, key } => {
121                req = req.header(header.as_str(), key.as_str());
122            }
123        }
124        req
125    }
126
127    // =========================================================
128    // Primary API โ€” wrapper-first
129    // =========================================================
130
131    /// Send a message to the agent. Returns a wrapper `SendResponse`.
132    ///
133    /// Accepts `MessageBuilder` directly (no `.build()` needed) or a raw proto request.
134    pub async fn send_message(
135        &self,
136        request: impl Into<pb::SendMessageRequest>,
137    ) -> Result<crate::response::SendResponse, A2aClientError> {
138        let proto_resp = self.send_message_proto(request.into()).await?;
139        crate::response::SendResponse::try_from(proto_resp)
140    }
141
142    /// Get a task by ID. Returns a wrapper `Task`.
143    pub async fn get_task(
144        &self,
145        task_id: &str,
146        history_length: Option<i32>,
147    ) -> Result<turul_a2a_types::Task, A2aClientError> {
148        let proto_task = self.get_task_proto(task_id, history_length).await?;
149        turul_a2a_types::Task::try_from(proto_task)
150            .map_err(|e| A2aClientError::Conversion(e.to_string()))
151    }
152
153    /// Cancel a task. Returns a wrapper `Task`.
154    pub async fn cancel_task(
155        &self,
156        task_id: &str,
157    ) -> Result<turul_a2a_types::Task, A2aClientError> {
158        let proto_task = self.cancel_task_proto(task_id).await?;
159        turul_a2a_types::Task::try_from(proto_task)
160            .map_err(|e| A2aClientError::Conversion(e.to_string()))
161    }
162
163    /// List tasks. Returns a wrapper `ListResponse` with wrapper `Task`s.
164    pub async fn list_tasks(
165        &self,
166        params: &ListTasksParams,
167    ) -> Result<crate::response::ListResponse, A2aClientError> {
168        let proto_resp = self.list_tasks_proto(params).await?;
169        crate::response::ListResponse::try_from(proto_resp)
170    }
171
172    // =========================================================
173    // Proto-level escape hatches
174    // =========================================================
175
176    /// Send a message, returning the raw proto response.
177    pub async fn send_message_proto(
178        &self,
179        request: pb::SendMessageRequest,
180    ) -> Result<pb::SendMessageResponse, A2aClientError> {
181        let url = self.url("/message:send");
182        let resp = self
183            .request(reqwest::Method::POST, &url)
184            .header("content-type", "application/json")
185            .json(&request)
186            .send()
187            .await?;
188
189        if !resp.status().is_success() {
190            return Err(self.parse_error(resp).await);
191        }
192        let response: pb::SendMessageResponse = resp.json().await?;
193        Ok(response)
194    }
195
196    /// Get a task by ID, returning the raw proto Task.
197    pub async fn get_task_proto(
198        &self,
199        task_id: &str,
200        history_length: Option<i32>,
201    ) -> Result<pb::Task, A2aClientError> {
202        let url = self.url(&format!("/tasks/{task_id}"));
203        let mut req = self.request(reqwest::Method::GET, &url);
204        if let Some(hl) = history_length {
205            req = req.query(&[("historyLength", hl.to_string())]);
206        }
207        let resp = req.send().await?;
208
209        if !resp.status().is_success() {
210            return Err(self.parse_error(resp).await);
211        }
212        let task: pb::Task = resp.json().await?;
213        Ok(task)
214    }
215
216    /// Cancel a task, returning the raw proto Task.
217    pub async fn cancel_task_proto(&self, task_id: &str) -> Result<pb::Task, A2aClientError> {
218        let url = self.url(&format!("/tasks/{task_id}:cancel"));
219        let resp = self
220            .request(reqwest::Method::POST, &url)
221            .header("a2a-version", "1.0")
222            .send()
223            .await?;
224
225        if !resp.status().is_success() {
226            return Err(self.parse_error(resp).await);
227        }
228        let task: pb::Task = resp.json().await?;
229        Ok(task)
230    }
231
232    /// List tasks, returning the raw proto response.
233    pub async fn list_tasks_proto(
234        &self,
235        params: &ListTasksParams,
236    ) -> Result<pb::ListTasksResponse, A2aClientError> {
237        let url = self.url("/tasks");
238        let mut req = self.request(reqwest::Method::GET, &url);
239        if let Some(ref ctx) = params.context_id {
240            req = req.query(&[("contextId", ctx.as_str())]);
241        }
242        if let Some(ref status) = params.status {
243            req = req.query(&[("status", status.as_str())]);
244        }
245        if let Some(ps) = params.page_size {
246            req = req.query(&[("pageSize", &ps.to_string())]);
247        }
248        if let Some(ref pt) = params.page_token {
249            req = req.query(&[("pageToken", pt.as_str())]);
250        }
251
252        let resp = req.send().await?;
253
254        if !resp.status().is_success() {
255            return Err(self.parse_error(resp).await);
256        }
257        let response: pb::ListTasksResponse = resp.json().await?;
258        Ok(response)
259    }
260
261    // =========================================================
262    // Streaming methods
263    // =========================================================
264
265    /// Send a streaming message. Returns a typed event stream.
266    ///
267    /// Events are `StreamEvent` variants: `Task`, `Message`, `StatusUpdate`, `ArtifactUpdate`.
268    /// The stream closes when the task reaches a terminal state.
269    pub async fn send_streaming_message(
270        &self,
271        request: impl Into<pb::SendMessageRequest>,
272    ) -> Result<TypedSseStream, A2aClientError> {
273        let raw = self.send_streaming_message_raw(request).await?;
274        Ok(TypedSseStream::from_raw(raw))
275    }
276
277    /// Subscribe to task events. Returns a typed event stream.
278    ///
279    /// The first event is a `StreamEvent::Task` snapshot (spec ยง3.1.6).
280    /// Subsequent events are `StatusUpdate` / `ArtifactUpdate` from the durable store.
281    pub async fn subscribe_to_task(
282        &self,
283        task_id: &str,
284        last_event_id: Option<&str>,
285    ) -> Result<TypedSseStream, A2aClientError> {
286        let raw = self.subscribe_to_task_raw(task_id, last_event_id).await?;
287        Ok(TypedSseStream::from_raw(raw))
288    }
289
290    /// Send a streaming message, returning raw SSE events (untyped JSON).
291    pub async fn send_streaming_message_raw(
292        &self,
293        request: impl Into<pb::SendMessageRequest>,
294    ) -> Result<SseStream, A2aClientError> {
295        let request = request.into();
296        let url = self.url("/message:stream");
297        let resp = self
298            .request(reqwest::Method::POST, &url)
299            .header("content-type", "application/json")
300            .header("accept", "text/event-stream")
301            .json(&request)
302            .send()
303            .await?;
304
305        if !resp.status().is_success() {
306            return Err(self.parse_error_from_status(resp).await);
307        }
308        Ok(SseStream::from_response(resp))
309    }
310
311    /// Subscribe to task events, returning raw SSE events (untyped JSON).
312    pub async fn subscribe_to_task_raw(
313        &self,
314        task_id: &str,
315        last_event_id: Option<&str>,
316    ) -> Result<SseStream, A2aClientError> {
317        let url = self.url(&format!("/tasks/{task_id}:subscribe"));
318        let mut req = self
319            .request(reqwest::Method::GET, &url)
320            .header("accept", "text/event-stream");
321
322        if let Some(lei) = last_event_id {
323            req = req.header("Last-Event-ID", lei);
324        }
325
326        let resp = req.send().await?;
327
328        if !resp.status().is_success() {
329            return Err(self.parse_error_from_status(resp).await);
330        }
331        Ok(SseStream::from_response(resp))
332    }
333
334    // =========================================================
335    // Push notification config CRUD
336    // =========================================================
337
338    /// Create a push notification config for a task using `url` + `token`
339    /// (the 80% case). For `authentication` or other advanced fields, build
340    /// a [`turul_a2a_types::PushConfig`] via
341    /// [`turul_a2a_types::PushConfigBuilder`] and call
342    /// [`Self::create_push_config_with`].
343    pub async fn create_push_config(
344        &self,
345        task_id: &str,
346        url: impl Into<String>,
347        token: impl Into<String>,
348    ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
349        let cfg = turul_a2a_types::PushConfigBuilder::new(url, token)
350            .task_id(task_id)
351            .build();
352        self.create_push_config_with(task_id, cfg).await
353    }
354
355    /// Create a push notification config from a fully-constructed
356    /// [`turul_a2a_types::PushConfig`] โ€” use this when you need
357    /// `authentication`, a custom `tenant`, or any other field beyond
358    /// `url` + `token`.
359    pub async fn create_push_config_with(
360        &self,
361        task_id: &str,
362        config: turul_a2a_types::PushConfig,
363    ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
364        let endpoint = self.url(&format!("/tasks/{task_id}/pushNotificationConfigs"));
365        let resp = self
366            .request(reqwest::Method::POST, &endpoint)
367            .header("content-type", "application/json")
368            .json(config.as_proto())
369            .send()
370            .await?;
371
372        if !resp.status().is_success() {
373            return Err(self.parse_error_from_status(resp).await);
374        }
375        let proto: pb::TaskPushNotificationConfig = resp.json().await?;
376        Ok(proto.into())
377    }
378
379    /// Get a push notification config by ID.
380    pub async fn get_push_config(
381        &self,
382        task_id: &str,
383        config_id: &str,
384    ) -> Result<turul_a2a_types::PushConfig, A2aClientError> {
385        let url = self.url(&format!(
386            "/tasks/{task_id}/pushNotificationConfigs/{config_id}"
387        ));
388        let resp = self.request(reqwest::Method::GET, &url).send().await?;
389
390        if !resp.status().is_success() {
391            return Err(self.parse_error_from_status(resp).await);
392        }
393        let proto: pb::TaskPushNotificationConfig = resp.json().await?;
394        Ok(proto.into())
395    }
396
397    /// List push notification configs for a task.
398    pub async fn list_push_configs(
399        &self,
400        task_id: &str,
401        page_size: Option<i32>,
402        page_token: Option<&str>,
403    ) -> Result<turul_a2a_types::PushConfigPage, A2aClientError> {
404        let url = self.url(&format!("/tasks/{task_id}/pushNotificationConfigs"));
405        let mut req = self.request(reqwest::Method::GET, &url);
406        if let Some(ps) = page_size {
407            req = req.query(&[("pageSize", ps.to_string())]);
408        }
409        if let Some(pt) = page_token {
410            req = req.query(&[("pageToken", pt)]);
411        }
412        let resp = req.send().await?;
413
414        if !resp.status().is_success() {
415            return Err(self.parse_error_from_status(resp).await);
416        }
417        let proto: pb::ListTaskPushNotificationConfigsResponse = resp.json().await?;
418        Ok(turul_a2a_types::PushConfigPage::new(
419            proto.configs.into_iter().map(Into::into).collect(),
420            (!proto.next_page_token.is_empty()).then_some(proto.next_page_token),
421        ))
422    }
423
424    /// Delete a push notification config.
425    pub async fn delete_push_config(
426        &self,
427        task_id: &str,
428        config_id: &str,
429    ) -> Result<(), A2aClientError> {
430        let url = self.url(&format!(
431            "/tasks/{task_id}/pushNotificationConfigs/{config_id}"
432        ));
433        let resp = self.request(reqwest::Method::DELETE, &url).send().await?;
434
435        if !resp.status().is_success() {
436            return Err(self.parse_error_from_status(resp).await);
437        }
438        Ok(())
439    }
440
441    // =========================================================
442    // Extended agent card
443    // =========================================================
444
445    /// Fetch the extended agent card (requires authentication).
446    pub async fn fetch_extended_agent_card(&self) -> Result<pb::AgentCard, A2aClientError> {
447        let url = self.url("/extendedAgentCard");
448        let resp = self.request(reqwest::Method::GET, &url).send().await?;
449
450        if !resp.status().is_success() {
451            return Err(self.parse_error_from_status(resp).await);
452        }
453        Ok(resp.json().await?)
454    }
455
456    // =========================================================
457    // Internal helpers
458    // =========================================================
459
460    /// Parse error from a non-success response (takes ownership).
461    async fn parse_error_from_status(&self, resp: reqwest::Response) -> A2aClientError {
462        self.parse_error(resp).await
463    }
464
465    /// Parse an error response from the server.
466    async fn parse_error(&self, resp: reqwest::Response) -> A2aClientError {
467        let status = resp.status().as_u16();
468        let body = resp.text().await.unwrap_or_default();
469
470        // Try to parse as AIP-193 error
471        if let Ok(json) = serde_json::from_str::<serde_json::Value>(&body) {
472            if let Some(error) = json.get("error") {
473                let message = error
474                    .get("message")
475                    .and_then(|m| m.as_str())
476                    .unwrap_or("Unknown error")
477                    .to_string();
478
479                // Extract ErrorInfo reason if present
480                let reason = error
481                    .get("details")
482                    .and_then(|d| d.as_array())
483                    .and_then(|arr| arr.first())
484                    .and_then(|info| info.get("reason"))
485                    .and_then(|r| r.as_str())
486                    .map(|s| s.to_string());
487
488                return A2aClientError::A2aError {
489                    status,
490                    message,
491                    reason,
492                };
493            }
494        }
495
496        A2aClientError::Http {
497            status,
498            message: body,
499        }
500    }
501}
502
503/// Parameters for listing tasks.
504#[derive(Debug, Clone, Default)]
505pub struct ListTasksParams {
506    pub context_id: Option<String>,
507    pub status: Option<String>,
508    pub page_size: Option<i32>,
509    pub page_token: Option<String>,
510}