embassy_nrf/buffered_uarte/
v1.rs

1//! Async buffered UART driver.
2//!
3//! Note that discarding a future from a read or write operation may lead to losing
4//! data. For example, when using `futures_util::future::select` and completion occurs
5//! on the "other" future, you should capture the incomplete future and continue to use
6//! it for the next read or write. This pattern is a consideration for all IO, and not
7//! just serial communications.
8//!
9//! Please also see [crate::uarte] to understand when [BufferedUarte] should be used.
10
11use core::cmp::min;
12use core::future::{Future, poll_fn};
13use core::marker::PhantomData;
14use core::slice;
15use core::sync::atomic::{AtomicBool, AtomicU8, AtomicUsize, Ordering, compiler_fence};
16use core::task::Poll;
17
18use embassy_hal_internal::Peri;
19use embassy_hal_internal::atomic_ring_buffer::RingBuffer;
20use pac::uarte::vals;
21// Re-export SVD variants to allow user to directly set values
22pub use pac::uarte::vals::{Baudrate, ConfigParity as Parity};
23
24use crate::gpio::{AnyPin, Pin as GpioPin};
25use crate::interrupt::InterruptExt;
26use crate::interrupt::typelevel::Interrupt;
27use crate::ppi::{
28    self, AnyConfigurableChannel, AnyGroup, Channel, ConfigurableChannel, Event, Group, Ppi, PpiGroup, Task,
29};
30use crate::timer::{Instance as TimerInstance, Timer};
31use crate::uarte::{Config, Instance as UarteInstance, configure, configure_rx_pins, configure_tx_pins, drop_tx_rx};
32use crate::{EASY_DMA_SIZE, interrupt, pac};
33
34pub(crate) struct State {
35    tx_buf: RingBuffer,
36    tx_count: AtomicUsize,
37
38    rx_buf: RingBuffer,
39    rx_started: AtomicBool,
40    rx_started_count: AtomicU8,
41    rx_ended_count: AtomicU8,
42    rx_ppi_ch: AtomicU8,
43    rx_overrun: AtomicBool,
44}
45
46/// UART error.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48#[cfg_attr(feature = "defmt", derive(defmt::Format))]
49#[non_exhaustive]
50pub enum Error {
51    /// Buffer Overrun
52    Overrun,
53}
54
55impl State {
56    pub(crate) const fn new() -> Self {
57        Self {
58            tx_buf: RingBuffer::new(),
59            tx_count: AtomicUsize::new(0),
60
61            rx_buf: RingBuffer::new(),
62            rx_started: AtomicBool::new(false),
63            rx_started_count: AtomicU8::new(0),
64            rx_ended_count: AtomicU8::new(0),
65            rx_ppi_ch: AtomicU8::new(0),
66            rx_overrun: AtomicBool::new(false),
67        }
68    }
69}
70
71/// Interrupt handler.
72pub struct InterruptHandler<U: UarteInstance> {
73    _phantom: PhantomData<U>,
74}
75
76impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for InterruptHandler<U> {
77    unsafe fn on_interrupt() {
78        //trace!("irq: start");
79        let r = U::regs();
80        let ss = U::state();
81        let s = U::buffered_state();
82
83        if let Some(mut rx) = unsafe { s.rx_buf.try_writer() } {
84            let buf_len = s.rx_buf.len();
85            let half_len = buf_len / 2;
86
87            if r.events_error().read() != 0 {
88                r.events_error().write_value(0);
89                let errs = r.errorsrc().read();
90                r.errorsrc().write_value(errs);
91
92                if errs.overrun() {
93                    s.rx_overrun.store(true, Ordering::Release);
94                    ss.rx_waker.wake();
95                }
96            }
97
98            // Received some bytes, wake task.
99            if r.inten().read().rxdrdy() && r.events_rxdrdy().read() != 0 {
100                r.intenclr().write(|w| w.set_rxdrdy(true));
101                r.events_rxdrdy().write_value(0);
102                ss.rx_waker.wake();
103            }
104
105            if r.events_dma().rx().end().read() != 0 {
106                //trace!("  irq_rx: endrx");
107                r.events_dma().rx().end().write_value(0);
108
109                let val = s.rx_ended_count.load(Ordering::Relaxed);
110                s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
111            }
112
113            if r.events_dma().rx().ready().read() != 0 || !s.rx_started.load(Ordering::Relaxed) {
114                //trace!("  irq_rx: rxstarted");
115                let (ptr, len) = rx.push_buf();
116                if len >= half_len {
117                    r.events_dma().rx().ready().write_value(0);
118
119                    //trace!("  irq_rx: starting second {:?}", half_len);
120
121                    // Set up the DMA read
122                    r.dma().rx().ptr().write_value(ptr as u32);
123                    r.dma().rx().maxcnt().write(|w| w.set_maxcnt(half_len as _));
124
125                    let chn = s.rx_ppi_ch.load(Ordering::Relaxed);
126
127                    // Enable endrx -> startrx PPI channel.
128                    // From this point on, if endrx happens, startrx is automatically fired.
129                    ppi::regs().chenset().write(|w| w.0 = 1 << chn);
130
131                    // It is possible that endrx happened BEFORE enabling the PPI. In this case
132                    // the PPI channel doesn't trigger, and we'd hang. We have to detect this
133                    // and manually start.
134
135                    // check again in case endrx has happened between the last check and now.
136                    if r.events_dma().rx().end().read() != 0 {
137                        //trace!("  irq_rx: endrx");
138                        r.events_dma().rx().end().write_value(0);
139
140                        let val = s.rx_ended_count.load(Ordering::Relaxed);
141                        s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
142                    }
143
144                    let rx_ended = s.rx_ended_count.load(Ordering::Relaxed);
145                    let rx_started = s.rx_started_count.load(Ordering::Relaxed);
146
147                    // If we started the same amount of transfers as ended, the last rxend has
148                    // already occured.
149                    let rxend_happened = rx_started == rx_ended;
150
151                    // Check if the PPI channel is still enabled. The PPI channel disables itself
152                    // when it fires, so if it's still enabled it hasn't fired.
153                    let ppi_ch_enabled = ppi::regs().chen().read().ch(chn as _);
154
155                    // if rxend happened, and the ppi channel hasn't fired yet, the rxend got missed.
156                    // this condition also naturally matches if `!started`, needed to kickstart the DMA.
157                    if rxend_happened && ppi_ch_enabled {
158                        //trace!("manually starting.");
159
160                        // disable the ppi ch, it's of no use anymore.
161                        ppi::regs().chenclr().write(|w| w.set_ch(chn as _, true));
162
163                        // manually start
164                        r.tasks_dma().rx().start().write_value(1);
165                    }
166
167                    rx.push_done(half_len);
168
169                    s.rx_started_count.store(rx_started.wrapping_add(1), Ordering::Relaxed);
170                    s.rx_started.store(true, Ordering::Relaxed);
171                } else {
172                    //trace!("  irq_rx: rxstarted no buf");
173                    r.intenclr().write(|w| w.set_dmarxready(true));
174                }
175            }
176        }
177
178        // =============================
179
180        if let Some(mut tx) = unsafe { s.tx_buf.try_reader() } {
181            // TX end
182            if r.events_dma().tx().end().read() != 0 {
183                r.events_dma().tx().end().write_value(0);
184
185                let n = s.tx_count.load(Ordering::Relaxed);
186                //trace!("  irq_tx: endtx {:?}", n);
187                tx.pop_done(n);
188                ss.tx_waker.wake();
189                s.tx_count.store(0, Ordering::Relaxed);
190            }
191
192            // If not TXing, start.
193            if s.tx_count.load(Ordering::Relaxed) == 0 {
194                let (ptr, len) = tx.pop_buf();
195                let len = len.min(EASY_DMA_SIZE);
196                if len != 0 {
197                    //trace!("  irq_tx: starting {:?}", len);
198                    s.tx_count.store(len, Ordering::Relaxed);
199
200                    // Set up the DMA write
201                    r.dma().tx().ptr().write_value(ptr as u32);
202                    r.dma().tx().maxcnt().write(|w| w.set_maxcnt(len as _));
203
204                    // Start UARTE Transmit transaction
205                    r.tasks_dma().tx().start().write_value(1);
206                }
207            }
208        }
209
210        //trace!("irq: end");
211    }
212}
213
214/// Buffered UARTE driver.
215pub struct BufferedUarte<'d> {
216    tx: BufferedUarteTx<'d>,
217    rx: BufferedUarteRx<'d>,
218}
219
220impl<'d> Unpin for BufferedUarte<'d> {}
221
222impl<'d> BufferedUarte<'d> {
223    /// Create a new BufferedUarte without hardware flow control.
224    ///
225    /// # Panics
226    ///
227    /// Panics if `rx_buffer.len()` is odd.
228    #[allow(clippy::too_many_arguments)]
229    pub fn new<U: UarteInstance, T: TimerInstance>(
230        uarte: Peri<'d, U>,
231        timer: Peri<'d, T>,
232        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
233        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
234        ppi_group: Peri<'d, impl Group>,
235        rxd: Peri<'d, impl GpioPin>,
236        txd: Peri<'d, impl GpioPin>,
237        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
238        config: Config,
239        rx_buffer: &'d mut [u8],
240        tx_buffer: &'d mut [u8],
241    ) -> Self {
242        Self::new_inner(
243            uarte,
244            timer,
245            ppi_ch1.into(),
246            ppi_ch2.into(),
247            ppi_group.into(),
248            rxd.into(),
249            txd.into(),
250            None,
251            None,
252            config,
253            rx_buffer,
254            tx_buffer,
255        )
256    }
257
258    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
259    ///
260    /// # Panics
261    ///
262    /// Panics if `rx_buffer.len()` is odd.
263    #[allow(clippy::too_many_arguments)]
264    pub fn new_with_rtscts<U: UarteInstance, T: TimerInstance>(
265        uarte: Peri<'d, U>,
266        timer: Peri<'d, T>,
267        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
268        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
269        ppi_group: Peri<'d, impl Group>,
270        rxd: Peri<'d, impl GpioPin>,
271        txd: Peri<'d, impl GpioPin>,
272        cts: Peri<'d, impl GpioPin>,
273        rts: Peri<'d, impl GpioPin>,
274        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
275        config: Config,
276        rx_buffer: &'d mut [u8],
277        tx_buffer: &'d mut [u8],
278    ) -> Self {
279        Self::new_inner(
280            uarte,
281            timer,
282            ppi_ch1.into(),
283            ppi_ch2.into(),
284            ppi_group.into(),
285            rxd.into(),
286            txd.into(),
287            Some(cts.into()),
288            Some(rts.into()),
289            config,
290            rx_buffer,
291            tx_buffer,
292        )
293    }
294
295    #[allow(clippy::too_many_arguments)]
296    fn new_inner<U: UarteInstance, T: TimerInstance>(
297        peri: Peri<'d, U>,
298        timer: Peri<'d, T>,
299        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
300        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
301        ppi_group: Peri<'d, AnyGroup>,
302        rxd: Peri<'d, AnyPin>,
303        txd: Peri<'d, AnyPin>,
304        cts: Option<Peri<'d, AnyPin>>,
305        rts: Option<Peri<'d, AnyPin>>,
306        config: Config,
307        rx_buffer: &'d mut [u8],
308        tx_buffer: &'d mut [u8],
309    ) -> Self {
310        let r = U::regs();
311        let irq = U::Interrupt::IRQ;
312        let state = U::state();
313
314        configure(r, config, cts.is_some());
315
316        let tx = BufferedUarteTx::new_innerer(unsafe { peri.clone_unchecked() }, txd, cts, tx_buffer);
317        let rx = BufferedUarteRx::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
318
319        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
320        irq.pend();
321        unsafe { irq.enable() };
322
323        state.tx_rx_refcount.store(2, Ordering::Relaxed);
324
325        Self { tx, rx }
326    }
327
328    /// Adjust the baud rate to the provided value.
329    pub fn set_baudrate(&mut self, baudrate: Baudrate) {
330        self.tx.set_baudrate(baudrate);
331    }
332
333    /// Split the UART in reader and writer parts.
334    ///
335    /// This allows reading and writing concurrently from independent tasks.
336    pub fn split(self) -> (BufferedUarteRx<'d>, BufferedUarteTx<'d>) {
337        (self.rx, self.tx)
338    }
339
340    /// Split the UART in reader and writer parts, by reference.
341    ///
342    /// The returned halves borrow from `self`, so you can drop them and go back to using
343    /// the "un-split" `self`. This allows temporarily splitting the UART.
344    pub fn split_by_ref(&mut self) -> (&mut BufferedUarteRx<'d>, &mut BufferedUarteTx<'d>) {
345        (&mut self.rx, &mut self.tx)
346    }
347
348    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
349    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
350        self.rx.read(buf).await
351    }
352
353    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
354    pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
355        self.rx.fill_buf().await
356    }
357
358    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
359    pub fn consume(&mut self, amt: usize) {
360        self.rx.consume(amt)
361    }
362
363    /// Write a buffer into this writer, returning how many bytes were written.
364    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
365        self.tx.write(buf).await
366    }
367
368    /// Try writing a buffer without waiting, returning how many bytes were written.
369    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
370        self.tx.try_write(buf)
371    }
372
373    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
374    pub async fn flush(&mut self) -> Result<(), Error> {
375        self.tx.flush().await
376    }
377}
378
379/// Reader part of the buffered UARTE driver.
380pub struct BufferedUarteTx<'d> {
381    r: pac::uarte::Uarte,
382    _irq: interrupt::Interrupt,
383    state: &'static crate::uarte::State,
384    buffered_state: &'static State,
385    _p: PhantomData<&'d ()>,
386}
387
388impl<'d> BufferedUarteTx<'d> {
389    /// Create a new BufferedUarteTx without hardware flow control.
390    pub fn new<U: UarteInstance>(
391        uarte: Peri<'d, U>,
392        txd: Peri<'d, impl GpioPin>,
393        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
394        config: Config,
395        tx_buffer: &'d mut [u8],
396    ) -> Self {
397        Self::new_inner(uarte, txd.into(), None, config, tx_buffer)
398    }
399
400    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
401    ///
402    /// # Panics
403    ///
404    /// Panics if `rx_buffer.len()` is odd.
405    pub fn new_with_cts<U: UarteInstance>(
406        uarte: Peri<'d, U>,
407        txd: Peri<'d, impl GpioPin>,
408        cts: Peri<'d, impl GpioPin>,
409        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
410        config: Config,
411        tx_buffer: &'d mut [u8],
412    ) -> Self {
413        Self::new_inner(uarte, txd.into(), Some(cts.into()), config, tx_buffer)
414    }
415
416    fn new_inner<U: UarteInstance>(
417        peri: Peri<'d, U>,
418        txd: Peri<'d, AnyPin>,
419        cts: Option<Peri<'d, AnyPin>>,
420        config: Config,
421        tx_buffer: &'d mut [u8],
422    ) -> Self {
423        let r = U::regs();
424        let irq = U::Interrupt::IRQ;
425        let state = U::state();
426        let _buffered_state = U::buffered_state();
427
428        configure(r, config, cts.is_some());
429
430        let this = Self::new_innerer(peri, txd, cts, tx_buffer);
431
432        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
433        irq.pend();
434        unsafe { irq.enable() };
435
436        state.tx_rx_refcount.store(1, Ordering::Relaxed);
437
438        this
439    }
440
441    fn new_innerer<U: UarteInstance>(
442        _peri: Peri<'d, U>,
443        txd: Peri<'d, AnyPin>,
444        cts: Option<Peri<'d, AnyPin>>,
445        tx_buffer: &'d mut [u8],
446    ) -> Self {
447        let r = U::regs();
448        let irq = U::Interrupt::IRQ;
449        let state = U::state();
450        let buffered_state = U::buffered_state();
451
452        configure_tx_pins(r, txd, cts);
453
454        // Initialize state
455        buffered_state.tx_count.store(0, Ordering::Relaxed);
456        let len = tx_buffer.len();
457        unsafe { buffered_state.tx_buf.init(tx_buffer.as_mut_ptr(), len) };
458
459        r.events_dma().tx().ready().write_value(0);
460
461        // Enable interrupts
462        r.intenset().write(|w| {
463            w.set_dmatxend(true);
464        });
465
466        Self {
467            r,
468            _irq: irq,
469            state,
470            buffered_state,
471            _p: PhantomData,
472        }
473    }
474
475    /// Write a buffer into this writer, returning how many bytes were written.
476    pub fn write<'a>(&'a mut self, buf: &'a [u8]) -> impl Future<Output = Result<usize, Error>> + 'a + use<'a, 'd> {
477        poll_fn(move |cx| {
478            //trace!("poll_write: {:?}", buf.len());
479            let ss = self.state;
480            let s = self.buffered_state;
481            let mut tx = unsafe { s.tx_buf.writer() };
482
483            let tx_buf = tx.push_slice();
484            if tx_buf.is_empty() {
485                //trace!("poll_write: pending");
486                ss.tx_waker.register(cx.waker());
487                return Poll::Pending;
488            }
489
490            let n = min(tx_buf.len(), buf.len());
491            tx_buf[..n].copy_from_slice(&buf[..n]);
492            tx.push_done(n);
493
494            //trace!("poll_write: queued {:?}", n);
495
496            compiler_fence(Ordering::SeqCst);
497            self._irq.pend();
498
499            Poll::Ready(Ok(n))
500        })
501    }
502
503    /// Try writing a buffer without waiting, returning how many bytes were written.
504    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
505        //trace!("poll_write: {:?}", buf.len());
506        let s = self.buffered_state;
507        let mut tx = unsafe { s.tx_buf.writer() };
508
509        let tx_buf = tx.push_slice();
510        if tx_buf.is_empty() {
511            return Ok(0);
512        }
513
514        let n = min(tx_buf.len(), buf.len());
515        tx_buf[..n].copy_from_slice(&buf[..n]);
516        tx.push_done(n);
517
518        //trace!("poll_write: queued {:?}", n);
519
520        compiler_fence(Ordering::SeqCst);
521        self._irq.pend();
522
523        Ok(n)
524    }
525
526    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
527    pub fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
528        let ss = self.state;
529        let s = self.buffered_state;
530        poll_fn(move |cx| {
531            //trace!("poll_flush");
532            if !s.tx_buf.is_empty() {
533                //trace!("poll_flush: pending");
534                ss.tx_waker.register(cx.waker());
535                return Poll::Pending;
536            }
537
538            Poll::Ready(Ok(()))
539        })
540    }
541
542    /// Adjust the baud rate to the provided value.
543    pub fn set_baudrate(&mut self, baudrate: Baudrate) {
544        self.r.baudrate().write(|w| w.set_baudrate(baudrate));
545    }
546}
547
548impl<'a> Drop for BufferedUarteTx<'a> {
549    fn drop(&mut self) {
550        let r = self.r;
551
552        r.intenclr().write(|w| {
553            w.set_txdrdy(true);
554            w.set_dmatxready(true);
555            w.set_txstopped(true);
556        });
557        r.events_txstopped().write_value(0);
558        r.tasks_dma().tx().stop().write_value(1);
559        while r.events_txstopped().read() == 0 {}
560
561        let s = self.buffered_state;
562        unsafe { s.tx_buf.deinit() }
563
564        let s = self.state;
565        drop_tx_rx(r, s);
566    }
567}
568
569/// Reader part of the buffered UARTE driver.
570pub struct BufferedUarteRx<'d> {
571    r: pac::uarte::Uarte,
572    state: &'static crate::uarte::State,
573    buffered_state: &'static State,
574    timer: Timer<'d>,
575    _ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
576    _ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
577    _ppi_group: PpiGroup<'d, AnyGroup>,
578    _p: PhantomData<&'d ()>,
579}
580
581impl<'d> BufferedUarteRx<'d> {
582    /// Create a new BufferedUarte without hardware flow control.
583    ///
584    /// # Panics
585    ///
586    /// Panics if `rx_buffer.len()` is odd.
587    #[allow(clippy::too_many_arguments)]
588    pub fn new<U: UarteInstance, T: TimerInstance>(
589        uarte: Peri<'d, U>,
590        timer: Peri<'d, T>,
591        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
592        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
593        ppi_group: Peri<'d, impl Group>,
594        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
595        rxd: Peri<'d, impl GpioPin>,
596        config: Config,
597        rx_buffer: &'d mut [u8],
598    ) -> Self {
599        Self::new_inner(
600            uarte,
601            timer,
602            ppi_ch1.into(),
603            ppi_ch2.into(),
604            ppi_group.into(),
605            rxd.into(),
606            None,
607            config,
608            rx_buffer,
609        )
610    }
611
612    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
613    ///
614    /// # Panics
615    ///
616    /// Panics if `rx_buffer.len()` is odd.
617    #[allow(clippy::too_many_arguments)]
618    pub fn new_with_rts<U: UarteInstance, T: TimerInstance>(
619        uarte: Peri<'d, U>,
620        timer: Peri<'d, T>,
621        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
622        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
623        ppi_group: Peri<'d, impl Group>,
624        rxd: Peri<'d, impl GpioPin>,
625        rts: Peri<'d, impl GpioPin>,
626        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
627        config: Config,
628        rx_buffer: &'d mut [u8],
629    ) -> Self {
630        Self::new_inner(
631            uarte,
632            timer,
633            ppi_ch1.into(),
634            ppi_ch2.into(),
635            ppi_group.into(),
636            rxd.into(),
637            Some(rts.into()),
638            config,
639            rx_buffer,
640        )
641    }
642
643    #[allow(clippy::too_many_arguments)]
644    fn new_inner<U: UarteInstance, T: TimerInstance>(
645        peri: Peri<'d, U>,
646        timer: Peri<'d, T>,
647        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
648        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
649        ppi_group: Peri<'d, AnyGroup>,
650        rxd: Peri<'d, AnyPin>,
651        rts: Option<Peri<'d, AnyPin>>,
652        config: Config,
653        rx_buffer: &'d mut [u8],
654    ) -> Self {
655        let r = U::regs();
656        let irq = U::Interrupt::IRQ;
657        let state = U::state();
658        let _buffered_state = U::buffered_state();
659
660        configure(r, config, rts.is_some());
661
662        let this = Self::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
663
664        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
665        irq.pend();
666        unsafe { irq.enable() };
667
668        state.tx_rx_refcount.store(1, Ordering::Relaxed);
669
670        this
671    }
672
673    #[allow(clippy::too_many_arguments)]
674    fn new_innerer<U: UarteInstance, T: TimerInstance>(
675        _peri: Peri<'d, U>,
676        timer: Peri<'d, T>,
677        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
678        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
679        ppi_group: Peri<'d, AnyGroup>,
680        rxd: Peri<'d, AnyPin>,
681        rts: Option<Peri<'d, AnyPin>>,
682        rx_buffer: &'d mut [u8],
683    ) -> Self {
684        assert!(rx_buffer.len() % 2 == 0);
685
686        let r = U::regs();
687        let state = U::state();
688        let buffered_state = U::buffered_state();
689
690        configure_rx_pins(r, rxd, rts);
691
692        // Initialize state
693        buffered_state.rx_started_count.store(0, Ordering::Relaxed);
694        buffered_state.rx_ended_count.store(0, Ordering::Relaxed);
695        buffered_state.rx_started.store(false, Ordering::Relaxed);
696        buffered_state.rx_overrun.store(false, Ordering::Relaxed);
697        let rx_len = rx_buffer.len().min(EASY_DMA_SIZE * 2);
698        unsafe { buffered_state.rx_buf.init(rx_buffer.as_mut_ptr(), rx_len) };
699
700        // clear errors
701        let errors = r.errorsrc().read();
702        r.errorsrc().write_value(errors);
703
704        r.events_dma().rx().ready().write_value(0);
705        r.events_error().write_value(0);
706        r.events_dma().rx().end().write_value(0);
707
708        // Enable interrupts
709        r.intenset().write(|w| {
710            w.set_dmatxend(true);
711            w.set_dmarxready(true);
712            w.set_error(true);
713            w.set_dmarxend(true);
714        });
715
716        // Configure byte counter.
717        let timer = Timer::new_counter(timer);
718        timer.cc(1).write(rx_len as u32 * 2);
719        timer.cc(1).short_compare_clear();
720        timer.clear();
721        timer.start();
722
723        let mut ppi_ch1 = Ppi::new_one_to_one(ppi_ch1, Event::from_reg(r.events_rxdrdy()), timer.task_count());
724        ppi_ch1.enable();
725
726        buffered_state
727            .rx_ppi_ch
728            .store(ppi_ch2.number() as u8, Ordering::Relaxed);
729        let mut ppi_group = PpiGroup::new(ppi_group);
730        let mut ppi_ch2 = Ppi::new_one_to_two(
731            ppi_ch2,
732            Event::from_reg(r.events_dma().rx().end()),
733            Task::from_reg(r.tasks_dma().rx().start()),
734            ppi_group.task_disable_all(),
735        );
736        ppi_ch2.disable();
737        ppi_group.add_channel(&ppi_ch2);
738
739        Self {
740            r,
741            state,
742            buffered_state,
743            timer,
744            _ppi_ch1: ppi_ch1,
745            _ppi_ch2: ppi_ch2,
746            _ppi_group: ppi_group,
747            _p: PhantomData,
748        }
749    }
750
751    fn get_rxdrdy_counter(&self) -> usize {
752        let s = self.buffered_state;
753        let timer = &self.timer;
754
755        // Read the RXDRDY counter.
756        timer.cc(0).capture();
757        let mut rxdrdy = timer.cc(0).read() as usize;
758        //trace!("  rxdrdy count = {:?}", rxdrdy);
759
760        // We've set a compare channel that resets the counter to 0 when it reaches `len*2`.
761        // However, it's unclear if that's instant, or there's a small window where you can
762        // still read `len()*2`.
763        // This could happen if in one clock cycle the counter is updated, and in the next the
764        // clear takes effect. The docs are very sparse, they just say "Task delays: After TIMER
765        // is started, the CLEAR, COUNT, and STOP tasks are guaranteed to take effect within one
766        // clock cycle of the PCLK16M." :shrug:
767        // So, we wrap the counter ourselves, just in case.
768        if rxdrdy > s.rx_buf.len() * 2 {
769            rxdrdy = 0;
770        }
771
772        rxdrdy
773    }
774
775    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
776    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
777        let data = self.fill_buf().await?;
778        let n = data.len().min(buf.len());
779        buf[..n].copy_from_slice(&data[..n]);
780        self.consume(n);
781        Ok(n)
782    }
783
784    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
785    pub fn fill_buf(&mut self) -> impl Future<Output = Result<&'_ [u8], Error>> {
786        let r = self.r;
787        let s = self.buffered_state;
788        let ss = self.state;
789
790        poll_fn(move |cx| {
791            compiler_fence(Ordering::SeqCst);
792            //trace!("poll_read");
793
794            if s.rx_overrun.swap(false, Ordering::Acquire) {
795                return Poll::Ready(Err(Error::Overrun));
796            }
797
798            let mut end = self.get_rxdrdy_counter();
799
800            // This logic mirrors `atomic_ring_buffer::Reader::pop_buf()`
801            let mut start = s.rx_buf.start.load(Ordering::Relaxed);
802            let len = s.rx_buf.len();
803            if start == end {
804                //trace!("  empty");
805                ss.rx_waker.register(cx.waker());
806                r.intenset().write(|w| w.set_rxdrdy(true));
807                return Poll::Pending;
808            }
809
810            if start >= len {
811                start -= len
812            }
813            if end >= len {
814                end -= len
815            }
816
817            let n = if end > start { end - start } else { len - start };
818            assert!(n != 0);
819            //trace!("  uarte ringbuf: pop_buf {:?}..{:?}", start, start + n);
820
821            let buf = s.rx_buf.buf.load(Ordering::Relaxed);
822            Poll::Ready(Ok(unsafe { slice::from_raw_parts(buf.add(start), n) }))
823        })
824    }
825
826    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
827    pub fn consume(&mut self, amt: usize) {
828        if amt == 0 {
829            return;
830        }
831
832        let s = self.buffered_state;
833        let mut rx = unsafe { s.rx_buf.reader() };
834        rx.pop_done(amt);
835        self.r.intenset().write(|w| w.set_dmarxready(true));
836    }
837
838    /// we are ready to read if there is data in the buffer
839    fn read_ready(&self) -> Result<bool, Error> {
840        let state = self.buffered_state;
841        if state.rx_overrun.swap(false, Ordering::Acquire) {
842            return Err(Error::Overrun);
843        }
844
845        let start = state.rx_buf.start.load(Ordering::Relaxed);
846        let end = self.get_rxdrdy_counter();
847
848        Ok(start != end)
849    }
850}
851
852impl<'a> Drop for BufferedUarteRx<'a> {
853    fn drop(&mut self) {
854        self._ppi_group.disable_all();
855
856        let r = self.r;
857
858        self.timer.stop();
859
860        r.intenclr().write(|w| {
861            w.set_rxdrdy(true);
862            w.set_dmarxready(true);
863            w.set_rxto(true);
864        });
865        r.events_rxto().write_value(0);
866        r.tasks_dma().rx().stop().write_value(1);
867        while r.events_rxto().read() == 0 {}
868
869        let s = self.buffered_state;
870        unsafe { s.rx_buf.deinit() }
871
872        let s = self.state;
873        drop_tx_rx(r, s);
874    }
875}
876
877mod _embedded_io {
878    use super::*;
879
880    impl embedded_io_async::Error for Error {
881        fn kind(&self) -> embedded_io_async::ErrorKind {
882            match *self {
883                Error::Overrun => embedded_io_async::ErrorKind::OutOfMemory,
884            }
885        }
886    }
887
888    impl<'d> embedded_io_async::ErrorType for BufferedUarte<'d> {
889        type Error = Error;
890    }
891
892    impl<'d> embedded_io_async::ErrorType for BufferedUarteRx<'d> {
893        type Error = Error;
894    }
895
896    impl<'d> embedded_io_async::ErrorType for BufferedUarteTx<'d> {
897        type Error = Error;
898    }
899
900    impl<'d> embedded_io_async::Read for BufferedUarte<'d> {
901        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
902            self.read(buf).await
903        }
904    }
905
906    impl<'d> embedded_io_async::Read for BufferedUarteRx<'d> {
907        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
908            self.read(buf).await
909        }
910    }
911
912    impl<'d> embedded_io_async::ReadReady for BufferedUarte<'d> {
913        fn read_ready(&mut self) -> Result<bool, Self::Error> {
914            self.rx.read_ready()
915        }
916    }
917
918    impl<'d> embedded_io_async::ReadReady for BufferedUarteRx<'d> {
919        fn read_ready(&mut self) -> Result<bool, Self::Error> {
920            let state = self.buffered_state;
921            Ok(!state.rx_buf.is_empty())
922        }
923    }
924
925    impl<'d> embedded_io_async::BufRead for BufferedUarte<'d> {
926        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
927            self.fill_buf().await
928        }
929
930        fn consume(&mut self, amt: usize) {
931            self.consume(amt)
932        }
933    }
934
935    impl<'d> embedded_io_async::BufRead for BufferedUarteRx<'d> {
936        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
937            self.fill_buf().await
938        }
939
940        fn consume(&mut self, amt: usize) {
941            self.consume(amt)
942        }
943    }
944
945    impl<'d> embedded_io_async::Write for BufferedUarte<'d> {
946        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
947            self.write(buf).await
948        }
949
950        async fn flush(&mut self) -> Result<(), Self::Error> {
951            self.flush().await
952        }
953    }
954
955    impl<'d> embedded_io_async::Write for BufferedUarteTx<'d> {
956        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
957            self.write(buf).await
958        }
959
960        async fn flush(&mut self) -> Result<(), Self::Error> {
961            self.flush().await
962        }
963    }
964}