1use as_slice::AsMutSlice;
4use core::{
5 convert::Infallible,
6 future::Future,
7 pin::Pin,
8 task::{Context, Poll, Waker},
9};
10use embedded_dma::{StaticReadBuffer, StaticWriteBuffer};
11use futures::{
12 sink::{Sink, SinkExt},
13 stream::{FusedStream, Stream},
14};
15use stm32f1xx_hal::{
16 dma::{self, CircBuffer, CircReadDma, Event, Half, Transfer, TransferPayload, WriteDma, R},
17 serial::{RxDma1, RxDma2, RxDma3, TxDma1, TxDma2, TxDma3},
18};
19
20#[must_use = "futures do nothing unless you `.await` or poll them"]
25pub struct TransferFuture<T>(Option<T>);
26
27impl<T> TransferFuture<T> {
28 fn from_listening(transfer: T) -> Self {
30 Self(Some(transfer))
31 }
32}
33
34macro_rules! transfer_future {
35 ($(
36 $USARTX:ident: ($INT:ident, $TxDmaX:ty),
37 )+) => {
38 $(
39 impl<BUF> Future for TransferFuture<Transfer<R, BUF, $TxDmaX>>
40 where
41 BUF: Unpin,
42 {
43 type Output = (BUF, $TxDmaX);
44
45 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
46 let transfer = self.0.as_mut().expect("polled after completion");
47 if transfer.is_done() {
48 Poll::Ready(self.0.take().unwrap().wait())
49 } else {
50 waker_interrupt!($INT, cx.waker().clone());
51 Poll::Pending
52 }
53 }
54 }
55 )+
56 }
57}
58
59transfer_future!(
60 USART1: (DMA1_CHANNEL4, TxDma1),
61 USART2: (DMA1_CHANNEL7, TxDma2),
62 USART3: (DMA1_CHANNEL2, TxDma3),
63);
64
65#[must_use = "sinks do nothing unless polled"]
77pub struct TxSink<'a, BUF, PAYLOAD: TransferPayload>(Option<TxSinkState<'a, BUF, PAYLOAD>>);
78
79enum TxSinkState<'a, BUF, PAYLOAD: TransferPayload> {
80 Ready {
81 buf: &'a mut BUF,
82 tx: PAYLOAD,
83 },
84 Sending {
85 transfer: TransferFuture<Transfer<R, &'a mut BUF, PAYLOAD>>,
86 },
87}
88
89impl<'a, BUF, PAYLOAD> TxSink<'a, BUF, PAYLOAD>
90where
91 TxSink<'a, BUF, PAYLOAD>: Sink<BUF, Error = Infallible>,
92 PAYLOAD: Unpin + TransferPayload,
93{
94 pub async fn release(mut self) -> (&'a mut BUF, PAYLOAD) {
96 self.close().await.unwrap();
98 match self.0.unwrap() {
99 TxSinkState::Ready { buf, tx } => (buf, tx),
100 _ => unreachable!("invalid state after closing"),
101 }
102 }
103}
104
105impl<BUF, PAYLOAD> Sink<BUF> for TxSink<'static, BUF, PAYLOAD>
106where
107 &'static mut BUF: StaticReadBuffer<Word = u8>,
108 PAYLOAD: WriteDma<&'static mut BUF, u8> + Unpin,
109 TransferFuture<Transfer<R, &'static mut BUF, PAYLOAD>>:
110 Future<Output = (&'static mut BUF, PAYLOAD)>,
111{
112 type Error = Infallible;
113
114 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
115 self.poll_flush(cx)
116 }
117
118 fn start_send(mut self: Pin<&mut Self>, item: BUF) -> Result<(), Self::Error> {
119 let this = self.0.take().unwrap();
120 match this {
121 TxSinkState::Ready { tx, buf } => {
122 *buf = item;
123 let transfer = TransferFuture::from_listening(tx.write(buf));
124 self.0 = Some(TxSinkState::Sending { transfer });
125 Ok(())
126 }
127 TxSinkState::Sending { .. } => panic!("started sending before polled ready"),
128 }
129 }
130
131 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
132 match &mut self.0.as_mut().unwrap() {
133 TxSinkState::Ready { .. } => Poll::Ready(Ok(())),
134 TxSinkState::Sending { transfer } => match Pin::new(transfer).poll(cx) {
135 Poll::Pending => Poll::Pending,
136 Poll::Ready((buf, tx)) => {
137 self.0 = Some(TxSinkState::Ready { tx, buf });
138 Poll::Ready(Ok(()))
139 }
140 },
141 }
142 }
143
144 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
145 self.poll_flush(cx)
146 }
147}
148
149macro_rules! tx_sink {
150 ($(
151 $TxSinkX:ident: ($TxDmaX:ty),
152 )+) => {
153 $(
154 pub type $TxSinkX<'a, BUF> = TxSink<'a, BUF, $TxDmaX>;
156
157 impl<'a, BUF> $TxSinkX<'a, BUF> {
158 pub fn new(buf: &'a mut BUF, mut tx: $TxDmaX) -> Self {
160 tx.channel.listen(Event::TransferComplete);
161 Self(Some(TxSinkState::Ready {
162 buf,
163 tx,
164 }))
165 }
166 }
167 )+
168 }
169}
170
171tx_sink!(TxSink1: (TxDma1), TxSink2: (TxDma2), TxSink3: (TxDma3),);
172
173#[must_use = "streams do nothing unless polled"]
185pub struct RxStream<BUF, PAYLOAD>
186where
187 BUF: 'static,
188{
189 circ_buffer: CircBuffer<BUF, PAYLOAD>,
190 last_read_half: Half,
191}
192
193macro_rules! rx_stream {
194 ($(
195 $RxStreamX:ident: ($INT:ident, $rxdma:ty),
196 )+) => {
197 $(
198 pub type $RxStreamX<BUF> = RxStream<BUF, $rxdma>;
200
201 impl<BUF> $RxStreamX<BUF>
202 where
203 &'static mut [BUF; 2]: StaticWriteBuffer<Word = u8>,
204 {
205 pub fn new(buf: &'static mut [BUF; 2], mut rx: $rxdma) -> Self
207 where
208 BUF: AsMutSlice<Element = u8>,
209 {
210 rx.channel.listen(Event::HalfTransfer);
211 rx.channel.listen(Event::TransferComplete);
212 Self {
213 circ_buffer: rx.circ_read(buf),
214 last_read_half: Half::Second,
215 }
216 }
217
218 pub fn release(self) -> (&'static mut [BUF; 2], $rxdma) {
220 self.circ_buffer.stop()
221 }
222 }
223
224 impl<BUF> Stream for $RxStreamX<BUF>
225 where
226 BUF: Clone,
227 {
228 type Item = Result<BUF, dma::Error>;
229
230 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
231 let last_read_half = self.last_read_half;
232 let res = self.circ_buffer.peek(|buf, half| {
233 if half == last_read_half {
234 None
235 } else {
236 Some((buf.clone(), half))
237 }
238 });
239
240 match res {
241 Ok(Some((buf, half))) => {
242 self.last_read_half = half;
243 Poll::Ready(Some(Ok(buf)))
244 }
245 Ok(None) => {
246 waker_interrupt!($INT, cx.waker().clone());
247 Poll::Pending
248 }
249 Err(err) => Poll::Ready(Some(Err(err))),
250 }
251 }
252
253 fn size_hint(&self) -> (usize, Option<usize>) {
254 (usize::MAX, None)
255 }
256 }
257
258
259 impl<BUF> FusedStream for $RxStreamX<BUF>
260 where
261 BUF: Clone,
262 {
263 fn is_terminated(&self) -> bool {
264 false
265 }
266 }
267 )+
268 }
269}
270
271rx_stream!(
272 RxStream1: (DMA1_CHANNEL5, RxDma1),
273 RxStream2: (DMA1_CHANNEL6, RxDma2),
274 RxStream3: (DMA1_CHANNEL3, RxDma3),
275);