1use defmt_or_log::warn;
3use zencan_common::{
4 messages::{CanId, CanMessage},
5 AtomicCell,
6};
7
8use crate::{
9 lss_slave::LssReceiver, pdo::Pdo, priority_queue::PriorityQueue, sdo_server::SdoComms,
10};
11
12pub trait CanMessageQueue: Send + Sync {
13 fn push(&self, msg: CanMessage) -> Result<(), CanMessage>;
14
15 fn pop(&self) -> Option<CanMessage>;
16}
17
18impl<const N: usize> CanMessageQueue for PriorityQueue<N, CanMessage> {
19 fn push(&self, msg: CanMessage) -> Result<(), CanMessage> {
20 let prio = msg.id().raw();
21 self.push(prio, msg)
22 }
23
24 fn pop(&self) -> Option<CanMessage> {
25 self.pop()
26 }
27}
28
29#[allow(missing_debug_implementations)]
34pub struct NodeMbox {
35 rx_pdos: &'static [Pdo],
36 tx_pdos: &'static [Pdo],
37 sdo_tx_cob_id: AtomicCell<Option<CanId>>,
39 sdo_rx_cob_id: AtomicCell<Option<CanId>>,
41 sdo_comms: SdoComms,
42 nmt_mbox: AtomicCell<Option<CanMessage>>,
43 lss_receiver: LssReceiver,
44 sync_flag: AtomicCell<bool>,
45 process_notify_cb: AtomicCell<Option<&'static (dyn Fn() + Sync)>>,
46 transmit_notify_cb: AtomicCell<Option<&'static (dyn Fn() + Sync)>>,
47 tx_queue: &'static dyn CanMessageQueue,
48}
49
50impl NodeMbox {
51 pub const fn new(
57 rx_pdos: &'static [Pdo],
58 tx_pdos: &'static [Pdo],
59 tx_queue: &'static dyn CanMessageQueue,
60 sdo_buffer: &'static mut [u8],
61 ) -> Self {
62 let sdo_rx_cob_id = AtomicCell::new(None);
63 let sdo_tx_cob_id = AtomicCell::new(None);
64 let sdo_comms = SdoComms::new(sdo_buffer);
65 let nmt_mbox = AtomicCell::new(None);
66 let lss_receiver = LssReceiver::new();
67 let sync_flag = AtomicCell::new(false);
68 let process_notify_cb = AtomicCell::new(None);
69 let transmit_notify_cb = AtomicCell::new(None);
70 Self {
71 rx_pdos,
72 tx_pdos,
73 sdo_rx_cob_id,
74 sdo_tx_cob_id,
75 sdo_comms,
76 nmt_mbox,
77 lss_receiver,
78 sync_flag,
79 process_notify_cb,
80 transmit_notify_cb,
81 tx_queue,
82 }
83 }
84
85 pub fn set_process_notify_callback(&self, callback: &'static (dyn Fn() + Sync)) {
90 self.process_notify_cb.store(Some(callback));
91 }
92
93 fn process_notify(&self) {
94 if let Some(notify_cb) = self.process_notify_cb.load() {
95 notify_cb();
96 }
97 }
98
99 pub fn set_transmit_notify_callback(&self, callback: &'static (dyn Fn() + Sync)) {
103 self.transmit_notify_cb.store(Some(callback));
104 }
105
106 pub(crate) fn transmit_notify(&self) {
107 if let Some(notify_cb) = self.transmit_notify_cb.load() {
108 notify_cb();
109 }
110 }
111
112 pub(crate) fn set_sdo_rx_cob_id(&self, cob_id: Option<CanId>) {
113 self.sdo_rx_cob_id.store(cob_id);
114 }
115
116 pub(crate) fn set_sdo_tx_cob_id(&self, cob_id: Option<CanId>) {
117 self.sdo_tx_cob_id.store(cob_id);
118 }
119
120 pub(crate) fn sdo_comms(&self) -> &SdoComms {
121 &self.sdo_comms
122 }
123
124 pub(crate) fn read_nmt_mbox(&self) -> Option<CanMessage> {
125 self.nmt_mbox.take()
126 }
127
128 pub(crate) fn lss_receiver(&self) -> &LssReceiver {
129 &self.lss_receiver
130 }
131
132 pub(crate) fn read_sync_flag(&self) -> bool {
133 self.sync_flag.take()
134 }
135
136 pub fn store_message(&self, msg: CanMessage) -> Result<(), CanMessage> {
138 let id = msg.id();
139 if id == zencan_common::messages::NMT_CMD_ID {
140 self.nmt_mbox.store(Some(msg));
141 self.process_notify();
142 return Ok(());
143 }
144
145 if id == zencan_common::messages::SYNC_ID {
146 self.sync_flag.store(true);
147 self.process_notify();
148 return Ok(());
149 }
150
151 if id == zencan_common::messages::LSS_REQ_ID {
152 if let Ok(lss_req) = msg.data().try_into() {
153 if self.lss_receiver.handle_req(lss_req) {
154 self.process_notify();
155 }
156 } else {
157 warn!("Invalid LSS request");
158 return Err(msg);
159 }
160 return Ok(());
161 }
162
163 for rpdo in self.rx_pdos {
164 if !rpdo.valid() {
165 continue;
166 }
167 if id == rpdo.cob_id() {
168 let mut data = [0u8; 8];
169 data[0..msg.data().len()].copy_from_slice(msg.data());
170 rpdo.buffered_value.store(Some(data));
171 return Ok(());
172 }
173 }
174
175 if let Some(cob_id) = self.sdo_rx_cob_id.load() {
176 if id == cob_id {
177 self.sdo_comms.handle_req(msg.data());
178 }
179 }
180
181 Err(msg)
182 }
183
184 pub fn next_transmit_message(&self) -> Option<CanMessage> {
192 for pdo in self.tx_pdos.iter() {
193 if let Some(buf) = pdo.buffered_value.take() {
194 return Some(CanMessage::new(pdo.cob_id(), &buf));
195 }
196 }
197
198 if let Some(msg) = self.tx_queue.pop() {
199 return Some(msg);
200 }
201
202 if let Some(msg) = self.sdo_comms.next_transmit_message() {
203 if let Some(id) = self.sdo_tx_cob_id.load() {
204 return Some(CanMessage::new(id, &msg));
205 }
206 }
207
208 None
209 }
210
211 pub fn queue_transmit_message(&self, msg: CanMessage) -> Result<(), CanMessage> {
213 self.tx_queue.push(msg)
214 }
215}