async_hal/can/receive/
receiver.rs1use super::Receive;
2use core::{
3 pin::Pin,
4 task::{Context, Poll},
5};
6use futures::{task::AtomicWaker, Stream};
7
8pub struct Receiver<R> {
9 receive: R,
10 waker: &'static AtomicWaker,
11}
12
13impl<R> Receiver<R> {
14 pub const fn new(receive: R, waker: &'static AtomicWaker) -> Self {
15 Self { receive, waker }
16 }
17}
18
19impl<R> Stream for Receiver<R>
20where
21 R: Receive + Unpin,
22{
23 type Item = Result<R::Frame, R::Error>;
24
25 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
26 match self.receive.receive() {
27 Ok(frame) => Poll::Ready(Some(Ok(frame))),
28 Err(nb::Error::WouldBlock) => {
29 self.waker.register(cx.waker());
30 Poll::Pending
31 }
32 Err(nb::Error::Other(error)) => Poll::Ready(Some(Err(error))),
33 }
34 }
35}
36
37pub struct DualReceiver<T, U> {
38 rx0: T,
39 rx1: U,
40 waker: &'static AtomicWaker,
41}
42
43impl<T, U> DualReceiver<T, U> {
44 pub const fn new(rx0: T, rx1: U, waker: &'static AtomicWaker) -> Self {
45 Self { rx0, rx1, waker }
46 }
47}
48
49impl<T, U> Stream for DualReceiver<T, U>
50where
51 T: Receive + Unpin,
52 U: Receive<Frame = T::Frame, Error = T::Error> + Unpin,
53{
54 type Item = Result<T::Frame, T::Error>;
55
56 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
57 match self.rx0.receive() {
58 Ok(frame) => Poll::Ready(Some(Ok(frame))),
59 Err(nb::Error::WouldBlock) => match self.rx1.receive() {
60 Ok(frame) => Poll::Ready(Some(Ok(frame))),
61 Err(nb::Error::WouldBlock) => {
62 self.waker.register(cx.waker());
63 Poll::Pending
64 }
65 Err(nb::Error::Other(error)) => Poll::Ready(Some(Err(error))),
66 },
67 Err(nb::Error::Other(error)) => Poll::Ready(Some(Err(error))),
68 }
69 }
70}