vorago_shared_hal/uart/
rx_asynch.rs1use core::{cell::RefCell, convert::Infallible, future::Future, sync::atomic::Ordering};
18
19use arbitrary_int::prelude::*;
20use critical_section::Mutex;
21use embassy_sync::waitqueue::AtomicWaker;
22use embedded_io::ErrorType;
23use portable_atomic::AtomicBool;
24
25use super::{
26 Bank, Rx, UartErrors,
27 regs::{InterruptClear, MmioUart},
28};
29
30static UART_RX_WAKERS: [AtomicWaker; 2] = [const { AtomicWaker::new() }; 2];
31static RX_READ_ACTIVE: [AtomicBool; 2] = [const { AtomicBool::new(false) }; 2];
32static RX_HAS_DATA: [AtomicBool; 2] = [const { AtomicBool::new(false) }; 2];
33
34struct RxFuture {
35 id: Bank,
36}
37
38impl RxFuture {
39 pub fn new(rx: &mut Rx) -> Self {
40 RX_READ_ACTIVE[rx.id as usize].store(true, Ordering::Relaxed);
41 Self { id: rx.id }
42 }
43}
44
45impl Future for RxFuture {
46 type Output = Result<(), Infallible>;
47
48 fn poll(
49 self: core::pin::Pin<&mut Self>,
50 cx: &mut core::task::Context<'_>,
51 ) -> core::task::Poll<Self::Output> {
52 UART_RX_WAKERS[self.id as usize].register(cx.waker());
53 if RX_HAS_DATA[self.id as usize].load(Ordering::Relaxed) {
54 return core::task::Poll::Ready(Ok(()));
55 }
56 core::task::Poll::Pending
57 }
58}
59
60#[derive(Debug, Clone, Copy)]
61#[cfg_attr(feature = "defmt", derive(defmt::Format))]
62pub struct AsyncUartErrors {
63 pub queue_overflow: bool,
65 pub uart_errors: UartErrors,
67}
68
69fn on_interrupt_handle_rx_errors(uart: &mut MmioUart<'static>) -> Option<UartErrors> {
70 let rx_status = uart.read_rx_status();
71 if rx_status.overrun_error() || rx_status.framing_error() || rx_status.parity_error() {
72 let mut errors_val = UartErrors::default();
73
74 if rx_status.overrun_error() {
75 errors_val.overflow = true;
76 }
77 if rx_status.framing_error() {
78 errors_val.framing = true;
79 }
80 if rx_status.parity_error() {
81 errors_val.parity = true;
82 }
83 return Some(errors_val);
84 }
85 None
86}
87
88fn on_interrupt_rx_common_post_processing(
89 id: Bank,
90 rx_enabled: bool,
91 read_some_data: bool,
92) -> Option<UartErrors> {
93 let idx = id as usize;
94 if read_some_data {
95 RX_HAS_DATA[idx].store(true, Ordering::Relaxed);
96 if RX_READ_ACTIVE[idx].load(Ordering::Relaxed) {
97 UART_RX_WAKERS[idx].wake();
98 }
99 }
100
101 let mut errors = None;
102 let mut uart_regs = unsafe { id.steal_regs() };
103 if rx_enabled {
105 errors = on_interrupt_handle_rx_errors(&mut uart_regs);
106 }
107
108 uart_regs.write_irq_clr(
110 InterruptClear::builder()
111 .with_rx_overrun(true)
112 .with_tx_overrun(false)
113 .build(),
114 );
115 errors
116}
117
118pub fn on_interrupt_rx_overwriting(
124 bank: Bank,
125 prod: &mut heapless::spsc::Producer<u8>,
126 shared_consumer: &Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
127) -> Result<(), AsyncUartErrors> {
128 on_interrupt_rx_async_heapless_queue_overwriting(bank, prod, shared_consumer)
129}
130
131pub fn on_interrupt_rx_async_heapless_queue_overwriting(
132 bank: Bank,
133 prod: &mut heapless::spsc::Producer<u8>,
134 shared_consumer: &Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
135) -> Result<(), AsyncUartErrors> {
136 let uart_regs = unsafe { bank.steal_regs() };
137 let irq_status = uart_regs.read_irq_status();
138 let irq_enabled = uart_regs.read_irq_enabled();
139 let rx_enabled = irq_enabled.rx();
140 let mut read_some_data = false;
141 let mut queue_overflow = false;
142
143 if irq_status.rx() {
145 let available_bytes = uart_regs.read_rx_fifo_trigger().level().as_usize();
146
147 for _ in 0..available_bytes {
150 let byte = uart_regs.read_data().value();
151 if !prod.ready() {
152 queue_overflow = true;
153 critical_section::with(|cs| {
154 let mut cons_ref = shared_consumer.borrow(cs).borrow_mut();
155 cons_ref.as_mut().unwrap().dequeue();
156 });
157 }
158 prod.enqueue(byte).ok();
159 }
160 read_some_data = true;
161 }
162
163 if irq_status.rx_timeout() {
165 while uart_regs.read_rx_status().data_available() {
166 let byte = uart_regs.read_data().value();
168 if !prod.ready() {
169 queue_overflow = true;
170 critical_section::with(|cs| {
171 let mut cons_ref = shared_consumer.borrow(cs).borrow_mut();
172 cons_ref.as_mut().unwrap().dequeue();
173 });
174 }
175 prod.enqueue(byte).ok();
176 }
177 read_some_data = true;
178 }
179
180 let uart_errors = on_interrupt_rx_common_post_processing(bank, rx_enabled, read_some_data);
181 if uart_errors.is_some() || queue_overflow {
182 return Err(AsyncUartErrors {
183 queue_overflow,
184 uart_errors: uart_errors.unwrap_or_default(),
185 });
186 }
187 Ok(())
188}
189
190pub fn on_interrupt_rx(
194 bank: Bank,
195 prod: &mut heapless::spsc::Producer<'_, u8>,
196) -> Result<(), AsyncUartErrors> {
197 on_interrupt_rx_async_heapless_queue(bank, prod)
198}
199
200pub fn on_interrupt_rx_async_heapless_queue(
201 bank: Bank,
202 prod: &mut heapless::spsc::Producer<'_, u8>,
203) -> Result<(), AsyncUartErrors> {
204 let uart_regs = unsafe { bank.steal_regs() };
205 let irq_status = uart_regs.read_irq_status();
206 let irq_enabled = uart_regs.read_irq_enabled();
207 let rx_enabled = irq_enabled.rx();
208 let mut read_some_data = false;
209 let mut queue_overflow = false;
210
211 if irq_status.rx() {
213 let available_bytes = uart_regs.read_rx_fifo_trigger().level().as_usize();
214
215 for _ in 0..available_bytes {
218 let byte = uart_regs.read_data().value();
219 if !prod.ready() {
220 queue_overflow = true;
221 }
222 prod.enqueue(byte).ok();
223 }
224 read_some_data = true;
225 }
226
227 if irq_status.rx_timeout() {
229 while uart_regs.read_rx_status().data_available() {
230 let byte = uart_regs.read_data().value();
232 if !prod.ready() {
233 queue_overflow = true;
234 }
235 prod.enqueue(byte).ok();
236 }
237 read_some_data = true;
238 }
239
240 let uart_errors = on_interrupt_rx_common_post_processing(bank, rx_enabled, read_some_data);
241 if uart_errors.is_some() || queue_overflow {
242 return Err(AsyncUartErrors {
243 queue_overflow,
244 uart_errors: uart_errors.unwrap_or_default(),
245 });
246 }
247 Ok(())
248}
249
250struct ActiveReadGuard(usize);
251
252impl Drop for ActiveReadGuard {
253 fn drop(&mut self) {
254 RX_READ_ACTIVE[self.0].store(false, Ordering::Relaxed);
255 }
256}
257
258struct RxAsyncInner {
259 rx: Rx,
260 pub queue: heapless::spsc::Consumer<'static, u8>,
261}
262
263pub struct RxAsync(Option<RxAsyncInner>);
267
268impl ErrorType for RxAsync {
269 type Error = Infallible;
271}
272
273fn stop_async_rx(rx: &mut Rx) {
274 rx.disable_interrupts();
275 rx.disable();
276 rx.clear_fifo();
277}
278
279impl RxAsync {
280 pub fn new(mut rx: Rx, queue: heapless::spsc::Consumer<'static, u8>) -> Self {
285 rx.disable_interrupts();
286 rx.disable();
287 rx.clear_fifo();
288 critical_section::with(|_| {
290 #[cfg(feature = "vor1x")]
291 rx.enable_interrupts(true);
292 #[cfg(feature = "vor4x")]
293 rx.enable_interrupts(true, true);
294 rx.enable();
295 });
296 Self(Some(RxAsyncInner { rx, queue }))
297 }
298
299 pub fn stop(&mut self) {
300 stop_async_rx(&mut self.0.as_mut().unwrap().rx);
301 }
302
303 pub fn release(mut self) -> (Rx, heapless::spsc::Consumer<'static, u8>) {
304 self.stop();
305 let inner = self.0.take().unwrap();
306 (inner.rx, inner.queue)
307 }
308}
309
310impl Drop for RxAsync {
311 fn drop(&mut self) {
312 self.stop();
313 }
314}
315
316impl embedded_io_async::Read for RxAsync {
317 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
318 let inner = self.0.as_ref().unwrap();
319 if inner.queue.is_empty() {
322 RX_HAS_DATA[inner.rx.id as usize].store(false, Ordering::Relaxed);
323 }
324 let _guard = ActiveReadGuard(inner.rx.id as usize);
325 let mut handle_data_in_queue = |consumer: &mut heapless::spsc::Consumer<'static, u8>| {
326 let data_to_read = consumer.len().min(buf.len());
327 for byte in buf.iter_mut().take(data_to_read) {
328 *byte = unsafe { consumer.dequeue_unchecked() };
330 }
331 data_to_read
332 };
333 let mut_ref = self.0.as_mut().unwrap();
334 let fut = RxFuture::new(&mut mut_ref.rx);
335 let read_data = handle_data_in_queue(&mut mut_ref.queue);
337 if read_data > 0 {
338 return Ok(read_data);
339 }
340 let _ = fut.await;
342 Ok(handle_data_in_queue(&mut mut_ref.queue))
343 }
344}
345
346struct RxAsyncOverwritingInner {
347 rx: Rx,
348 pub shared_consumer: &'static Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
349}
350
351pub struct RxAsyncOverwriting(Option<RxAsyncOverwritingInner>);
356
357impl ErrorType for RxAsyncOverwriting {
358 type Error = Infallible;
360}
361
362impl RxAsyncOverwriting {
363 pub fn new(
369 mut rx: Rx,
370 shared_consumer: &'static Mutex<RefCell<Option<heapless::spsc::Consumer<'static, u8>>>>,
371 ) -> Self {
372 rx.disable_interrupts();
373 rx.disable();
374 rx.clear_fifo();
375 critical_section::with(|_| {
377 #[cfg(feature = "vor4x")]
378 rx.enable_interrupts(true, true);
379 #[cfg(feature = "vor1x")]
380 rx.enable_interrupts(true);
381 rx.enable();
382 });
383 Self(Some(RxAsyncOverwritingInner {
384 rx,
385 shared_consumer,
386 }))
387 }
388
389 pub fn stop(&mut self) {
390 stop_async_rx(&mut self.0.as_mut().unwrap().rx);
391 }
392
393 pub fn release(mut self) -> Rx {
394 self.stop();
395 let inner = self.0.take().unwrap();
396 inner.rx
397 }
398}
399
400impl Drop for RxAsyncOverwriting {
401 fn drop(&mut self) {
402 self.stop();
403 }
404}
405
406impl embedded_io_async::Read for RxAsyncOverwriting {
407 async fn read(&mut self, buf: &mut [u8]) -> Result<usize, Self::Error> {
408 let inner = self.0.as_ref().unwrap();
409 let id = inner.rx.id as usize;
410 critical_section::with(|cs| {
414 let queue = inner.shared_consumer.borrow(cs);
415 if queue.borrow().as_ref().unwrap().is_empty() {
416 RX_HAS_DATA[id].store(false, Ordering::Relaxed);
417 }
418 });
419 let _guard = ActiveReadGuard(id);
420 let mut handle_data_in_queue = |inner: &mut RxAsyncOverwritingInner| {
421 critical_section::with(|cs| {
422 let mut consumer_ref = inner.shared_consumer.borrow(cs).borrow_mut();
423 let consumer = consumer_ref.as_mut().unwrap();
424 let data_to_read = consumer.len().min(buf.len());
425 for byte in buf.iter_mut().take(data_to_read) {
426 *byte = unsafe { consumer.dequeue_unchecked() };
428 }
429 data_to_read
430 })
431 };
432 let fut = RxFuture::new(&mut self.0.as_mut().unwrap().rx);
433 let read_data = handle_data_in_queue(self.0.as_mut().unwrap());
435 if read_data > 0 {
436 return Ok(read_data);
437 }
438 let _ = fut.await;
440 let read_data = handle_data_in_queue(self.0.as_mut().unwrap());
441 Ok(read_data)
442 }
443}