async_coap/
send_as_stream.rs

1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16use super::*;
17
18use crate::send_desc::SendDesc;
19use futures::channel::mpsc::{Receiver, Sender};
20use futures::task::Context;
21use futures::task::Poll;
22use pin_utils::unsafe_pinned;
23use std::marker::PhantomData;
24use std::ops::Bound;
25use std::pin::Pin;
26
27/// A [`Stream`] that is created by [`LocalEndpointExt::send_as_stream`],
28/// [`RemoteEndpointExt::send_as_stream`], and [`RemoteEndpointExt::send_to_as_stream`].
29///
30/// [`Stream`]: futures::stream::Stream
31/// [`LocalEndpointExt::send_as_stream`]: crate::LocalEndpointExt::send_as_stream
32/// [`RemoteEndpointExt::send_as_stream`]: crate::RemoteEndpointExt::send_as_stream
33/// [`RemoteEndpointExt::send_to_as_stream`]: crate::RemoteEndpointExt::send_to_as_stream
34pub struct SendAsStream<'a, R: Send> {
35    pub(crate) receiver: Receiver<Result<R, Error>>,
36    pub(crate) send_future: BoxFuture<'a, Result<R, Error>>,
37}
38
39impl<'a, R: Send + core::fmt::Debug> core::fmt::Debug for SendAsStream<'a, R> {
40    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
41        f.debug_struct("SendAsStream")
42            .field("receiver", &self.receiver)
43            .field("send_future", &"")
44            .finish()
45    }
46}
47
48impl<'a, R: Send> SendAsStream<'a, R> {
49    unsafe_pinned!(send_future: BoxFuture<'a, Result<R, Error>>);
50    unsafe_pinned!(receiver: futures::channel::mpsc::Receiver<Result<R, Error>>);
51}
52
53impl<'a, R: Send> Stream for SendAsStream<'a, R> {
54    type Item = Result<R, Error>;
55
56    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57        match self.as_mut().receiver().poll_next(cx) {
58            Poll::Ready(None) => Poll::Ready(None),
59            from_receiver => match self.as_mut().send_future().poll(cx) {
60                Poll::Ready(Ok(_)) => Poll::Ready(None),
61                Poll::Ready(Err(Error::ResponseTimeout)) | Poll::Ready(Err(Error::Cancelled)) => {
62                    Poll::Ready(None)
63                }
64                Poll::Ready(Err(x)) => Poll::Ready(Some(Err(x))),
65                Poll::Pending => from_receiver,
66            },
67        }
68    }
69}
70
71#[derive(Debug)]
72pub(crate) struct SendAsStreamDesc<SD, IC, R>
73where
74    SD: SendDesc<IC, R>,
75    IC: InboundContext,
76    R: Send,
77{
78    inner: SD,
79    sender: Sender<Result<R, Error>>,
80    phantom: PhantomData<IC>,
81}
82
83impl<SD, IC, R> SendAsStreamDesc<SD, IC, R>
84where
85    SD: SendDesc<IC, R>,
86    IC: InboundContext,
87    R: Send,
88{
89    pub(crate) fn new(inner: SD, sender: Sender<Result<R, Error>>) -> SendAsStreamDesc<SD, IC, R> {
90        SendAsStreamDesc {
91            inner,
92            sender,
93            phantom: PhantomData,
94        }
95    }
96}
97
98impl<SD, IC, R> SendDesc<IC, R> for SendAsStreamDesc<SD, IC, R>
99where
100    SD: SendDesc<IC, R>,
101    IC: InboundContext,
102    R: Send,
103{
104    send_desc_passthru_timing!(inner);
105    send_desc_passthru_options!(inner);
106    send_desc_passthru_payload!(inner);
107    send_desc_passthru_supports_option!(inner);
108
109    fn handler(&mut self, context: Result<&IC, Error>) -> Result<ResponseStatus<R>, Error> {
110        match self.inner.handler(context)? {
111            ResponseStatus::Done(x) => {
112                if let Some(err) = self.sender.start_send(Ok(x)).err() {
113                    if err.is_full() {
114                        Err(Error::OutOfSpace)
115                    } else {
116                        Err(Error::Cancelled)
117                    }
118                } else {
119                    Ok(ResponseStatus::Continue)
120                }
121            }
122            response_status => Ok(response_status),
123        }
124    }
125}