zencan_client/bus_manager/
bus_manager.rs

1use std::ops::{Deref, DerefMut};
2use std::sync::Mutex;
3use std::time::Duration;
4use std::{collections::HashMap, sync::Arc, time::Instant};
5
6use futures::future::join_all;
7use tokio::task::JoinHandle;
8use zencan_common::constants::object_ids::{
9    RPDO_COMM_BASE, RPDO_MAP_BASE, TPDO_COMM_BASE, TPDO_MAP_BASE,
10};
11use zencan_common::lss::{LssIdentity, LssState};
12use zencan_common::messages::{NmtCommand, NmtCommandSpecifier, ZencanMessage};
13use zencan_common::nmt::NmtState;
14use zencan_common::node_id::ConfiguredNodeId;
15use zencan_common::sdo::AbortCode;
16use zencan_common::{
17    node_configuration::PdoConfig,
18    pdo::PdoMapping,
19    traits::{AsyncCanReceiver, AsyncCanSender},
20    CanId, NodeId,
21};
22
23use super::shared_sender::SharedSender;
24use crate::sdo_client::{SdoClient, SdoClientError};
25use crate::{LssError, LssMaster, RawAbortCode};
26
27use super::shared_receiver::{SharedReceiver, SharedReceiverChannel};
28
29#[derive(Debug, Clone)]
30pub struct NodeInfo {
31    pub node_id: u8,
32    pub identity: Option<LssIdentity>,
33    pub device_name: Option<String>,
34    pub software_version: Option<String>,
35    pub hardware_version: Option<String>,
36    pub last_seen: Instant,
37    pub nmt_state: Option<NmtState>,
38}
39
40impl core::fmt::Display for NodeInfo {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        writeln!(
43            f,
44            "Node {}: {}",
45            self.node_id,
46            self.nmt_state
47                .map(|s| s.to_string())
48                .unwrap_or("Unknown State".into())
49        )?;
50        match self.identity {
51            Some(id) => writeln!(
52                f,
53                "    Identity vendor: {:X}, product: {:X}, revision: {:X}, serial: {:X}",
54                id.vendor_id, id.product_code, id.revision, id.serial
55            )?,
56            None => writeln!(f, "    Identity: Unknown")?,
57        }
58        writeln!(
59            f,
60            "    Device Name: '{}'",
61            self.device_name.as_deref().unwrap_or("Unknown")
62        )?;
63        writeln!(
64            f,
65            "    Versions: '{}' SW, '{}' HW",
66            self.software_version.as_deref().unwrap_or("Unknown"),
67            self.hardware_version.as_deref().unwrap_or("Unknown")
68        )?;
69        let age = Instant::now().duration_since(self.last_seen);
70        writeln!(f, "    Last Seen: {}s ago", age.as_secs())?;
71
72        Ok(())
73    }
74}
75
76impl NodeInfo {
77    pub fn new(node_id: u8) -> Self {
78        Self {
79            node_id,
80            last_seen: Instant::now(),
81            device_name: None,
82            identity: None,
83            software_version: None,
84            hardware_version: None,
85            nmt_state: None,
86        }
87    }
88
89    /// Update / merge new information about the node
90    pub fn update(&mut self, info: &NodeInfo) {
91        if info.device_name.is_some() {
92            self.device_name = info.device_name.clone();
93        }
94        if info.identity.is_some() {
95            self.identity = info.identity;
96        }
97        if info.software_version.is_some() {
98            self.software_version = info.software_version.clone();
99        }
100        if info.hardware_version.is_some() {
101            self.hardware_version = info.hardware_version.clone();
102        }
103        if info.nmt_state.is_some() {
104            self.nmt_state = info.nmt_state;
105        }
106        self.last_seen = Instant::now();
107    }
108}
109
110async fn scan_node<S: AsyncCanSender + Sync + Send>(
111    node_id: u8,
112    clients: &SdoClientMutex<S>,
113) -> Result<Option<NodeInfo>, SdoClientError> {
114    let mut sdo_client = clients.lock(node_id);
115    log::info!("Scanning Node {node_id}");
116
117    let identity = match sdo_client.read_identity().await {
118        Ok(id) => Some(id),
119        // A no response here is not really an error, it just indicates the node is not present
120        Err(SdoClientError::NoResponse) => {
121            log::info!("No response from node {node_id}");
122            return Ok(None);
123        }
124        Err(e) => return Err(e),
125    };
126    let device_name = match sdo_client.read_device_name().await {
127        Ok(s) => Some(s),
128        Err(e) => {
129            log::error!("SDO Abort Response scanning node {node_id} device name: {e:?}");
130            None
131        }
132    };
133    let software_version = match sdo_client.read_software_version().await {
134        Ok(s) => Some(s),
135        Err(e) => {
136            log::error!("SDO Abort Response scanning node {node_id} SW version: {e:?}");
137            None
138        }
139    };
140    let hardware_version = match sdo_client.read_hardware_version().await {
141        Ok(s) => Some(s),
142        Err(e) => {
143            log::error!("SDO Abort Response scanning node {node_id} HW version: {e:?}");
144            None
145        }
146    };
147    Ok(Some(NodeInfo {
148        node_id,
149        identity,
150        device_name,
151        software_version,
152        hardware_version,
153        nmt_state: None,
154        last_seen: Instant::now(),
155    }))
156}
157
158/// Result struct for reading PDO configuration from a single node
159#[derive(Clone, Debug)]
160pub struct PdoScanResult {
161    /// List of TPDO configurations
162    pub tpdos: Vec<PdoConfig>,
163    /// List of RPDO configurations
164    pub rpdos: Vec<PdoConfig>,
165}
166
167async fn read_pdos<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
168    mut comm_base: u16,
169    mut mapping_base: u16,
170    client: &mut SdoClient<S, R>,
171) -> Result<Vec<PdoConfig>, SdoClientError> {
172    let mut result = Vec::new();
173
174    loop {
175        let _comm_max_sub = match client.read_u8(comm_base, 0).await {
176            Ok(val) => val,
177            // This error is expected; this means there are no more PDOs to read
178            Err(SdoClientError::ServerAbort {
179                index: _,
180                sub: _,
181                abort_code: RawAbortCode::Valid(AbortCode::NoSuchObject),
182            }) => break,
183            // Any other error is unexpected
184            Err(e) => {
185                return Err(e);
186            }
187        };
188
189        let cob_value = client.read_u32(comm_base, 1).await?;
190        let transmission_type = client.read_u8(comm_base, 2).await?;
191
192        let frame = (cob_value & (1 << 29)) != 0;
193        let rtr_disabled = (cob_value & (1 << 30)) != 0;
194        let enabled = (cob_value & (1 << 31)) == 0;
195
196        let cob_id = cob_value & 0x1FFFFFFF;
197        let cob_id = if frame {
198            CanId::extended(cob_id)
199        } else {
200            CanId::std((cob_id & 0x7ff) as u16)
201        };
202        let num_mappings = client.read_u8(mapping_base, 0).await?;
203        let mut mappings = Vec::new();
204        for i in 0..num_mappings {
205            let map_param = client.read_u32(mapping_base, i + 1).await?;
206            mappings.push(PdoMapping::from_object_value(map_param));
207        }
208
209        result.push(PdoConfig {
210            cob_id,
211            enabled,
212            rtr_disabled,
213            mappings,
214            transmission_type,
215        });
216        comm_base += 1;
217        mapping_base += 1;
218    }
219    Ok(result)
220}
221
222async fn read_rpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
223    client: &mut SdoClient<S, R>,
224) -> Result<Vec<PdoConfig>, SdoClientError> {
225    read_pdos(RPDO_COMM_BASE, RPDO_MAP_BASE, client).await
226}
227
228async fn read_tpdo_config<S: AsyncCanSender + Sync + Send, R: AsyncCanReceiver>(
229    client: &mut SdoClient<S, R>,
230) -> Result<Vec<PdoConfig>, SdoClientError> {
231    read_pdos(TPDO_COMM_BASE, TPDO_MAP_BASE, client).await
232}
233
234#[derive(Debug)]
235pub struct SdoClientGuard<'a, S, R>
236where
237    S: AsyncCanSender,
238    R: AsyncCanReceiver,
239{
240    _guard: std::sync::MutexGuard<'a, ()>,
241    client: SdoClient<S, R>,
242}
243
244impl<S, R> Deref for SdoClientGuard<'_, S, R>
245where
246    S: AsyncCanSender,
247    R: AsyncCanReceiver,
248{
249    type Target = SdoClient<S, R>;
250
251    fn deref(&self) -> &Self::Target {
252        &self.client
253    }
254}
255
256impl<S, R> DerefMut for SdoClientGuard<'_, S, R>
257where
258    S: AsyncCanSender,
259    R: AsyncCanReceiver,
260{
261    fn deref_mut(&mut self) -> &mut Self::Target {
262        &mut self.client
263    }
264}
265
266#[derive(Debug)]
267struct SdoClientMutex<S>
268where
269    S: AsyncCanSender + Sync,
270{
271    sender: SharedSender<S>,
272    receiver: SharedReceiver,
273    clients: HashMap<u8, Mutex<()>>,
274}
275
276impl<S> SdoClientMutex<S>
277where
278    S: AsyncCanSender + Sync,
279{
280    pub fn new(sender: SharedSender<S>, receiver: SharedReceiver) -> Self {
281        let mut clients = HashMap::new();
282        for i in 0u8..128 {
283            clients.insert(i, Mutex::new(()));
284        }
285
286        Self {
287            sender,
288            receiver,
289            clients,
290        }
291    }
292
293    pub fn lock(&self, id: u8) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
294        if !(1..=127).contains(&id) {
295            panic!("ID {} out of range", id);
296        }
297        let guard = self.clients.get(&id).unwrap().lock().unwrap();
298        let client = SdoClient::new_std(id, self.sender.clone(), self.receiver.create_rx());
299        SdoClientGuard {
300            _guard: guard,
301            client,
302        }
303    }
304}
305
306/// Manage a zencan bus
307#[derive(Debug)]
308pub struct BusManager<S: AsyncCanSender + Sync + Send> {
309    sender: SharedSender<S>,
310    receiver: SharedReceiver,
311    nodes: Arc<tokio::sync::Mutex<HashMap<u8, NodeInfo>>>,
312    sdo_clients: SdoClientMutex<S>,
313    _monitor_task: JoinHandle<()>,
314}
315
316impl<S: AsyncCanSender + Sync + Send> BusManager<S> {
317    /// Create a new bus manager
318    ///
319    /// # Arguments
320    /// - `sender`: An object which implements [`AsyncCanSender`] to be used for sending messages to
321    ///   the bus
322    /// - `receiver`: An object which implements [`AsyncCanReceiver`] to be used for receiving
323    ///   messages from the bus
324    ///
325    /// When using socketcan, these can be created with [`crate::open_socketcan`]
326    pub fn new(sender: S, receiver: impl AsyncCanReceiver + Sync + 'static) -> Self {
327        let receiver = SharedReceiver::new(receiver);
328        let sender = SharedSender::new(Arc::new(tokio::sync::Mutex::new(sender)));
329        let sdo_clients = SdoClientMutex::new(sender.clone(), receiver.clone());
330
331        let mut state_rx = receiver.create_rx();
332        let nodes = Arc::new(tokio::sync::Mutex::new(HashMap::new()));
333
334        let monitor_task = {
335            let nodes = nodes.clone();
336            tokio::spawn(async move {
337                loop {
338                    if let Ok(msg) = state_rx.recv().await {
339                        if let Ok(ZencanMessage::Heartbeat(heartbeat)) =
340                            ZencanMessage::try_from(msg)
341                        {
342                            let id_num = heartbeat.node;
343                            if let Ok(node_id) = NodeId::try_from(id_num) {
344                                let mut nodes = nodes.lock().await;
345                                if let std::collections::hash_map::Entry::Vacant(e) =
346                                    nodes.entry(id_num)
347                                {
348                                    e.insert(NodeInfo::new(node_id.raw()));
349                                } else {
350                                    let node = nodes.get_mut(&id_num).unwrap();
351                                    node.nmt_state = Some(heartbeat.state);
352                                    node.last_seen = Instant::now();
353                                }
354                            } else {
355                                log::warn!("Invalid heartbeat node ID {id_num} received");
356                            }
357                        }
358                    }
359                }
360            })
361        };
362
363        Self {
364            sender,
365            receiver,
366            sdo_clients,
367            nodes,
368            _monitor_task: monitor_task,
369        }
370    }
371
372    /// Get an SDO client for a particular node
373    ///
374    /// This function may block if another task is using the required SDO client, as it ensures
375    /// exclusive access to each node's SDO server.
376    pub fn sdo_client(
377        &self,
378        node_id: u8,
379    ) -> SdoClientGuard<'_, SharedSender<S>, SharedReceiverChannel> {
380        self.sdo_clients.lock(node_id)
381    }
382
383    /// Get a list of known nodes
384    pub async fn node_list(&self) -> Vec<NodeInfo> {
385        let node_map = self.nodes.lock().await;
386        let mut nodes = Vec::with_capacity(node_map.len());
387        for n in node_map.values() {
388            nodes.push(n.clone());
389        }
390
391        nodes.sort_by_key(|n| n.node_id);
392        nodes
393    }
394
395    /// Perform a scan of all possible node IDs
396    ///
397    /// Will find all configured devices, and read metadata from required objects, including:
398    /// - Identity
399    /// - Device Name
400    /// - Software Version
401    /// - Hardware Version
402    pub async fn scan_nodes(&mut self) -> Result<Vec<NodeInfo>, SdoClientError> {
403        const N_PARALLEL: usize = 10;
404
405        let ids = Vec::from_iter(1..128u8);
406        let mut nodes: Vec<NodeInfo> = Vec::new();
407
408        let mut chunks = Vec::new();
409        for chunk in ids.chunks(128 / N_PARALLEL) {
410            chunks.push(Vec::from_iter(chunk.iter().cloned()));
411        }
412
413        let mut futures = Vec::new();
414
415        for block in chunks {
416            futures.push(async {
417                let mut block_nodes = Vec::new();
418                for id in block {
419                    block_nodes.push(scan_node(id, &self.sdo_clients).await);
420                }
421                block_nodes
422            });
423        }
424
425        // Collect the results from batches. If any errors occurred, fail.
426        let results = join_all(futures).await;
427        for r in results {
428            let r: Result<Vec<Option<NodeInfo>>, SdoClientError> = r.into_iter().collect();
429            nodes.extend(r?.into_iter().flatten());
430        }
431
432        let mut node_map = self.nodes.lock().await;
433        // Update our nodes
434        for n in &nodes {
435            if let std::collections::hash_map::Entry::Vacant(e) = node_map.entry(n.node_id) {
436                e.insert(n.clone());
437            } else {
438                node_map.get_mut(&n.node_id).unwrap().update(n);
439            }
440        }
441
442        // Pull the just scanned nodes from the collection so that
443        // 1) We only included nodes which responded just now to the scan, but
444        // 2) we also display the latest NMT state for that node, which comes from the heartbeat
445        //    rather than the scan
446        Ok(nodes
447            .iter()
448            .map(|n| node_map.get(&n.node_id).unwrap().clone())
449            .collect())
450    }
451
452    /// Find all unconfigured devices on the bus
453    ///
454    /// The LSS fastscan protocol is used to identify devices which do not have an assigned node ID.
455    ///
456    /// Devices that do have a node ID can be found using [`scan_nodes`](Self::scan_nodes), or by
457    /// their heartbeat messages.
458    ///
459    /// After devices are found, they are all put back into waiting state
460    pub async fn lss_fastscan(&mut self, timeout: Duration) -> Vec<LssIdentity> {
461        let mut devices = Vec::new();
462        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
463
464        // Put all nodes into Waiting state
465        lss.set_global_mode(LssState::Waiting).await;
466
467        // Each time a device is completely identified, it goes into Configuring mode and will not
468        // respond to further scans. Once all devices are identified, the scan will return None.
469        while let Some(id) = lss.fast_scan(timeout).await {
470            devices.push(id);
471        }
472
473        lss.set_global_mode(LssState::Waiting).await;
474
475        devices
476    }
477
478    /// Activate a single LSS slave by its identity
479    ///
480    /// All nodes are put into Waiting mode via the global command, then the specified node is
481    /// activates. Will return `Ok(())` if the activated node acknowledges, or an Err otherwise.
482    ///
483    /// The identity consists of the four u32 values from the 0x1018 object, which should uniquely
484    /// identify a device on the bus. If they are not known, they can be found using
485    /// [`lss_fastscan()`](Self::lss_fastscan).
486    pub async fn lss_activate(&mut self, ident: LssIdentity) -> Result<(), LssError> {
487        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
488        lss.set_global_mode(LssState::Waiting).await;
489        lss.enter_config_by_identity(
490            ident.vendor_id,
491            ident.product_code,
492            ident.revision,
493            ident.serial,
494        )
495        .await
496    }
497
498    /// Set the node ID of LSS slave in Configuration mode
499    ///
500    /// It is required that one node has been put into Configuration mode already when this is
501    /// called, e.g. using [`lss_activate`](Self::lss_activate)
502    pub async fn lss_set_node_id(&mut self, node_id: NodeId) -> Result<(), LssError> {
503        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
504        lss.set_node_id(node_id).await?;
505        Ok(())
506    }
507
508    /// Command the node in Configuration mode to store its configuration
509    ///
510    /// It is required that one node has been put into Configuration mode already when this is
511    /// called, e.g. using [`lss_activate`](Self::lss_activate)
512    pub async fn lss_store_config(&mut self) -> Result<(), LssError> {
513        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
514        lss.store_config().await
515    }
516
517    /// Send a command to put all devices into the specified LSS state
518    pub async fn lss_set_global_mode(&mut self, mode: LssState) {
519        let mut lss = LssMaster::new(self.sender.clone(), self.receiver.create_rx());
520        lss.set_global_mode(mode).await;
521    }
522
523    /// Send application reset command
524    ///
525    /// node - The node ID to command, or 0 to broadcast to all nodes
526    pub async fn nmt_reset_app(&mut self, node: u8) {
527        self.send_nmt_cmd(NmtCommandSpecifier::ResetApp, node).await
528    }
529
530    /// Send communications reset command
531    ///
532    /// node - The node ID to command, or 0 to broadcast to all nodes
533    pub async fn nmt_reset_comms(&mut self, node: u8) {
534        self.send_nmt_cmd(NmtCommandSpecifier::ResetComm, node)
535            .await
536    }
537
538    /// Send start operation command
539    ///
540    /// node - The node ID to command, or 0 to broadcast to all nodes
541    pub async fn nmt_start(&mut self, node: u8) {
542        self.send_nmt_cmd(NmtCommandSpecifier::Start, node).await
543    }
544
545    /// Send start operation command
546    ///
547    /// node - The node ID to command, or 0 to broadcast to all nodes
548    pub async fn nmt_stop(&mut self, node: u8) {
549        self.send_nmt_cmd(NmtCommandSpecifier::Stop, node).await
550    }
551
552    async fn send_nmt_cmd(&mut self, cmd: NmtCommandSpecifier, node: u8) {
553        let message = NmtCommand { cs: cmd, node };
554        self.sender.send(message.into()).await.ok();
555    }
556
557    /// Read the RPDO and TPDO configuration for the specified node
558    ///
559    /// node - The node ID to read from
560    pub async fn read_pdo_config(
561        &mut self,
562        node: ConfiguredNodeId,
563    ) -> Result<PdoScanResult, SdoClientError> {
564        let mut client = self.sdo_client(node.raw());
565
566        let tpdos = read_tpdo_config(&mut client).await?;
567        let rpdos = read_rpdo_config(&mut client).await?;
568
569        Ok(PdoScanResult { tpdos, rpdos })
570    }
571}