Skip to main content

a2a_ao/transport/
sse.rs

1//! SSE (Server-Sent Events) transport for A2A streaming.
2//!
3//! Used for real-time task updates via `SendStreamingMessage` and `SubscribeToTask`.
4
5use futures::Stream;
6use pin_project_lite::pin_project;
7use std::pin::Pin;
8use std::task::{Context, Poll};
9
10use crate::error::A2AError;
11use crate::task::TaskEvent;
12
13// A stream of task events received via SSE.
14pin_project! {
15    pub struct TaskEventStream {
16        #[pin]
17        inner: Pin<Box<dyn Stream<Item = Result<TaskEvent, A2AError>> + Send>>,
18    }
19}
20
21impl TaskEventStream {
22    /// Create a new TaskEventStream from an SSE connection.
23    pub fn new(inner: Pin<Box<dyn Stream<Item = Result<TaskEvent, A2AError>> + Send>>) -> Self {
24        Self { inner }
25    }
26}
27
28impl Stream for TaskEventStream {
29    type Item = Result<TaskEvent, A2AError>;
30
31    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
32        self.project().inner.poll_next(cx)
33    }
34}
35
36/// Parse an SSE event data line into a TaskEvent.
37pub fn parse_sse_event(data: &str) -> Result<TaskEvent, A2AError> {
38    serde_json::from_str(data)
39        .map_err(|e| A2AError::StreamingError(format!("Failed to parse SSE event: {e}")))
40}
41
42/// SSE event type constants.
43pub mod event_types {
44    /// Task state change event.
45    pub const STATE_CHANGED: &str = "state_changed";
46    /// New message added to task.
47    pub const MESSAGE_ADDED: &str = "message_added";
48    /// New artifact produced.
49    pub const ARTIFACT_ADDED: &str = "artifact_added";
50    /// Partial artifact data chunk.
51    pub const ARTIFACT_CHUNK: &str = "artifact_chunk";
52    /// End of stream.
53    pub const DONE: &str = "done";
54}