use crate::message::EmergentMessage;
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::sync::mpsc;
pub struct MessageStream {
receiver: mpsc::Receiver<EmergentMessage>,
}
impl MessageStream {
pub(crate) fn new(receiver: mpsc::Receiver<EmergentMessage>) -> Self {
Self { receiver }
}
pub async fn next(&mut self) -> Option<EmergentMessage> {
self.receiver.recv().await
}
pub fn try_next(&mut self) -> Option<EmergentMessage> {
self.receiver.try_recv().ok()
}
pub fn close(&mut self) {
self.receiver.close();
}
}
impl Stream for MessageStream {
type Item = EmergentMessage;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Pin::new(&mut self.receiver).poll_recv(cx)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[tokio::test]
async fn test_message_stream() -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = mpsc::channel(16);
let mut stream = MessageStream::new(rx);
let msg = EmergentMessage::new("test.event").with_payload(json!({"key": "value"}));
tx.send(msg).await?;
let received = stream.next().await.ok_or("stream ended unexpectedly")?;
assert_eq!(received.message_type.as_str(), "test.event");
drop(tx);
assert!(stream.next().await.is_none());
Ok(())
}
}