1use core::cell::RefCell;
2use core::future::{pending, poll_fn};
3use core::marker::PhantomData;
4use core::task::{Poll, Waker};
5use embassy_futures::select::{Either, Either4, select, select4};
6use embassy_stm32::Peri;
7use embassy_stm32::can::Instance as EmbassyInstance;
8use embassy_stm32::can::{RxPin, TxPin};
9use embassy_stm32::interrupt::InterruptExt;
10use embassy_stm32::interrupt::typelevel::Binding;
11use embassy_stm32::interrupt::typelevel::{Handler, Interrupt};
12use embassy_stm32::{gpio, rcc};
13use embassy_sync::blocking_mutex::Mutex;
14use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex;
15use embassy_sync::{channel::Channel, waitqueue::WakerRegistration};
16use embassy_time::{Instant, Timer};
17use emcyphal_core::{NodeId, PrioritySet, SubjectId};
18use emcyphal_driver::link::{self, FilterUpdate};
19use emcyphal_driver::{frame, time};
20
21use crate::config::FrameFormat;
22use crate::utils::{MailboxPriorityMap, PriorityMap, TxMailboxIdx, TxMailboxSet};
23use crate::{config, message_ram as ram, raw};
24
25pub const SUBJECT_SLOT_COUNT: usize = raw::MESSAGE_FILTER_COUNT * 2;
27
28pub trait SealedInstance: EmbassyInstance {
29 fn atomic_methods() -> raw::AtomicMethods;
30 unsafe fn make_registers<'a>() -> raw::Registers<'a>;
31 fn info() -> &'static Info;
32}
33
34#[allow(private_bounds)]
35pub trait Instance: SealedInstance {}
36
37struct State {
38 _pins: [gpio::Flex<'static>; 2],
39 control: Option<raw::Control<'static>>,
40 rx_trigger: WakerRegistration,
41 tx_unpend_mask: TxMailboxSet,
42 tx_trigger: WakerRegistration,
43 ref_counter: u8,
44}
45
46pub struct Info {
47 interrupts: [embassy_stm32::interrupt::Interrupt; 2],
48 state: Mutex<CriticalSectionRawMutex, RefCell<Option<State>>>,
49 loop_back: Channel<CriticalSectionRawMutex, frame::Frame, 1>,
50}
51
52impl Info {
53 pub const fn new<I: EmbassyInstance>() -> Self {
54 Self {
55 interrupts: [I::IT0Interrupt::IRQ, I::IT1Interrupt::IRQ],
56 state: Mutex::new(RefCell::new(None)),
57 loop_back: Channel::new(),
58 }
59 }
60
61 fn wake_rx(&self) {
62 self.state.lock(|cell| {
63 let mut slot = cell.borrow_mut();
64 if let Some(state) = slot.as_mut() {
65 state.rx_trigger.wake();
66 }
67 });
68 }
69 fn wake_tx(&self, pending_mailboxes: TxMailboxSet) {
70 self.state.lock(|cell| {
71 let mut slot = cell.borrow_mut();
72 if let Some(state) = slot.as_mut()
73 && (pending_mailboxes & state.tx_unpend_mask).is_empty()
74 {
75 state.tx_trigger.wake();
76 }
77 });
78 }
79}
80
81struct InfoRef<'a>(&'a Info);
82
83impl<'a> InfoRef<'a> {
84 fn create(info: &'a Info, pins: [gpio::Flex<'a>; 2]) -> Option<Self> {
85 info.state.lock(|cell| {
86 let mut slot = cell.borrow_mut();
87 if slot.is_none() {
88 *slot = Some(State {
89 _pins: pins.map(|pin| unsafe { core::mem::transmute(pin) }),
91 control: Default::default(),
92 rx_trigger: Default::default(),
93 tx_unpend_mask: TxMailboxSet::ALL,
94 tx_trigger: Default::default(),
95 ref_counter: 1,
96 });
97 Some(Self(info))
98 } else {
99 None
100 }
101 })
102 }
103
104 fn loop_back(&self) -> &Channel<CriticalSectionRawMutex, frame::Frame, 1> {
105 &self.0.loop_back
106 }
107
108 fn register_rx_waker(&self, w: &Waker) {
109 self.0.state.lock(|cell| {
110 let mut slot = cell.borrow_mut();
111 let state = unwrap!(slot.as_mut());
112 state.rx_trigger.register(w);
113 });
114 }
115
116 fn register_tx_waker(&self, w: &Waker, unpend_mask: TxMailboxSet) {
117 self.0.state.lock(|cell| {
118 let mut slot = cell.borrow_mut();
119 let state = unwrap!(slot.as_mut());
120 state.tx_unpend_mask = unpend_mask;
121 state.tx_trigger.register(w);
122 });
123 }
124
125 fn set_control(&self, control: raw::Control<'a>) {
126 let control =
128 unsafe { core::mem::transmute::<raw::Control<'_>, raw::Control<'_>>(control) };
129 self.0.state.lock(|cell| {
130 let mut slot = cell.borrow_mut();
131 let state = unwrap!(slot.as_mut());
132 state.control = Some(control);
133 });
134 }
135}
136
137impl<'a> Clone for InfoRef<'a> {
138 fn clone(&self) -> Self {
139 self.0.state.lock(|cell| {
140 let mut slot = cell.borrow_mut();
141 let state = unwrap!(slot.as_mut());
142 state.ref_counter += 1;
143 });
144 Self(self.0)
145 }
146}
147
148impl<'a> Drop for InfoRef<'a> {
149 fn drop(&mut self) {
150 self.0.state.lock(|cell| {
151 let mut slot = cell.borrow_mut();
152 let state = unwrap!(slot.as_mut());
153 state.ref_counter -= 1;
154 if state.ref_counter == 0 {
155 if let Some(control) = state.control.as_mut() {
156 control.stop();
157 }
158 *slot = None;
159 for interrupt in self.0.interrupts {
160 interrupt.disable();
161 }
162 }
163
164 });
166 }
167}
168
169pub struct IT0InterruptHandler<T: Instance> {
173 _phantom: PhantomData<T>,
174}
175
176impl<T: Instance> Handler<T::IT0Interrupt> for IT0InterruptHandler<T> {
177 unsafe fn on_interrupt() {
178 T::atomic_methods().clear_rx_interrupts();
179 T::info().wake_rx();
180 }
181}
182
183pub struct IT1InterruptHandler<T: Instance> {
187 _phantom: PhantomData<T>,
188}
189
190impl<T: Instance> Handler<T::IT1Interrupt> for IT1InterruptHandler<T> {
191 unsafe fn on_interrupt() {
192 T::atomic_methods().clear_tx_interrupts();
193 let pending_mailboxes = T::atomic_methods().tx_pending();
194 T::info().wake_tx(pending_mailboxes);
195 }
196}
197
198pub struct Driver<'a> {
199 regs: Option<raw::Registers<'a>>,
200 info: InfoRef<'a>,
201 frame_format: config::FrameFormat,
202 timestamp_source: config::TimestampSource,
203}
204
205impl<'a> Driver<'a> {
206 pub fn new<T: Instance>(
210 _instance: Peri<'a, T>,
211 rx: Peri<'a, impl RxPin<T>>,
212 tx: Peri<'a, impl TxPin<T>>,
213 _irqs: impl Binding<T::IT0Interrupt, IT0InterruptHandler<T>>
214 + Binding<T::IT1Interrupt, IT1InterruptHandler<T>>
215 + 'a,
216 config: config::Config,
217 ) -> Self {
218 let rx_af_num = rx.af_num();
219 let mut rx_pin = gpio::Flex::new(rx);
220 rx_pin.set_as_af_unchecked(rx_af_num, gpio::AfType::input(gpio::Pull::None));
221
222 let tx_af_num = tx.af_num();
223 let mut tx_pin = gpio::Flex::new(tx);
224 tx_pin.set_as_af_unchecked(
225 tx_af_num,
226 gpio::AfType::output(gpio::OutputType::PushPull, gpio::Speed::VeryHigh),
227 );
228
229 let info =
230 InfoRef::create(T::info(), [rx_pin, tx_pin]).expect("Peripheral state is occupied");
231
232 rcc::enable_and_reset::<T>();
233
234 let mut regs = unsafe { T::make_registers() };
236 regs.configure(&config);
237
238 unsafe {
239 T::IT0Interrupt::unpend(); T::IT0Interrupt::enable();
241
242 T::IT1Interrupt::unpend(); T::IT1Interrupt::enable();
244 }
245
246 Self {
247 regs: Some(regs),
248 info,
249 frame_format: config.frame_format,
250 timestamp_source: config.timestamp_source,
251 }
252 }
253
254 pub fn start(
258 mut self,
259 access: link::Link<'a>,
260 ) -> (RxFilterRunner<'a>, RxRunner<'a>, TxRunner<'a>) {
261 let regs = unwrap!(self.regs.take());
262 let (mut control, filters, rx_fifo, tx_queue, tx_event_fifo) = regs.split();
263 let (channel_rx_filter, channel_rx, channel_tx) = access.split();
264
265 control.start();
266
267 let timestamp_offset = match self.timestamp_source {
269 config::TimestampSource::System => None,
270 config::TimestampSource::Internal(_) => {
271 let (instant, counter) = critical_section::with(|_| {
272 (time::Instant::now(), control.internal_timestamp_counter())
274 });
275 Some(counter.wrapping_sub(instant.as_ticks() as u16))
276 }
277 config::TimestampSource::ExternalTIM3 => {
278 let (instant, counter) = critical_section::with(|_| {
279 (time::Instant::now(), control.external_timestamp_counter())
281 });
282 Some(counter.wrapping_sub(instant.as_ticks() as u16))
283 }
284 };
285
286 self.info.set_control(control);
287
288 let rx_filter_runner = RxFilterRunner {
289 link: channel_rx_filter,
290 filters,
291 _info: self.info.clone(),
292 };
293 let rx_runner = RxRunner {
294 link: channel_rx,
295 rx_fifo,
296 info: self.info.clone(),
297 frame_format: self.frame_format,
298 timestamp_offset,
299 };
300 let tx_runner = TxRunner {
301 link: channel_tx,
302 tx_queue,
303 tx_event_fifo,
304 info: self.info.clone(),
305 frame_format: self.frame_format,
306 timestamp_offset,
307 };
308
309 (rx_filter_runner, rx_runner, tx_runner)
310 }
311}
312
313pub struct RxFilterRunner<'a> {
317 link: link::RxFilter<'a>,
318 filters: raw::Filters<'a>,
319 _info: InfoRef<'a>,
320}
321
322type MsgFilter = [Option<SubjectId>; 2];
323
324impl<'a> RxFilterRunner<'a> {
325 pub async fn run(&mut self) -> ! {
326 let mut msg_filters = self.load_message_filters();
327 let mut srv_filter = self.load_service_filter();
328
329 loop {
330 let request = self.link.pop().await;
331 match request {
332 FilterUpdate::AddSubject(subject) => self.add_subject(&mut msg_filters, subject),
333 FilterUpdate::RemoveSubjectRange(range) => {
334 self.remove_subject_range(&mut msg_filters, range)
335 }
336
337 FilterUpdate::AddDestination(node) => self.add_destination(&mut srv_filter, node),
338 FilterUpdate::RemoveDestinationRange(range) => {
339 self.remove_destination_range(&mut srv_filter, range)
340 }
341 }
342 }
343 }
344
345 fn load_message_filters(&mut self) -> [MsgFilter; raw::MESSAGE_FILTER_COUNT] {
346 let mut filters: [MsgFilter; raw::MESSAGE_FILTER_COUNT] = Default::default();
347 for (i, filter) in filters.iter_mut().enumerate() {
348 *filter = self.filters.get_message_filter(i);
349 }
350 filters
351 }
352
353 fn add_subject(&mut self, filters: &mut [MsgFilter], subject: SubjectId) {
354 let slots = filters.as_flattened_mut();
355 let slot_idx = slots
356 .iter()
357 .position(Option::is_none)
358 .expect("No slots left");
359 slots[slot_idx] = Some(subject);
360 let filter_idx = slot_idx / 2;
361 self.filters
362 .set_message_filter(filter_idx, filters[filter_idx]);
363 }
364
365 fn remove_subject_range(&mut self, filters: &mut [MsgFilter], range: [SubjectId; 2]) {
366 for (i, filter) in filters.iter_mut().enumerate() {
367 let mut update_filter = false;
368 for slot in filter
369 .iter_mut()
370 .filter(|slot| slot.is_some_and(|val| range[0] <= val && val <= range[1]))
371 {
372 *slot = None;
373 update_filter = true;
374 }
375 if update_filter {
376 self.filters.set_message_filter(i, *filter);
377 }
378 }
379 }
380
381 fn load_service_filter(&mut self) -> Option<NodeId> {
382 self.filters.get_service_filter()
383 }
384
385 fn add_destination(&mut self, filter: &mut Option<NodeId>, node: NodeId) {
386 assert!(filter.is_none(), "No slot left");
387 *filter = Some(node);
388 self.filters.set_service_filter(*filter);
389 }
390
391 fn remove_destination_range(&mut self, filter: &mut Option<NodeId>, range: [NodeId; 2]) {
392 if filter.is_some_and(|val| range[0] <= val && val <= range[1]) {
393 *filter = None;
394 self.filters.set_service_filter(*filter);
395 }
396 }
397}
398
399pub struct RxRunner<'a> {
403 link: link::Rx<'a>,
404 rx_fifo: raw::RxFifo<'a>,
405 info: InfoRef<'a>,
406 frame_format: FrameFormat,
407 timestamp_offset: Option<u16>,
408}
409
410impl<'a> RxRunner<'a> {
411 pub async fn run(&mut self) -> ! {
412 loop {
413 let frame = match select(
414 self.info.loop_back().receive(),
416 Self::pop_rx(&mut self.rx_fifo, &self.info),
417 )
418 .await
419 {
420 Either::First(frame) => frame,
421 Either::Second(raw_frame) => {
422 let now = time::Instant::now();
423 let timestamp = if let Some(offset) = self.timestamp_offset {
424 make_timestamp(now, raw_frame.timestamp.wrapping_sub(offset))
425 } else {
426 now
427 };
428
429 frame::Frame {
430 header: raw_frame.header,
431 data: raw_frame.data,
432 timestamp,
433 loop_back: false,
434 }
435 }
436 };
437 self.link.push(frame, self.frame_format.mtu()).await;
438 }
439 }
440
441 async fn pop_rx(rx_fifo: &mut raw::RxFifo<'a>, info: &InfoRef<'a>) -> raw::RawFrame {
442 poll_fn(|cx| {
443 info.register_rx_waker(cx.waker());
444
445 for i in 0..ram::RX_FIFOS_MAX {
447 if let Some(frame) = rx_fifo.pop(i.into()) {
448 return Poll::Ready(frame);
449 }
450 }
451 Poll::Pending
452 })
453 .await
454 }
455}
456
457pub struct TxRunner<'a> {
461 link: link::Tx<'a>,
462 tx_queue: raw::TxQueue<'a>,
463 tx_event_fifo: raw::TxEventFifo<'a>,
464 info: InfoRef<'a>,
465 frame_format: config::FrameFormat,
466 timestamp_offset: Option<u16>,
467}
468
469impl<'a> TxRunner<'a> {
470 pub async fn run(&mut self) -> ! {
471 let mut tx_queue = TxQueue {
472 raw_queue: &mut self.tx_queue,
473 info: &self.info,
474 };
475 let mut frames: PriorityMap<frame::Frame> = Default::default();
476 let mut pending_frames: MailboxPriorityMap = Default::default();
477 let mut loop_back_pending: PrioritySet = Default::default();
478
479 loop {
480 let pending_mailboxes = tx_queue.pending();
482 let mut completed_mailboxes = TxMailboxSet::NONE;
483 while let Some(event) = self.tx_event_fifo.pop() {
484 let idx = unwrap!(TxMailboxIdx::new(event.marker));
485 completed_mailboxes.insert(idx);
486 let priority = unwrap!(pending_frames.remove_by_mailbox(idx));
487 if frames[priority].loop_back {
488 let now = Instant::now();
489 let timestamp = if let Some(offset) = self.timestamp_offset {
490 make_timestamp(now, event.timestamp.wrapping_sub(offset))
491 } else {
492 now
493 };
494 frames[priority].timestamp = timestamp;
495 loop_back_pending.insert(priority);
496 } else {
497 frames.remove(priority);
498 }
499 }
500 let free_mailboxes = !pending_mailboxes | completed_mailboxes;
501 for idx in free_mailboxes & !completed_mailboxes {
502 pending_frames.remove_by_mailbox(idx);
503 }
504
505 let occupied_priorities = frames.keys();
506 let queued_priorities = occupied_priorities & !loop_back_pending;
507
508 match select4(
510 async {
511 if let Some(deadline) = queued_priorities
512 .into_iter()
513 .map(|p| frames[p].timestamp)
514 .min()
515 {
516 if deadline > time::Instant::now() {
518 Timer::at(deadline).await
519 }
520 } else {
521 pending().await
522 }
523 },
524 async {
525 if let Some(priority) = loop_back_pending.first() {
526 self.info.loop_back().send(frames[priority].clone()).await;
527 priority
528 } else {
529 pending().await
530 }
531 },
532 self.link.pop(!occupied_priorities, self.frame_format.mtu()),
533 async {
534 if let Some(priority) = queued_priorities.first() {
535 if !pending_frames.contains_priority(priority) {
536 let idx = free_mailboxes
537 .first()
538 .expect("One mailbox must always be free");
539 unwrap!(tx_queue.add(
540 idx,
541 &frames[priority],
542 self.frame_format,
543 idx.into()
544 ));
545 tx_queue.cancel(!TxMailboxSet::new_eq(idx));
546 assert_ne!(
547 tx_queue.pending(),
548 TxMailboxSet::ALL,
549 "At least one mailbox should get canceled immediately"
550 );
551 unwrap!(pending_frames.insert(idx, priority));
552 }
553 let idx = unwrap!(pending_frames.get_by_priority(priority));
554 tx_queue.wait_for_unpend(TxMailboxSet::new_eq(idx)).await
555 } else {
556 pending().await
557 }
558 },
559 )
560 .await
561 {
562 Either4::First(()) => {
563 let now = time::Instant::now();
564 for priority in queued_priorities {
565 if frames[priority].timestamp <= now {
566 unwrap!(frames.remove(priority));
567 if let Some(idx) = pending_frames.remove_by_priority(priority) {
568 tx_queue.cancel(TxMailboxSet::new_eq(idx));
569 }
570 }
571 }
572 }
573 Either4::Second(priority) => {
574 loop_back_pending.remove(priority);
575 unwrap!(frames.remove(priority));
576 }
577 Either4::Third(frame) => {
578 if frame.header.source.is_some() {
580 let priority = frame.header.priority;
581 unwrap!(frames.insert(priority, frame));
582 }
583 }
584 Either4::Fourth(()) => {}
585 }
586 }
587 }
588}
589
590struct TxQueue<'a, 'b> {
591 raw_queue: &'a mut raw::TxQueue<'b>,
592 info: &'a InfoRef<'b>,
593}
594
595impl<'a, 'b> TxQueue<'a, 'b> {
596 pub fn pending(&self) -> TxMailboxSet {
597 self.raw_queue.pending()
598 }
599
600 pub fn add(
601 &mut self,
602 index: TxMailboxIdx,
603 frame: &frame::Frame,
604 frame_format: config::FrameFormat,
605 marker: u8,
606 ) -> Result<(), ()> {
607 self.raw_queue.add(index, frame, frame_format, marker)
608 }
609
610 pub fn cancel(&mut self, mailboxes: TxMailboxSet) {
611 self.raw_queue.cancel(mailboxes);
612 }
613
614 pub async fn wait_for_unpend(&mut self, mailboxes: TxMailboxSet) {
615 poll_fn(|cx| {
616 self.info.register_tx_waker(cx.waker(), mailboxes);
617 self.raw_queue.enable_interrupt(mailboxes);
618
619 if (self.pending() & mailboxes).is_empty() {
621 Poll::Ready(())
622 } else {
623 Poll::Pending
624 }
625 })
626 .await;
627 }
628}
629
630impl<'a, 'b> Drop for TxQueue<'a, 'b> {
631 fn drop(&mut self) {
632 self.raw_queue.cancel(TxMailboxSet::ALL);
633 }
634}
635
636fn make_timestamp(epoch: time::Instant, counter: u16) -> Instant {
639 let offset = (epoch.as_ticks() as u16).wrapping_sub(counter);
640 Instant::from_ticks(epoch.as_ticks().saturating_sub(offset.into()))
641}