async_coap/
receive_as_stream.rs1use super::*;
17use futures::task::Context;
18use futures::task::Poll;
19use std::pin::Pin;
20
21pub struct ReceiveAsStream<'a, LE, F> {
26 local_endpoint: &'a LE,
27 handler: F,
28 recv_future: Option<BoxFuture<'a, Result<(), Error>>>,
29}
30
31impl<'a, LE: core::fmt::Debug, F: core::fmt::Debug> core::fmt::Debug
32 for ReceiveAsStream<'a, LE, F>
33{
34 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
35 f.debug_struct("ReceiveAsStream")
36 .field("local_endpoint", self.local_endpoint)
37 .field("handler", &self.handler)
38 .field("recv_future", &self.recv_future.as_ref().map(|_| ""))
39 .finish()
40 }
41}
42
43impl<'a, LE, F> ReceiveAsStream<'a, LE, F>
44where
45 LE: LocalEndpoint,
46 F: FnMut(&LE::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
47{
48 pub(crate) fn new(local_endpoint: &'a LE, handler: F) -> ReceiveAsStream<'a, LE, F> {
49 let mut ret = ReceiveAsStream {
50 local_endpoint,
51 recv_future: None,
52 handler,
53 };
54 ret.update_recv_future();
55 return ret;
56 }
57
58 fn update_recv_future(&mut self) {
59 self.recv_future = Some(self.local_endpoint.receive(self.handler.clone()));
60 }
61
62 fn _poll_next_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<(), Error>>> {
63 if let Some(recv_future) = self.recv_future.as_mut() {
64 match recv_future.poll_unpin(cx) {
65 Poll::Ready(Err(Error::IOError)) => {
66 self.recv_future = None;
67 Poll::Ready(Some(Err(Error::IOError)))
68 }
69 Poll::Ready(Err(Error::Cancelled)) => {
70 self.recv_future = None;
71 Poll::Ready(Some(Err(Error::Cancelled)))
72 }
73 Poll::Ready(_) => {
74 self.update_recv_future();
75 Poll::Ready(Some(Ok(())))
76 }
77 Poll::Pending => Poll::Pending,
78 }
79 } else {
80 Poll::Ready(None)
81 }
82 }
83}
84
85impl<'a, LE, F> Stream for ReceiveAsStream<'a, LE, F>
86where
87 LE: LocalEndpoint,
88 F: FnMut(&LE::RespondableInboundContext) -> Result<(), Error> + 'a + Clone + Unpin + Send,
89{
90 type Item = Result<(), Error>;
91
92 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 self.get_mut()._poll_next_unpin(cx)
94 }
95}