async_stm32f1xx/
serial.rs

1//! [`Stream`]/[`Sink`]-based abstractions for DMA-based Serial Communication (USART).
2
3use as_slice::AsMutSlice;
4use core::{
5    convert::Infallible,
6    future::Future,
7    pin::Pin,
8    task::{Context, Poll, Waker},
9};
10use embedded_dma::{StaticReadBuffer, StaticWriteBuffer};
11use futures::{
12    sink::{Sink, SinkExt},
13    stream::{FusedStream, Stream},
14};
15use stm32f1xx_hal::{
16    dma::{self, CircBuffer, CircReadDma, Event, Half, Transfer, TransferPayload, WriteDma, R},
17    serial::{RxDma1, RxDma2, RxDma3, TxDma1, TxDma2, TxDma3},
18};
19
20/// A [`Future`] driving a [`Transfer`].
21///
22/// You can not use this directly.
23/// Use [`TxSink`] instead.
24#[must_use = "futures do nothing unless you `.await` or poll them"]
25pub struct TransferFuture<T>(Option<T>);
26
27impl<T> TransferFuture<T> {
28    /// Creates a TransferFuture, the DMA channel of which must be listen to [`Event::TransferComplete`].
29    fn from_listening(transfer: T) -> Self {
30        Self(Some(transfer))
31    }
32}
33
34macro_rules! transfer_future {
35    ($(
36        $USARTX:ident: ($INT:ident, $TxDmaX:ty),
37    )+) => {
38        $(
39            impl<BUF> Future for TransferFuture<Transfer<R, BUF, $TxDmaX>>
40            where
41                BUF: Unpin,
42            {
43                type Output = (BUF, $TxDmaX);
44
45                fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46                    let transfer = self.0.as_mut().expect("polled after completion");
47                    if transfer.is_done() {
48                        Poll::Ready(self.0.take().unwrap().wait())
49                    } else {
50                        waker_interrupt!($INT, cx.waker().clone());
51                        Poll::Pending
52                    }
53                }
54            }
55        )+
56    }
57}
58
59transfer_future!(
60    USART1: (DMA1_CHANNEL4, TxDma1),
61    USART2: (DMA1_CHANNEL7, TxDma2),
62    USART3: (DMA1_CHANNEL2, TxDma3),
63);
64
65/// A [`Sink`]-based asynchronous abstraction over a DMA transmitter.
66///
67/// # Examples
68///
69/// ```
70/// let mut tx_sink = TxSink3::new(tx_buf, tx.with_dma(channels.2));
71/// // Spams "01234567"
72/// loop {
73///     tx_sink.send(*b"01234567").await.unwrap();
74/// }
75/// ```
76#[must_use = "sinks do nothing unless polled"]
77pub struct TxSink<'a, BUF, PAYLOAD: TransferPayload>(Option<TxSinkState<'a, BUF, PAYLOAD>>);
78
79enum TxSinkState<'a, BUF, PAYLOAD: TransferPayload> {
80    Ready {
81        buf: &'a mut BUF,
82        tx: PAYLOAD,
83    },
84    Sending {
85        transfer: TransferFuture<Transfer<R, &'a mut BUF, PAYLOAD>>,
86    },
87}
88
89impl<'a, BUF, PAYLOAD> TxSink<'a, BUF, PAYLOAD>
90where
91    TxSink<'a, BUF, PAYLOAD>: Sink<BUF, Error = Infallible>,
92    PAYLOAD: Unpin + TransferPayload,
93{
94    /// Releases the buffer and payload peripheral.
95    pub async fn release(mut self) -> (&'a mut BUF, PAYLOAD) {
96        // Unwrapping: TxSink is infallible
97        self.close().await.unwrap();
98        match self.0.unwrap() {
99            TxSinkState::Ready { buf, tx } => (buf, tx),
100            _ => unreachable!("invalid state after closing"),
101        }
102    }
103}
104
105impl<BUF, PAYLOAD> Sink<BUF> for TxSink<'static, BUF, PAYLOAD>
106where
107    &'static mut BUF: StaticReadBuffer<Word = u8>,
108    PAYLOAD: WriteDma<&'static mut BUF, u8> + Unpin,
109    TransferFuture<Transfer<R, &'static mut BUF, PAYLOAD>>:
110        Future<Output = (&'static mut BUF, PAYLOAD)>,
111{
112    type Error = Infallible;
113
114    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115        self.poll_flush(cx)
116    }
117
118    fn start_send(mut self: Pin<&mut Self>, item: BUF) -> Result<(), Self::Error> {
119        let this = self.0.take().unwrap();
120        match this {
121            TxSinkState::Ready { tx, buf } => {
122                *buf = item;
123                let transfer = TransferFuture::from_listening(tx.write(buf));
124                self.0 = Some(TxSinkState::Sending { transfer });
125                Ok(())
126            }
127            TxSinkState::Sending { .. } => panic!("started sending before polled ready"),
128        }
129    }
130
131    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
132        match &mut self.0.as_mut().unwrap() {
133            TxSinkState::Ready { .. } => Poll::Ready(Ok(())),
134            TxSinkState::Sending { transfer } => match Pin::new(transfer).poll(cx) {
135                Poll::Pending => Poll::Pending,
136                Poll::Ready((buf, tx)) => {
137                    self.0 = Some(TxSinkState::Ready { tx, buf });
138                    Poll::Ready(Ok(()))
139                }
140            },
141        }
142    }
143
144    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145        self.poll_flush(cx)
146    }
147}
148
149macro_rules! tx_sink {
150    ($(
151        $TxSinkX:ident: ($TxDmaX:ty),
152    )+) => {
153        $(
154            /// A type shorthand for specifying different DMA channels easily.
155            pub type $TxSinkX<'a, BUF> = TxSink<'a, BUF, $TxDmaX>;
156
157            impl<'a, BUF> $TxSinkX<'a, BUF> {
158                /// Creates a new [`TxSink`] from the specified buffer and DMA transmitter.
159                pub fn new(buf: &'a mut BUF, mut tx: $TxDmaX) -> Self {
160                    tx.channel.listen(Event::TransferComplete);
161                    Self(Some(TxSinkState::Ready {
162                        buf,
163                        tx,
164                    }))
165                }
166            }
167        )+
168    }
169}
170
171tx_sink!(TxSink1: (TxDma1), TxSink2: (TxDma2), TxSink3: (TxDma3),);
172
173/// A [`Stream`]-based asynchronous abstraction over a DMA receiver.
174///
175/// # Examples
176///
177/// ```
178/// let mut tx_sink = TxSink3::new(tx_buf, tx.with_dma(channels.2));
179/// let mut rx_stream = RxStream3::new(rx_buf, rx.with_dma(channels.3));
180/// // Echoes USART3, by sending all items from the infinite RxStream
181/// tx_sink.send_all(&mut rx_stream).await?;
182/// unreachable!("rx_stream is empty");
183/// ```
184#[must_use = "streams do nothing unless polled"]
185pub struct RxStream<BUF, PAYLOAD>
186where
187    BUF: 'static,
188{
189    circ_buffer: CircBuffer<BUF, PAYLOAD>,
190    last_read_half: Half,
191}
192
193macro_rules! rx_stream {
194    ($(
195        $RxStreamX:ident: ($INT:ident, $rxdma:ty),
196    )+) => {
197        $(
198            /// A type shorthand for specifying different DMA channels easily.
199            pub type $RxStreamX<BUF> = RxStream<BUF, $rxdma>;
200
201            impl<BUF> $RxStreamX<BUF>
202            where
203                &'static mut [BUF; 2]: StaticWriteBuffer<Word = u8>,
204            {
205                /// Creates a new [`RxStream`] from the specified buffers and DMA transmitter.
206                pub fn new(buf: &'static mut [BUF; 2], mut rx: $rxdma) -> Self
207                where
208                    BUF: AsMutSlice<Element = u8>,
209                {
210                    rx.channel.listen(Event::HalfTransfer);
211                    rx.channel.listen(Event::TransferComplete);
212                    Self {
213                        circ_buffer: rx.circ_read(buf),
214                        last_read_half: Half::Second,
215                    }
216                }
217
218                /// Releases the buffers and DMA transmitter.
219                pub fn release(self) -> (&'static mut [BUF; 2], $rxdma) {
220                    self.circ_buffer.stop()
221                }
222            }
223
224            impl<BUF> Stream for $RxStreamX<BUF>
225            where
226                BUF: Clone,
227            {
228                type Item = Result<BUF, dma::Error>;
229
230                fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231                    let last_read_half = self.last_read_half;
232                    let res = self.circ_buffer.peek(|buf, half| {
233                        if half == last_read_half {
234                            None
235                        } else {
236                            Some((buf.clone(), half))
237                        }
238                    });
239
240                    match res {
241                        Ok(Some((buf, half))) => {
242                            self.last_read_half = half;
243                            Poll::Ready(Some(Ok(buf)))
244                        }
245                        Ok(None) => {
246                            waker_interrupt!($INT, cx.waker().clone());
247                            Poll::Pending
248                        }
249                        Err(err) => Poll::Ready(Some(Err(err))),
250                    }
251                }
252
253                fn size_hint(&self) -> (usize, Option<usize>) {
254                    (usize::MAX, None)
255                }
256            }
257
258
259            impl<BUF> FusedStream for $RxStreamX<BUF>
260            where
261                BUF: Clone,
262            {
263                fn is_terminated(&self) -> bool {
264                    false
265                }
266            }
267        )+
268    }
269}
270
271rx_stream!(
272    RxStream1: (DMA1_CHANNEL5, RxDma1),
273    RxStream2: (DMA1_CHANNEL6, RxDma2),
274    RxStream3: (DMA1_CHANNEL3, RxDma3),
275);