zencan_client/bus_manager/
bus_manager.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9    RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, NmtState, ZencanMessage};
13use zencan_common::node_id::ConfiguredNodeId;
14use zencan_common::sdo::AbortCode;
15use zencan_common::CanId;
16use zencan_common::{
17    traits::{AsyncCanReceiver, AsyncCanSender},
18    NodeId,
19};
20
21use super::shared_sender::SharedSender;
22use crate::sdo_client::{SdoClient, SdoClientError};
23use crate::{LssError, LssMaster, PdoConfig, PdoMapping, RawAbortCode};
24
25use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
26
27#[derive(Debug, Clone)]
28pub struct NodeInfo {
29    pub node_id: u8,
30    pub identity: Option<LssIdentity>,
31    pub device_name: Option<String>,
32    pub software_version: Option<String>,
33    pub hardware_version: Option<String>,
34    pub last_seen: Instant,
35    pub nmt_state: Option<NmtState>,
36}
37
38impl core::fmt::Display for NodeInfo {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        writeln!(
41            f,
42            "Node {}: {}",
43            self.node_id,
44            self.nmt_state
45                .map(|s| s.to_string())
46                .unwrap_or("Unknown State".into())
47        )?;
48        match self.identity {
49            Some(id) => writeln!(
50                f,
51                "    Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
52                id.vendor_id, id.product_code, id.revision, id.serial
53            )?,
54            None => writeln!(f, "    Identity: Unknown")?,
55        }
56        writeln!(
57            f,
58            "    Device Name: '{}'",
59            self.device_name.as_deref().unwrap_or("Unknown")
60        )?;
61        writeln!(
62            f,
63            "    Versions: '{}' SW, '{}' HW",
64            self.software_version.as_deref().unwrap_or("Unknown"),
65            self.hardware_version.as_deref().unwrap_or("Unknown")
66        )?;
67        let age = Instant::now().duration_since(self.last_seen);
68        writeln!(f, "    Last Seen: {}s ago", age.as_secs())?;
69
70        Ok(())
71    }
72}
73
74impl NodeInfo {
75    pub fn new(node_id: u8) -> Self {
76        Self {
77            node_id,
78            last_seen: Instant::now(),
79            device_name: None,
80            identity: None,
81            software_version: None,
82            hardware_version: None,
83            nmt_state: None,
84        }
85    }
86
87    /// Update / merge new information about the node
88    pub fn update(&mut self, info: &NodeInfo) {
89        if info.device_name.is_some() {
90            self.device_name = info.device_name.clone();
91        }
92        if info.identity.is_some() {
93            self.identity = info.identity;
94        }
95        if info.software_version.is_some() {
96            self.software_version = info.software_version.clone();
97        }
98        if info.hardware_version.is_some() {
99            self.hardware_version = info.hardware_version.clone();
100        }
101        if info.nmt_state.is_some() {
102            self.nmt_state = info.nmt_state;
103        }
104        self.last_seen = Instant::now();
105    }
106}
107
108async fn scan_node<S: AsyncCanSender + Sync + Send>(
109    node_id: u8,
110    clients: &SdoClientMutex<S>,
111) -> Option<NodeInfo> {
112    let mut sdo_client = clients.lock(node_id);
113    log::info!("Scanning Node {node_id}");
114    let identity = match sdo_client.read_identity().await {
115        Ok(id) => Some(id),
116        Err(SdoClientError::NoResponse) => {
117            log::info!("No response from node {node_id}");
118            return None;
119        }
120        Err(e) => {
121            // A server responded, but we failed to read identity. An unexpected situation, as all
122            // nodes should implement the identity object
123            log::error!("SDO Abort Response scanning node {node_id} identity: {e:?}");
124            None
125        }
126    };
127    let device_name = match sdo_client.read_device_name().await {
128        Ok(s) => Some(s),
129        Err(e) => {
130            log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
131            None
132        }
133    };
134    let software_version = match sdo_client.read_software_version().await {
135        Ok(s) => Some(s),
136        Err(e) => {
137            log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
138            None
139        }
140    };
141    let hardware_version = match sdo_client.read_hardware_version().await {
142        Ok(s) => Some(s),
143        Err(e) => {
144            log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
145            None
146        }
147    };
148    Some(NodeInfo {
149        node_id,
150        identity,
151        device_name,
152        software_version,
153        hardware_version,
154        nmt_state: None,
155        last_seen: Instant::now(),
156    })
157}
158
159/// Result struct for reading PDO configuration from a single node
160#[derive(Clone, Debug)]
161pub struct PdoScanResult {
162    /// List of TPDO configurations
163    pub tpdos: Vec<PdoConfig>,
164    /// List of RPDO configurations
165    pub rpdos: Vec<PdoConfig>,
166}
167
168async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
169    mut comm_base: u16,
170    mut mapping_base: u16,
171    client: &mut SdoClient<S, R>,
172) -> Result<Vec<PdoConfig>, SdoClientError> {
173    let mut result = Vec::new();
174
175    loop {
176        let _comm_max_sub = match client.read_u8(comm_base, 0).await {
177            Ok(val) => val,
178            // This error is expected; this means there are no more PDOs to read
179            Err(SdoClientError::ServerAbort {
180                index: _,
181                sub: _,
182                abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
183            }) => break,
184            // Any other error is unexpected
185            Err(e) => {
186                return Err(e);
187            }
188        };
189
190        let cob_value = client.read_u32(comm_base, 1).await?;
191        let transmission_type = client.read_u8(comm_base, 2).await?;
192
193        let frame = (cob_value & (1 << 29)) != 0;
194        let enabled = (cob_value & (1 << 31)) == 0;
195        let cob_id = cob_value & 0x1FFFFFFF;
196        let cob = if frame {
197            CanId::extended(cob_id)
198        } else {
199            CanId::std((cob_id & 0x7ff) as u16)
200        };
201        let num_mappings = client.read_u8(mapping_base, 0).await?;
202        let mut mappings = Vec::new();
203        for i in 0..num_mappings {
204            let map_param = client.read_u32(mapping_base, i + 1).await?;
205            mappings.push(PdoMapping::from_object_value(map_param));
206        }
207
208        result.push(PdoConfig {
209            cob,
210            enabled,
211            mappings,
212            transmission_type,
213        });
214        comm_base += 1;
215        mapping_base += 1;
216    }
217    Ok(result)
218}
219
220async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
221    client: &mut SdoClient<S, R>,
222) -> Result<Vec<PdoConfig>, SdoClientError> {
223    read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
224}
225
226async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
227    client: &mut SdoClient<S, R>,
228) -> Result<Vec<PdoConfig>, SdoClientError> {
229    read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
230}
231
232#[derive(Debug)]
233pub struct SdoClientGuard<'a, S, R>
234where
235    S: AsyncCanSender,
236    R: AsyncCanReceiver,
237{
238    _guard: std::sync::MutexGuard<'a, ()>,
239    client: SdoClient<S, R>,
240}
241
242impl<S, R> Deref for SdoClientGuard<'_, S, R>
243where
244    S: AsyncCanSender,
245    R: AsyncCanReceiver,
246{
247    type Target = SdoClient<S, R>;
248
249    fn deref(&self) -> &Self::Target {
250        &self.client
251    }
252}
253
254impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
255where
256    S: AsyncCanSender,
257    R: AsyncCanReceiver,
258{
259    fn deref_mut(&mut self) -> &mut Self::Target {
260        &mut self.client
261    }
262}
263
264#[derive(Debug)]
265struct SdoClientMutex<S>
266where
267    S: AsyncCanSender + Sync,
268{
269    sender: SharedSender<S>,
270    receiver: SharedReceiver,
271    clients: HashMap<u8, Mutex<()>>,
272}
273
274impl<S> SdoClientMutex<S>
275where
276    S: AsyncCanSender + Sync,
277{
278    pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
279        let mut clients = HashMap::new();
280        for i in 0u8..128 {
281            clients.insert(i, Mutex::new(()));
282        }
283
284        Self {
285            sender,
286            receiver,
287            clients,
288        }
289    }
290
291    pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
292        if !(1..=127).contains(&id) {
293            panic!("ID {} out of range", id);
294        }
295        let guard = self.clients.get(&id).unwrap().lock().unwrap();
296        let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
297        SdoClientGuard {
298            _guard: guard,
299            client,
300        }
301    }
302}
303
304/// Manage a zencan bus
305#[derive(Debug)]
306pub struct BusManager<S: AsyncCanSender + Sync + Send> {
307    sender: SharedSender<S>,
308    receiver: SharedReceiver,
309    nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
310    sdo_clients: SdoClientMutex<S>,
311    _monitor_task: JoinHandle<()>,
312}
313
314impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
315    /// Create a new bus manager
316    ///
317    /// # Arguments
318    /// - `sender`: An object which implements [`AsyncCanSender`] to be used for sending messages to
319    ///   the bus
320    /// - `receiver`: An object which implements [`AsyncCanReceiver`] to be used for receiving
321    ///   messages from the bus
322    ///
323    /// When using socketcan, these can be created with [`crate::open_socketcan`]
324    pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
325        let receiver = SharedReceiver::new(receiver);
326        let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
327        let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
328
329        let mut state_rx = receiver.create_rx();
330        let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
331
332        let monitor_task = {
333            let nodes = nodes.clone();
334            tokio::spawn(async move {
335                loop {
336                    if let Ok(msg) = state_rx.recv().await {
337                        if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
338                            ZencanMessage::try_from(msg)
339                        {
340                            let id_num = heartbeat.node;
341                            if let Ok(node_id) = NodeId::try_from(id_num) {
342                                let mut nodes = nodes.lock().await;
343                                if let std::collections::hash_map::Entry::Vacant(e) =
344                                    nodes.entry(id_num)
345                                {
346                                    e.insert(NodeInfo::new(node_id.raw()));
347                                } else {
348                                    let node = nodes.get_mut(&id_num).unwrap();
349                                    node.nmt_state = Some(heartbeat.state);
350                                    node.last_seen = Instant::now();
351                                }
352                            } else {
353                                log::warn!("Invalid heartbeat node ID {id_num} received");
354                            }
355                        }
356                    }
357                }
358            })
359        };
360
361        Self {
362            sender,
363            receiver,
364            sdo_clients,
365            nodes,
366            _monitor_task: monitor_task,
367        }
368    }
369
370    /// Get an SDO client for a particular node
371    ///
372    /// This function may block if another task is using the required SDO client, as it ensures
373    /// exclusive access to each node's SDO server.
374    pub fn sdo_client(
375        &self,
376        node_id: u8,
377    ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
378        self.sdo_clients.lock(node_id)
379    }
380
381    /// Get a list of known nodes
382    pub async fn node_list(&self) -> Vec<NodeInfo> {
383        let node_map = self.nodes.lock().await;
384        let mut nodes = Vec::with_capacity(node_map.len());
385        for n in node_map.values() {
386            nodes.push(n.clone());
387        }
388
389        nodes.sort_by_key(|n| n.node_id);
390        nodes
391    }
392
393    /// Perform a scan of all possible node IDs
394    ///
395    /// Will find all configured devices, and read metadata from required objects, including:
396    /// - Identity
397    /// - Device Name
398    /// - Software Version
399    /// - Hardware Version
400    pub async fn scan_nodes(&mut self) -> Vec<NodeInfo> {
401        const N_PARALLEL: usize = 10;
402
403        let ids = Vec::from_iter(1..128u8);
404        let mut nodes = Vec::new();
405
406        let mut chunks = Vec::new();
407        for chunk in ids.chunks(128 / N_PARALLEL) {
408            chunks.push(Vec::from_iter(chunk.iter().cloned()));
409        }
410
411        let mut futures = Vec::new();
412
413        for block in chunks {
414            futures.push(async {
415                let mut block_nodes = Vec::new();
416                for id in block {
417                    block_nodes.push(scan_node(id, &self.sdo_clients).await);
418                }
419                block_nodes
420            });
421        }
422
423        let results = join_all(futures).await;
424        for r in results {
425            nodes.extend(r.into_iter().flatten());
426        }
427
428        let mut node_map = self.nodes.lock().await;
429        // Update our nodes
430        for n in &nodes {
431            if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
432                e.insert(n.clone());
433            } else {
434                node_map.get_mut(&n.node_id).unwrap().update(n);
435            }
436        }
437
438        // Pull the just scanned nodes from the collection so that
439        // 1) We only included nodes which responded just now to the scan, but
440        // 2) we also display the latest NMT state for that node, which comes from the heartbeat
441        //    rather than the scan
442        nodes
443            .iter()
444            .map(|n| node_map.get(&n.node_id).unwrap().clone())
445            .collect()
446    }
447
448    /// Find all unconfigured devices on the bus
449    ///
450    /// The LSS fastscan protocol is used to identify devices which do not have an assigned node ID.
451    ///
452    /// Devices that do have a node ID can be found using [`scan_nodes`](Self::scan_nodes), or by
453    /// their heartbeat messages.
454    ///
455    /// After devices are found, they are all put back into waiting state
456    pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
457        let mut devices = Vec::new();
458        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
459
460        // Put all nodes into Waiting state
461        lss.set_global_mode(LssState::Waiting).await;
462
463        // Each time a device is completely identified, it goes into Configuring mode and will not
464        // respond to further scans. Once all devices are identified, the scan will return None.
465        while let Some(id) = lss.fast_scan(timeout).await {
466            devices.push(id);
467        }
468
469        lss.set_global_mode(LssState::Waiting).await;
470
471        devices
472    }
473
474    /// Activate a single LSS slave by its identity
475    ///
476    /// All nodes are put into Waiting mode via the global command, then the specified node is
477    /// activates. Will return `Ok(())` if the activated node acknowledges, or an Err otherwise.
478    ///
479    /// The identity consists of the four u32 values from the 0x1018 object, which should uniquely
480    /// identify a device on the bus. If they are not known, they can be found using
481    /// [`lss_fastscan()`](Self::lss_fastscan).
482    pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
483        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
484        lss.set_global_mode(LssState::Waiting).await;
485        lss.enter_config_by_identity(
486            ident.vendor_id,
487            ident.product_code,
488            ident.revision,
489            ident.serial,
490        )
491        .await
492    }
493
494    /// Set the node ID of LSS slave in Configuration mode
495    ///
496    /// It is required that one node has been put into Configuration mode already when this is
497    /// called, e.g. using [`lss_activate`](Self::lss_activate)
498    pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
499        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
500        lss.set_node_id(node_id).await?;
501        Ok(())
502    }
503
504    /// Command the node in Configuration mode to store its configuration
505    ///
506    /// It is required that one node has been put into Configuration mode already when this is
507    /// called, e.g. using [`lss_activate`](Self::lss_activate)
508    pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
509        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
510        lss.store_config().await
511    }
512
513    /// Send a command to put all devices into the specified LSS state
514    pub async fn lss_set_global_mode(&mut self, mode: LssState) {
515        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
516        lss.set_global_mode(mode).await;
517    }
518
519    /// Send application reset command
520    ///
521    /// node - The node ID to command, or 0 to broadcast to all nodes
522    pub async fn nmt_reset_app(&mut self, node: u8) {
523        self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
524    }
525
526    /// Send communications reset command
527    ///
528    /// node - The node ID to command, or 0 to broadcast to all nodes
529    pub async fn nmt_reset_comms(&mut self, node: u8) {
530        self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
531            .await
532    }
533
534    /// Send start operation command
535    ///
536    /// node - The node ID to command, or 0 to broadcast to all nodes
537    pub async fn nmt_start(&mut self, node: u8) {
538        self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
539    }
540
541    /// Send start operation command
542    ///
543    /// node - The node ID to command, or 0 to broadcast to all nodes
544    pub async fn nmt_stop(&mut self, node: u8) {
545        self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
546    }
547
548    async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
549        let message = NmtCommand { cs: cmd, node };
550        self.sender.send(message.into()).await.ok();
551    }
552
553    /// Read the RPDO and TPDO configuration for the specified node
554    ///
555    /// node - The node ID to read from
556    pub async fn read_pdo_config(
557        &mut self,
558        node: ConfiguredNodeId,
559    ) -> Result<PdoScanResult, SdoClientError> {
560        let mut client = self.sdo_client(node.raw());
561
562        let tpdos = read_tpdo_config(&mut client).await?;
563        let rpdos = read_rpdo_config(&mut client).await?;
564
565        Ok(PdoScanResult { tpdos, rpdos })
566    }
567}