capnp_futures/
read_stream.rs

1// Copyright (c) 2016 Sandstorm Development Group, Inc. and contributors
2//
3// Permission is hereby granted, free of charge, to any person obtaining a copy
4// of this software and associated documentation files (the "Software"), to deal
5// in the Software without restriction, including without limitation the rights
6// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
7// copies of the Software, and to permit persons to whom the Software is
8// furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
14// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
18// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
19// THE SOFTWARE.
20
21use std::future::Future;
22use std::pin::Pin;
23use std::task::{Context, Poll};
24
25use capnp::{message, Error};
26use futures_util::stream::Stream;
27use futures_util::AsyncRead;
28
29async fn read_next_message<R>(
30    mut reader: R,
31    options: message::ReaderOptions,
32) -> Result<(R, Option<message::Reader<capnp::serialize::OwnedSegments>>), Error>
33where
34    R: AsyncRead + Unpin,
35{
36    let m = crate::serialize::try_read_message(&mut reader, options).await?;
37    Ok((reader, m))
38}
39
40type ReadStreamResult<R> =
41    Result<(R, Option<message::Reader<capnp::serialize::OwnedSegments>>), Error>;
42
43/// An incoming sequence of messages.
44#[must_use = "streams do nothing unless polled"]
45pub struct ReadStream<'a, R>
46where
47    R: AsyncRead + Unpin,
48{
49    options: message::ReaderOptions,
50    read: Pin<Box<dyn Future<Output = ReadStreamResult<R>> + 'a>>,
51}
52
53impl<R> Unpin for ReadStream<'_, R> where R: AsyncRead + Unpin {}
54
55impl<'a, R> ReadStream<'a, R>
56where
57    R: AsyncRead + Unpin + 'a,
58{
59    pub fn new(reader: R, options: message::ReaderOptions) -> Self {
60        ReadStream {
61            read: Box::pin(read_next_message(reader, options)),
62            options,
63        }
64    }
65}
66
67impl<'a, R> Stream for ReadStream<'a, R>
68where
69    R: AsyncRead + Unpin + 'a,
70{
71    type Item = Result<message::Reader<capnp::serialize::OwnedSegments>, Error>;
72
73    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
74        let (r, m) = match Future::poll(self.read.as_mut(), cx) {
75            Poll::Pending => return Poll::Pending,
76            Poll::Ready(Err(e)) => return Poll::Ready(Some(Err(e))),
77            Poll::Ready(Ok(x)) => x,
78        };
79        self.read = Box::pin(read_next_message(r, self.options));
80        match m {
81            Some(message) => Poll::Ready(Some(Ok(message))),
82            None => Poll::Ready(None),
83        }
84    }
85}