zencan_node/
node_mbox.rs

1//! Implements mailbox for receiving CAN messages
2use defmt_or_log::warn;
3use zencan_common::{
4    messages::{CanId, CanMessage},
5    AtomicCell,
6};
7
8use crate::{
9    lss_slave::LssReceiver, pdo::Pdo, priority_queue::PriorityQueue, sdo_server::SdoComms,
10};
11
12pub trait CanMessageQueue: Send + Sync {
13    fn push(&self, msg: CanMessage) -> Result<(), CanMessage>;
14
15    fn pop(&self) -> Option<CanMessage>;
16}
17
18impl<const N: usize> CanMessageQueue for PriorityQueue<N, CanMessage> {
19    fn push(&self, msg: CanMessage) -> Result<(), CanMessage> {
20        let prio = msg.id().raw();
21        self.push(prio, msg)
22    }
23
24    fn pop(&self) -> Option<CanMessage> {
25        self.pop()
26    }
27}
28
29/// A data structure to be shared between a receiving thread (e.g. a CAN controller IRQ) and the
30/// [`Node`](crate::Node) object.
31///
32/// Incoming messages should be passed to [NodeMbox::store_message].
33#[allow(missing_debug_implementations)]
34pub struct NodeMbox {
35    rx_pdos: &'static [Pdo],
36    tx_pdos: &'static [Pdo],
37    /// ID used for transmitting SDO server responses
38    sdo_tx_cob_id: AtomicCell<Option<CanId>>,
39    /// ID used for receiving SDO server requests
40    sdo_rx_cob_id: AtomicCell<Option<CanId>>,
41    sdo_comms: SdoComms,
42    nmt_mbox: AtomicCell<Option<CanMessage>>,
43    lss_receiver: LssReceiver,
44    sync_flag: AtomicCell<bool>,
45    process_notify_cb: AtomicCell<Option<&'static (dyn Fn() + Sync)>>,
46    transmit_notify_cb: AtomicCell<Option<&'static (dyn Fn() + Sync)>>,
47    tx_queue: &'static dyn CanMessageQueue,
48}
49
50impl NodeMbox {
51    /// Create a new NodeMbox
52    ///
53    /// # Args
54    ///
55    /// - `rx_pdos`: A slice of Pdo objects for all of the receive PDOs
56    pub const fn new(
57        rx_pdos: &'static [Pdo],
58        tx_pdos: &'static [Pdo],
59        tx_queue: &'static dyn CanMessageQueue,
60        sdo_buffer: &'static mut [u8],
61    ) -> Self {
62        let sdo_rx_cob_id = AtomicCell::new(None);
63        let sdo_tx_cob_id = AtomicCell::new(None);
64        let sdo_comms = SdoComms::new(sdo_buffer);
65        let nmt_mbox = AtomicCell::new(None);
66        let lss_receiver = LssReceiver::new();
67        let sync_flag = AtomicCell::new(false);
68        let process_notify_cb = AtomicCell::new(None);
69        let transmit_notify_cb = AtomicCell::new(None);
70        Self {
71            rx_pdos,
72            tx_pdos,
73            sdo_rx_cob_id,
74            sdo_tx_cob_id,
75            sdo_comms,
76            nmt_mbox,
77            lss_receiver,
78            sync_flag,
79            process_notify_cb,
80            transmit_notify_cb,
81            tx_queue,
82        }
83    }
84
85    /// Set a callback for notification when a message is received and requires processing.
86    ///
87    /// It must be static. Usually this will be a static fn, but in some circumstances, it may be
88    /// desirable to use Box::leak to pass a heap allocated closure instead.
89    pub fn set_process_notify_callback(&self, callback: &'static (dyn Fn() + Sync)) {
90        self.process_notify_cb.store(Some(callback));
91    }
92
93    fn process_notify(&self) {
94        if let Some(notify_cb) = self.process_notify_cb.load() {
95            notify_cb();
96        }
97    }
98
99    /// Set a callback for when new transmit messages are queued
100    ///
101    /// This will be called during process anytime new messages are ready to be queued
102    pub fn set_transmit_notify_callback(&self, callback: &'static (dyn Fn() + Sync)) {
103        self.transmit_notify_cb.store(Some(callback));
104    }
105
106    pub(crate) fn transmit_notify(&self) {
107        if let Some(notify_cb) = self.transmit_notify_cb.load() {
108            notify_cb();
109        }
110    }
111
112    pub(crate) fn set_sdo_rx_cob_id(&self, cob_id: Option<CanId>) {
113        self.sdo_rx_cob_id.store(cob_id);
114    }
115
116    pub(crate) fn set_sdo_tx_cob_id(&self, cob_id: Option<CanId>) {
117        self.sdo_tx_cob_id.store(cob_id);
118    }
119
120    pub(crate) fn sdo_comms(&self) -> &SdoComms {
121        &self.sdo_comms
122    }
123
124    pub(crate) fn read_nmt_mbox(&self) -> Option<CanMessage> {
125        self.nmt_mbox.take()
126    }
127
128    pub(crate) fn lss_receiver(&self) -> &LssReceiver {
129        &self.lss_receiver
130    }
131
132    pub(crate) fn read_sync_flag(&self) -> bool {
133        self.sync_flag.take()
134    }
135
136    /// Store a received CAN message
137    pub fn store_message(&self, msg: CanMessage) -> Result<(), CanMessage> {
138        let id = msg.id();
139        if id == zencan_common::messages::NMT_CMD_ID {
140            self.nmt_mbox.store(Some(msg));
141            self.process_notify();
142            return Ok(());
143        }
144
145        if id == zencan_common::messages::SYNC_ID {
146            self.sync_flag.store(true);
147            self.process_notify();
148            return Ok(());
149        }
150
151        if id == zencan_common::messages::LSS_REQ_ID {
152            if let Ok(lss_req) = msg.data().try_into() {
153                if self.lss_receiver.handle_req(lss_req) {
154                    self.process_notify();
155                }
156            } else {
157                warn!("Invalid LSS request");
158                return Err(msg);
159            }
160            return Ok(());
161        }
162
163        for rpdo in self.rx_pdos {
164            if !rpdo.valid() {
165                continue;
166            }
167            if id == rpdo.cob_id() {
168                let mut data = [0u8; 8];
169                data[0..msg.data().len()].copy_from_slice(msg.data());
170                rpdo.buffered_value.store(Some(data));
171                return Ok(());
172            }
173        }
174
175        if let Some(cob_id) = self.sdo_rx_cob_id.load() {
176            if id == cob_id {
177                self.sdo_comms.handle_req(msg.data());
178            }
179        }
180
181        Err(msg)
182    }
183
184    /// Get the next message ready for transmit
185    ///
186    /// Messages are prioritized as follows:
187    ///
188    /// - TPDOs first, if available, starting with TPDO0
189    /// - Other non-SDO messages (SYNC, LSS, NMT)
190    /// - SDO server responses    
191    pub fn next_transmit_message(&self) -> Option<CanMessage> {
192        for pdo in self.tx_pdos.iter() {
193            if let Some(buf) = pdo.buffered_value.take() {
194                return Some(CanMessage::new(pdo.cob_id(), &buf));
195            }
196        }
197
198        if let Some(msg) = self.tx_queue.pop() {
199            return Some(msg);
200        }
201
202        if let Some(msg) = self.sdo_comms.next_transmit_message() {
203            if let Some(id) = self.sdo_tx_cob_id.load() {
204                return Some(CanMessage::new(id, &msg));
205            }
206        }
207
208        None
209    }
210
211    /// Store a message for transmission in the general transmit queue
212    pub fn queue_transmit_message(&self, msg: CanMessage) -> Result<(), CanMessage> {
213        self.tx_queue.push(msg)
214    }
215}