capnp_futures/
read_stream.rs1use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use capnp::{message, Error};
26use futures_util::stream::Stream;
27use futures_util::AsyncRead;
28
29async fn read_next_message<R>(
30 mut reader: R,
31 options: message::ReaderOptions,
32) -> Result<(R, Option<message::Reader<capnp::serialize::OwnedSegments>>), Error>
33where
34 R: AsyncRead + Unpin,
35{
36 let m = crate::serialize::try_read_message(&mut reader, options).await?;
37 Ok((reader, m))
38}
39
40type ReadStreamResult<R> =
41 Result<(R, Option<message::Reader<capnp::serialize::OwnedSegments>>), Error>;
42
43#[must_use = "streams do nothing unless polled"]
45pub struct ReadStream<'a, R>
46where
47 R: AsyncRead + Unpin,
48{
49 options: message::ReaderOptions,
50 read: Pin<Box<dyn Future<Output = ReadStreamResult<R>> + 'a>>,
51}
52
53impl<R> Unpin for ReadStream<'_, R> where R: AsyncRead + Unpin {}
54
55impl<'a, R> ReadStream<'a, R>
56where
57 R: AsyncRead + Unpin + 'a,
58{
59 pub fn new(reader: R, options: message::ReaderOptions) -> Self {
60 ReadStream {
61 read: Box::pin(read_next_message(reader, options)),
62 options,
63 }
64 }
65}
66
67impl<'a, R> Stream for ReadStream<'a, R>
68where
69 R: AsyncRead + Unpin + 'a,
70{
71 type Item = Result<message::Reader<capnp::serialize::OwnedSegments>, Error>;
72
73 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
74 let (r, m) = match Future::poll(self.read.as_mut(), cx) {
75 Poll::Pending => return Poll::Pending,
76 Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
77 Poll::Ready(Ok(x)) => x,
78 };
79 self.read = Box::pin(read_next_message(r, self.options));
80 match m {
81 Some(message) => Poll::Ready(Some(Ok(message))),
82 None => Poll::Ready(None),
83 }
84 }
85}