Skip to main content

a2a_rs/port/
client.rs

1//! The client-side `Transport` port.
2//!
3//! [`Transport`] is the outbound port a client uses to talk to a remote A2A
4//! agent: the application names the capability it needs ("send a message", "get a
5//! task", "subscribe to updates"), and a concrete transport **adapter**
6//! (ConnectRPC, JSON-RPC 2.0, …) fulfils it over the wire. This is the mirror of
7//! the inbound server ports — same hexagonal shape, opposite direction.
8//!
9//! Each adapter reports its wire protocol via [`Transport::protocol`] so a
10//! card-driven negotiator can pick the right one from an agent card's
11//! `supported_interfaces`.
12//!
13//! The port carries no feature gate (hex rule 5 — gate adapters, not ports); it
14//! depends only on the always-available `async-trait`/`futures` and domain types.
15
16use async_trait::async_trait;
17use futures::Stream;
18use std::pin::Pin;
19
20use crate::domain::{
21    A2AError, ListTasksParams, ListTasksResult, Message, Task, TaskArtifactUpdateEvent,
22    TaskPushNotificationConfig, TaskStatusUpdateEvent,
23};
24
25/// The capability a client needs from a remote A2A agent, independent of wire
26/// protocol. Implemented by each transport adapter (`HttpClient` for ConnectRPC,
27/// `JsonRpcClient` for JSON-RPC 2.0, …).
28#[async_trait]
29pub trait Transport: Send + Sync {
30    /// The wire protocol this transport speaks, matching an agent interface's
31    /// `protocol_binding` (e.g. `"JSONRPC"`, `"CONNECTRPC"`, `"GRPC"`).
32    fn protocol(&self) -> &str;
33
34    /// Send a message to a task
35    async fn send_task_message(
36        &self,
37        task_id: &str,
38        message: &Message,
39        session_id: Option<&str>,
40        history_length: Option<u32>,
41    ) -> Result<Task, A2AError>;
42
43    /// Get a task by ID
44    async fn get_task(&self, task_id: &str, history_length: Option<u32>) -> Result<Task, A2AError>;
45
46    /// Cancel a task
47    async fn cancel_task(&self, task_id: &str) -> Result<Task, A2AError>;
48
49    /// Set up push notifications for a task
50    async fn set_task_push_notification(
51        &self,
52        config: &TaskPushNotificationConfig,
53    ) -> Result<TaskPushNotificationConfig, A2AError>;
54
55    /// Get push notification configuration for a task
56    async fn get_task_push_notification(
57        &self,
58        task_id: &str,
59    ) -> Result<TaskPushNotificationConfig, A2AError>;
60
61    /// List tasks with filtering and pagination (v1.0.0)
62    async fn list_tasks(&self, params: &ListTasksParams) -> Result<ListTasksResult, A2AError>;
63
64    /// List all push notification configs for a task (v1.0.0)
65    async fn list_push_notification_configs(
66        &self,
67        task_id: &str,
68    ) -> Result<Vec<TaskPushNotificationConfig>, A2AError>;
69
70    /// Get a specific push notification config by ID (v1.0.0)
71    async fn get_push_notification_config(
72        &self,
73        task_id: &str,
74        config_id: &str,
75    ) -> Result<TaskPushNotificationConfig, A2AError>;
76
77    /// Delete a specific push notification config (v1.0.0)
78    async fn delete_push_notification_config(
79        &self,
80        task_id: &str,
81        config_id: &str,
82    ) -> Result<(), A2AError>;
83
84    /// Subscribe to task updates (for streaming).
85    ///
86    /// Passing `last_event_id = None` is the spec-compliant subscribe: it maps
87    /// to the A2A `SubscribeToTask` call and streams from the task's current
88    /// state — exactly what a spec client expects.
89    ///
90    /// `last_event_id = Some(..)` opts into the a2a-rs **`Last-Event-ID`
91    /// resumption enhancement** (not part of the A2A v1.0 spec): a resumable
92    /// transport sends it as the SSE `Last-Event-ID` header so an a2a-rs server
93    /// replays the events after that id before streaming live. A spec-compliant
94    /// server ignores the header and simply streams from current state, so this
95    /// stays interoperable either way.
96    async fn subscribe_to_task(
97        &self,
98        task_id: &str,
99        history_length: Option<u32>,
100        last_event_id: Option<&str>,
101    ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, A2AError>> + Send>>, A2AError>;
102}
103
104/// A streamed [`StreamItem`] tagged with the server's SSE event id (when the
105/// transport supports it). A resilient client records the most recent `event_id`
106/// and echoes it as `Last-Event-ID` on reconnect to resume without gaps.
107///
108/// The `event_id` is part of the a2a-rs resumption enhancement (see
109/// [`subscribe_to_task`](Transport::subscribe_to_task)); spec clients that only
110/// read `item` are unaffected.
111#[derive(Debug, Clone)]
112pub struct StreamEvent {
113    /// The server-assigned per-task event id, parsed from the SSE `id:` field.
114    /// `None` for the initial task snapshot, for transports without event ids,
115    /// or when talking to a spec-compliant server that does not emit `id:`.
116    pub event_id: Option<u64>,
117    /// The update payload.
118    pub item: StreamItem,
119}
120
121impl StreamEvent {
122    /// Construct a stream event.
123    #[inline]
124    pub fn new(event_id: Option<u64>, item: StreamItem) -> Self {
125        Self { event_id, item }
126    }
127
128    /// A stream event with no id (initial snapshot / id-less transport).
129    #[inline]
130    pub fn untagged(item: StreamItem) -> Self {
131        Self {
132            event_id: None,
133            item,
134        }
135    }
136}
137
138/// Items that can be streamed from the server during task subscriptions.
139///
140/// When subscribing to streaming updates for a task, the server can send
141/// different types of items:
142/// - `Task`: The complete initial task state when subscription starts
143/// - `StatusUpdate`: Updates to the task's status (state changes, progress)
144/// - `ArtifactUpdate`: Notifications about new or updated artifacts
145///
146/// This allows clients to receive real-time updates about task progress
147/// and results as they become available.
148#[derive(Debug, Clone)]
149pub enum StreamItem {
150    /// The initial task state
151    Task(Task),
152    /// A task status update
153    StatusUpdate(TaskStatusUpdateEvent),
154    /// A task artifact update
155    ArtifactUpdate(TaskArtifactUpdateEvent),
156}