1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9 RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, NmtState, ZencanMessage};
13use zencan_common::node_id::ConfiguredNodeId;
14use zencan_common::sdo::AbortCode;
15use zencan_common::CanId;
16use zencan_common::{
17 traits::{AsyncCanReceiver, AsyncCanSender},
18 NodeId,
19};
20
21use super::shared_sender::SharedSender;
22use crate::sdo_client::{SdoClient, SdoClientError};
23use crate::{LssError, LssMaster, PdoConfig, PdoMapping, RawAbortCode};
24
25use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
26
27#[derive(Debug, Clone)]
28pub struct NodeInfo {
29 pub node_id: u8,
30 pub identity: Option<LssIdentity>,
31 pub device_name: Option<String>,
32 pub software_version: Option<String>,
33 pub hardware_version: Option<String>,
34 pub last_seen: Instant,
35 pub nmt_state: Option<NmtState>,
36}
37
38impl core::fmt::Display for NodeInfo {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 writeln!(
41 f,
42 "Node {}: {}",
43 self.node_id,
44 self.nmt_state
45 .map(|s| s.to_string())
46 .unwrap_or("Unknown State".into())
47 )?;
48 match self.identity {
49 Some(id) => writeln!(
50 f,
51 " Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
52 id.vendor_id, id.product_code, id.revision, id.serial
53 )?,
54 None => writeln!(f, " Identity: Unknown")?,
55 }
56 writeln!(
57 f,
58 " Device Name: '{}'",
59 self.device_name.as_deref().unwrap_or("Unknown")
60 )?;
61 writeln!(
62 f,
63 " Versions: '{}' SW, '{}' HW",
64 self.software_version.as_deref().unwrap_or("Unknown"),
65 self.hardware_version.as_deref().unwrap_or("Unknown")
66 )?;
67 let age = Instant::now().duration_since(self.last_seen);
68 writeln!(f, " Last Seen: {}s ago", age.as_secs())?;
69
70 Ok(())
71 }
72}
73
74impl NodeInfo {
75 pub fn new(node_id: u8) -> Self {
76 Self {
77 node_id,
78 last_seen: Instant::now(),
79 device_name: None,
80 identity: None,
81 software_version: None,
82 hardware_version: None,
83 nmt_state: None,
84 }
85 }
86
87 pub fn update(&mut self, info: &NodeInfo) {
89 if info.device_name.is_some() {
90 self.device_name = info.device_name.clone();
91 }
92 if info.identity.is_some() {
93 self.identity = info.identity;
94 }
95 if info.software_version.is_some() {
96 self.software_version = info.software_version.clone();
97 }
98 if info.hardware_version.is_some() {
99 self.hardware_version = info.hardware_version.clone();
100 }
101 if info.nmt_state.is_some() {
102 self.nmt_state = info.nmt_state;
103 }
104 self.last_seen = Instant::now();
105 }
106}
107
108async fn scan_node<S: AsyncCanSender + Sync + Send>(
109 node_id: u8,
110 clients: &SdoClientMutex<S>,
111) -> Option<NodeInfo> {
112 let mut sdo_client = clients.lock(node_id);
113 log::info!("Scanning Node {node_id}");
114 let identity = match sdo_client.read_identity().await {
115 Ok(id) => Some(id),
116 Err(SdoClientError::NoResponse) => {
117 log::info!("No response from node {node_id}");
118 return None;
119 }
120 Err(e) => {
121 log::error!("SDO Abort Response scanning node {node_id} identity: {e:?}");
124 None
125 }
126 };
127 let device_name = match sdo_client.read_device_name().await {
128 Ok(s) => Some(s),
129 Err(e) => {
130 log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
131 None
132 }
133 };
134 let software_version = match sdo_client.read_software_version().await {
135 Ok(s) => Some(s),
136 Err(e) => {
137 log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
138 None
139 }
140 };
141 let hardware_version = match sdo_client.read_hardware_version().await {
142 Ok(s) => Some(s),
143 Err(e) => {
144 log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
145 None
146 }
147 };
148 Some(NodeInfo {
149 node_id,
150 identity,
151 device_name,
152 software_version,
153 hardware_version,
154 nmt_state: None,
155 last_seen: Instant::now(),
156 })
157}
158
159#[derive(Clone, Debug)]
161pub struct PdoScanResult {
162 pub tpdos: Vec<PdoConfig>,
164 pub rpdos: Vec<PdoConfig>,
166}
167
168async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
169 mut comm_base: u16,
170 mut mapping_base: u16,
171 client: &mut SdoClient<S, R>,
172) -> Result<Vec<PdoConfig>, SdoClientError> {
173 let mut result = Vec::new();
174
175 loop {
176 let _comm_max_sub = match client.read_u8(comm_base, 0).await {
177 Ok(val) => val,
178 Err(SdoClientError::ServerAbort {
180 index: _,
181 sub: _,
182 abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
183 }) => break,
184 Err(e) => {
186 return Err(e);
187 }
188 };
189
190 let cob_value = client.read_u32(comm_base, 1).await?;
191 let transmission_type = client.read_u8(comm_base, 2).await?;
192
193 let frame = (cob_value & (1 << 29)) != 0;
194 let enabled = (cob_value & (1 << 31)) == 0;
195 let cob_id = cob_value & 0x1FFFFFFF;
196 let cob = if frame {
197 CanId::extended(cob_id)
198 } else {
199 CanId::std((cob_id & 0x7ff) as u16)
200 };
201 let num_mappings = client.read_u8(mapping_base, 0).await?;
202 let mut mappings = Vec::new();
203 for i in 0..num_mappings {
204 let map_param = client.read_u32(mapping_base, i + 1).await?;
205 mappings.push(PdoMapping::from_object_value(map_param));
206 }
207
208 result.push(PdoConfig {
209 cob,
210 enabled,
211 mappings,
212 transmission_type,
213 });
214 comm_base += 1;
215 mapping_base += 1;
216 }
217 Ok(result)
218}
219
220async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
221 client: &mut SdoClient<S, R>,
222) -> Result<Vec<PdoConfig>, SdoClientError> {
223 read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
224}
225
226async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
227 client: &mut SdoClient<S, R>,
228) -> Result<Vec<PdoConfig>, SdoClientError> {
229 read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
230}
231
232#[derive(Debug)]
233pub struct SdoClientGuard<'a, S, R>
234where
235 S: AsyncCanSender,
236 R: AsyncCanReceiver,
237{
238 _guard: std::sync::MutexGuard<'a, ()>,
239 client: SdoClient<S, R>,
240}
241
242impl<S, R> Deref for SdoClientGuard<'_, S, R>
243where
244 S: AsyncCanSender,
245 R: AsyncCanReceiver,
246{
247 type Target = SdoClient<S, R>;
248
249 fn deref(&self) -> &Self::Target {
250 &self.client
251 }
252}
253
254impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
255where
256 S: AsyncCanSender,
257 R: AsyncCanReceiver,
258{
259 fn deref_mut(&mut self) -> &mut Self::Target {
260 &mut self.client
261 }
262}
263
264#[derive(Debug)]
265struct SdoClientMutex<S>
266where
267 S: AsyncCanSender + Sync,
268{
269 sender: SharedSender<S>,
270 receiver: SharedReceiver,
271 clients: HashMap<u8, Mutex<()>>,
272}
273
274impl<S> SdoClientMutex<S>
275where
276 S: AsyncCanSender + Sync,
277{
278 pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
279 let mut clients = HashMap::new();
280 for i in 0u8..128 {
281 clients.insert(i, Mutex::new(()));
282 }
283
284 Self {
285 sender,
286 receiver,
287 clients,
288 }
289 }
290
291 pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
292 if !(1..=127).contains(&id) {
293 panic!("ID {} out of range", id);
294 }
295 let guard = self.clients.get(&id).unwrap().lock().unwrap();
296 let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
297 SdoClientGuard {
298 _guard: guard,
299 client,
300 }
301 }
302}
303
304#[derive(Debug)]
306pub struct BusManager<S: AsyncCanSender + Sync + Send> {
307 sender: SharedSender<S>,
308 receiver: SharedReceiver,
309 nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
310 sdo_clients: SdoClientMutex<S>,
311 _monitor_task: JoinHandle<()>,
312}
313
314impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
315 pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
325 let receiver = SharedReceiver::new(receiver);
326 let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
327 let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
328
329 let mut state_rx = receiver.create_rx();
330 let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
331
332 let monitor_task = {
333 let nodes = nodes.clone();
334 tokio::spawn(async move {
335 loop {
336 if let Ok(msg) = state_rx.recv().await {
337 if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
338 ZencanMessage::try_from(msg)
339 {
340 let id_num = heartbeat.node;
341 if let Ok(node_id) = NodeId::try_from(id_num) {
342 let mut nodes = nodes.lock().await;
343 if let std::collections::hash_map::Entry::Vacant(e) =
344 nodes.entry(id_num)
345 {
346 e.insert(NodeInfo::new(node_id.raw()));
347 } else {
348 let node = nodes.get_mut(&id_num).unwrap();
349 node.nmt_state = Some(heartbeat.state);
350 node.last_seen = Instant::now();
351 }
352 } else {
353 log::warn!("Invalid heartbeat node ID {id_num} received");
354 }
355 }
356 }
357 }
358 })
359 };
360
361 Self {
362 sender,
363 receiver,
364 sdo_clients,
365 nodes,
366 _monitor_task: monitor_task,
367 }
368 }
369
370 pub fn sdo_client(
375 &self,
376 node_id: u8,
377 ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
378 self.sdo_clients.lock(node_id)
379 }
380
381 pub async fn node_list(&self) -> Vec<NodeInfo> {
383 let node_map = self.nodes.lock().await;
384 let mut nodes = Vec::with_capacity(node_map.len());
385 for n in node_map.values() {
386 nodes.push(n.clone());
387 }
388
389 nodes.sort_by_key(|n| n.node_id);
390 nodes
391 }
392
393 pub async fn scan_nodes(&mut self) -> Vec<NodeInfo> {
401 const N_PARALLEL: usize = 10;
402
403 let ids = Vec::from_iter(1..128u8);
404 let mut nodes = Vec::new();
405
406 let mut chunks = Vec::new();
407 for chunk in ids.chunks(128 / N_PARALLEL) {
408 chunks.push(Vec::from_iter(chunk.iter().cloned()));
409 }
410
411 let mut futures = Vec::new();
412
413 for block in chunks {
414 futures.push(async {
415 let mut block_nodes = Vec::new();
416 for id in block {
417 block_nodes.push(scan_node(id, &self.sdo_clients).await);
418 }
419 block_nodes
420 });
421 }
422
423 let results = join_all(futures).await;
424 for r in results {
425 nodes.extend(r.into_iter().flatten());
426 }
427
428 let mut node_map = self.nodes.lock().await;
429 for n in &nodes {
431 if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
432 e.insert(n.clone());
433 } else {
434 node_map.get_mut(&n.node_id).unwrap().update(n);
435 }
436 }
437
438 nodes
443 .iter()
444 .map(|n| node_map.get(&n.node_id).unwrap().clone())
445 .collect()
446 }
447
448 pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
457 let mut devices = Vec::new();
458 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
459
460 lss.set_global_mode(LssState::Waiting).await;
462
463 while let Some(id) = lss.fast_scan(timeout).await {
466 devices.push(id);
467 }
468
469 lss.set_global_mode(LssState::Waiting).await;
470
471 devices
472 }
473
474 pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
483 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
484 lss.set_global_mode(LssState::Waiting).await;
485 lss.enter_config_by_identity(
486 ident.vendor_id,
487 ident.product_code,
488 ident.revision,
489 ident.serial,
490 )
491 .await
492 }
493
494 pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
499 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
500 lss.set_node_id(node_id).await?;
501 Ok(())
502 }
503
504 pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
509 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
510 lss.store_config().await
511 }
512
513 pub async fn lss_set_global_mode(&mut self, mode: LssState) {
515 let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
516 lss.set_global_mode(mode).await;
517 }
518
519 pub async fn nmt_reset_app(&mut self, node: u8) {
523 self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
524 }
525
526 pub async fn nmt_reset_comms(&mut self, node: u8) {
530 self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
531 .await
532 }
533
534 pub async fn nmt_start(&mut self, node: u8) {
538 self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
539 }
540
541 pub async fn nmt_stop(&mut self, node: u8) {
545 self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
546 }
547
548 async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
549 let message = NmtCommand { cs: cmd, node };
550 self.sender.send(message.into()).await.ok();
551 }
552
553 pub async fn read_pdo_config(
557 &mut self,
558 node: ConfiguredNodeId,
559 ) -> Result<PdoScanResult, SdoClientError> {
560 let mut client = self.sdo_client(node.raw());
561
562 let tpdos = read_tpdo_config(&mut client).await?;
563 let rpdos = read_rpdo_config(&mut client).await?;
564
565 Ok(PdoScanResult { tpdos, rpdos })
566 }
567}