1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9 RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, ZencanMessage};
13use zencan_common::nmt::NmtState;
14use zencan_common::node_id::ConfiguredNodeId;
15use zencan_common::sdo::AbortCode;
16use zencan_common::{
17 node_configuration::PdoConfig,
18 pdo::PdoMapping,
19 traits::{AsyncCanReceiver, AsyncCanSender},
20 CanId, NodeId,
21};
22
23use super::shared_sender::SharedSender;
24use crate::sdo_client::{SdoClient, SdoClientError};
25use crate::{LssError, LssMaster, RawAbortCode};
26
27use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
28
29#[derive(Debug, Clone)]
30pub struct NodeInfo {
31 pub node_id: u8,
32 pub identity: Option<LssIdentity>,
33 pub device_name: Option<String>,
34 pub software_version: Option<String>,
35 pub hardware_version: Option<String>,
36 pub last_seen: Instant,
37 pub nmt_state: Option<NmtState>,
38}
39
40impl core::fmt::Display for NodeInfo {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 writeln!(
43 f,
44 "Node {}: {}",
45 self.node_id,
46 self.nmt_state
47 .map(|s| s.to_string())
48 .unwrap_or("Unknown State".into())
49 )?;
50 match self.identity {
51 Some(id) => writeln!(
52 f,
53 " Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
54 id.vendor_id, id.product_code, id.revision, id.serial
55 )?,
56 None => writeln!(f, " Identity: Unknown")?,
57 }
58 writeln!(
59 f,
60 " Device Name: '{}'",
61 self.device_name.as_deref().unwrap_or("Unknown")
62 )?;
63 writeln!(
64 f,
65 " Versions: '{}' SW, '{}' HW",
66 self.software_version.as_deref().unwrap_or("Unknown"),
67 self.hardware_version.as_deref().unwrap_or("Unknown")
68 )?;
69 let age = Instant::now().duration_since(self.last_seen);
70 writeln!(f, " Last Seen: {}s ago", age.as_secs())?;
71
72 Ok(())
73 }
74}
75
76impl NodeInfo {
77 pub fn new(node_id: u8) -> Self {
78 Self {
79 node_id,
80 last_seen: Instant::now(),
81 device_name: None,
82 identity: None,
83 software_version: None,
84 hardware_version: None,
85 nmt_state: None,
86 }
87 }
88
89 pub fn update(&mut self, info: &NodeInfo) {
91 if info.device_name.is_some() {
92 self.device_name = info.device_name.clone();
93 }
94 if info.identity.is_some() {
95 self.identity = info.identity;
96 }
97 if info.software_version.is_some() {
98 self.software_version = info.software_version.clone();
99 }
100 if info.hardware_version.is_some() {
101 self.hardware_version = info.hardware_version.clone();
102 }
103 if info.nmt_state.is_some() {
104 self.nmt_state = info.nmt_state;
105 }
106 self.last_seen = Instant::now();
107 }
108}
109
110async fn scan_node<S: AsyncCanSender + Sync + Send>(
111 node_id: u8,
112 clients: &SdoClientMutex<S>,
113) -> Result<Option<NodeInfo>, SdoClientError> {
114 let mut sdo_client = clients.lock(node_id);
115 log::info!("Scanning Node {node_id}");
116
117 let identity = match sdo_client.read_identity().await {
118 Ok(id) => Some(id),
119 Err(SdoClientError::NoResponse) => {
121 log::info!("No response from node {node_id}");
122 return Ok(None);
123 }
124 Err(e) => return Err(e),
125 };
126 let device_name = match sdo_client.read_device_name().await {
127 Ok(s) => Some(s),
128 Err(e) => {
129 log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
130 None
131 }
132 };
133 let software_version = match sdo_client.read_software_version().await {
134 Ok(s) => Some(s),
135 Err(e) => {
136 log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
137 None
138 }
139 };
140 let hardware_version = match sdo_client.read_hardware_version().await {
141 Ok(s) => Some(s),
142 Err(e) => {
143 log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
144 None
145 }
146 };
147 Ok(Some(NodeInfo {
148 node_id,
149 identity,
150 device_name,
151 software_version,
152 hardware_version,
153 nmt_state: None,
154 last_seen: Instant::now(),
155 }))
156}
157
158#[derive(Clone, Debug)]
160pub struct PdoScanResult {
161 pub tpdos: Vec<PdoConfig>,
163 pub rpdos: Vec<PdoConfig>,
165}
166
167async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
168 mut comm_base: u16,
169 mut mapping_base: u16,
170 client: &mut SdoClient<S, R>,
171) -> Result<Vec<PdoConfig>, SdoClientError> {
172 let mut result = Vec::new();
173
174 loop {
175 let _comm_max_sub = match client.read_u8(comm_base, 0).await {
176 Ok(val) => val,
177 Err(SdoClientError::ServerAbort {
179 index: _,
180 sub: _,
181 abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
182 }) => break,
183 Err(e) => {
185 return Err(e);
186 }
187 };
188
189 let cob_value = client.read_u32(comm_base, 1).await?;
190 let transmission_type = client.read_u8(comm_base, 2).await?;
191
192 let frame = (cob_value & (1 << 29)) != 0;
193 let rtr_disabled = (cob_value & (1 << 30)) != 0;
194 let enabled = (cob_value & (1 << 31)) == 0;
195
196 let cob_id = cob_value & 0x1FFFFFFF;
197 let cob_id = if frame {
198 CanId::extended(cob_id)
199 } else {
200 CanId::std((cob_id & 0x7ff) as u16)
201 };
202 let num_mappings = client.read_u8(mapping_base, 0).await?;
203 let mut mappings = Vec::new();
204 for i in 0..num_mappings {
205 let map_param = client.read_u32(mapping_base, i + 1).await?;
206 mappings.push(PdoMapping::from_object_value(map_param));
207 }
208
209 result.push(PdoConfig {
210 cob_id,
211 enabled,
212 rtr_disabled,
213 mappings,
214 transmission_type,
215 });
216 comm_base += 1;
217 mapping_base += 1;
218 }
219 Ok(result)
220}
221
222async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
223 client: &mut SdoClient<S, R>,
224) -> Result<Vec<PdoConfig>, SdoClientError> {
225 read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
226}
227
228async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
229 client: &mut SdoClient<S, R>,
230) -> Result<Vec<PdoConfig>, SdoClientError> {
231 read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
232}
233
234#[derive(Debug)]
235pub struct SdoClientGuard<'a, S, R>
236where
237 S: AsyncCanSender,
238 R: AsyncCanReceiver,
239{
240 _guard: std::sync::MutexGuard<'a, ()>,
241 client: SdoClient<S, R>,
242}
243
244impl<S, R> Deref for SdoClientGuard<'_, S, R>
245where
246 S: AsyncCanSender,
247 R: AsyncCanReceiver,
248{
249 type Target = SdoClient<S, R>;
250
251 fn deref(&self) -> &Self::Target {
252 &self.client
253 }
254}
255
256impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
257where
258 S: AsyncCanSender,
259 R: AsyncCanReceiver,
260{
261 fn deref_mut(&mut self) -> &mut Self::Target {
262 &mut self.client
263 }
264}
265
266#[derive(Debug)]
267struct SdoClientMutex<S>
268where
269 S: AsyncCanSender + Sync,
270{
271 sender: SharedSender<S>,
272 receiver: SharedReceiver,
273 clients: HashMap<u8, Mutex<()>>,
274}
275
276impl<S> SdoClientMutex<S>
277where
278 S: AsyncCanSender + Sync,
279{
280 pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
281 let mut clients = HashMap::new();
282 for i in 0u8..128 {
283 clients.insert(i, Mutex::new(()));
284 }
285
286 Self {
287 sender,
288 receiver,
289 clients,
290 }
291 }
292
293 pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
294 if !(1..=127).contains(&id) {
295 panic!("ID {} out of range", id);
296 }
297 let guard = self.clients.get(&id).unwrap().lock().unwrap();
298 let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
299 SdoClientGuard {
300 _guard: guard,
301 client,
302 }
303 }
304}
305
306#[derive(Debug)]
308pub struct BusManager<S: AsyncCanSender + Sync + Send> {
309 sender: SharedSender<S>,
310 receiver: SharedReceiver,
311 nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
312 sdo_clients: SdoClientMutex<S>,
313 _monitor_task: JoinHandle<()>,
314}
315
316impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
317 pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
327 let receiver = SharedReceiver::new(receiver);
328 let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
329 let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
330
331 let mut state_rx = receiver.create_rx();
332 let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
333
334 let monitor_task = {
335 let nodes = nodes.clone();
336 tokio::spawn(async move {
337 loop {
338 if let Ok(msg) = state_rx.recv().await {
339 if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
340 ZencanMessage::try_from(msg)
341 {
342 let id_num = heartbeat.node;
343 if let Ok(node_id) = NodeId::try_from(id_num) {
344 let mut nodes = nodes.lock().await;
345 if let std::collections::hash_map::Entry::Vacant(e) =
346 nodes.entry(id_num)
347 {
348 e.insert(NodeInfo::new(node_id.raw()));
349 } else {
350 let node = nodes.get_mut(&id_num).unwrap();
351 node.nmt_state = Some(heartbeat.state);
352 node.last_seen = Instant::now();
353 }
354 } else {
355 log::warn!("Invalid heartbeat node ID {id_num} received");
356 }
357 }
358 }
359 }
360 })
361 };
362
363 Self {
364 sender,
365 receiver,
366 sdo_clients,
367 nodes,
368 _monitor_task: monitor_task,
369 }
370 }
371
372 pub fn sdo_client(
377 &self,
378 node_id: u8,
379 ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
380 self.sdo_clients.lock(node_id)
381 }
382
383 pub async fn node_list(&self) -> Vec<NodeInfo> {
385 let node_map = self.nodes.lock().await;
386 let mut nodes = Vec::with_capacity(node_map.len());
387 for n in node_map.values() {
388 nodes.push(n.clone());
389 }
390
391 nodes.sort_by_key(|n| n.node_id);
392 nodes
393 }
394
395 pub async fn scan_nodes(&mut self) -> Result<Vec<NodeInfo>, SdoClientError> {
403 const N_PARALLEL: usize = 10;
404
405 let ids = Vec::from_iter(1..128u8);
406 let mut nodes: Vec<NodeInfo> = Vec::new();
407
408 let mut chunks = Vec::new();
409 for chunk in ids.chunks(128 / N_PARALLEL) {
410 chunks.push(Vec::from_iter(chunk.iter().cloned()));
411 }
412
413 let mut futures = Vec::new();
414
415 for block in chunks {
416 futures.push(async {
417 let mut block_nodes = Vec::new();
418 for id in block {
419 block_nodes.push(scan_node(id, &self.sdo_clients).await);
420 }
421 block_nodes
422 });
423 }
424
425 let results = join_all(futures).await;
427 for r in results {
428 let r: Result<Vec<Option<NodeInfo>>, SdoClientError> = r.into_iter().collect();
429 nodes.extend(r?.into_iter().flatten());
430 }
431
432 let mut node_map = self.nodes.lock().await;
433 for n in &nodes {
435 if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
436 e.insert(n.clone());
437 } else {
438 node_map.get_mut(&n.node_id).unwrap().update(n);
439 }
440 }
441
442 Ok(nodes
447 .iter()
448 .map(|n| node_map.get(&n.node_id).unwrap().clone())
449 .collect())
450 }
451
452 pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
461 let mut devices = Vec::new();
462 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
463
464 lss.set_global_mode(LssState::Waiting).await;
466
467 while let Some(id) = lss.fast_scan(timeout).await {
470 devices.push(id);
471 }
472
473 lss.set_global_mode(LssState::Waiting).await;
474
475 devices
476 }
477
478 pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
487 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
488 lss.set_global_mode(LssState::Waiting).await;
489 lss.enter_config_by_identity(
490 ident.vendor_id,
491 ident.product_code,
492 ident.revision,
493 ident.serial,
494 )
495 .await
496 }
497
498 pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
503 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
504 lss.set_node_id(node_id).await?;
505 Ok(())
506 }
507
508 pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
513 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
514 lss.store_config().await
515 }
516
517 pub async fn lss_set_global_mode(&mut self, mode: LssState) {
519 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
520 lss.set_global_mode(mode).await;
521 }
522
523 pub async fn nmt_reset_app(&mut self, node: u8) {
527 self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
528 }
529
530 pub async fn nmt_reset_comms(&mut self, node: u8) {
534 self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
535 .await
536 }
537
538 pub async fn nmt_start(&mut self, node: u8) {
542 self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
543 }
544
545 pub async fn nmt_stop(&mut self, node: u8) {
549 self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
550 }
551
552 async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
553 let message = NmtCommand { cs: cmd, node };
554 self.sender.send(message.into()).await.ok();
555 }
556
557 pub async fn read_pdo_config(
561 &mut self,
562 node: ConfiguredNodeId,
563 ) -> Result<PdoScanResult, SdoClientError> {
564 let mut client = self.sdo_client(node.raw());
565
566 let tpdos = read_tpdo_config(&mut client).await?;
567 let rpdos = read_rpdo_config(&mut client).await?;
568
569 Ok(PdoScanResult { tpdos, rpdos })
570 }
571}