embassy_nrf/
buffered_uarte.rs

1//! Async buffered UART driver.
2//!
3//! Note that discarding a future from a read or write operation may lead to losing
4//! data. For example, when using `futures_util::future::select` and completion occurs
5//! on the "other" future, you should capture the incomplete future and continue to use
6//! it for the next read or write. This pattern is a consideration for all IO, and not
7//! just serial communications.
8//!
9//! Please also see [crate::uarte] to understand when [BufferedUarte] should be used.
10
11use core::cmp::min;
12use core::future::{poll_fn, Future};
13use core::marker::PhantomData;
14use core::slice;
15use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU8, AtomicUsize, Ordering};
16use core::task::Poll;
17
18use embassy_hal_internal::atomic_ring_buffer::RingBuffer;
19use embassy_hal_internal::Peri;
20use pac::uarte::vals;
21// Re-export SVD variants to allow user to directly set values
22pub use pac::uarte::vals::{Baudrate, ConfigParity as Parity};
23
24use crate::gpio::{AnyPin, Pin as GpioPin};
25use crate::interrupt::typelevel::Interrupt;
26use crate::ppi::{
27    self, AnyConfigurableChannel, AnyGroup, Channel, ConfigurableChannel, Event, Group, Ppi, PpiGroup, Task,
28};
29use crate::timer::{Instance as TimerInstance, Timer};
30use crate::uarte::{configure, configure_rx_pins, configure_tx_pins, drop_tx_rx, Config, Instance as UarteInstance};
31use crate::{interrupt, pac, EASY_DMA_SIZE};
32
33pub(crate) struct State {
34    tx_buf: RingBuffer,
35    tx_count: AtomicUsize,
36
37    rx_buf: RingBuffer,
38    rx_started: AtomicBool,
39    rx_started_count: AtomicU8,
40    rx_ended_count: AtomicU8,
41    rx_ppi_ch: AtomicU8,
42}
43
44/// UART error.
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
46#[cfg_attr(feature = "defmt", derive(defmt::Format))]
47#[non_exhaustive]
48pub enum Error {
49    // No errors for now
50}
51
52impl State {
53    pub(crate) const fn new() -> Self {
54        Self {
55            tx_buf: RingBuffer::new(),
56            tx_count: AtomicUsize::new(0),
57
58            rx_buf: RingBuffer::new(),
59            rx_started: AtomicBool::new(false),
60            rx_started_count: AtomicU8::new(0),
61            rx_ended_count: AtomicU8::new(0),
62            rx_ppi_ch: AtomicU8::new(0),
63        }
64    }
65}
66
67/// Interrupt handler.
68pub struct InterruptHandler<U: UarteInstance> {
69    _phantom: PhantomData<U>,
70}
71
72impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for InterruptHandler<U> {
73    unsafe fn on_interrupt() {
74        //trace!("irq: start");
75        let r = U::regs();
76        let ss = U::state();
77        let s = U::buffered_state();
78
79        if let Some(mut rx) = unsafe { s.rx_buf.try_writer() } {
80            let buf_len = s.rx_buf.len();
81            let half_len = buf_len / 2;
82
83            if r.events_error().read() != 0 {
84                r.events_error().write_value(0);
85                let errs = r.errorsrc().read();
86                r.errorsrc().write_value(errs);
87
88                if errs.overrun() {
89                    panic!("BufferedUarte overrun");
90                }
91            }
92
93            // Received some bytes, wake task.
94            if r.inten().read().rxdrdy() && r.events_rxdrdy().read() != 0 {
95                r.intenclr().write(|w| w.set_rxdrdy(true));
96                r.events_rxdrdy().write_value(0);
97                ss.rx_waker.wake();
98            }
99
100            if r.events_endrx().read() != 0 {
101                //trace!("  irq_rx: endrx");
102                r.events_endrx().write_value(0);
103
104                let val = s.rx_ended_count.load(Ordering::Relaxed);
105                s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
106            }
107
108            if r.events_rxstarted().read() != 0 || !s.rx_started.load(Ordering::Relaxed) {
109                //trace!("  irq_rx: rxstarted");
110                let (ptr, len) = rx.push_buf();
111                if len >= half_len {
112                    r.events_rxstarted().write_value(0);
113
114                    //trace!("  irq_rx: starting second {:?}", half_len);
115
116                    // Set up the DMA read
117                    r.rxd().ptr().write_value(ptr as u32);
118                    r.rxd().maxcnt().write(|w| w.set_maxcnt(half_len as _));
119
120                    let chn = s.rx_ppi_ch.load(Ordering::Relaxed);
121
122                    // Enable endrx -> startrx PPI channel.
123                    // From this point on, if endrx happens, startrx is automatically fired.
124                    ppi::regs().chenset().write(|w| w.0 = 1 << chn);
125
126                    // It is possible that endrx happened BEFORE enabling the PPI. In this case
127                    // the PPI channel doesn't trigger, and we'd hang. We have to detect this
128                    // and manually start.
129
130                    // check again in case endrx has happened between the last check and now.
131                    if r.events_endrx().read() != 0 {
132                        //trace!("  irq_rx: endrx");
133                        r.events_endrx().write_value(0);
134
135                        let val = s.rx_ended_count.load(Ordering::Relaxed);
136                        s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
137                    }
138
139                    let rx_ended = s.rx_ended_count.load(Ordering::Relaxed);
140                    let rx_started = s.rx_started_count.load(Ordering::Relaxed);
141
142                    // If we started the same amount of transfers as ended, the last rxend has
143                    // already occured.
144                    let rxend_happened = rx_started == rx_ended;
145
146                    // Check if the PPI channel is still enabled. The PPI channel disables itself
147                    // when it fires, so if it's still enabled it hasn't fired.
148                    let ppi_ch_enabled = ppi::regs().chen().read().ch(chn as _);
149
150                    // if rxend happened, and the ppi channel hasn't fired yet, the rxend got missed.
151                    // this condition also naturally matches if `!started`, needed to kickstart the DMA.
152                    if rxend_happened && ppi_ch_enabled {
153                        //trace!("manually starting.");
154
155                        // disable the ppi ch, it's of no use anymore.
156                        ppi::regs().chenclr().write(|w| w.set_ch(chn as _, true));
157
158                        // manually start
159                        r.tasks_startrx().write_value(1);
160                    }
161
162                    rx.push_done(half_len);
163
164                    s.rx_started_count.store(rx_started.wrapping_add(1), Ordering::Relaxed);
165                    s.rx_started.store(true, Ordering::Relaxed);
166                } else {
167                    //trace!("  irq_rx: rxstarted no buf");
168                    r.intenclr().write(|w| w.set_rxstarted(true));
169                }
170            }
171        }
172
173        // =============================
174
175        if let Some(mut tx) = unsafe { s.tx_buf.try_reader() } {
176            // TX end
177            if r.events_endtx().read() != 0 {
178                r.events_endtx().write_value(0);
179
180                let n = s.tx_count.load(Ordering::Relaxed);
181                //trace!("  irq_tx: endtx {:?}", n);
182                tx.pop_done(n);
183                ss.tx_waker.wake();
184                s.tx_count.store(0, Ordering::Relaxed);
185            }
186
187            // If not TXing, start.
188            if s.tx_count.load(Ordering::Relaxed) == 0 {
189                let (ptr, len) = tx.pop_buf();
190                let len = len.min(EASY_DMA_SIZE);
191                if len != 0 {
192                    //trace!("  irq_tx: starting {:?}", len);
193                    s.tx_count.store(len, Ordering::Relaxed);
194
195                    // Set up the DMA write
196                    r.txd().ptr().write_value(ptr as u32);
197                    r.txd().maxcnt().write(|w| w.set_maxcnt(len as _));
198
199                    // Start UARTE Transmit transaction
200                    r.tasks_starttx().write_value(1);
201                }
202            }
203        }
204
205        //trace!("irq: end");
206    }
207}
208
209/// Buffered UARTE driver.
210pub struct BufferedUarte<'d, U: UarteInstance, T: TimerInstance> {
211    tx: BufferedUarteTx<'d, U>,
212    rx: BufferedUarteRx<'d, U, T>,
213}
214
215impl<'d, U: UarteInstance, T: TimerInstance> Unpin for BufferedUarte<'d, U, T> {}
216
217impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarte<'d, U, T> {
218    /// Create a new BufferedUarte without hardware flow control.
219    ///
220    /// # Panics
221    ///
222    /// Panics if `rx_buffer.len()` is odd.
223    #[allow(clippy::too_many_arguments)]
224    pub fn new(
225        uarte: Peri<'d, U>,
226        timer: Peri<'d, T>,
227        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
228        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
229        ppi_group: Peri<'d, impl Group>,
230        rxd: Peri<'d, impl GpioPin>,
231        txd: Peri<'d, impl GpioPin>,
232        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
233        config: Config,
234        rx_buffer: &'d mut [u8],
235        tx_buffer: &'d mut [u8],
236    ) -> Self {
237        Self::new_inner(
238            uarte,
239            timer,
240            ppi_ch1.into(),
241            ppi_ch2.into(),
242            ppi_group.into(),
243            rxd.into(),
244            txd.into(),
245            None,
246            None,
247            config,
248            rx_buffer,
249            tx_buffer,
250        )
251    }
252
253    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
254    ///
255    /// # Panics
256    ///
257    /// Panics if `rx_buffer.len()` is odd.
258    #[allow(clippy::too_many_arguments)]
259    pub fn new_with_rtscts(
260        uarte: Peri<'d, U>,
261        timer: Peri<'d, T>,
262        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
263        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
264        ppi_group: Peri<'d, impl Group>,
265        rxd: Peri<'d, impl GpioPin>,
266        txd: Peri<'d, impl GpioPin>,
267        cts: Peri<'d, impl GpioPin>,
268        rts: Peri<'d, impl GpioPin>,
269        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
270        config: Config,
271        rx_buffer: &'d mut [u8],
272        tx_buffer: &'d mut [u8],
273    ) -> Self {
274        Self::new_inner(
275            uarte,
276            timer,
277            ppi_ch1.into(),
278            ppi_ch2.into(),
279            ppi_group.into(),
280            rxd.into(),
281            txd.into(),
282            Some(cts.into()),
283            Some(rts.into()),
284            config,
285            rx_buffer,
286            tx_buffer,
287        )
288    }
289
290    #[allow(clippy::too_many_arguments)]
291    fn new_inner(
292        peri: Peri<'d, U>,
293        timer: Peri<'d, T>,
294        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
295        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
296        ppi_group: Peri<'d, AnyGroup>,
297        rxd: Peri<'d, AnyPin>,
298        txd: Peri<'d, AnyPin>,
299        cts: Option<Peri<'d, AnyPin>>,
300        rts: Option<Peri<'d, AnyPin>>,
301        config: Config,
302        rx_buffer: &'d mut [u8],
303        tx_buffer: &'d mut [u8],
304    ) -> Self {
305        configure(U::regs(), config, cts.is_some());
306
307        let tx = BufferedUarteTx::new_innerer(unsafe { peri.clone_unchecked() }, txd, cts, tx_buffer);
308        let rx = BufferedUarteRx::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
309
310        U::regs().enable().write(|w| w.set_enable(vals::Enable::ENABLED));
311        U::Interrupt::pend();
312        unsafe { U::Interrupt::enable() };
313
314        U::state().tx_rx_refcount.store(2, Ordering::Relaxed);
315
316        Self { tx, rx }
317    }
318
319    /// Adjust the baud rate to the provided value.
320    pub fn set_baudrate(&mut self, baudrate: Baudrate) {
321        let r = U::regs();
322        r.baudrate().write(|w| w.set_baudrate(baudrate));
323    }
324
325    /// Split the UART in reader and writer parts.
326    ///
327    /// This allows reading and writing concurrently from independent tasks.
328    pub fn split(self) -> (BufferedUarteRx<'d, U, T>, BufferedUarteTx<'d, U>) {
329        (self.rx, self.tx)
330    }
331
332    /// Split the UART in reader and writer parts, by reference.
333    ///
334    /// The returned halves borrow from `self`, so you can drop them and go back to using
335    /// the "un-split" `self`. This allows temporarily splitting the UART.
336    pub fn split_by_ref(&mut self) -> (&mut BufferedUarteRx<'d, U, T>, &mut BufferedUarteTx<'d, U>) {
337        (&mut self.rx, &mut self.tx)
338    }
339
340    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
341    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
342        self.rx.read(buf).await
343    }
344
345    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
346    pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
347        self.rx.fill_buf().await
348    }
349
350    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
351    pub fn consume(&mut self, amt: usize) {
352        self.rx.consume(amt)
353    }
354
355    /// Write a buffer into this writer, returning how many bytes were written.
356    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
357        self.tx.write(buf).await
358    }
359
360    /// Try writing a buffer without waiting, returning how many bytes were written.
361    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
362        self.tx.try_write(buf)
363    }
364
365    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
366    pub async fn flush(&mut self) -> Result<(), Error> {
367        self.tx.flush().await
368    }
369}
370
371/// Reader part of the buffered UARTE driver.
372pub struct BufferedUarteTx<'d, U: UarteInstance> {
373    _peri: Peri<'d, U>,
374}
375
376impl<'d, U: UarteInstance> BufferedUarteTx<'d, U> {
377    /// Create a new BufferedUarteTx without hardware flow control.
378    pub fn new(
379        uarte: Peri<'d, U>,
380        txd: Peri<'d, impl GpioPin>,
381        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
382        config: Config,
383        tx_buffer: &'d mut [u8],
384    ) -> Self {
385        Self::new_inner(uarte, txd.into(), None, config, tx_buffer)
386    }
387
388    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
389    ///
390    /// # Panics
391    ///
392    /// Panics if `rx_buffer.len()` is odd.
393    pub fn new_with_cts(
394        uarte: Peri<'d, U>,
395        txd: Peri<'d, impl GpioPin>,
396        cts: Peri<'d, impl GpioPin>,
397        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
398        config: Config,
399        tx_buffer: &'d mut [u8],
400    ) -> Self {
401        Self::new_inner(uarte, txd.into(), Some(cts.into()), config, tx_buffer)
402    }
403
404    fn new_inner(
405        peri: Peri<'d, U>,
406        txd: Peri<'d, AnyPin>,
407        cts: Option<Peri<'d, AnyPin>>,
408        config: Config,
409        tx_buffer: &'d mut [u8],
410    ) -> Self {
411        configure(U::regs(), config, cts.is_some());
412
413        let this = Self::new_innerer(peri, txd, cts, tx_buffer);
414
415        U::regs().enable().write(|w| w.set_enable(vals::Enable::ENABLED));
416        U::Interrupt::pend();
417        unsafe { U::Interrupt::enable() };
418
419        U::state().tx_rx_refcount.store(1, Ordering::Relaxed);
420
421        this
422    }
423
424    fn new_innerer(
425        peri: Peri<'d, U>,
426        txd: Peri<'d, AnyPin>,
427        cts: Option<Peri<'d, AnyPin>>,
428        tx_buffer: &'d mut [u8],
429    ) -> Self {
430        let r = U::regs();
431
432        configure_tx_pins(r, txd, cts);
433
434        // Initialize state
435        let s = U::buffered_state();
436        s.tx_count.store(0, Ordering::Relaxed);
437        let len = tx_buffer.len();
438        unsafe { s.tx_buf.init(tx_buffer.as_mut_ptr(), len) };
439
440        r.events_txstarted().write_value(0);
441
442        // Enable interrupts
443        r.intenset().write(|w| {
444            w.set_endtx(true);
445        });
446
447        Self { _peri: peri }
448    }
449
450    /// Write a buffer into this writer, returning how many bytes were written.
451    pub fn write<'a>(&'a mut self, buf: &'a [u8]) -> impl Future<Output = Result<usize, Error>> + 'a {
452        poll_fn(move |cx| {
453            //trace!("poll_write: {:?}", buf.len());
454            let ss = U::state();
455            let s = U::buffered_state();
456            let mut tx = unsafe { s.tx_buf.writer() };
457
458            let tx_buf = tx.push_slice();
459            if tx_buf.is_empty() {
460                //trace!("poll_write: pending");
461                ss.tx_waker.register(cx.waker());
462                return Poll::Pending;
463            }
464
465            let n = min(tx_buf.len(), buf.len());
466            tx_buf[..n].copy_from_slice(&buf[..n]);
467            tx.push_done(n);
468
469            //trace!("poll_write: queued {:?}", n);
470
471            compiler_fence(Ordering::SeqCst);
472            U::Interrupt::pend();
473
474            Poll::Ready(Ok(n))
475        })
476    }
477
478    /// Try writing a buffer without waiting, returning how many bytes were written.
479    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
480        //trace!("poll_write: {:?}", buf.len());
481        let s = U::buffered_state();
482        let mut tx = unsafe { s.tx_buf.writer() };
483
484        let tx_buf = tx.push_slice();
485        if tx_buf.is_empty() {
486            return Ok(0);
487        }
488
489        let n = min(tx_buf.len(), buf.len());
490        tx_buf[..n].copy_from_slice(&buf[..n]);
491        tx.push_done(n);
492
493        //trace!("poll_write: queued {:?}", n);
494
495        compiler_fence(Ordering::SeqCst);
496        U::Interrupt::pend();
497
498        Ok(n)
499    }
500
501    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
502    pub fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
503        poll_fn(move |cx| {
504            //trace!("poll_flush");
505            let ss = U::state();
506            let s = U::buffered_state();
507            if !s.tx_buf.is_empty() {
508                //trace!("poll_flush: pending");
509                ss.tx_waker.register(cx.waker());
510                return Poll::Pending;
511            }
512
513            Poll::Ready(Ok(()))
514        })
515    }
516}
517
518impl<'a, U: UarteInstance> Drop for BufferedUarteTx<'a, U> {
519    fn drop(&mut self) {
520        let r = U::regs();
521
522        r.intenclr().write(|w| {
523            w.set_txdrdy(true);
524            w.set_txstarted(true);
525            w.set_txstopped(true);
526        });
527        r.events_txstopped().write_value(0);
528        r.tasks_stoptx().write_value(1);
529        while r.events_txstopped().read() == 0 {}
530
531        let s = U::buffered_state();
532        unsafe { s.tx_buf.deinit() }
533
534        let s = U::state();
535        drop_tx_rx(r, s);
536    }
537}
538
539/// Reader part of the buffered UARTE driver.
540pub struct BufferedUarteRx<'d, U: UarteInstance, T: TimerInstance> {
541    _peri: Peri<'d, U>,
542    timer: Timer<'d, T>,
543    _ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
544    _ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
545    _ppi_group: PpiGroup<'d, AnyGroup>,
546}
547
548impl<'d, U: UarteInstance, T: TimerInstance> BufferedUarteRx<'d, U, T> {
549    /// Create a new BufferedUarte without hardware flow control.
550    ///
551    /// # Panics
552    ///
553    /// Panics if `rx_buffer.len()` is odd.
554    #[allow(clippy::too_many_arguments)]
555    pub fn new(
556        uarte: Peri<'d, U>,
557        timer: Peri<'d, T>,
558        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
559        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
560        ppi_group: Peri<'d, impl Group>,
561        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
562        rxd: Peri<'d, impl GpioPin>,
563        config: Config,
564        rx_buffer: &'d mut [u8],
565    ) -> Self {
566        Self::new_inner(
567            uarte,
568            timer,
569            ppi_ch1.into(),
570            ppi_ch2.into(),
571            ppi_group.into(),
572            rxd.into(),
573            None,
574            config,
575            rx_buffer,
576        )
577    }
578
579    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
580    ///
581    /// # Panics
582    ///
583    /// Panics if `rx_buffer.len()` is odd.
584    #[allow(clippy::too_many_arguments)]
585    pub fn new_with_rts(
586        uarte: Peri<'d, U>,
587        timer: Peri<'d, T>,
588        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
589        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
590        ppi_group: Peri<'d, impl Group>,
591        rxd: Peri<'d, impl GpioPin>,
592        rts: Peri<'d, impl GpioPin>,
593        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
594        config: Config,
595        rx_buffer: &'d mut [u8],
596    ) -> Self {
597        Self::new_inner(
598            uarte,
599            timer,
600            ppi_ch1.into(),
601            ppi_ch2.into(),
602            ppi_group.into(),
603            rxd.into(),
604            Some(rts.into()),
605            config,
606            rx_buffer,
607        )
608    }
609
610    #[allow(clippy::too_many_arguments)]
611    fn new_inner(
612        peri: Peri<'d, U>,
613        timer: Peri<'d, T>,
614        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
615        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
616        ppi_group: Peri<'d, AnyGroup>,
617        rxd: Peri<'d, AnyPin>,
618        rts: Option<Peri<'d, AnyPin>>,
619        config: Config,
620        rx_buffer: &'d mut [u8],
621    ) -> Self {
622        configure(U::regs(), config, rts.is_some());
623
624        let this = Self::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
625
626        U::regs().enable().write(|w| w.set_enable(vals::Enable::ENABLED));
627        U::Interrupt::pend();
628        unsafe { U::Interrupt::enable() };
629
630        U::state().tx_rx_refcount.store(1, Ordering::Relaxed);
631
632        this
633    }
634
635    #[allow(clippy::too_many_arguments)]
636    fn new_innerer(
637        peri: Peri<'d, U>,
638        timer: Peri<'d, T>,
639        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
640        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
641        ppi_group: Peri<'d, AnyGroup>,
642        rxd: Peri<'d, AnyPin>,
643        rts: Option<Peri<'d, AnyPin>>,
644        rx_buffer: &'d mut [u8],
645    ) -> Self {
646        assert!(rx_buffer.len() % 2 == 0);
647
648        let r = U::regs();
649
650        configure_rx_pins(r, rxd, rts);
651
652        // Initialize state
653        let s = U::buffered_state();
654        s.rx_started_count.store(0, Ordering::Relaxed);
655        s.rx_ended_count.store(0, Ordering::Relaxed);
656        s.rx_started.store(false, Ordering::Relaxed);
657        let rx_len = rx_buffer.len().min(EASY_DMA_SIZE * 2);
658        unsafe { s.rx_buf.init(rx_buffer.as_mut_ptr(), rx_len) };
659
660        // clear errors
661        let errors = r.errorsrc().read();
662        r.errorsrc().write_value(errors);
663
664        r.events_rxstarted().write_value(0);
665        r.events_error().write_value(0);
666        r.events_endrx().write_value(0);
667
668        // Enable interrupts
669        r.intenset().write(|w| {
670            w.set_endtx(true);
671            w.set_rxstarted(true);
672            w.set_error(true);
673            w.set_endrx(true);
674        });
675
676        // Configure byte counter.
677        let timer = Timer::new_counter(timer);
678        timer.cc(1).write(rx_len as u32 * 2);
679        timer.cc(1).short_compare_clear();
680        timer.clear();
681        timer.start();
682
683        let mut ppi_ch1 = Ppi::new_one_to_one(ppi_ch1, Event::from_reg(r.events_rxdrdy()), timer.task_count());
684        ppi_ch1.enable();
685
686        s.rx_ppi_ch.store(ppi_ch2.number() as u8, Ordering::Relaxed);
687        let mut ppi_group = PpiGroup::new(ppi_group);
688        let mut ppi_ch2 = Ppi::new_one_to_two(
689            ppi_ch2,
690            Event::from_reg(r.events_endrx()),
691            Task::from_reg(r.tasks_startrx()),
692            ppi_group.task_disable_all(),
693        );
694        ppi_ch2.disable();
695        ppi_group.add_channel(&ppi_ch2);
696
697        Self {
698            _peri: peri,
699            timer,
700            _ppi_ch1: ppi_ch1,
701            _ppi_ch2: ppi_ch2,
702            _ppi_group: ppi_group,
703        }
704    }
705
706    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
707    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
708        let data = self.fill_buf().await?;
709        let n = data.len().min(buf.len());
710        buf[..n].copy_from_slice(&data[..n]);
711        self.consume(n);
712        Ok(n)
713    }
714
715    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
716    pub fn fill_buf(&mut self) -> impl Future<Output = Result<&'_ [u8], Error>> {
717        poll_fn(move |cx| {
718            compiler_fence(Ordering::SeqCst);
719            //trace!("poll_read");
720
721            let r = U::regs();
722            let s = U::buffered_state();
723            let ss = U::state();
724
725            // Read the RXDRDY counter.
726            T::regs().tasks_capture(0).write_value(1);
727            let mut end = T::regs().cc(0).read() as usize;
728            //trace!("  rxdrdy count = {:?}", end);
729
730            // We've set a compare channel that resets the counter to 0 when it reaches `len*2`.
731            // However, it's unclear if that's instant, or there's a small window where you can
732            // still read `len()*2`.
733            // This could happen if in one clock cycle the counter is updated, and in the next the
734            // clear takes effect. The docs are very sparse, they just say "Task delays: After TIMER
735            // is started, the CLEAR, COUNT, and STOP tasks are guaranteed to take effect within one
736            // clock cycle of the PCLK16M." :shrug:
737            // So, we wrap the counter ourselves, just in case.
738            if end > s.rx_buf.len() * 2 {
739                end = 0
740            }
741
742            // This logic mirrors `atomic_ring_buffer::Reader::pop_buf()`
743            let mut start = s.rx_buf.start.load(Ordering::Relaxed);
744            let len = s.rx_buf.len();
745            if start == end {
746                //trace!("  empty");
747                ss.rx_waker.register(cx.waker());
748                r.intenset().write(|w| w.set_rxdrdy(true));
749                return Poll::Pending;
750            }
751
752            if start >= len {
753                start -= len
754            }
755            if end >= len {
756                end -= len
757            }
758
759            let n = if end > start { end - start } else { len - start };
760            assert!(n != 0);
761            //trace!("  uarte ringbuf: pop_buf {:?}..{:?}", start, start + n);
762
763            let buf = s.rx_buf.buf.load(Ordering::Relaxed);
764            Poll::Ready(Ok(unsafe { slice::from_raw_parts(buf.add(start), n) }))
765        })
766    }
767
768    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
769    pub fn consume(&mut self, amt: usize) {
770        if amt == 0 {
771            return;
772        }
773
774        let s = U::buffered_state();
775        let mut rx = unsafe { s.rx_buf.reader() };
776        rx.pop_done(amt);
777        U::regs().intenset().write(|w| w.set_rxstarted(true));
778    }
779
780    /// we are ready to read if there is data in the buffer
781    fn read_ready() -> Result<bool, Error> {
782        let state = U::buffered_state();
783        Ok(!state.rx_buf.is_empty())
784    }
785}
786
787impl<'a, U: UarteInstance, T: TimerInstance> Drop for BufferedUarteRx<'a, U, T> {
788    fn drop(&mut self) {
789        self._ppi_group.disable_all();
790
791        let r = U::regs();
792
793        self.timer.stop();
794
795        r.intenclr().write(|w| {
796            w.set_rxdrdy(true);
797            w.set_rxstarted(true);
798            w.set_rxto(true);
799        });
800        r.events_rxto().write_value(0);
801        r.tasks_stoprx().write_value(1);
802        while r.events_rxto().read() == 0 {}
803
804        let s = U::buffered_state();
805        unsafe { s.rx_buf.deinit() }
806
807        let s = U::state();
808        drop_tx_rx(r, s);
809    }
810}
811
812mod _embedded_io {
813    use super::*;
814
815    impl embedded_io_async::Error for Error {
816        fn kind(&self) -> embedded_io_async::ErrorKind {
817            match *self {}
818        }
819    }
820
821    impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::ErrorType for BufferedUarte<'d, U, T> {
822        type Error = Error;
823    }
824
825    impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::ErrorType for BufferedUarteRx<'d, U, T> {
826        type Error = Error;
827    }
828
829    impl<'d, U: UarteInstance> embedded_io_async::ErrorType for BufferedUarteTx<'d, U> {
830        type Error = Error;
831    }
832
833    impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::Read for BufferedUarte<'d, U, T> {
834        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
835            self.read(buf).await
836        }
837    }
838
839    impl<'d: 'd, U: UarteInstance, T: TimerInstance> embedded_io_async::Read for BufferedUarteRx<'d, U, T> {
840        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
841            self.read(buf).await
842        }
843    }
844
845    impl<'d, U: UarteInstance, T: TimerInstance + 'd> embedded_io_async::ReadReady for BufferedUarte<'d, U, T> {
846        fn read_ready(&mut self) -> Result<bool, Self::Error> {
847            BufferedUarteRx::<'d, U, T>::read_ready()
848        }
849    }
850
851    impl<'d, U: UarteInstance, T: TimerInstance + 'd> embedded_io_async::ReadReady for BufferedUarteRx<'d, U, T> {
852        fn read_ready(&mut self) -> Result<bool, Self::Error> {
853            Self::read_ready()
854        }
855    }
856
857    impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::BufRead for BufferedUarte<'d, U, T> {
858        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
859            self.fill_buf().await
860        }
861
862        fn consume(&mut self, amt: usize) {
863            self.consume(amt)
864        }
865    }
866
867    impl<'d: 'd, U: UarteInstance, T: TimerInstance> embedded_io_async::BufRead for BufferedUarteRx<'d, U, T> {
868        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
869            self.fill_buf().await
870        }
871
872        fn consume(&mut self, amt: usize) {
873            self.consume(amt)
874        }
875    }
876
877    impl<'d, U: UarteInstance, T: TimerInstance> embedded_io_async::Write for BufferedUarte<'d, U, T> {
878        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
879            self.write(buf).await
880        }
881
882        async fn flush(&mut self) -> Result<(), Self::Error> {
883            self.flush().await
884        }
885    }
886
887    impl<'d: 'd, U: UarteInstance> embedded_io_async::Write for BufferedUarteTx<'d, U> {
888        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
889            self.write(buf).await
890        }
891
892        async fn flush(&mut self) -> Result<(), Self::Error> {
893            self.flush().await
894        }
895    }
896}