embassy_nrf/
buffered_uarte.rs

1//! Async buffered UART driver.
2//!
3//! Note that discarding a future from a read or write operation may lead to losing
4//! data. For example, when using `futures_util::future::select` and completion occurs
5//! on the "other" future, you should capture the incomplete future and continue to use
6//! it for the next read or write. This pattern is a consideration for all IO, and not
7//! just serial communications.
8//!
9//! Please also see [crate::uarte] to understand when [BufferedUarte] should be used.
10
11use core::cmp::min;
12use core::future::{poll_fn, Future};
13use core::marker::PhantomData;
14use core::slice;
15use core::sync::atomic::{compiler_fence, AtomicBool, AtomicU8, AtomicUsize, Ordering};
16use core::task::Poll;
17
18use embassy_hal_internal::atomic_ring_buffer::RingBuffer;
19use embassy_hal_internal::Peri;
20use pac::uarte::vals;
21// Re-export SVD variants to allow user to directly set values
22pub use pac::uarte::vals::{Baudrate, ConfigParity as Parity};
23
24use crate::gpio::{AnyPin, Pin as GpioPin};
25use crate::interrupt::typelevel::Interrupt;
26use crate::interrupt::InterruptExt;
27use crate::ppi::{
28    self, AnyConfigurableChannel, AnyGroup, Channel, ConfigurableChannel, Event, Group, Ppi, PpiGroup, Task,
29};
30use crate::timer::{Instance as TimerInstance, Timer};
31use crate::uarte::{configure, configure_rx_pins, configure_tx_pins, drop_tx_rx, Config, Instance as UarteInstance};
32use crate::{interrupt, pac, EASY_DMA_SIZE};
33
34pub(crate) struct State {
35    tx_buf: RingBuffer,
36    tx_count: AtomicUsize,
37
38    rx_buf: RingBuffer,
39    rx_started: AtomicBool,
40    rx_started_count: AtomicU8,
41    rx_ended_count: AtomicU8,
42    rx_ppi_ch: AtomicU8,
43}
44
45/// UART error.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47#[cfg_attr(feature = "defmt", derive(defmt::Format))]
48#[non_exhaustive]
49pub enum Error {
50    // No errors for now
51}
52
53impl State {
54    pub(crate) const fn new() -> Self {
55        Self {
56            tx_buf: RingBuffer::new(),
57            tx_count: AtomicUsize::new(0),
58
59            rx_buf: RingBuffer::new(),
60            rx_started: AtomicBool::new(false),
61            rx_started_count: AtomicU8::new(0),
62            rx_ended_count: AtomicU8::new(0),
63            rx_ppi_ch: AtomicU8::new(0),
64        }
65    }
66}
67
68/// Interrupt handler.
69pub struct InterruptHandler<U: UarteInstance> {
70    _phantom: PhantomData<U>,
71}
72
73impl<U: UarteInstance> interrupt::typelevel::Handler<U::Interrupt> for InterruptHandler<U> {
74    unsafe fn on_interrupt() {
75        //trace!("irq: start");
76        let r = U::regs();
77        let ss = U::state();
78        let s = U::buffered_state();
79
80        if let Some(mut rx) = unsafe { s.rx_buf.try_writer() } {
81            let buf_len = s.rx_buf.len();
82            let half_len = buf_len / 2;
83
84            if r.events_error().read() != 0 {
85                r.events_error().write_value(0);
86                let errs = r.errorsrc().read();
87                r.errorsrc().write_value(errs);
88
89                if errs.overrun() {
90                    panic!("BufferedUarte overrun");
91                }
92            }
93
94            // Received some bytes, wake task.
95            if r.inten().read().rxdrdy() && r.events_rxdrdy().read() != 0 {
96                r.intenclr().write(|w| w.set_rxdrdy(true));
97                r.events_rxdrdy().write_value(0);
98                ss.rx_waker.wake();
99            }
100
101            if r.events_endrx().read() != 0 {
102                //trace!("  irq_rx: endrx");
103                r.events_endrx().write_value(0);
104
105                let val = s.rx_ended_count.load(Ordering::Relaxed);
106                s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
107            }
108
109            if r.events_rxstarted().read() != 0 || !s.rx_started.load(Ordering::Relaxed) {
110                //trace!("  irq_rx: rxstarted");
111                let (ptr, len) = rx.push_buf();
112                if len >= half_len {
113                    r.events_rxstarted().write_value(0);
114
115                    //trace!("  irq_rx: starting second {:?}", half_len);
116
117                    // Set up the DMA read
118                    r.rxd().ptr().write_value(ptr as u32);
119                    r.rxd().maxcnt().write(|w| w.set_maxcnt(half_len as _));
120
121                    let chn = s.rx_ppi_ch.load(Ordering::Relaxed);
122
123                    // Enable endrx -> startrx PPI channel.
124                    // From this point on, if endrx happens, startrx is automatically fired.
125                    ppi::regs().chenset().write(|w| w.0 = 1 << chn);
126
127                    // It is possible that endrx happened BEFORE enabling the PPI. In this case
128                    // the PPI channel doesn't trigger, and we'd hang. We have to detect this
129                    // and manually start.
130
131                    // check again in case endrx has happened between the last check and now.
132                    if r.events_endrx().read() != 0 {
133                        //trace!("  irq_rx: endrx");
134                        r.events_endrx().write_value(0);
135
136                        let val = s.rx_ended_count.load(Ordering::Relaxed);
137                        s.rx_ended_count.store(val.wrapping_add(1), Ordering::Relaxed);
138                    }
139
140                    let rx_ended = s.rx_ended_count.load(Ordering::Relaxed);
141                    let rx_started = s.rx_started_count.load(Ordering::Relaxed);
142
143                    // If we started the same amount of transfers as ended, the last rxend has
144                    // already occured.
145                    let rxend_happened = rx_started == rx_ended;
146
147                    // Check if the PPI channel is still enabled. The PPI channel disables itself
148                    // when it fires, so if it's still enabled it hasn't fired.
149                    let ppi_ch_enabled = ppi::regs().chen().read().ch(chn as _);
150
151                    // if rxend happened, and the ppi channel hasn't fired yet, the rxend got missed.
152                    // this condition also naturally matches if `!started`, needed to kickstart the DMA.
153                    if rxend_happened && ppi_ch_enabled {
154                        //trace!("manually starting.");
155
156                        // disable the ppi ch, it's of no use anymore.
157                        ppi::regs().chenclr().write(|w| w.set_ch(chn as _, true));
158
159                        // manually start
160                        r.tasks_startrx().write_value(1);
161                    }
162
163                    rx.push_done(half_len);
164
165                    s.rx_started_count.store(rx_started.wrapping_add(1), Ordering::Relaxed);
166                    s.rx_started.store(true, Ordering::Relaxed);
167                } else {
168                    //trace!("  irq_rx: rxstarted no buf");
169                    r.intenclr().write(|w| w.set_rxstarted(true));
170                }
171            }
172        }
173
174        // =============================
175
176        if let Some(mut tx) = unsafe { s.tx_buf.try_reader() } {
177            // TX end
178            if r.events_endtx().read() != 0 {
179                r.events_endtx().write_value(0);
180
181                let n = s.tx_count.load(Ordering::Relaxed);
182                //trace!("  irq_tx: endtx {:?}", n);
183                tx.pop_done(n);
184                ss.tx_waker.wake();
185                s.tx_count.store(0, Ordering::Relaxed);
186            }
187
188            // If not TXing, start.
189            if s.tx_count.load(Ordering::Relaxed) == 0 {
190                let (ptr, len) = tx.pop_buf();
191                let len = len.min(EASY_DMA_SIZE);
192                if len != 0 {
193                    //trace!("  irq_tx: starting {:?}", len);
194                    s.tx_count.store(len, Ordering::Relaxed);
195
196                    // Set up the DMA write
197                    r.txd().ptr().write_value(ptr as u32);
198                    r.txd().maxcnt().write(|w| w.set_maxcnt(len as _));
199
200                    // Start UARTE Transmit transaction
201                    r.tasks_starttx().write_value(1);
202                }
203            }
204        }
205
206        //trace!("irq: end");
207    }
208}
209
210/// Buffered UARTE driver.
211pub struct BufferedUarte<'d> {
212    tx: BufferedUarteTx<'d>,
213    rx: BufferedUarteRx<'d>,
214}
215
216impl<'d> Unpin for BufferedUarte<'d> {}
217
218impl<'d> BufferedUarte<'d> {
219    /// Create a new BufferedUarte without hardware flow control.
220    ///
221    /// # Panics
222    ///
223    /// Panics if `rx_buffer.len()` is odd.
224    #[allow(clippy::too_many_arguments)]
225    pub fn new<U: UarteInstance, T: TimerInstance>(
226        uarte: Peri<'d, U>,
227        timer: Peri<'d, T>,
228        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
229        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
230        ppi_group: Peri<'d, impl Group>,
231        rxd: Peri<'d, impl GpioPin>,
232        txd: Peri<'d, impl GpioPin>,
233        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
234        config: Config,
235        rx_buffer: &'d mut [u8],
236        tx_buffer: &'d mut [u8],
237    ) -> Self {
238        Self::new_inner(
239            uarte,
240            timer,
241            ppi_ch1.into(),
242            ppi_ch2.into(),
243            ppi_group.into(),
244            rxd.into(),
245            txd.into(),
246            None,
247            None,
248            config,
249            rx_buffer,
250            tx_buffer,
251        )
252    }
253
254    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
255    ///
256    /// # Panics
257    ///
258    /// Panics if `rx_buffer.len()` is odd.
259    #[allow(clippy::too_many_arguments)]
260    pub fn new_with_rtscts<U: UarteInstance, T: TimerInstance>(
261        uarte: Peri<'d, U>,
262        timer: Peri<'d, T>,
263        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
264        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
265        ppi_group: Peri<'d, impl Group>,
266        rxd: Peri<'d, impl GpioPin>,
267        txd: Peri<'d, impl GpioPin>,
268        cts: Peri<'d, impl GpioPin>,
269        rts: Peri<'d, impl GpioPin>,
270        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
271        config: Config,
272        rx_buffer: &'d mut [u8],
273        tx_buffer: &'d mut [u8],
274    ) -> Self {
275        Self::new_inner(
276            uarte,
277            timer,
278            ppi_ch1.into(),
279            ppi_ch2.into(),
280            ppi_group.into(),
281            rxd.into(),
282            txd.into(),
283            Some(cts.into()),
284            Some(rts.into()),
285            config,
286            rx_buffer,
287            tx_buffer,
288        )
289    }
290
291    #[allow(clippy::too_many_arguments)]
292    fn new_inner<U: UarteInstance, T: TimerInstance>(
293        peri: Peri<'d, U>,
294        timer: Peri<'d, T>,
295        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
296        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
297        ppi_group: Peri<'d, AnyGroup>,
298        rxd: Peri<'d, AnyPin>,
299        txd: Peri<'d, AnyPin>,
300        cts: Option<Peri<'d, AnyPin>>,
301        rts: Option<Peri<'d, AnyPin>>,
302        config: Config,
303        rx_buffer: &'d mut [u8],
304        tx_buffer: &'d mut [u8],
305    ) -> Self {
306        let r = U::regs();
307        let irq = U::Interrupt::IRQ;
308        let state = U::state();
309
310        configure(r, config, cts.is_some());
311
312        let tx = BufferedUarteTx::new_innerer(unsafe { peri.clone_unchecked() }, txd, cts, tx_buffer);
313        let rx = BufferedUarteRx::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
314
315        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
316        irq.pend();
317        unsafe { irq.enable() };
318
319        state.tx_rx_refcount.store(2, Ordering::Relaxed);
320
321        Self { tx, rx }
322    }
323
324    /// Adjust the baud rate to the provided value.
325    pub fn set_baudrate(&mut self, baudrate: Baudrate) {
326        self.tx.set_baudrate(baudrate);
327    }
328
329    /// Split the UART in reader and writer parts.
330    ///
331    /// This allows reading and writing concurrently from independent tasks.
332    pub fn split(self) -> (BufferedUarteRx<'d>, BufferedUarteTx<'d>) {
333        (self.rx, self.tx)
334    }
335
336    /// Split the UART in reader and writer parts, by reference.
337    ///
338    /// The returned halves borrow from `self`, so you can drop them and go back to using
339    /// the "un-split" `self`. This allows temporarily splitting the UART.
340    pub fn split_by_ref(&mut self) -> (&mut BufferedUarteRx<'d>, &mut BufferedUarteTx<'d>) {
341        (&mut self.rx, &mut self.tx)
342    }
343
344    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
345    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
346        self.rx.read(buf).await
347    }
348
349    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
350    pub async fn fill_buf(&mut self) -> Result<&[u8], Error> {
351        self.rx.fill_buf().await
352    }
353
354    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
355    pub fn consume(&mut self, amt: usize) {
356        self.rx.consume(amt)
357    }
358
359    /// Write a buffer into this writer, returning how many bytes were written.
360    pub async fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
361        self.tx.write(buf).await
362    }
363
364    /// Try writing a buffer without waiting, returning how many bytes were written.
365    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
366        self.tx.try_write(buf)
367    }
368
369    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
370    pub async fn flush(&mut self) -> Result<(), Error> {
371        self.tx.flush().await
372    }
373}
374
375/// Reader part of the buffered UARTE driver.
376pub struct BufferedUarteTx<'d> {
377    r: pac::uarte::Uarte,
378    _irq: interrupt::Interrupt,
379    state: &'static crate::uarte::State,
380    buffered_state: &'static State,
381    _p: PhantomData<&'d ()>,
382}
383
384impl<'d> BufferedUarteTx<'d> {
385    /// Create a new BufferedUarteTx without hardware flow control.
386    pub fn new<U: UarteInstance>(
387        uarte: Peri<'d, U>,
388        txd: Peri<'d, impl GpioPin>,
389        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
390        config: Config,
391        tx_buffer: &'d mut [u8],
392    ) -> Self {
393        Self::new_inner(uarte, txd.into(), None, config, tx_buffer)
394    }
395
396    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
397    ///
398    /// # Panics
399    ///
400    /// Panics if `rx_buffer.len()` is odd.
401    pub fn new_with_cts<U: UarteInstance>(
402        uarte: Peri<'d, U>,
403        txd: Peri<'d, impl GpioPin>,
404        cts: Peri<'d, impl GpioPin>,
405        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
406        config: Config,
407        tx_buffer: &'d mut [u8],
408    ) -> Self {
409        Self::new_inner(uarte, txd.into(), Some(cts.into()), config, tx_buffer)
410    }
411
412    fn new_inner<U: UarteInstance>(
413        peri: Peri<'d, U>,
414        txd: Peri<'d, AnyPin>,
415        cts: Option<Peri<'d, AnyPin>>,
416        config: Config,
417        tx_buffer: &'d mut [u8],
418    ) -> Self {
419        let r = U::regs();
420        let irq = U::Interrupt::IRQ;
421        let state = U::state();
422        let _buffered_state = U::buffered_state();
423
424        configure(r, config, cts.is_some());
425
426        let this = Self::new_innerer(peri, txd, cts, tx_buffer);
427
428        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
429        irq.pend();
430        unsafe { irq.enable() };
431
432        state.tx_rx_refcount.store(1, Ordering::Relaxed);
433
434        this
435    }
436
437    fn new_innerer<U: UarteInstance>(
438        _peri: Peri<'d, U>,
439        txd: Peri<'d, AnyPin>,
440        cts: Option<Peri<'d, AnyPin>>,
441        tx_buffer: &'d mut [u8],
442    ) -> Self {
443        let r = U::regs();
444        let irq = U::Interrupt::IRQ;
445        let state = U::state();
446        let buffered_state = U::buffered_state();
447
448        configure_tx_pins(r, txd, cts);
449
450        // Initialize state
451        buffered_state.tx_count.store(0, Ordering::Relaxed);
452        let len = tx_buffer.len();
453        unsafe { buffered_state.tx_buf.init(tx_buffer.as_mut_ptr(), len) };
454
455        r.events_txstarted().write_value(0);
456
457        // Enable interrupts
458        r.intenset().write(|w| {
459            w.set_endtx(true);
460        });
461
462        Self {
463            r,
464            _irq: irq,
465            state,
466            buffered_state,
467            _p: PhantomData,
468        }
469    }
470
471    /// Write a buffer into this writer, returning how many bytes were written.
472    pub fn write<'a>(&'a mut self, buf: &'a [u8]) -> impl Future<Output = Result<usize, Error>> + 'a + use<'a, 'd> {
473        poll_fn(move |cx| {
474            //trace!("poll_write: {:?}", buf.len());
475            let ss = self.state;
476            let s = self.buffered_state;
477            let mut tx = unsafe { s.tx_buf.writer() };
478
479            let tx_buf = tx.push_slice();
480            if tx_buf.is_empty() {
481                //trace!("poll_write: pending");
482                ss.tx_waker.register(cx.waker());
483                return Poll::Pending;
484            }
485
486            let n = min(tx_buf.len(), buf.len());
487            tx_buf[..n].copy_from_slice(&buf[..n]);
488            tx.push_done(n);
489
490            //trace!("poll_write: queued {:?}", n);
491
492            compiler_fence(Ordering::SeqCst);
493            self._irq.pend();
494
495            Poll::Ready(Ok(n))
496        })
497    }
498
499    /// Try writing a buffer without waiting, returning how many bytes were written.
500    pub fn try_write(&mut self, buf: &[u8]) -> Result<usize, Error> {
501        //trace!("poll_write: {:?}", buf.len());
502        let s = self.buffered_state;
503        let mut tx = unsafe { s.tx_buf.writer() };
504
505        let tx_buf = tx.push_slice();
506        if tx_buf.is_empty() {
507            return Ok(0);
508        }
509
510        let n = min(tx_buf.len(), buf.len());
511        tx_buf[..n].copy_from_slice(&buf[..n]);
512        tx.push_done(n);
513
514        //trace!("poll_write: queued {:?}", n);
515
516        compiler_fence(Ordering::SeqCst);
517        self._irq.pend();
518
519        Ok(n)
520    }
521
522    /// Flush this output stream, ensuring that all intermediately buffered contents reach their destination.
523    pub fn flush(&mut self) -> impl Future<Output = Result<(), Error>> + '_ {
524        let ss = self.state;
525        let s = self.buffered_state;
526        poll_fn(move |cx| {
527            //trace!("poll_flush");
528            if !s.tx_buf.is_empty() {
529                //trace!("poll_flush: pending");
530                ss.tx_waker.register(cx.waker());
531                return Poll::Pending;
532            }
533
534            Poll::Ready(Ok(()))
535        })
536    }
537
538    /// Adjust the baud rate to the provided value.
539    pub fn set_baudrate(&mut self, baudrate: Baudrate) {
540        self.r.baudrate().write(|w| w.set_baudrate(baudrate));
541    }
542}
543
544impl<'a> Drop for BufferedUarteTx<'a> {
545    fn drop(&mut self) {
546        let r = self.r;
547
548        r.intenclr().write(|w| {
549            w.set_txdrdy(true);
550            w.set_txstarted(true);
551            w.set_txstopped(true);
552        });
553        r.events_txstopped().write_value(0);
554        r.tasks_stoptx().write_value(1);
555        while r.events_txstopped().read() == 0 {}
556
557        let s = self.buffered_state;
558        unsafe { s.tx_buf.deinit() }
559
560        let s = self.state;
561        drop_tx_rx(r, s);
562    }
563}
564
565/// Reader part of the buffered UARTE driver.
566pub struct BufferedUarteRx<'d> {
567    r: pac::uarte::Uarte,
568    state: &'static crate::uarte::State,
569    buffered_state: &'static State,
570    timer: Timer<'d>,
571    _ppi_ch1: Ppi<'d, AnyConfigurableChannel, 1, 1>,
572    _ppi_ch2: Ppi<'d, AnyConfigurableChannel, 1, 2>,
573    _ppi_group: PpiGroup<'d, AnyGroup>,
574    _p: PhantomData<&'d ()>,
575}
576
577impl<'d> BufferedUarteRx<'d> {
578    /// Create a new BufferedUarte without hardware flow control.
579    ///
580    /// # Panics
581    ///
582    /// Panics if `rx_buffer.len()` is odd.
583    #[allow(clippy::too_many_arguments)]
584    pub fn new<U: UarteInstance, T: TimerInstance>(
585        uarte: Peri<'d, U>,
586        timer: Peri<'d, T>,
587        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
588        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
589        ppi_group: Peri<'d, impl Group>,
590        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
591        rxd: Peri<'d, impl GpioPin>,
592        config: Config,
593        rx_buffer: &'d mut [u8],
594    ) -> Self {
595        Self::new_inner(
596            uarte,
597            timer,
598            ppi_ch1.into(),
599            ppi_ch2.into(),
600            ppi_group.into(),
601            rxd.into(),
602            None,
603            config,
604            rx_buffer,
605        )
606    }
607
608    /// Create a new BufferedUarte with hardware flow control (RTS/CTS)
609    ///
610    /// # Panics
611    ///
612    /// Panics if `rx_buffer.len()` is odd.
613    #[allow(clippy::too_many_arguments)]
614    pub fn new_with_rts<U: UarteInstance, T: TimerInstance>(
615        uarte: Peri<'d, U>,
616        timer: Peri<'d, T>,
617        ppi_ch1: Peri<'d, impl ConfigurableChannel>,
618        ppi_ch2: Peri<'d, impl ConfigurableChannel>,
619        ppi_group: Peri<'d, impl Group>,
620        rxd: Peri<'d, impl GpioPin>,
621        rts: Peri<'d, impl GpioPin>,
622        _irq: impl interrupt::typelevel::Binding<U::Interrupt, InterruptHandler<U>> + 'd,
623        config: Config,
624        rx_buffer: &'d mut [u8],
625    ) -> Self {
626        Self::new_inner(
627            uarte,
628            timer,
629            ppi_ch1.into(),
630            ppi_ch2.into(),
631            ppi_group.into(),
632            rxd.into(),
633            Some(rts.into()),
634            config,
635            rx_buffer,
636        )
637    }
638
639    #[allow(clippy::too_many_arguments)]
640    fn new_inner<U: UarteInstance, T: TimerInstance>(
641        peri: Peri<'d, U>,
642        timer: Peri<'d, T>,
643        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
644        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
645        ppi_group: Peri<'d, AnyGroup>,
646        rxd: Peri<'d, AnyPin>,
647        rts: Option<Peri<'d, AnyPin>>,
648        config: Config,
649        rx_buffer: &'d mut [u8],
650    ) -> Self {
651        let r = U::regs();
652        let irq = U::Interrupt::IRQ;
653        let state = U::state();
654        let _buffered_state = U::buffered_state();
655
656        configure(r, config, rts.is_some());
657
658        let this = Self::new_innerer(peri, timer, ppi_ch1, ppi_ch2, ppi_group, rxd, rts, rx_buffer);
659
660        r.enable().write(|w| w.set_enable(vals::Enable::ENABLED));
661        irq.pend();
662        unsafe { irq.enable() };
663
664        state.tx_rx_refcount.store(1, Ordering::Relaxed);
665
666        this
667    }
668
669    #[allow(clippy::too_many_arguments)]
670    fn new_innerer<U: UarteInstance, T: TimerInstance>(
671        _peri: Peri<'d, U>,
672        timer: Peri<'d, T>,
673        ppi_ch1: Peri<'d, AnyConfigurableChannel>,
674        ppi_ch2: Peri<'d, AnyConfigurableChannel>,
675        ppi_group: Peri<'d, AnyGroup>,
676        rxd: Peri<'d, AnyPin>,
677        rts: Option<Peri<'d, AnyPin>>,
678        rx_buffer: &'d mut [u8],
679    ) -> Self {
680        assert!(rx_buffer.len() % 2 == 0);
681
682        let r = U::regs();
683        let state = U::state();
684        let buffered_state = U::buffered_state();
685
686        configure_rx_pins(r, rxd, rts);
687
688        // Initialize state
689        buffered_state.rx_started_count.store(0, Ordering::Relaxed);
690        buffered_state.rx_ended_count.store(0, Ordering::Relaxed);
691        buffered_state.rx_started.store(false, Ordering::Relaxed);
692        let rx_len = rx_buffer.len().min(EASY_DMA_SIZE * 2);
693        unsafe { buffered_state.rx_buf.init(rx_buffer.as_mut_ptr(), rx_len) };
694
695        // clear errors
696        let errors = r.errorsrc().read();
697        r.errorsrc().write_value(errors);
698
699        r.events_rxstarted().write_value(0);
700        r.events_error().write_value(0);
701        r.events_endrx().write_value(0);
702
703        // Enable interrupts
704        r.intenset().write(|w| {
705            w.set_endtx(true);
706            w.set_rxstarted(true);
707            w.set_error(true);
708            w.set_endrx(true);
709        });
710
711        // Configure byte counter.
712        let timer = Timer::new_counter(timer);
713        timer.cc(1).write(rx_len as u32 * 2);
714        timer.cc(1).short_compare_clear();
715        timer.clear();
716        timer.start();
717
718        let mut ppi_ch1 = Ppi::new_one_to_one(ppi_ch1, Event::from_reg(r.events_rxdrdy()), timer.task_count());
719        ppi_ch1.enable();
720
721        buffered_state
722            .rx_ppi_ch
723            .store(ppi_ch2.number() as u8, Ordering::Relaxed);
724        let mut ppi_group = PpiGroup::new(ppi_group);
725        let mut ppi_ch2 = Ppi::new_one_to_two(
726            ppi_ch2,
727            Event::from_reg(r.events_endrx()),
728            Task::from_reg(r.tasks_startrx()),
729            ppi_group.task_disable_all(),
730        );
731        ppi_ch2.disable();
732        ppi_group.add_channel(&ppi_ch2);
733
734        Self {
735            r,
736            state,
737            buffered_state,
738            timer,
739            _ppi_ch1: ppi_ch1,
740            _ppi_ch2: ppi_ch2,
741            _ppi_group: ppi_group,
742            _p: PhantomData,
743        }
744    }
745
746    /// Pull some bytes from this source into the specified buffer, returning how many bytes were read.
747    pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
748        let data = self.fill_buf().await?;
749        let n = data.len().min(buf.len());
750        buf[..n].copy_from_slice(&data[..n]);
751        self.consume(n);
752        Ok(n)
753    }
754
755    /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty.
756    pub fn fill_buf(&mut self) -> impl Future<Output = Result<&'_ [u8], Error>> {
757        let r = self.r;
758        let s = self.buffered_state;
759        let ss = self.state;
760        let timer = &self.timer;
761        poll_fn(move |cx| {
762            compiler_fence(Ordering::SeqCst);
763            //trace!("poll_read");
764
765            // Read the RXDRDY counter.
766            timer.cc(0).capture();
767            let mut end = timer.cc(0).read() as usize;
768            //trace!("  rxdrdy count = {:?}", end);
769
770            // We've set a compare channel that resets the counter to 0 when it reaches `len*2`.
771            // However, it's unclear if that's instant, or there's a small window where you can
772            // still read `len()*2`.
773            // This could happen if in one clock cycle the counter is updated, and in the next the
774            // clear takes effect. The docs are very sparse, they just say "Task delays: After TIMER
775            // is started, the CLEAR, COUNT, and STOP tasks are guaranteed to take effect within one
776            // clock cycle of the PCLK16M." :shrug:
777            // So, we wrap the counter ourselves, just in case.
778            if end > s.rx_buf.len() * 2 {
779                end = 0
780            }
781
782            // This logic mirrors `atomic_ring_buffer::Reader::pop_buf()`
783            let mut start = s.rx_buf.start.load(Ordering::Relaxed);
784            let len = s.rx_buf.len();
785            if start == end {
786                //trace!("  empty");
787                ss.rx_waker.register(cx.waker());
788                r.intenset().write(|w| w.set_rxdrdy(true));
789                return Poll::Pending;
790            }
791
792            if start >= len {
793                start -= len
794            }
795            if end >= len {
796                end -= len
797            }
798
799            let n = if end > start { end - start } else { len - start };
800            assert!(n != 0);
801            //trace!("  uarte ringbuf: pop_buf {:?}..{:?}", start, start + n);
802
803            let buf = s.rx_buf.buf.load(Ordering::Relaxed);
804            Poll::Ready(Ok(unsafe { slice::from_raw_parts(buf.add(start), n) }))
805        })
806    }
807
808    /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`.
809    pub fn consume(&mut self, amt: usize) {
810        if amt == 0 {
811            return;
812        }
813
814        let s = self.buffered_state;
815        let mut rx = unsafe { s.rx_buf.reader() };
816        rx.pop_done(amt);
817        self.r.intenset().write(|w| w.set_rxstarted(true));
818    }
819
820    /// we are ready to read if there is data in the buffer
821    fn read_ready(&self) -> Result<bool, Error> {
822        let state = self.buffered_state;
823        Ok(!state.rx_buf.is_empty())
824    }
825}
826
827impl<'a> Drop for BufferedUarteRx<'a> {
828    fn drop(&mut self) {
829        self._ppi_group.disable_all();
830
831        let r = self.r;
832
833        self.timer.stop();
834
835        r.intenclr().write(|w| {
836            w.set_rxdrdy(true);
837            w.set_rxstarted(true);
838            w.set_rxto(true);
839        });
840        r.events_rxto().write_value(0);
841        r.tasks_stoprx().write_value(1);
842        while r.events_rxto().read() == 0 {}
843
844        let s = self.buffered_state;
845        unsafe { s.rx_buf.deinit() }
846
847        let s = self.state;
848        drop_tx_rx(r, s);
849    }
850}
851
852mod _embedded_io {
853    use super::*;
854
855    impl embedded_io_async::Error for Error {
856        fn kind(&self) -> embedded_io_async::ErrorKind {
857            match *self {}
858        }
859    }
860
861    impl<'d> embedded_io_async::ErrorType for BufferedUarte<'d> {
862        type Error = Error;
863    }
864
865    impl<'d> embedded_io_async::ErrorType for BufferedUarteRx<'d> {
866        type Error = Error;
867    }
868
869    impl<'d> embedded_io_async::ErrorType for BufferedUarteTx<'d> {
870        type Error = Error;
871    }
872
873    impl<'d> embedded_io_async::Read for BufferedUarte<'d> {
874        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
875            self.read(buf).await
876        }
877    }
878
879    impl<'d> embedded_io_async::Read for BufferedUarteRx<'d> {
880        async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
881            self.read(buf).await
882        }
883    }
884
885    impl<'d> embedded_io_async::ReadReady for BufferedUarte<'d> {
886        fn read_ready(&mut self) -> Result<bool, Self::Error> {
887            self.rx.read_ready()
888        }
889    }
890
891    impl<'d> embedded_io_async::ReadReady for BufferedUarteRx<'d> {
892        fn read_ready(&mut self) -> Result<bool, Self::Error> {
893            let state = self.buffered_state;
894            Ok(!state.rx_buf.is_empty())
895        }
896    }
897
898    impl<'d> embedded_io_async::BufRead for BufferedUarte<'d> {
899        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
900            self.fill_buf().await
901        }
902
903        fn consume(&mut self, amt: usize) {
904            self.consume(amt)
905        }
906    }
907
908    impl<'d> embedded_io_async::BufRead for BufferedUarteRx<'d> {
909        async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> {
910            self.fill_buf().await
911        }
912
913        fn consume(&mut self, amt: usize) {
914            self.consume(amt)
915        }
916    }
917
918    impl<'d> embedded_io_async::Write for BufferedUarte<'d> {
919        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
920            self.write(buf).await
921        }
922
923        async fn flush(&mut self) -> Result<(), Self::Error> {
924            self.flush().await
925        }
926    }
927
928    impl<'d> embedded_io_async::Write for BufferedUarteTx<'d> {
929        async fn write(&mut self, buf: &[u8]) -> Result<usize, Self::Error> {
930            self.write(buf).await
931        }
932
933        async fn flush(&mut self) -> Result<(), Self::Error> {
934            self.flush().await
935        }
936    }
937}