Skip to main content

emergent_client/
stream.rs

1//! Message stream for receiving pushed messages.
2
3use crate::message::EmergentMessage;
4use futures::Stream;
5use std::pin::Pin;
6use std::task::{Context, Poll};
7use tokio::sync::mpsc;
8
9/// An async stream of messages received from subscriptions.
10///
11/// This is returned by `subscribe()` on `EmergentHandler` and `EmergentSink`.
12/// Implements [`futures::Stream`] for use with [`futures::StreamExt`] combinators.
13///
14/// # Example
15///
16/// ```rust,ignore
17/// use futures::StreamExt;
18///
19/// let mut stream = handler.subscribe(["timer.tick"]).await?;
20///
21/// // Basic iteration
22/// while let Some(msg) = stream.next().await {
23///     println!("Received: {}", msg.message_type);
24/// }
25///
26/// // Or use StreamExt combinators
27/// stream
28///     .filter(|msg| futures::future::ready(msg.message_type.starts_with("timer.")))
29///     .for_each(|msg| async move { println!("{:?}", msg) })
30///     .await;
31/// ```
32pub struct MessageStream {
33    /// The receiver channel for incoming messages.
34    receiver: mpsc::Receiver<EmergentMessage>,
35}
36
37impl MessageStream {
38    /// Create a new message stream from a receiver channel.
39    pub(crate) fn new(receiver: mpsc::Receiver<EmergentMessage>) -> Self {
40        Self { receiver }
41    }
42
43    /// Receive the next message from the stream.
44    ///
45    /// Returns `None` if the stream is closed.
46    pub async fn next(&mut self) -> Option<EmergentMessage> {
47        self.receiver.recv().await
48    }
49
50    /// Try to receive the next message without blocking.
51    ///
52    /// Returns `None` if no message is available.
53    pub fn try_next(&mut self) -> Option<EmergentMessage> {
54        self.receiver.try_recv().ok()
55    }
56
57    /// Close the stream.
58    pub fn close(&mut self) {
59        self.receiver.close();
60    }
61}
62
63impl Stream for MessageStream {
64    type Item = EmergentMessage;
65
66    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67        Pin::new(&mut self.receiver).poll_recv(cx)
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74    use serde_json::json;
75
76    #[tokio::test]
77    async fn test_message_stream() -> Result<(), Box<dyn std::error::Error>> {
78        let (tx, rx) = mpsc::channel(16);
79        let mut stream = MessageStream::new(rx);
80
81        // Send a message
82        let msg = EmergentMessage::new("test.event").with_payload(json!({"key": "value"}));
83        tx.send(msg).await?;
84
85        // Receive the message
86        let received = stream.next().await.ok_or("stream ended unexpectedly")?;
87        assert_eq!(received.message_type.as_str(), "test.event");
88
89        // Close and verify stream ends
90        drop(tx);
91        assert!(stream.next().await.is_none());
92        Ok(())
93    }
94}