zencan_client/bus_manager/
bus_manager.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9    RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, NmtState, ZencanMessage};
13use zencan_common::node_id::ConfiguredNodeId;
14use zencan_common::sdo::AbortCode;
15use zencan_common::{
16    node_configuration::PdoConfig,
17    pdo::PdoMapping,
18    traits::{AsyncCanReceiver, AsyncCanSender},
19    CanId, NodeId,
20};
21
22use super::shared_sender::SharedSender;
23use crate::sdo_client::{SdoClient, SdoClientError};
24use crate::{LssError, LssMaster, RawAbortCode};
25
26use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
27
28#[derive(Debug, Clone)]
29pub struct NodeInfo {
30    pub node_id: u8,
31    pub identity: Option<LssIdentity>,
32    pub device_name: Option<String>,
33    pub software_version: Option<String>,
34    pub hardware_version: Option<String>,
35    pub last_seen: Instant,
36    pub nmt_state: Option<NmtState>,
37}
38
39impl core::fmt::Display for NodeInfo {
40    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41        writeln!(
42            f,
43            "Node {}: {}",
44            self.node_id,
45            self.nmt_state
46                .map(|s| s.to_string())
47                .unwrap_or("Unknown State".into())
48        )?;
49        match self.identity {
50            Some(id) => writeln!(
51                f,
52                "    Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
53                id.vendor_id, id.product_code, id.revision, id.serial
54            )?,
55            None => writeln!(f, "    Identity: Unknown")?,
56        }
57        writeln!(
58            f,
59            "    Device Name: '{}'",
60            self.device_name.as_deref().unwrap_or("Unknown")
61        )?;
62        writeln!(
63            f,
64            "    Versions: '{}' SW, '{}' HW",
65            self.software_version.as_deref().unwrap_or("Unknown"),
66            self.hardware_version.as_deref().unwrap_or("Unknown")
67        )?;
68        let age = Instant::now().duration_since(self.last_seen);
69        writeln!(f, "    Last Seen: {}s ago", age.as_secs())?;
70
71        Ok(())
72    }
73}
74
75impl NodeInfo {
76    pub fn new(node_id: u8) -> Self {
77        Self {
78            node_id,
79            last_seen: Instant::now(),
80            device_name: None,
81            identity: None,
82            software_version: None,
83            hardware_version: None,
84            nmt_state: None,
85        }
86    }
87
88    /// Update / merge new information about the node
89    pub fn update(&mut self, info: &NodeInfo) {
90        if info.device_name.is_some() {
91            self.device_name = info.device_name.clone();
92        }
93        if info.identity.is_some() {
94            self.identity = info.identity;
95        }
96        if info.software_version.is_some() {
97            self.software_version = info.software_version.clone();
98        }
99        if info.hardware_version.is_some() {
100            self.hardware_version = info.hardware_version.clone();
101        }
102        if info.nmt_state.is_some() {
103            self.nmt_state = info.nmt_state;
104        }
105        self.last_seen = Instant::now();
106    }
107}
108
109async fn scan_node<S: AsyncCanSender + Sync + Send>(
110    node_id: u8,
111    clients: &SdoClientMutex<S>,
112) -> Result<Option<NodeInfo>, SdoClientError> {
113    let mut sdo_client = clients.lock(node_id);
114    log::info!("Scanning Node {node_id}");
115
116    let identity = match sdo_client.read_identity().await {
117        Ok(id) => Some(id),
118        // A no response here is not really an error, it just indicates the node is not present
119        Err(SdoClientError::NoResponse) => {
120            log::info!("No response from node {node_id}");
121            return Ok(None);
122        }
123        Err(e) => return Err(e),
124    };
125    let device_name = match sdo_client.read_device_name().await {
126        Ok(s) => Some(s),
127        Err(e) => {
128            log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
129            None
130        }
131    };
132    let software_version = match sdo_client.read_software_version().await {
133        Ok(s) => Some(s),
134        Err(e) => {
135            log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
136            None
137        }
138    };
139    let hardware_version = match sdo_client.read_hardware_version().await {
140        Ok(s) => Some(s),
141        Err(e) => {
142            log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
143            None
144        }
145    };
146    Ok(Some(NodeInfo {
147        node_id,
148        identity,
149        device_name,
150        software_version,
151        hardware_version,
152        nmt_state: None,
153        last_seen: Instant::now(),
154    }))
155}
156
157/// Result struct for reading PDO configuration from a single node
158#[derive(Clone, Debug)]
159pub struct PdoScanResult {
160    /// List of TPDO configurations
161    pub tpdos: Vec<PdoConfig>,
162    /// List of RPDO configurations
163    pub rpdos: Vec<PdoConfig>,
164}
165
166async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
167    mut comm_base: u16,
168    mut mapping_base: u16,
169    client: &mut SdoClient<S, R>,
170) -> Result<Vec<PdoConfig>, SdoClientError> {
171    let mut result = Vec::new();
172
173    loop {
174        let _comm_max_sub = match client.read_u8(comm_base, 0).await {
175            Ok(val) => val,
176            // This error is expected; this means there are no more PDOs to read
177            Err(SdoClientError::ServerAbort {
178                index: _,
179                sub: _,
180                abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
181            }) => break,
182            // Any other error is unexpected
183            Err(e) => {
184                return Err(e);
185            }
186        };
187
188        let cob_value = client.read_u32(comm_base, 1).await?;
189        let transmission_type = client.read_u8(comm_base, 2).await?;
190
191        let frame = (cob_value & (1 << 29)) != 0;
192        let rtr_disabled = (cob_value & (1 << 30)) != 0;
193        let enabled = (cob_value & (1 << 31)) == 0;
194
195        let cob_id = cob_value & 0x1FFFFFFF;
196        let cob_id = if frame {
197            CanId::extended(cob_id)
198        } else {
199            CanId::std((cob_id & 0x7ff) as u16)
200        };
201        let num_mappings = client.read_u8(mapping_base, 0).await?;
202        let mut mappings = Vec::new();
203        for i in 0..num_mappings {
204            let map_param = client.read_u32(mapping_base, i + 1).await?;
205            mappings.push(PdoMapping::from_object_value(map_param));
206        }
207
208        result.push(PdoConfig {
209            cob_id,
210            enabled,
211            rtr_disabled,
212            mappings,
213            transmission_type,
214        });
215        comm_base += 1;
216        mapping_base += 1;
217    }
218    Ok(result)
219}
220
221async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
222    client: &mut SdoClient<S, R>,
223) -> Result<Vec<PdoConfig>, SdoClientError> {
224    read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
225}
226
227async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
228    client: &mut SdoClient<S, R>,
229) -> Result<Vec<PdoConfig>, SdoClientError> {
230    read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
231}
232
233#[derive(Debug)]
234pub struct SdoClientGuard<'a, S, R>
235where
236    S: AsyncCanSender,
237    R: AsyncCanReceiver,
238{
239    _guard: std::sync::MutexGuard<'a, ()>,
240    client: SdoClient<S, R>,
241}
242
243impl<S, R> Deref for SdoClientGuard<'_, S, R>
244where
245    S: AsyncCanSender,
246    R: AsyncCanReceiver,
247{
248    type Target = SdoClient<S, R>;
249
250    fn deref(&self) -> &Self::Target {
251        &self.client
252    }
253}
254
255impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
256where
257    S: AsyncCanSender,
258    R: AsyncCanReceiver,
259{
260    fn deref_mut(&mut self) -> &mut Self::Target {
261        &mut self.client
262    }
263}
264
265#[derive(Debug)]
266struct SdoClientMutex<S>
267where
268    S: AsyncCanSender + Sync,
269{
270    sender: SharedSender<S>,
271    receiver: SharedReceiver,
272    clients: HashMap<u8, Mutex<()>>,
273}
274
275impl<S> SdoClientMutex<S>
276where
277    S: AsyncCanSender + Sync,
278{
279    pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
280        let mut clients = HashMap::new();
281        for i in 0u8..128 {
282            clients.insert(i, Mutex::new(()));
283        }
284
285        Self {
286            sender,
287            receiver,
288            clients,
289        }
290    }
291
292    pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
293        if !(1..=127).contains(&id) {
294            panic!("ID {} out of range", id);
295        }
296        let guard = self.clients.get(&id).unwrap().lock().unwrap();
297        let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
298        SdoClientGuard {
299            _guard: guard,
300            client,
301        }
302    }
303}
304
305/// Manage a zencan bus
306#[derive(Debug)]
307pub struct BusManager<S: AsyncCanSender + Sync + Send> {
308    sender: SharedSender<S>,
309    receiver: SharedReceiver,
310    nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
311    sdo_clients: SdoClientMutex<S>,
312    _monitor_task: JoinHandle<()>,
313}
314
315impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
316    /// Create a new bus manager
317    ///
318    /// # Arguments
319    /// - `sender`: An object which implements [`AsyncCanSender`] to be used for sending messages to
320    ///   the bus
321    /// - `receiver`: An object which implements [`AsyncCanReceiver`] to be used for receiving
322    ///   messages from the bus
323    ///
324    /// When using socketcan, these can be created with [`crate::open_socketcan`]
325    pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
326        let receiver = SharedReceiver::new(receiver);
327        let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
328        let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
329
330        let mut state_rx = receiver.create_rx();
331        let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
332
333        let monitor_task = {
334            let nodes = nodes.clone();
335            tokio::spawn(async move {
336                loop {
337                    if let Ok(msg) = state_rx.recv().await {
338                        if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
339                            ZencanMessage::try_from(msg)
340                        {
341                            let id_num = heartbeat.node;
342                            if let Ok(node_id) = NodeId::try_from(id_num) {
343                                let mut nodes = nodes.lock().await;
344                                if let std::collections::hash_map::Entry::Vacant(e) =
345                                    nodes.entry(id_num)
346                                {
347                                    e.insert(NodeInfo::new(node_id.raw()));
348                                } else {
349                                    let node = nodes.get_mut(&id_num).unwrap();
350                                    node.nmt_state = Some(heartbeat.state);
351                                    node.last_seen = Instant::now();
352                                }
353                            } else {
354                                log::warn!("Invalid heartbeat node ID {id_num} received");
355                            }
356                        }
357                    }
358                }
359            })
360        };
361
362        Self {
363            sender,
364            receiver,
365            sdo_clients,
366            nodes,
367            _monitor_task: monitor_task,
368        }
369    }
370
371    /// Get an SDO client for a particular node
372    ///
373    /// This function may block if another task is using the required SDO client, as it ensures
374    /// exclusive access to each node's SDO server.
375    pub fn sdo_client(
376        &self,
377        node_id: u8,
378    ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
379        self.sdo_clients.lock(node_id)
380    }
381
382    /// Get a list of known nodes
383    pub async fn node_list(&self) -> Vec<NodeInfo> {
384        let node_map = self.nodes.lock().await;
385        let mut nodes = Vec::with_capacity(node_map.len());
386        for n in node_map.values() {
387            nodes.push(n.clone());
388        }
389
390        nodes.sort_by_key(|n| n.node_id);
391        nodes
392    }
393
394    /// Perform a scan of all possible node IDs
395    ///
396    /// Will find all configured devices, and read metadata from required objects, including:
397    /// - Identity
398    /// - Device Name
399    /// - Software Version
400    /// - Hardware Version
401    pub async fn scan_nodes(&mut self) -> Result<Vec<NodeInfo>, SdoClientError> {
402        const N_PARALLEL: usize = 10;
403
404        let ids = Vec::from_iter(1..128u8);
405        let mut nodes: Vec<NodeInfo> = Vec::new();
406
407        let mut chunks = Vec::new();
408        for chunk in ids.chunks(128 / N_PARALLEL) {
409            chunks.push(Vec::from_iter(chunk.iter().cloned()));
410        }
411
412        let mut futures = Vec::new();
413
414        for block in chunks {
415            futures.push(async {
416                let mut block_nodes = Vec::new();
417                for id in block {
418                    block_nodes.push(scan_node(id, &self.sdo_clients).await);
419                }
420                block_nodes
421            });
422        }
423
424        // Collect the results from batches. If any errors occurred, fail.
425        let results = join_all(futures).await;
426        for r in results {
427            let r: Result<Vec<Option<NodeInfo>>, SdoClientError> = r.into_iter().collect();
428            nodes.extend(r?.into_iter().flatten());
429        }
430
431        let mut node_map = self.nodes.lock().await;
432        // Update our nodes
433        for n in &nodes {
434            if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
435                e.insert(n.clone());
436            } else {
437                node_map.get_mut(&n.node_id).unwrap().update(n);
438            }
439        }
440
441        // Pull the just scanned nodes from the collection so that
442        // 1) We only included nodes which responded just now to the scan, but
443        // 2) we also display the latest NMT state for that node, which comes from the heartbeat
444        //    rather than the scan
445        Ok(nodes
446            .iter()
447            .map(|n| node_map.get(&n.node_id).unwrap().clone())
448            .collect())
449    }
450
451    /// Find all unconfigured devices on the bus
452    ///
453    /// The LSS fastscan protocol is used to identify devices which do not have an assigned node ID.
454    ///
455    /// Devices that do have a node ID can be found using [`scan_nodes`](Self::scan_nodes), or by
456    /// their heartbeat messages.
457    ///
458    /// After devices are found, they are all put back into waiting state
459    pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
460        let mut devices = Vec::new();
461        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
462
463        // Put all nodes into Waiting state
464        lss.set_global_mode(LssState::Waiting).await;
465
466        // Each time a device is completely identified, it goes into Configuring mode and will not
467        // respond to further scans. Once all devices are identified, the scan will return None.
468        while let Some(id) = lss.fast_scan(timeout).await {
469            devices.push(id);
470        }
471
472        lss.set_global_mode(LssState::Waiting).await;
473
474        devices
475    }
476
477    /// Activate a single LSS slave by its identity
478    ///
479    /// All nodes are put into Waiting mode via the global command, then the specified node is
480    /// activates. Will return `Ok(())` if the activated node acknowledges, or an Err otherwise.
481    ///
482    /// The identity consists of the four u32 values from the 0x1018 object, which should uniquely
483    /// identify a device on the bus. If they are not known, they can be found using
484    /// [`lss_fastscan()`](Self::lss_fastscan).
485    pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
486        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
487        lss.set_global_mode(LssState::Waiting).await;
488        lss.enter_config_by_identity(
489            ident.vendor_id,
490            ident.product_code,
491            ident.revision,
492            ident.serial,
493        )
494        .await
495    }
496
497    /// Set the node ID of LSS slave in Configuration mode
498    ///
499    /// It is required that one node has been put into Configuration mode already when this is
500    /// called, e.g. using [`lss_activate`](Self::lss_activate)
501    pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
502        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
503        lss.set_node_id(node_id).await?;
504        Ok(())
505    }
506
507    /// Command the node in Configuration mode to store its configuration
508    ///
509    /// It is required that one node has been put into Configuration mode already when this is
510    /// called, e.g. using [`lss_activate`](Self::lss_activate)
511    pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
512        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
513        lss.store_config().await
514    }
515
516    /// Send a command to put all devices into the specified LSS state
517    pub async fn lss_set_global_mode(&mut self, mode: LssState) {
518        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
519        lss.set_global_mode(mode).await;
520    }
521
522    /// Send application reset command
523    ///
524    /// node - The node ID to command, or 0 to broadcast to all nodes
525    pub async fn nmt_reset_app(&mut self, node: u8) {
526        self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
527    }
528
529    /// Send communications reset command
530    ///
531    /// node - The node ID to command, or 0 to broadcast to all nodes
532    pub async fn nmt_reset_comms(&mut self, node: u8) {
533        self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
534            .await
535    }
536
537    /// Send start operation command
538    ///
539    /// node - The node ID to command, or 0 to broadcast to all nodes
540    pub async fn nmt_start(&mut self, node: u8) {
541        self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
542    }
543
544    /// Send start operation command
545    ///
546    /// node - The node ID to command, or 0 to broadcast to all nodes
547    pub async fn nmt_stop(&mut self, node: u8) {
548        self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
549    }
550
551    async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
552        let message = NmtCommand { cs: cmd, node };
553        self.sender.send(message.into()).await.ok();
554    }
555
556    /// Read the RPDO and TPDO configuration for the specified node
557    ///
558    /// node - The node ID to read from
559    pub async fn read_pdo_config(
560        &mut self,
561        node: ConfiguredNodeId,
562    ) -> Result<PdoScanResult, SdoClientError> {
563        let mut client = self.sdo_client(node.raw());
564
565        let tpdos = read_tpdo_config(&mut client).await?;
566        let rpdos = read_rpdo_config(&mut client).await?;
567
568        Ok(PdoScanResult { tpdos, rpdos })
569    }
570}