a2a_rs/port/client.rs
1//! The client-side `Transport` port.
2//!
3//! [`Transport`] is the outbound port a client uses to talk to a remote A2A
4//! agent: the application names the capability it needs ("send a message", "get a
5//! task", "subscribe to updates"), and a concrete transport **adapter**
6//! (ConnectRPC, JSON-RPC 2.0, …) fulfils it over the wire. This is the mirror of
7//! the inbound server ports — same hexagonal shape, opposite direction.
8//!
9//! Each adapter reports its wire protocol via [`Transport::protocol`] so a
10//! card-driven negotiator can pick the right one from an agent card's
11//! `supported_interfaces`.
12//!
13//! The port carries no feature gate (hex rule 5 — gate adapters, not ports); it
14//! depends only on the always-available `async-trait`/`futures` and domain types.
15
16use async_trait::async_trait;
17use futures::Stream;
18use std::pin::Pin;
19
20use crate::domain::{
21 A2AError, ListTasksParams, ListTasksResult, Message, Task, TaskArtifactUpdateEvent,
22 TaskPushNotificationConfig, TaskStatusUpdateEvent,
23};
24
25/// The capability a client needs from a remote A2A agent, independent of wire
26/// protocol. Implemented by each transport adapter (`HttpClient` for ConnectRPC,
27/// `JsonRpcClient` for JSON-RPC 2.0, …).
28#[async_trait]
29pub trait Transport: Send + Sync {
30 /// The wire protocol this transport speaks, matching an agent interface's
31 /// `protocol_binding` (e.g. `"JSONRPC"`, `"CONNECTRPC"`, `"GRPC"`).
32 fn protocol(&self) -> &str;
33
34 /// Send a message to a task
35 async fn send_task_message(
36 &self,
37 task_id: &str,
38 message: &Message,
39 session_id: Option<&str>,
40 history_length: Option<u32>,
41 ) -> Result<Task, A2AError>;
42
43 /// Get a task by ID
44 async fn get_task(&self, task_id: &str, history_length: Option<u32>) -> Result<Task, A2AError>;
45
46 /// Cancel a task
47 async fn cancel_task(&self, task_id: &str) -> Result<Task, A2AError>;
48
49 /// Set up push notifications for a task
50 async fn set_task_push_notification(
51 &self,
52 config: &TaskPushNotificationConfig,
53 ) -> Result<TaskPushNotificationConfig, A2AError>;
54
55 /// Get push notification configuration for a task
56 async fn get_task_push_notification(
57 &self,
58 task_id: &str,
59 ) -> Result<TaskPushNotificationConfig, A2AError>;
60
61 /// List tasks with filtering and pagination (v1.0.0)
62 async fn list_tasks(&self, params: &ListTasksParams) -> Result<ListTasksResult, A2AError>;
63
64 /// List all push notification configs for a task (v1.0.0)
65 async fn list_push_notification_configs(
66 &self,
67 task_id: &str,
68 ) -> Result<Vec<TaskPushNotificationConfig>, A2AError>;
69
70 /// Get a specific push notification config by ID (v1.0.0)
71 async fn get_push_notification_config(
72 &self,
73 task_id: &str,
74 config_id: &str,
75 ) -> Result<TaskPushNotificationConfig, A2AError>;
76
77 /// Delete a specific push notification config (v1.0.0)
78 async fn delete_push_notification_config(
79 &self,
80 task_id: &str,
81 config_id: &str,
82 ) -> Result<(), A2AError>;
83
84 /// Subscribe to task updates (for streaming).
85 ///
86 /// Passing `last_event_id = None` is the spec-compliant subscribe: it maps
87 /// to the A2A `SubscribeToTask` call and streams from the task's current
88 /// state — exactly what a spec client expects.
89 ///
90 /// `last_event_id = Some(..)` opts into the a2a-rs **`Last-Event-ID`
91 /// resumption enhancement** (not part of the A2A v1.0 spec): a resumable
92 /// transport sends it as the SSE `Last-Event-ID` header so an a2a-rs server
93 /// replays the events after that id before streaming live. A spec-compliant
94 /// server ignores the header and simply streams from current state, so this
95 /// stays interoperable either way.
96 async fn subscribe_to_task(
97 &self,
98 task_id: &str,
99 history_length: Option<u32>,
100 last_event_id: Option<&str>,
101 ) -> Result<Pin<Box<dyn Stream<Item = Result<StreamEvent, A2AError>> + Send>>, A2AError>;
102}
103
104/// A streamed [`StreamItem`] tagged with the server's SSE event id (when the
105/// transport supports it). A resilient client records the most recent `event_id`
106/// and echoes it as `Last-Event-ID` on reconnect to resume without gaps.
107///
108/// The `event_id` is part of the a2a-rs resumption enhancement (see
109/// [`subscribe_to_task`](Transport::subscribe_to_task)); spec clients that only
110/// read `item` are unaffected.
111#[derive(Debug, Clone)]
112pub struct StreamEvent {
113 /// The server-assigned per-task event id, parsed from the SSE `id:` field.
114 /// `None` for the initial task snapshot, for transports without event ids,
115 /// or when talking to a spec-compliant server that does not emit `id:`.
116 pub event_id: Option<u64>,
117 /// The update payload.
118 pub item: StreamItem,
119}
120
121impl StreamEvent {
122 /// Construct a stream event.
123 #[inline]
124 pub fn new(event_id: Option<u64>, item: StreamItem) -> Self {
125 Self { event_id, item }
126 }
127
128 /// A stream event with no id (initial snapshot / id-less transport).
129 #[inline]
130 pub fn untagged(item: StreamItem) -> Self {
131 Self {
132 event_id: None,
133 item,
134 }
135 }
136}
137
138/// Items that can be streamed from the server during task subscriptions.
139///
140/// When subscribing to streaming updates for a task, the server can send
141/// different types of items:
142/// - `Task`: The complete initial task state when subscription starts
143/// - `StatusUpdate`: Updates to the task's status (state changes, progress)
144/// - `ArtifactUpdate`: Notifications about new or updated artifacts
145///
146/// This allows clients to receive real-time updates about task progress
147/// and results as they become available.
148#[derive(Debug, Clone)]
149pub enum StreamItem {
150 /// The initial task state
151 Task(Task),
152 /// A task status update
153 StatusUpdate(TaskStatusUpdateEvent),
154 /// A task artifact update
155 ArtifactUpdate(TaskArtifactUpdateEvent),
156}