1use super::common::receive_from_stream;
2use crate::{Event, Fallible};
3use async_io::Async;
4use futures_lite::future::Boxed;
5use futures_lite::ready;
6use futures_lite::stream::Stream;
7use std::os::unix::net::UnixStream;
8use std::pin::Pin;
9use std::task::{Context, Poll};
10
11pub struct EventStream(Boxed<(Async<UnixStream>, Fallible<Event>)>);
12
13async fn receive(mut stream: Async<UnixStream>) -> (Async<UnixStream>, Fallible<Event>) {
14 let data = receive_from_stream(&mut stream).await;
15 (stream, data.and_then(Event::decode))
16}
17
18impl EventStream {
19 pub(super) fn new(stream: Async<UnixStream>) -> Self {
20 Self(Box::pin(receive(stream)))
21 }
22}
23
24impl Stream for EventStream {
25 type Item = Fallible<Event>;
26
27 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
28 let (stream, item) = ready!(self.0.as_mut().poll(cx));
29 self.0 = Box::pin(receive(stream));
30 Poll::Ready(Some(item))
31 }
32}