async_coap/
send_as_stream.rs1use super::*;
17
18use crate::send_desc::SendDesc;
19use futures::channel::mpsc::{Receiver, Sender};
20use futures::task::Context;
21use futures::task::Poll;
22use pin_utils::unsafe_pinned;
23use std::marker::PhantomData;
24use std::ops::Bound;
25use std::pin::Pin;
26
27pub struct SendAsStream<'a, R: Send> {
35 pub(crate) receiver: Receiver<Result<R, Error>>,
36 pub(crate) send_future: BoxFuture<'a, Result<R, Error>>,
37}
38
39impl<'a, R: Send + core::fmt::Debug> core::fmt::Debug for SendAsStream<'a, R> {
40 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
41 f.debug_struct("SendAsStream")
42 .field("receiver", &self.receiver)
43 .field("send_future", &"")
44 .finish()
45 }
46}
47
48impl<'a, R: Send> SendAsStream<'a, R> {
49 unsafe_pinned!(send_future: BoxFuture<'a, Result<R, Error>>);
50 unsafe_pinned!(receiver: futures::channel::mpsc::Receiver<Result<R, Error>>);
51}
52
53impl<'a, R: Send> Stream for SendAsStream<'a, R> {
54 type Item = Result<R, Error>;
55
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
57 match self.as_mut().receiver().poll_next(cx) {
58 Poll::Ready(None) => Poll::Ready(None),
59 from_receiver => match self.as_mut().send_future().poll(cx) {
60 Poll::Ready(Ok(_)) => Poll::Ready(None),
61 Poll::Ready(Err(Error::ResponseTimeout)) | Poll::Ready(Err(Error::Cancelled)) => {
62 Poll::Ready(None)
63 }
64 Poll::Ready(Err(x)) => Poll::Ready(Some(Err(x))),
65 Poll::Pending => from_receiver,
66 },
67 }
68 }
69}
70
71#[derive(Debug)]
72pub(crate) struct SendAsStreamDesc<SD, IC, R>
73where
74 SD: SendDesc<IC, R>,
75 IC: InboundContext,
76 R: Send,
77{
78 inner: SD,
79 sender: Sender<Result<R, Error>>,
80 phantom: PhantomData<IC>,
81}
82
83impl<SD, IC, R> SendAsStreamDesc<SD, IC, R>
84where
85 SD: SendDesc<IC, R>,
86 IC: InboundContext,
87 R: Send,
88{
89 pub(crate) fn new(inner: SD, sender: Sender<Result<R, Error>>) -> SendAsStreamDesc<SD, IC, R> {
90 SendAsStreamDesc {
91 inner,
92 sender,
93 phantom: PhantomData,
94 }
95 }
96}
97
98impl<SD, IC, R> SendDesc<IC, R> for SendAsStreamDesc<SD, IC, R>
99where
100 SD: SendDesc<IC, R>,
101 IC: InboundContext,
102 R: Send,
103{
104 send_desc_passthru_timing!(inner);
105 send_desc_passthru_options!(inner);
106 send_desc_passthru_payload!(inner);
107 send_desc_passthru_supports_option!(inner);
108
109 fn handler(&mut self, context: Result<&IC, Error>) -> Result<ResponseStatus<R>, Error> {
110 match self.inner.handler(context)? {
111 ResponseStatus::Done(x) => {
112 if let Some(err) = self.sender.start_send(Ok(x)).err() {
113 if err.is_full() {
114 Err(Error::OutOfSpace)
115 } else {
116 Err(Error::Cancelled)
117 }
118 } else {
119 Ok(ResponseStatus::Continue)
120 }
121 }
122 response_status => Ok(response_status),
123 }
124 }
125}