adk_code/a2a_compat.rs
1//! A2A-compatible event mapping boundary for collaboration transport.
2//!
3//! This module defines the [`CollaborationTransport`] trait that abstracts the
4//! transport layer for [`CollaborationEvent`]s, and provides a [`LocalTransport`]
5//! implementation backed by an in-process broadcast channel.
6//!
7//! ## A2A Concept Mapping
8//!
9//! The collaboration event model is designed to map cleanly onto the ADK A2A
10//! protocol (see `adk-server/src/a2a/types.rs`) for future remote specialist
11//! execution. The mapping is:
12//!
13//! | Collaboration Concept | A2A Concept |
14//! |---|---|
15//! | [`CollaborationEvent`] | A2A `Message` or `TaskStatusUpdateEvent` |
16//! | `correlation_id` | A2A `task_id` — links related events into one task |
17//! | `producer` | A2A agent card sender — the agent originating the event |
18//! | `consumer` | A2A agent card receiver — the intended recipient agent |
19//! | `kind` | A2A `TaskState` or message role (see kind mapping below) |
20//! | `payload` | A2A `Artifact` parts or `Message` parts (structured data) |
21//! | `topic` | A2A message metadata or artifact name |
22//! | `timestamp` | A2A event timestamp |
23//!
24//! ### Event Kind → A2A Task State Mapping
25//!
26//! | [`CollaborationEventKind`] | A2A `TaskState` | Notes |
27//! |---|---|---|
28//! | `NeedWork` | `Submitted` | A new task is submitted for another agent |
29//! | `WorkClaimed` | `Working` | The receiving agent has accepted the task |
30//! | `WorkPublished` | `Completed` + artifact | Task completed with output artifact |
31//! | `FeedbackRequested` | `InputRequired` | Agent needs input before continuing |
32//! | `FeedbackProvided` | `Submitted` (response) | Input provided as a new message |
33//! | `Blocked` | `InputRequired` + metadata | Agent blocked, needs external decision |
34//! | `Completed` | `Completed` (final) | Terminal state, no more updates |
35//!
36//! ### Transport Neutrality
37//!
38//! Phase 1 uses [`LocalTransport`] (in-process broadcast channel). The event
39//! envelope is transport-neutral: the same [`CollaborationEvent`] struct works
40//! whether the transport is local or remote. A future `A2aTransport` would
41//! serialize events into A2A JSON-RPC messages and route them over HTTP/SSE
42//! without changing the [`CollaborationTransport`] trait contract.
43//!
44//! ### Migration Path to A2A
45//!
46//! To add remote specialist execution later:
47//!
48//! 1. Implement `CollaborationTransport` backed by an A2A client
49//! 2. Map `CollaborationEvent` → A2A `Message` on publish
50//! 3. Map A2A `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` → `CollaborationEvent` on receive
51//! 4. Use `correlation_id` as the A2A `task_id` for routing
52//! 5. Use `producer`/`consumer` to resolve agent card URLs
53//!
54//! The [`Workspace`] API remains unchanged — only the transport implementation
55//! swaps from local to remote.
56//!
57//! ## Example
58//!
59//! ```rust
60//! use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
61//! use adk_code::{CollaborationEvent, CollaborationEventKind};
62//!
63//! # async fn example() {
64//! let transport = LocalTransport::new(64);
65//! let mut receiver = transport.subscribe();
66//!
67//! let event = CollaborationEvent::new(
68//! "corr-1", "api-routes", "backend", CollaborationEventKind::WorkPublished,
69//! );
70//! transport.publish(event).await.unwrap();
71//!
72//! let received = receiver.recv().await;
73//! assert!(received.is_some());
74//! # }
75//! ```
76
77use async_trait::async_trait;
78use tokio::sync::broadcast;
79
80use crate::{CollaborationEvent, ExecutionError};
81
82/// Async trait abstracting the collaboration event transport layer.
83///
84/// Implementations can be local (in-process broadcast) or remote (A2A over
85/// HTTP/SSE). The trait is intentionally minimal — publish and subscribe —
86/// so that higher-level semantics (correlation, wait/resume) stay in
87/// [`Workspace`](crate::Workspace).
88///
89/// # Example
90///
91/// ```rust
92/// use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
93/// use adk_code::{CollaborationEvent, CollaborationEventKind};
94///
95/// # async fn example() {
96/// let transport = LocalTransport::new(64);
97/// let event = CollaborationEvent::new(
98/// "c1", "topic", "agent", CollaborationEventKind::NeedWork,
99/// );
100/// transport.publish(event).await.unwrap();
101/// # }
102/// ```
103#[async_trait]
104pub trait CollaborationTransport: Send + Sync {
105 /// Publish a collaboration event to all subscribers.
106 async fn publish(&self, event: CollaborationEvent) -> Result<(), ExecutionError>;
107
108 /// Create a new receiver for collaboration events.
109 fn subscribe(&self) -> Box<dyn CollaborationReceiver>;
110}
111
112/// Async trait for receiving collaboration events from a transport.
113///
114/// Each receiver gets its own independent stream of events published
115/// after the subscription was created.
116#[async_trait]
117pub trait CollaborationReceiver: Send {
118 /// Receive the next collaboration event.
119 ///
120 /// Returns `None` if the transport is closed.
121 async fn recv(&mut self) -> Option<CollaborationEvent>;
122}
123
124/// In-process collaboration transport backed by a [`broadcast`] channel.
125///
126/// This is the phase 1 default transport. It keeps all collaboration local
127/// to the process while preserving the same [`CollaborationTransport`] trait
128/// contract that a future A2A-backed transport would implement.
129///
130/// # Example
131///
132/// ```rust
133/// use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
134/// use adk_code::{CollaborationEvent, CollaborationEventKind};
135///
136/// # async fn example() {
137/// let transport = LocalTransport::new(128);
138/// let mut rx = transport.subscribe();
139///
140/// transport.publish(CollaborationEvent::new(
141/// "c1", "topic", "agent", CollaborationEventKind::WorkPublished,
142/// )).await.unwrap();
143///
144/// let event = rx.recv().await.unwrap();
145/// assert_eq!(event.correlation_id, "c1");
146/// # }
147/// ```
148#[derive(Debug)]
149pub struct LocalTransport {
150 tx: broadcast::Sender<CollaborationEvent>,
151}
152
153impl LocalTransport {
154 /// Create a new local transport with the given channel capacity.
155 ///
156 /// The capacity determines how many events can be buffered before slow
157 /// subscribers start lagging. A reasonable default is 256.
158 pub fn new(capacity: usize) -> Self {
159 let (tx, _rx) = broadcast::channel(capacity);
160 Self { tx }
161 }
162}
163
164impl Default for LocalTransport {
165 fn default() -> Self {
166 Self::new(256)
167 }
168}
169
170#[async_trait]
171impl CollaborationTransport for LocalTransport {
172 async fn publish(&self, event: CollaborationEvent) -> Result<(), ExecutionError> {
173 // If no receivers are listening, send returns Err — that is fine,
174 // the event is simply not delivered (same as Workspace::publish).
175 let _ = self.tx.send(event);
176 Ok(())
177 }
178
179 fn subscribe(&self) -> Box<dyn CollaborationReceiver> {
180 Box::new(LocalReceiver { rx: self.tx.subscribe() })
181 }
182}
183
184/// Receiver side of a [`LocalTransport`].
185struct LocalReceiver {
186 rx: broadcast::Receiver<CollaborationEvent>,
187}
188
189#[async_trait]
190impl CollaborationReceiver for LocalReceiver {
191 async fn recv(&mut self) -> Option<CollaborationEvent> {
192 loop {
193 match self.rx.recv().await {
194 Ok(event) => return Some(event),
195 Err(broadcast::error::RecvError::Lagged(skipped)) => {
196 tracing::warn!(
197 skipped,
198 "local transport receiver lagged, {skipped} events dropped"
199 );
200 continue;
201 }
202 Err(broadcast::error::RecvError::Closed) => return None,
203 }
204 }
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use super::*;
211 use crate::CollaborationEventKind;
212
213 #[tokio::test]
214 async fn local_transport_publish_and_receive() {
215 let transport = LocalTransport::new(16);
216 let mut rx = transport.subscribe();
217
218 let event = CollaborationEvent::new(
219 "c1",
220 "api-routes",
221 "backend",
222 CollaborationEventKind::WorkPublished,
223 );
224 transport.publish(event).await.unwrap();
225
226 let received = rx.recv().await.unwrap();
227 assert_eq!(received.correlation_id, "c1");
228 assert_eq!(received.kind, CollaborationEventKind::WorkPublished);
229 }
230
231 #[tokio::test]
232 async fn local_transport_multiple_subscribers() {
233 let transport = LocalTransport::new(16);
234 let mut rx1 = transport.subscribe();
235 let mut rx2 = transport.subscribe();
236
237 transport
238 .publish(CollaborationEvent::new(
239 "c1",
240 "topic",
241 "agent",
242 CollaborationEventKind::NeedWork,
243 ))
244 .await
245 .unwrap();
246
247 let e1 = rx1.recv().await.unwrap();
248 let e2 = rx2.recv().await.unwrap();
249 assert_eq!(e1.correlation_id, "c1");
250 assert_eq!(e2.correlation_id, "c1");
251 }
252
253 #[tokio::test]
254 async fn local_transport_publish_with_no_subscribers_succeeds() {
255 let transport = LocalTransport::new(16);
256 // No subscribers — publish should still succeed.
257 let result = transport
258 .publish(CollaborationEvent::new(
259 "c1",
260 "topic",
261 "agent",
262 CollaborationEventKind::Completed,
263 ))
264 .await;
265 assert!(result.is_ok());
266 }
267
268 #[tokio::test]
269 async fn local_transport_default_capacity() {
270 let transport = LocalTransport::default();
271 let mut rx = transport.subscribe();
272
273 transport
274 .publish(CollaborationEvent::new(
275 "c1",
276 "topic",
277 "agent",
278 CollaborationEventKind::WorkClaimed,
279 ))
280 .await
281 .unwrap();
282
283 let event = rx.recv().await.unwrap();
284 assert_eq!(event.kind, CollaborationEventKind::WorkClaimed);
285 }
286
287 #[tokio::test]
288 async fn local_transport_preserves_event_fields() {
289 let transport = LocalTransport::new(16);
290 let mut rx = transport.subscribe();
291
292 let original = CollaborationEvent::new(
293 "corr-42",
294 "database-schema",
295 "db_engineer",
296 CollaborationEventKind::FeedbackRequested,
297 )
298 .consumer("reviewer")
299 .payload(serde_json::json!({ "tables": ["users", "orders"] }))
300 .timestamp(1719000000000);
301
302 transport.publish(original).await.unwrap();
303
304 let received = rx.recv().await.unwrap();
305 assert_eq!(received.correlation_id, "corr-42");
306 assert_eq!(received.topic, "database-schema");
307 assert_eq!(received.producer, "db_engineer");
308 assert_eq!(received.consumer.as_deref(), Some("reviewer"));
309 assert_eq!(received.kind, CollaborationEventKind::FeedbackRequested);
310 assert_eq!(received.payload, serde_json::json!({ "tables": ["users", "orders"] }));
311 assert_eq!(received.timestamp, 1719000000000);
312 }
313
314 #[tokio::test]
315 async fn local_transport_event_ordering() {
316 let transport = LocalTransport::new(16);
317 let mut rx = transport.subscribe();
318
319 for i in 0..5 {
320 transport
321 .publish(CollaborationEvent::new(
322 format!("c{i}"),
323 "topic",
324 "agent",
325 CollaborationEventKind::NeedWork,
326 ))
327 .await
328 .unwrap();
329 }
330
331 for i in 0..5 {
332 let event = rx.recv().await.unwrap();
333 assert_eq!(event.correlation_id, format!("c{i}"));
334 }
335 }
336}