1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9 RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, NmtState, ZencanMessage};
13use zencan_common::node_id::ConfiguredNodeId;
14use zencan_common::sdo::AbortCode;
15use zencan_common::{
16 node_configuration::PdoConfig,
17 pdo::PdoMapping,
18 traits::{AsyncCanReceiver, AsyncCanSender},
19 CanId, NodeId,
20};
21
22use super::shared_sender::SharedSender;
23use crate::sdo_client::{SdoClient, SdoClientError};
24use crate::{LssError, LssMaster, RawAbortCode};
25
26use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
27
28#[derive(Debug, Clone)]
29pub struct NodeInfo {
30 pub node_id: u8,
31 pub identity: Option<LssIdentity>,
32 pub device_name: Option<String>,
33 pub software_version: Option<String>,
34 pub hardware_version: Option<String>,
35 pub last_seen: Instant,
36 pub nmt_state: Option<NmtState>,
37}
38
39impl core::fmt::Display for NodeInfo {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 writeln!(
42 f,
43 "Node {}: {}",
44 self.node_id,
45 self.nmt_state
46 .map(|s| s.to_string())
47 .unwrap_or("Unknown State".into())
48 )?;
49 match self.identity {
50 Some(id) => writeln!(
51 f,
52 " Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
53 id.vendor_id, id.product_code, id.revision, id.serial
54 )?,
55 None => writeln!(f, " Identity: Unknown")?,
56 }
57 writeln!(
58 f,
59 " Device Name: '{}'",
60 self.device_name.as_deref().unwrap_or("Unknown")
61 )?;
62 writeln!(
63 f,
64 " Versions: '{}' SW, '{}' HW",
65 self.software_version.as_deref().unwrap_or("Unknown"),
66 self.hardware_version.as_deref().unwrap_or("Unknown")
67 )?;
68 let age = Instant::now().duration_since(self.last_seen);
69 writeln!(f, " Last Seen: {}s ago", age.as_secs())?;
70
71 Ok(())
72 }
73}
74
75impl NodeInfo {
76 pub fn new(node_id: u8) -> Self {
77 Self {
78 node_id,
79 last_seen: Instant::now(),
80 device_name: None,
81 identity: None,
82 software_version: None,
83 hardware_version: None,
84 nmt_state: None,
85 }
86 }
87
88 pub fn update(&mut self, info: &NodeInfo) {
90 if info.device_name.is_some() {
91 self.device_name = info.device_name.clone();
92 }
93 if info.identity.is_some() {
94 self.identity = info.identity;
95 }
96 if info.software_version.is_some() {
97 self.software_version = info.software_version.clone();
98 }
99 if info.hardware_version.is_some() {
100 self.hardware_version = info.hardware_version.clone();
101 }
102 if info.nmt_state.is_some() {
103 self.nmt_state = info.nmt_state;
104 }
105 self.last_seen = Instant::now();
106 }
107}
108
109async fn scan_node<S: AsyncCanSender + Sync + Send>(
110 node_id: u8,
111 clients: &SdoClientMutex<S>,
112) -> Result<Option<NodeInfo>, SdoClientError> {
113 let mut sdo_client = clients.lock(node_id);
114 log::info!("Scanning Node {node_id}");
115
116 let identity = match sdo_client.read_identity().await {
117 Ok(id) => Some(id),
118 Err(SdoClientError::NoResponse) => {
120 log::info!("No response from node {node_id}");
121 return Ok(None);
122 }
123 Err(e) => return Err(e),
124 };
125 let device_name = match sdo_client.read_device_name().await {
126 Ok(s) => Some(s),
127 Err(e) => {
128 log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
129 None
130 }
131 };
132 let software_version = match sdo_client.read_software_version().await {
133 Ok(s) => Some(s),
134 Err(e) => {
135 log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
136 None
137 }
138 };
139 let hardware_version = match sdo_client.read_hardware_version().await {
140 Ok(s) => Some(s),
141 Err(e) => {
142 log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
143 None
144 }
145 };
146 Ok(Some(NodeInfo {
147 node_id,
148 identity,
149 device_name,
150 software_version,
151 hardware_version,
152 nmt_state: None,
153 last_seen: Instant::now(),
154 }))
155}
156
157#[derive(Clone, Debug)]
159pub struct PdoScanResult {
160 pub tpdos: Vec<PdoConfig>,
162 pub rpdos: Vec<PdoConfig>,
164}
165
166async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
167 mut comm_base: u16,
168 mut mapping_base: u16,
169 client: &mut SdoClient<S, R>,
170) -> Result<Vec<PdoConfig>, SdoClientError> {
171 let mut result = Vec::new();
172
173 loop {
174 let _comm_max_sub = match client.read_u8(comm_base, 0).await {
175 Ok(val) => val,
176 Err(SdoClientError::ServerAbort {
178 index: _,
179 sub: _,
180 abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
181 }) => break,
182 Err(e) => {
184 return Err(e);
185 }
186 };
187
188 let cob_value = client.read_u32(comm_base, 1).await?;
189 let transmission_type = client.read_u8(comm_base, 2).await?;
190
191 let frame = (cob_value & (1 << 29)) != 0;
192 let rtr_disabled = (cob_value & (1 << 30)) != 0;
193 let enabled = (cob_value & (1 << 31)) == 0;
194
195 let cob_id = cob_value & 0x1FFFFFFF;
196 let cob_id = if frame {
197 CanId::extended(cob_id)
198 } else {
199 CanId::std((cob_id & 0x7ff) as u16)
200 };
201 let num_mappings = client.read_u8(mapping_base, 0).await?;
202 let mut mappings = Vec::new();
203 for i in 0..num_mappings {
204 let map_param = client.read_u32(mapping_base, i + 1).await?;
205 mappings.push(PdoMapping::from_object_value(map_param));
206 }
207
208 result.push(PdoConfig {
209 cob_id,
210 enabled,
211 rtr_disabled,
212 mappings,
213 transmission_type,
214 });
215 comm_base += 1;
216 mapping_base += 1;
217 }
218 Ok(result)
219}
220
221async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
222 client: &mut SdoClient<S, R>,
223) -> Result<Vec<PdoConfig>, SdoClientError> {
224 read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
225}
226
227async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
228 client: &mut SdoClient<S, R>,
229) -> Result<Vec<PdoConfig>, SdoClientError> {
230 read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
231}
232
233#[derive(Debug)]
234pub struct SdoClientGuard<'a, S, R>
235where
236 S: AsyncCanSender,
237 R: AsyncCanReceiver,
238{
239 _guard: std::sync::MutexGuard<'a, ()>,
240 client: SdoClient<S, R>,
241}
242
243impl<S, R> Deref for SdoClientGuard<'_, S, R>
244where
245 S: AsyncCanSender,
246 R: AsyncCanReceiver,
247{
248 type Target = SdoClient<S, R>;
249
250 fn deref(&self) -> &Self::Target {
251 &self.client
252 }
253}
254
255impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
256where
257 S: AsyncCanSender,
258 R: AsyncCanReceiver,
259{
260 fn deref_mut(&mut self) -> &mut Self::Target {
261 &mut self.client
262 }
263}
264
265#[derive(Debug)]
266struct SdoClientMutex<S>
267where
268 S: AsyncCanSender + Sync,
269{
270 sender: SharedSender<S>,
271 receiver: SharedReceiver,
272 clients: HashMap<u8, Mutex<()>>,
273}
274
275impl<S> SdoClientMutex<S>
276where
277 S: AsyncCanSender + Sync,
278{
279 pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
280 let mut clients = HashMap::new();
281 for i in 0u8..128 {
282 clients.insert(i, Mutex::new(()));
283 }
284
285 Self {
286 sender,
287 receiver,
288 clients,
289 }
290 }
291
292 pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
293 if !(1..=127).contains(&id) {
294 panic!("ID {} out of range", id);
295 }
296 let guard = self.clients.get(&id).unwrap().lock().unwrap();
297 let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
298 SdoClientGuard {
299 _guard: guard,
300 client,
301 }
302 }
303}
304
305#[derive(Debug)]
307pub struct BusManager<S: AsyncCanSender + Sync + Send> {
308 sender: SharedSender<S>,
309 receiver: SharedReceiver,
310 nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
311 sdo_clients: SdoClientMutex<S>,
312 _monitor_task: JoinHandle<()>,
313}
314
315impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
316 pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
326 let receiver = SharedReceiver::new(receiver);
327 let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
328 let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
329
330 let mut state_rx = receiver.create_rx();
331 let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
332
333 let monitor_task = {
334 let nodes = nodes.clone();
335 tokio::spawn(async move {
336 loop {
337 if let Ok(msg) = state_rx.recv().await {
338 if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
339 ZencanMessage::try_from(msg)
340 {
341 let id_num = heartbeat.node;
342 if let Ok(node_id) = NodeId::try_from(id_num) {
343 let mut nodes = nodes.lock().await;
344 if let std::collections::hash_map::Entry::Vacant(e) =
345 nodes.entry(id_num)
346 {
347 e.insert(NodeInfo::new(node_id.raw()));
348 } else {
349 let node = nodes.get_mut(&id_num).unwrap();
350 node.nmt_state = Some(heartbeat.state);
351 node.last_seen = Instant::now();
352 }
353 } else {
354 log::warn!("Invalid heartbeat node ID {id_num} received");
355 }
356 }
357 }
358 }
359 })
360 };
361
362 Self {
363 sender,
364 receiver,
365 sdo_clients,
366 nodes,
367 _monitor_task: monitor_task,
368 }
369 }
370
371 pub fn sdo_client(
376 &self,
377 node_id: u8,
378 ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
379 self.sdo_clients.lock(node_id)
380 }
381
382 pub async fn node_list(&self) -> Vec<NodeInfo> {
384 let node_map = self.nodes.lock().await;
385 let mut nodes = Vec::with_capacity(node_map.len());
386 for n in node_map.values() {
387 nodes.push(n.clone());
388 }
389
390 nodes.sort_by_key(|n| n.node_id);
391 nodes
392 }
393
394 pub async fn scan_nodes(&mut self) -> Result<Vec<NodeInfo>, SdoClientError> {
402 const N_PARALLEL: usize = 10;
403
404 let ids = Vec::from_iter(1..128u8);
405 let mut nodes: Vec<NodeInfo> = Vec::new();
406
407 let mut chunks = Vec::new();
408 for chunk in ids.chunks(128 / N_PARALLEL) {
409 chunks.push(Vec::from_iter(chunk.iter().cloned()));
410 }
411
412 let mut futures = Vec::new();
413
414 for block in chunks {
415 futures.push(async {
416 let mut block_nodes = Vec::new();
417 for id in block {
418 block_nodes.push(scan_node(id, &self.sdo_clients).await);
419 }
420 block_nodes
421 });
422 }
423
424 let results = join_all(futures).await;
426 for r in results {
427 let r: Result<Vec<Option<NodeInfo>>, SdoClientError> = r.into_iter().collect();
428 nodes.extend(r?.into_iter().flatten());
429 }
430
431 let mut node_map = self.nodes.lock().await;
432 for n in &nodes {
434 if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
435 e.insert(n.clone());
436 } else {
437 node_map.get_mut(&n.node_id).unwrap().update(n);
438 }
439 }
440
441 Ok(nodes
446 .iter()
447 .map(|n| node_map.get(&n.node_id).unwrap().clone())
448 .collect())
449 }
450
451 pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
460 let mut devices = Vec::new();
461 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
462
463 lss.set_global_mode(LssState::Waiting).await;
465
466 while let Some(id) = lss.fast_scan(timeout).await {
469 devices.push(id);
470 }
471
472 lss.set_global_mode(LssState::Waiting).await;
473
474 devices
475 }
476
477 pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
486 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
487 lss.set_global_mode(LssState::Waiting).await;
488 lss.enter_config_by_identity(
489 ident.vendor_id,
490 ident.product_code,
491 ident.revision,
492 ident.serial,
493 )
494 .await
495 }
496
497 pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
502 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
503 lss.set_node_id(node_id).await?;
504 Ok(())
505 }
506
507 pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
512 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
513 lss.store_config().await
514 }
515
516 pub async fn lss_set_global_mode(&mut self, mode: LssState) {
518 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
519 lss.set_global_mode(mode).await;
520 }
521
522 pub async fn nmt_reset_app(&mut self, node: u8) {
526 self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
527 }
528
529 pub async fn nmt_reset_comms(&mut self, node: u8) {
533 self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
534 .await
535 }
536
537 pub async fn nmt_start(&mut self, node: u8) {
541 self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
542 }
543
544 pub async fn nmt_stop(&mut self, node: u8) {
548 self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
549 }
550
551 async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
552 let message = NmtCommand { cs: cmd, node };
553 self.sender.send(message.into()).await.ok();
554 }
555
556 pub async fn read_pdo_config(
560 &mut self,
561 node: ConfiguredNodeId,
562 ) -> Result<PdoScanResult, SdoClientError> {
563 let mut client = self.sdo_client(node.raw());
564
565 let tpdos = read_tpdo_config(&mut client).await?;
566 let rpdos = read_rpdo_config(&mut client).await?;
567
568 Ok(PdoScanResult { tpdos, rpdos })
569 }
570}