Skip to main content

adk_code/
a2a_compat.rs

1//! A2A-compatible event mapping boundary for collaboration transport.
2//!
3//! This module defines the [`CollaborationTransport`] trait that abstracts the
4//! transport layer for [`CollaborationEvent`]s, and provides a [`LocalTransport`]
5//! implementation backed by an in-process broadcast channel.
6//!
7//! ## A2A Concept Mapping
8//!
9//! The collaboration event model is designed to map cleanly onto the ADK A2A
10//! protocol (see `adk-server/src/a2a/types.rs`) for future remote specialist
11//! execution. The mapping is:
12//!
13//! | Collaboration Concept | A2A Concept |
14//! |---|---|
15//! | [`CollaborationEvent`] | A2A `Message` or `TaskStatusUpdateEvent` |
16//! | `correlation_id` | A2A `task_id` — links related events into one task |
17//! | `producer` | A2A agent card sender — the agent originating the event |
18//! | `consumer` | A2A agent card receiver — the intended recipient agent |
19//! | `kind` | A2A `TaskState` or message role (see kind mapping below) |
20//! | `payload` | A2A `Artifact` parts or `Message` parts (structured data) |
21//! | `topic` | A2A message metadata or artifact name |
22//! | `timestamp` | A2A event timestamp |
23//!
24//! ### Event Kind → A2A Task State Mapping
25//!
26//! | [`CollaborationEventKind`] | A2A `TaskState` | Notes |
27//! |---|---|---|
28//! | `NeedWork` | `Submitted` | A new task is submitted for another agent |
29//! | `WorkClaimed` | `Working` | The receiving agent has accepted the task |
30//! | `WorkPublished` | `Completed` + artifact | Task completed with output artifact |
31//! | `FeedbackRequested` | `InputRequired` | Agent needs input before continuing |
32//! | `FeedbackProvided` | `Submitted` (response) | Input provided as a new message |
33//! | `Blocked` | `InputRequired` + metadata | Agent blocked, needs external decision |
34//! | `Completed` | `Completed` (final) | Terminal state, no more updates |
35//!
36//! ### Transport Neutrality
37//!
38//! Phase 1 uses [`LocalTransport`] (in-process broadcast channel). The event
39//! envelope is transport-neutral: the same [`CollaborationEvent`] struct works
40//! whether the transport is local or remote. A future `A2aTransport` would
41//! serialize events into A2A JSON-RPC messages and route them over HTTP/SSE
42//! without changing the [`CollaborationTransport`] trait contract.
43//!
44//! ### Migration Path to A2A
45//!
46//! To add remote specialist execution later:
47//!
48//! 1. Implement `CollaborationTransport` backed by an A2A client
49//! 2. Map `CollaborationEvent` → A2A `Message` on publish
50//! 3. Map A2A `TaskStatusUpdateEvent` / `TaskArtifactUpdateEvent` → `CollaborationEvent` on receive
51//! 4. Use `correlation_id` as the A2A `task_id` for routing
52//! 5. Use `producer`/`consumer` to resolve agent card URLs
53//!
54//! The [`Workspace`] API remains unchanged — only the transport implementation
55//! swaps from local to remote.
56//!
57//! ## Example
58//!
59//! ```rust
60//! use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
61//! use adk_code::{CollaborationEvent, CollaborationEventKind};
62//!
63//! # async fn example() {
64//! let transport = LocalTransport::new(64);
65//! let mut receiver = transport.subscribe();
66//!
67//! let event = CollaborationEvent::new(
68//!     "corr-1", "api-routes", "backend", CollaborationEventKind::WorkPublished,
69//! );
70//! transport.publish(event).await.unwrap();
71//!
72//! let received = receiver.recv().await;
73//! assert!(received.is_some());
74//! # }
75//! ```
76
77use async_trait::async_trait;
78use tokio::sync::broadcast;
79
80use crate::{CollaborationEvent, ExecutionError};
81
82/// Async trait abstracting the collaboration event transport layer.
83///
84/// Implementations can be local (in-process broadcast) or remote (A2A over
85/// HTTP/SSE). The trait is intentionally minimal — publish and subscribe —
86/// so that higher-level semantics (correlation, wait/resume) stay in
87/// [`Workspace`](crate::Workspace).
88///
89/// # Example
90///
91/// ```rust
92/// use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
93/// use adk_code::{CollaborationEvent, CollaborationEventKind};
94///
95/// # async fn example() {
96/// let transport = LocalTransport::new(64);
97/// let event = CollaborationEvent::new(
98///     "c1", "topic", "agent", CollaborationEventKind::NeedWork,
99/// );
100/// transport.publish(event).await.unwrap();
101/// # }
102/// ```
103#[async_trait]
104pub trait CollaborationTransport: Send + Sync {
105    /// Publish a collaboration event to all subscribers.
106    async fn publish(&self, event: CollaborationEvent) -> Result<(), ExecutionError>;
107
108    /// Create a new receiver for collaboration events.
109    fn subscribe(&self) -> Box<dyn CollaborationReceiver>;
110}
111
112/// Async trait for receiving collaboration events from a transport.
113///
114/// Each receiver gets its own independent stream of events published
115/// after the subscription was created.
116#[async_trait]
117pub trait CollaborationReceiver: Send {
118    /// Receive the next collaboration event.
119    ///
120    /// Returns `None` if the transport is closed.
121    async fn recv(&mut self) -> Option<CollaborationEvent>;
122}
123
124/// In-process collaboration transport backed by a [`broadcast`] channel.
125///
126/// This is the phase 1 default transport. It keeps all collaboration local
127/// to the process while preserving the same [`CollaborationTransport`] trait
128/// contract that a future A2A-backed transport would implement.
129///
130/// # Example
131///
132/// ```rust
133/// use adk_code::a2a_compat::{CollaborationTransport, LocalTransport};
134/// use adk_code::{CollaborationEvent, CollaborationEventKind};
135///
136/// # async fn example() {
137/// let transport = LocalTransport::new(128);
138/// let mut rx = transport.subscribe();
139///
140/// transport.publish(CollaborationEvent::new(
141///     "c1", "topic", "agent", CollaborationEventKind::WorkPublished,
142/// )).await.unwrap();
143///
144/// let event = rx.recv().await.unwrap();
145/// assert_eq!(event.correlation_id, "c1");
146/// # }
147/// ```
148#[derive(Debug)]
149pub struct LocalTransport {
150    tx: broadcast::Sender<CollaborationEvent>,
151}
152
153impl LocalTransport {
154    /// Create a new local transport with the given channel capacity.
155    ///
156    /// The capacity determines how many events can be buffered before slow
157    /// subscribers start lagging. A reasonable default is 256.
158    pub fn new(capacity: usize) -> Self {
159        let (tx, _rx) = broadcast::channel(capacity);
160        Self { tx }
161    }
162}
163
164impl Default for LocalTransport {
165    fn default() -> Self {
166        Self::new(256)
167    }
168}
169
170#[async_trait]
171impl CollaborationTransport for LocalTransport {
172    async fn publish(&self, event: CollaborationEvent) -> Result<(), ExecutionError> {
173        // If no receivers are listening, send returns Err — that is fine,
174        // the event is simply not delivered (same as Workspace::publish).
175        let _ = self.tx.send(event);
176        Ok(())
177    }
178
179    fn subscribe(&self) -> Box<dyn CollaborationReceiver> {
180        Box::new(LocalReceiver { rx: self.tx.subscribe() })
181    }
182}
183
184/// Receiver side of a [`LocalTransport`].
185struct LocalReceiver {
186    rx: broadcast::Receiver<CollaborationEvent>,
187}
188
189#[async_trait]
190impl CollaborationReceiver for LocalReceiver {
191    async fn recv(&mut self) -> Option<CollaborationEvent> {
192        loop {
193            match self.rx.recv().await {
194                Ok(event) => return Some(event),
195                Err(broadcast::error::RecvError::Lagged(skipped)) => {
196                    tracing::warn!(
197                        skipped,
198                        "local transport receiver lagged, {skipped} events dropped"
199                    );
200                    continue;
201                }
202                Err(broadcast::error::RecvError::Closed) => return None,
203            }
204        }
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use super::*;
211    use crate::CollaborationEventKind;
212
213    #[tokio::test]
214    async fn local_transport_publish_and_receive() {
215        let transport = LocalTransport::new(16);
216        let mut rx = transport.subscribe();
217
218        let event = CollaborationEvent::new(
219            "c1",
220            "api-routes",
221            "backend",
222            CollaborationEventKind::WorkPublished,
223        );
224        transport.publish(event).await.unwrap();
225
226        let received = rx.recv().await.unwrap();
227        assert_eq!(received.correlation_id, "c1");
228        assert_eq!(received.kind, CollaborationEventKind::WorkPublished);
229    }
230
231    #[tokio::test]
232    async fn local_transport_multiple_subscribers() {
233        let transport = LocalTransport::new(16);
234        let mut rx1 = transport.subscribe();
235        let mut rx2 = transport.subscribe();
236
237        transport
238            .publish(CollaborationEvent::new(
239                "c1",
240                "topic",
241                "agent",
242                CollaborationEventKind::NeedWork,
243            ))
244            .await
245            .unwrap();
246
247        let e1 = rx1.recv().await.unwrap();
248        let e2 = rx2.recv().await.unwrap();
249        assert_eq!(e1.correlation_id, "c1");
250        assert_eq!(e2.correlation_id, "c1");
251    }
252
253    #[tokio::test]
254    async fn local_transport_publish_with_no_subscribers_succeeds() {
255        let transport = LocalTransport::new(16);
256        // No subscribers — publish should still succeed.
257        let result = transport
258            .publish(CollaborationEvent::new(
259                "c1",
260                "topic",
261                "agent",
262                CollaborationEventKind::Completed,
263            ))
264            .await;
265        assert!(result.is_ok());
266    }
267
268    #[tokio::test]
269    async fn local_transport_default_capacity() {
270        let transport = LocalTransport::default();
271        let mut rx = transport.subscribe();
272
273        transport
274            .publish(CollaborationEvent::new(
275                "c1",
276                "topic",
277                "agent",
278                CollaborationEventKind::WorkClaimed,
279            ))
280            .await
281            .unwrap();
282
283        let event = rx.recv().await.unwrap();
284        assert_eq!(event.kind, CollaborationEventKind::WorkClaimed);
285    }
286
287    #[tokio::test]
288    async fn local_transport_preserves_event_fields() {
289        let transport = LocalTransport::new(16);
290        let mut rx = transport.subscribe();
291
292        let original = CollaborationEvent::new(
293            "corr-42",
294            "database-schema",
295            "db_engineer",
296            CollaborationEventKind::FeedbackRequested,
297        )
298        .consumer("reviewer")
299        .payload(serde_json::json!({ "tables": ["users", "orders"] }))
300        .timestamp(1719000000000);
301
302        transport.publish(original).await.unwrap();
303
304        let received = rx.recv().await.unwrap();
305        assert_eq!(received.correlation_id, "corr-42");
306        assert_eq!(received.topic, "database-schema");
307        assert_eq!(received.producer, "db_engineer");
308        assert_eq!(received.consumer.as_deref(), Some("reviewer"));
309        assert_eq!(received.kind, CollaborationEventKind::FeedbackRequested);
310        assert_eq!(received.payload, serde_json::json!({ "tables": ["users", "orders"] }));
311        assert_eq!(received.timestamp, 1719000000000);
312    }
313
314    #[tokio::test]
315    async fn local_transport_event_ordering() {
316        let transport = LocalTransport::new(16);
317        let mut rx = transport.subscribe();
318
319        for i in 0..5 {
320            transport
321                .publish(CollaborationEvent::new(
322                    format!("c{i}"),
323                    "topic",
324                    "agent",
325                    CollaborationEventKind::NeedWork,
326                ))
327                .await
328                .unwrap();
329        }
330
331        for i in 0..5 {
332            let event = rx.recv().await.unwrap();
333            assert_eq!(event.correlation_id, format!("c{i}"));
334        }
335    }
336}