async_coap/
receive_as_stream.rs

1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use super::*;
17use futures::task::Context;
18use futures::task::Poll;
19use std::pin::Pin;
20
21/// A [`Stream`] that is created by [`LocalEndpointExt::receive_as_stream`].
22///
23/// [`Stream`]: futures::stream::Stream
24/// [`LocalEndpointExt::receive_as_stream`]: crate::LocalEndpointExt::receive_as_stream
25pub struct ReceiveAsStream<'a, LE, F> {
26    local_endpoint: &'a LE,
27    handler: F,
28    recv_future: Option<BoxFuture<'a, Result<(), Error>>>,
29}
30
31impl<'a, LE: core::fmt::Debug, F: core::fmt::Debug> core::fmt::Debug
32    for ReceiveAsStream<'a, LE, F>
33{
34    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
35        f.debug_struct("ReceiveAsStream")
36            .field("local_endpoint", self.local_endpoint)
37            .field("handler", &self.handler)
38            .field("recv_future", &self.recv_future.as_ref().map(|_| ""))
39            .finish()
40    }
41}
42
43impl<'a, LE, F> ReceiveAsStream<'a, LE, F>
44where
45    LE: LocalEndpoint,
46    F: FnMut(&LE::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
47{
48    pub(crate) fn new(local_endpoint: &'a LE, handler: F) -> ReceiveAsStream<'a, LE, F> {
49        let mut ret = ReceiveAsStream {
50            local_endpoint,
51            recv_future: None,
52            handler,
53        };
54        ret.update_recv_future();
55        return ret;
56    }
57
58    fn update_recv_future(&mut self) {
59        self.recv_future = Some(self.local_endpoint.receive(self.handler.clone()));
60    }
61
62    fn _poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(), Error>>> {
63        if let Some(recv_future) = self.recv_future.as_mut() {
64            match recv_future.poll_unpin(cx) {
65                Poll::Ready(Err(Error::IOError)) => {
66                    self.recv_future = None;
67                    Poll::Ready(Some(Err(Error::IOError)))
68                }
69                Poll::Ready(Err(Error::Cancelled)) => {
70                    self.recv_future = None;
71                    Poll::Ready(Some(Err(Error::Cancelled)))
72                }
73                Poll::Ready(_) => {
74                    self.update_recv_future();
75                    Poll::Ready(Some(Ok(())))
76                }
77                Poll::Pending => Poll::Pending,
78            }
79        } else {
80            Poll::Ready(None)
81        }
82    }
83}
84
85impl<'a, LE, F> Stream for ReceiveAsStream<'a, LE, F>
86where
87    LE: LocalEndpoint,
88    F: FnMut(&LE::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
89{
90    type Item = Result<(), Error>;
91
92    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        self.get_mut()._poll_next_unpin(cx)
94    }
95}