vorago_shared_hal/uart/
rx_asynch.rs

1//! # Async UART reception functionality.
2//!
3//! This module provides the [RxAsync] and [RxAsyncOverwriting] struct which both implement the
4//! [embedded_io_async::Read] trait.
5//! This trait allows for asynchronous reception of data streams. Please note that this module does
6//! not specify/declare the interrupt handlers which must be provided for async support to work.
7//! However, it provides two interrupt handlers:
8//!
9//! - [on_interrupt_rx]
10//! - [on_interrupt_rx_overwriting]
11//!
12//! The first two are used for the [RxAsync] struct, while the latter two are used with the
13//! [RxAsyncOverwriting] struct. The later two will overwrite old values in the used ring buffer.
14//!
15//! Error handling is performed in the user interrupt handler by checking the [AsyncUartErrors]
16//! structure returned by the interrupt handlers.
17use core::{cell::RefCell, convert::Infallible, future::Future, sync::atomic::Ordering};
18
19use arbitrary_int::prelude::*;
20use critical_section::Mutex;
21use embassy_sync::waitqueue::AtomicWaker;
22use embedded_io::ErrorType;
23use portable_atomic::AtomicBool;
24
25use super::{
26    Bank, Rx, UartErrors,
27    regs::{InterruptClear, MmioUart},
28};
29
30static UART_RX_WAKERS: [AtomicWaker; 2] = [const { AtomicWaker::new() }; 2];
31static RX_READ_ACTIVE: [AtomicBool; 2] = [const { AtomicBool::new(false) }; 2];
32static RX_HAS_DATA: [AtomicBool; 2] = [const { AtomicBool::new(false) }; 2];
33
34struct RxFuture {
35    id: Bank,
36}
37
38impl RxFuture {
39    pub fn new(rx: &mut Rx) -> Self {
40        RX_READ_ACTIVE[rx.id as usize].store(true, Ordering::Relaxed);
41        Self { id: rx.id }
42    }
43}
44
45impl Future for RxFuture {
46    type Output = Result<(), Infallible>;
47
48    fn poll(
49        self: core::pin::Pin<&mut Self>,
50        cx: &mut core::task::Context<'_>,
51    ) -> core::task::Poll<Self::Output> {
52        UART_RX_WAKERS[self.id as usize].register(cx.waker());
53        if RX_HAS_DATA[self.id as usize].load(Ordering::Relaxed) {
54            return core::task::Poll::Ready(Ok(()));
55        }
56        core::task::Poll::Pending
57    }
58}
59
60#[derive(Debug, Clone, Copy)]
61#[cfg_attr(feature = "defmt", derive(defmt::Format))]
62pub struct AsyncUartErrors {
63    /// Queue has overflowed, data might have been lost.
64    pub queue_overflow: bool,
65    /// UART errors.
66    pub uart_errors: UartErrors,
67}
68
69fn on_interrupt_handle_rx_errors(uart: &mut MmioUart<'static>) -> Option<UartErrors> {
70    let rx_status = uart.read_rx_status();
71    if rx_status.overrun_error() || rx_status.framing_error() || rx_status.parity_error() {
72        let mut errors_val = UartErrors::default();
73
74        if rx_status.overrun_error() {
75            errors_val.overflow = true;
76        }
77        if rx_status.framing_error() {
78            errors_val.framing = true;
79        }
80        if rx_status.parity_error() {
81            errors_val.parity = true;
82        }
83        return Some(errors_val);
84    }
85    None
86}
87
88fn on_interrupt_rx_common_post_processing(
89    id: Bank,
90    rx_enabled: bool,
91    read_some_data: bool,
92) -> Option<UartErrors> {
93    let idx = id as usize;
94    if read_some_data {
95        RX_HAS_DATA[idx].store(true, Ordering::Relaxed);
96        if RX_READ_ACTIVE[idx].load(Ordering::Relaxed) {
97            UART_RX_WAKERS[idx].wake();
98        }
99    }
100
101    let mut errors = None;
102    let mut uart_regs = unsafe { id.steal_regs() };
103    // Check for RX errors
104    if rx_enabled {
105        errors = on_interrupt_handle_rx_errors(&mut uart_regs);
106    }
107
108    // Clear the interrupt status bits
109    uart_regs.write_irq_clr(
110        InterruptClear::builder()
111            .with_rx_overrun(true)
112            .with_tx_overrun(false)
113            .build(),
114    );
115    errors
116}
117
118/// Interrupt handler with overwriting behaviour when the ring buffer is full.
119///
120/// Should be called in the user interrupt handler to enable
121/// asynchronous reception. This variant will overwrite old data in the ring buffer in case
122/// the ring buffer is full.
123pub fn on_interrupt_rx_overwriting(
124    bank: Bank,
125    prod: &mut heapless::spsc::Producer<u8>,
126    shared_consumer: &Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
127) -> Result<(), AsyncUartErrors> {
128    on_interrupt_rx_async_heapless_queue_overwriting(bank, prod, shared_consumer)
129}
130
131pub fn on_interrupt_rx_async_heapless_queue_overwriting(
132    bank: Bank,
133    prod: &mut heapless::spsc::Producer<u8>,
134    shared_consumer: &Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
135) -> Result<(), AsyncUartErrors> {
136    let uart_regs = unsafe { bank.steal_regs() };
137    let irq_status = uart_regs.read_irq_status();
138    let irq_enabled = uart_regs.read_irq_enabled();
139    let rx_enabled = irq_enabled.rx();
140    let mut read_some_data = false;
141    let mut queue_overflow = false;
142
143    // Half-Full interrupt. We have a guaranteed amount of data we can read.
144    if irq_status.rx() {
145        let available_bytes = uart_regs.read_rx_fifo_trigger().level().as_usize();
146
147        // If this interrupt bit is set, the trigger level is available at the very least.
148        // Read everything as fast as possible
149        for _ in 0..available_bytes {
150            let byte = uart_regs.read_data().value();
151            if !prod.ready() {
152                queue_overflow = true;
153                critical_section::with(|cs| {
154                    let mut cons_ref = shared_consumer.borrow(cs).borrow_mut();
155                    cons_ref.as_mut().unwrap().dequeue();
156                });
157            }
158            prod.enqueue(byte).ok();
159        }
160        read_some_data = true;
161    }
162
163    // Timeout, empty the FIFO completely.
164    if irq_status.rx_timeout() {
165        while uart_regs.read_rx_status().data_available() {
166            // While there is data in the FIFO, write it into the reception buffer
167            let byte = uart_regs.read_data().value();
168            if !prod.ready() {
169                queue_overflow = true;
170                critical_section::with(|cs| {
171                    let mut cons_ref = shared_consumer.borrow(cs).borrow_mut();
172                    cons_ref.as_mut().unwrap().dequeue();
173                });
174            }
175            prod.enqueue(byte).ok();
176        }
177        read_some_data = true;
178    }
179
180    let uart_errors = on_interrupt_rx_common_post_processing(bank, rx_enabled, read_some_data);
181    if uart_errors.is_some() || queue_overflow {
182        return Err(AsyncUartErrors {
183            queue_overflow,
184            uart_errors: uart_errors.unwrap_or_default(),
185        });
186    }
187    Ok(())
188}
189
190/// Interrupt handler for asynchronous RX operations.
191///
192/// Should be called in the user interrupt handler to enable asynchronous reception.
193pub fn on_interrupt_rx(
194    bank: Bank,
195    prod: &mut heapless::spsc::Producer<'_, u8>,
196) -> Result<(), AsyncUartErrors> {
197    on_interrupt_rx_async_heapless_queue(bank, prod)
198}
199
200pub fn on_interrupt_rx_async_heapless_queue(
201    bank: Bank,
202    prod: &mut heapless::spsc::Producer<'_, u8>,
203) -> Result<(), AsyncUartErrors> {
204    let uart_regs = unsafe { bank.steal_regs() };
205    let irq_status = uart_regs.read_irq_status();
206    let irq_enabled = uart_regs.read_irq_enabled();
207    let rx_enabled = irq_enabled.rx();
208    let mut read_some_data = false;
209    let mut queue_overflow = false;
210
211    // Half-Full interrupt. We have a guaranteed amount of data we can read.
212    if irq_status.rx() {
213        let available_bytes = uart_regs.read_rx_fifo_trigger().level().as_usize();
214
215        // If this interrupt bit is set, the trigger level is available at the very least.
216        // Read everything as fast as possible
217        for _ in 0..available_bytes {
218            let byte = uart_regs.read_data().value();
219            if !prod.ready() {
220                queue_overflow = true;
221            }
222            prod.enqueue(byte).ok();
223        }
224        read_some_data = true;
225    }
226
227    // Timeout, empty the FIFO completely.
228    if irq_status.rx_timeout() {
229        while uart_regs.read_rx_status().data_available() {
230            // While there is data in the FIFO, write it into the reception buffer
231            let byte = uart_regs.read_data().value();
232            if !prod.ready() {
233                queue_overflow = true;
234            }
235            prod.enqueue(byte).ok();
236        }
237        read_some_data = true;
238    }
239
240    let uart_errors = on_interrupt_rx_common_post_processing(bank, rx_enabled, read_some_data);
241    if uart_errors.is_some() || queue_overflow {
242        return Err(AsyncUartErrors {
243            queue_overflow,
244            uart_errors: uart_errors.unwrap_or_default(),
245        });
246    }
247    Ok(())
248}
249
250struct ActiveReadGuard(usize);
251
252impl Drop for ActiveReadGuard {
253    fn drop(&mut self) {
254        RX_READ_ACTIVE[self.0].store(false, Ordering::Relaxed);
255    }
256}
257
258struct RxAsyncInner {
259    rx: Rx,
260    pub queue: heapless::spsc::Consumer<'static, u8>,
261}
262
263/// Core data structure to allow asynchronous UART reception.
264///
265/// If the ring buffer becomes full, data will be lost.
266pub struct RxAsync(Option<RxAsyncInner>);
267
268impl ErrorType for RxAsync {
269    /// Error reporting is done using the result of the interrupt functions.
270    type Error = Infallible;
271}
272
273fn stop_async_rx(rx: &mut Rx) {
274    rx.disable_interrupts();
275    rx.disable();
276    rx.clear_fifo();
277}
278
279impl RxAsync {
280    /// Create a new asynchronous receiver.
281    ///
282    /// The passed [heapless::spsc::Consumer] will be used to asynchronously receive data which
283    /// is filled by the interrupt handler [on_interrupt_rx].
284    pub fn new(mut rx: Rx, queue: heapless::spsc::Consumer<'static, u8>) -> Self {
285        rx.disable_interrupts();
286        rx.disable();
287        rx.clear_fifo();
288        // Enable those together.
289        critical_section::with(|_| {
290            #[cfg(feature = "vor1x")]
291            rx.enable_interrupts(true);
292            #[cfg(feature = "vor4x")]
293            rx.enable_interrupts(true, true);
294            rx.enable();
295        });
296        Self(Some(RxAsyncInner { rx, queue }))
297    }
298
299    pub fn stop(&mut self) {
300        stop_async_rx(&mut self.0.as_mut().unwrap().rx);
301    }
302
303    pub fn release(mut self) -> (Rx, heapless::spsc::Consumer<'static, u8>) {
304        self.stop();
305        let inner = self.0.take().unwrap();
306        (inner.rx, inner.queue)
307    }
308}
309
310impl Drop for RxAsync {
311    fn drop(&mut self) {
312        self.stop();
313    }
314}
315
316impl embedded_io_async::Read for RxAsync {
317    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
318        let inner = self.0.as_ref().unwrap();
319        // Need to wait for the IRQ to read data and set this flag. If the queue is not
320        // empty, we can read data immediately.
321        if inner.queue.is_empty() {
322            RX_HAS_DATA[inner.rx.id as usize].store(false, Ordering::Relaxed);
323        }
324        let _guard = ActiveReadGuard(inner.rx.id as usize);
325        let mut handle_data_in_queue = |consumer: &mut heapless::spsc::Consumer<'static, u8>| {
326            let data_to_read = consumer.len().min(buf.len());
327            for byte in buf.iter_mut().take(data_to_read) {
328                // We own the consumer and we checked that the amount of data is guaranteed to be available.
329                *byte = unsafe { consumer.dequeue_unchecked() };
330            }
331            data_to_read
332        };
333        let mut_ref = self.0.as_mut().unwrap();
334        let fut = RxFuture::new(&mut mut_ref.rx);
335        // Data is available, so read that data immediately.
336        let read_data = handle_data_in_queue(&mut mut_ref.queue);
337        if read_data > 0 {
338            return Ok(read_data);
339        }
340        // Await data.
341        let _ = fut.await;
342        Ok(handle_data_in_queue(&mut mut_ref.queue))
343    }
344}
345
346struct RxAsyncOverwritingInner {
347    rx: Rx,
348    pub shared_consumer: &'static Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
349}
350
351/// Core data structure to allow asynchronous UART reception.
352///
353/// If the ring buffer becomes full, the oldest data will be overwritten when using the
354/// [on_interrupt_rx_overwriting] interrupt handlers.
355pub struct RxAsyncOverwriting(Option<RxAsyncOverwritingInner>);
356
357impl ErrorType for RxAsyncOverwriting {
358    /// Error reporting is done using the result of the interrupt functions.
359    type Error = Infallible;
360}
361
362impl RxAsyncOverwriting {
363    /// Create a new asynchronous receiver.
364    ///
365    /// The passed shared [heapless::spsc::Consumer] will be used to asynchronously receive data
366    /// which is filled by the interrupt handler. The shared property allows using it in the
367    /// interrupt handler to overwrite old data.
368    pub fn new(
369        mut rx: Rx,
370        shared_consumer: &'static Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
371    ) -> Self {
372        rx.disable_interrupts();
373        rx.disable();
374        rx.clear_fifo();
375        // Enable those together.
376        critical_section::with(|_| {
377            #[cfg(feature = "vor4x")]
378            rx.enable_interrupts(true, true);
379            #[cfg(feature = "vor1x")]
380            rx.enable_interrupts(true);
381            rx.enable();
382        });
383        Self(Some(RxAsyncOverwritingInner {
384            rx,
385            shared_consumer,
386        }))
387    }
388
389    pub fn stop(&mut self) {
390        stop_async_rx(&mut self.0.as_mut().unwrap().rx);
391    }
392
393    pub fn release(mut self) -> Rx {
394        self.stop();
395        let inner = self.0.take().unwrap();
396        inner.rx
397    }
398}
399
400impl Drop for RxAsyncOverwriting {
401    fn drop(&mut self) {
402        self.stop();
403    }
404}
405
406impl embedded_io_async::Read for RxAsyncOverwriting {
407    async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
408        let inner = self.0.as_ref().unwrap();
409        let id = inner.rx.id as usize;
410        // Need to wait for the IRQ to read data and set this flag. If the queue is not
411        // empty, we can read data immediately.
412
413        critical_section::with(|cs| {
414            let queue = inner.shared_consumer.borrow(cs);
415            if queue.borrow().as_ref().unwrap().is_empty() {
416                RX_HAS_DATA[id].store(false, Ordering::Relaxed);
417            }
418        });
419        let _guard = ActiveReadGuard(id);
420        let mut handle_data_in_queue = |inner: &mut RxAsyncOverwritingInner| {
421            critical_section::with(|cs| {
422                let mut consumer_ref = inner.shared_consumer.borrow(cs).borrow_mut();
423                let consumer = consumer_ref.as_mut().unwrap();
424                let data_to_read = consumer.len().min(buf.len());
425                for byte in buf.iter_mut().take(data_to_read) {
426                    // We own the consumer and we checked that the amount of data is guaranteed to be available.
427                    *byte = unsafe { consumer.dequeue_unchecked() };
428                }
429                data_to_read
430            })
431        };
432        let fut = RxFuture::new(&mut self.0.as_mut().unwrap().rx);
433        // Data is available, so read that data immediately.
434        let read_data = handle_data_in_queue(self.0.as_mut().unwrap());
435        if read_data > 0 {
436            return Ok(read_data);
437        }
438        // Await data.
439        let _ = fut.await;
440        let read_data = handle_data_in_queue(self.0.as_mut().unwrap());
441        Ok(read_data)
442    }
443}