zencan_client/
sdo_client.rs

1use std::time::Duration;
2
3use snafu::Snafu;
4use zencan_common::{
5    constants::{object_ids, values::SAVE_CMD},
6    lss::LssIdentity,
7    messages::CanId,
8    node_configuration::PdoConfig,
9    pdo::PdoMapping,
10    sdo::{AbortCode, BlockSegment, SdoRequest, SdoResponse},
11    traits::{AsyncCanReceiver, AsyncCanSender, CanSendError as _},
12    CanMessage, TimeDifference, TimeOfDay,
13};
14
15const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_millis(150);
16
17/// A wrapper around the AbortCode enum to allow for unknown values
18///
19/// Although the library should "know" all the abort codes, it is possible to receive other values
20/// and this allows those to be captured and exposed.
21#[derive(Debug, Clone, Copy, PartialEq)]
22pub enum RawAbortCode {
23    /// A recognized abort code
24    Valid(AbortCode),
25    /// An unrecognized abort code
26    Unknown(u32),
27}
28
29impl std::fmt::Display for RawAbortCode {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        match self {
32            RawAbortCode::Valid(abort_code) => write!(f, "{abort_code:?}"),
33            RawAbortCode::Unknown(code) => write!(f, "{code:X}"),
34        }
35    }
36}
37
38impl From<u32> for RawAbortCode {
39    fn from(value: u32) -> Self {
40        match AbortCode::try_from(value) {
41            Ok(code) => Self::Valid(code),
42            Err(_) => Self::Unknown(value),
43        }
44    }
45}
46
47/// Error returned by [`SdoClient`] methods
48#[derive(Clone, Debug, PartialEq, Snafu)]
49pub enum SdoClientError {
50    /// Timeout while awaiting an expected response
51    NoResponse,
52    /// Received a response that could not be interpreted
53    MalformedResponse,
54    /// Received a valid SdoResponse, but with an unexpected command specifier
55    #[snafu(display("Unexpected SDO response. Expected {expecting}, got {response:?}"))]
56    UnexpectedResponse {
57        /// The type of response which was expected
58        expecting: String,
59        /// The response which was received
60        response: SdoResponse,
61    },
62    /// Received a ServerAbort response from the node
63    #[snafu(display("Received abort accessing object 0x{index:X}sub{sub}: {abort_code}"))]
64    ServerAbort {
65        /// Index of the SDO access which was aborted
66        index: u16,
67        /// Sub index of the SDO access which was aborted
68        sub: u8,
69        /// Reason for the abort
70        abort_code: RawAbortCode,
71    },
72    /// Received a response with the wrong toggle bit
73    ToggleNotAlternated,
74    /// Received a response with a different index/sub value than was requested
75    #[snafu(display("Received object 0x{:x}sub{} after requesting 0x{:x}sub{}",
76        received.0, received.1, expected.0, expected.1))]
77    MismatchedObjectIndex {
78        /// The object ID which was expected to be echoed back
79        expected: (u16, u8),
80        /// The received object ID
81        received: (u16, u8),
82    },
83    /// An SDO upload response had a size that did not match the expected size
84    UnexpectedSize,
85    /// Failed to write a message to the socket
86    #[snafu(display("Failed to send CAN message: {message}"))]
87    SocketSendFailed {
88        /// A string describing the error reason
89        message: String,
90    },
91    /// An SDO server shrunk the block size while requesting retransmission
92    ///
93    /// Hopefully no node will ever do this, but it's a possible corner case, since servers are
94    /// allowed to change the block size between each block, and can request resend of part of a
95    /// block by not acknowledging all segments.
96    BlockSizeChangedTooSmall,
97    /// The CRC on a block upload did not match
98    CrcMismatch,
99}
100
101type Result<T> = std::result::Result<T, SdoClientError>;
102
103/// Convenience macro for expecting a particular variant of a response and erroring on abort of
104/// unexpected variant
105macro_rules! match_response  {
106    ($resp: ident, $expecting: literal, $($match:pat => $code : expr),*) => {
107                match $resp {
108                    $($match => $code),*
109                    SdoResponse::Abort {
110                        index,
111                        sub,
112                        abort_code,
113                    } => {
114                        return ServerAbortSnafu {
115                            index,
116                            sub,
117                            abort_code,
118                        }
119                        .fail()
120                    }
121                    _ => {
122                        return UnexpectedResponseSnafu {
123                            expecting: $expecting,
124                            response: $resp,
125                        }
126                        .fail()
127                    }
128                }
129    };
130}
131
132use paste::paste;
133macro_rules! access_methods {
134    ($type: ty) => {
135
136        paste! {
137            #[doc = concat!("Read a ", stringify!($type), " sub object from the SDO server\n\n")]
138            #[doc = concat!("This is an alias for upload_", stringify!($type), " for a more intuitive API")]
139            pub async fn [<read_ $type>](&mut self, index: u16, sub: u8) -> Result<$type> {
140                self.[<upload_ $type>](index, sub).await
141            }
142
143            #[doc = concat!("Read a ", stringify!($type), " sub object from the SDO server")]
144            pub async fn [<upload_ $type>](&mut self, index: u16, sub: u8) -> Result<$type> {
145                let data = self.upload(index, sub).await?;
146                if data.len() != size_of::<$type>() {
147                    return UnexpectedSizeSnafu.fail();
148                }
149                Ok($type::from_le_bytes(data.try_into().unwrap()))
150            }
151
152            #[doc = concat!("Write a ", stringify!($type), " sub object on the SDO server\n\n")]
153            #[doc = concat!("This is an alias for download_", stringify!($type), " for a more intuitive API")]
154            pub async fn [<write_ $type>](&mut self, index: u16, sub: u8, value: $type) -> Result<()> {
155                self.[<download_ $type>](index, sub, value).await
156            }
157
158            #[doc = concat!("Read a ", stringify!($type), " sub object from the SDO server")]
159            pub async fn [<download_ $type>](&mut self, index: u16, sub: u8, value: $type) -> Result<()> {
160                let data = value.to_le_bytes();
161                self.download(index, sub, &data).await
162            }
163        }
164    };
165}
166
167#[derive(Debug)]
168/// A client for accessing a node's SDO server
169///
170/// A single server can talk to a single client at a time.
171pub struct SdoClient<S, R> {
172    req_cob_id: CanId,
173    resp_cob_id: CanId,
174    timeout: Duration,
175    sender: S,
176    receiver: R,
177}
178
179impl<S: AsyncCanSender, R: AsyncCanReceiver> SdoClient<S, R> {
180    /// Create a new SdoClient using a node ID
181    ///
182    /// Nodes have a default SDO server, which uses a COB ID based on the node ID. This is a
183    /// shortcut to create a client that that default SDO server.
184    ///
185    /// It is possible for nodes to have other SDO servers on other COB IDs, and clients for these
186    /// can be created using [`Self::new()`]
187    pub fn new_std(server_node_id: u8, sender: S, receiver: R) -> Self {
188        let req_cob_id = CanId::Std(0x600 + server_node_id as u16);
189        let resp_cob_id = CanId::Std(0x580 + server_node_id as u16);
190        Self::new(req_cob_id, resp_cob_id, sender, receiver)
191    }
192
193    /// Create a new SdoClient from request and response COB IDs
194    pub fn new(req_cob_id: CanId, resp_cob_id: CanId, sender: S, receiver: R) -> Self {
195        Self {
196            req_cob_id,
197            resp_cob_id,
198            timeout: DEFAULT_RESPONSE_TIMEOUT,
199            sender,
200            receiver,
201        }
202    }
203
204    /// Set the timeout for waiting on SDO server responses
205    pub fn set_timeout(&mut self, timeout: Duration) {
206        self.timeout = timeout;
207    }
208
209    /// Get the current timeout for waiting on SDO server responses
210    pub fn get_timeout(&self) -> Duration {
211        self.timeout
212    }
213
214    async fn send(&mut self, data: [u8; 8]) -> Result<()> {
215        let frame = CanMessage::new(self.req_cob_id, &data);
216        let mut tries = 3;
217        loop {
218            match self.sender.send(frame).await {
219                Ok(()) => return Ok(()),
220                Err(e) => {
221                    tries -= 1;
222                    tokio::time::sleep(Duration::from_millis(5)).await;
223                    if tries == 0 {
224                        return SocketSendFailedSnafu {
225                            message: e.message(),
226                        }
227                        .fail();
228                    }
229                }
230            }
231        }
232    }
233
234    /// Write data to a sub-object on the SDO server
235    pub async fn download(&mut self, index: u16, sub: u8, data: &[u8]) -> Result<()> {
236        if data.len() <= 4 {
237            // Do an expedited transfer
238            self.send(SdoRequest::expedited_download(index, sub, data).to_bytes())
239                .await?;
240
241            let resp = self.wait_for_response().await?;
242            match_response!(
243                resp,
244                "ConfirmDownload",
245                SdoResponse::ConfirmDownload { index: _, sub: _ } => {
246                    Ok(()) // Success!
247                }
248            )
249        } else {
250            self.send(
251                SdoRequest::initiate_download(index, sub, Some(data.len() as u32)).to_bytes(),
252            )
253            .await?;
254
255            let resp = self.wait_for_response().await?;
256            match_response!(
257                resp,
258                "ConfirmDownload",
259                SdoResponse::ConfirmDownload { index: _, sub: _ } => { }
260            );
261
262            let mut toggle = false;
263            // Send segments
264            let total_segments = data.len().div_ceil(7);
265            for n in 0..total_segments {
266                let last_segment = n == total_segments - 1;
267                let segment_size = (data.len() - n * 7).min(7);
268                let seg_msg = SdoRequest::download_segment(
269                    toggle,
270                    last_segment,
271                    &data[n * 7..n * 7 + segment_size],
272                );
273                self.send(seg_msg.to_bytes()).await?;
274                let resp = self.wait_for_response().await?;
275                match_response!(
276                    resp,
277                    "ConfirmDownloadSegment",
278                    SdoResponse::ConfirmDownloadSegment { t } => {
279                        // Fail if toggle value doesn't match
280                        if t != toggle {
281                            let abort_msg =
282                                SdoRequest::abort(index, sub, AbortCode::ToggleNotAlternated);
283
284                            self.send(abort_msg.to_bytes())
285                                .await?;
286                            return ToggleNotAlternatedSnafu.fail();
287                        }
288                        // Otherwise, carry on
289                    }
290                );
291                toggle = !toggle;
292            }
293            Ok(())
294        }
295    }
296
297    /// Read a sub-object on the SDO server
298    pub async fn upload(&mut self, index: u16, sub: u8) -> Result<Vec<u8>> {
299        let mut read_buf = Vec::new();
300
301        self.send(SdoRequest::initiate_upload(index, sub).to_bytes())
302            .await?;
303
304        let resp = self.wait_for_response().await?;
305
306        let expedited = match_response!(
307            resp,
308            "ConfirmUpload",
309            SdoResponse::ConfirmUpload {
310                n,
311                e,
312                s,
313                index: _,
314                sub: _,
315                data,
316            } => {
317                if e {
318                    let mut len = 0;
319                    if s {
320                        len = 4 - n as usize;
321                    }
322                    read_buf.extend_from_slice(&data[0..len]);
323                }
324                e
325            }
326        );
327
328        if !expedited {
329            // Read segments
330            let mut toggle = false;
331            loop {
332                self.send(SdoRequest::upload_segment_request(toggle).to_bytes())
333                    .await?;
334
335                let resp = self.wait_for_response().await?;
336                match_response!(
337                    resp,
338                    "UploadSegment",
339                    SdoResponse::UploadSegment { t, n, c, data } => {
340                        if t != toggle {
341                            self.send(
342                                    SdoRequest::abort(index, sub, AbortCode::ToggleNotAlternated)
343                                        .to_bytes(),
344                                )
345                                .await?;
346                            return ToggleNotAlternatedSnafu.fail();
347                        }
348                        read_buf.extend_from_slice(&data[0..7 - n as usize]);
349                        if c {
350                            // Transfer complete
351                            break;
352                        }
353                    }
354                );
355                toggle = !toggle;
356            }
357        }
358        Ok(read_buf)
359    }
360
361    /// Perform a block download to transfer data to an object
362    ///
363    /// Block downloads are more efficient for large amounts of data, but may not be supported by
364    /// all devices.
365    pub async fn block_download(&mut self, index: u16, sub: u8, data: &[u8]) -> Result<()> {
366        self.send(
367            SdoRequest::InitiateBlockDownload {
368                cc: true, // CRC supported
369                s: true,  // size specified
370                index,
371                sub,
372                size: data.len() as u32,
373            }
374            .to_bytes(),
375        )
376        .await?;
377
378        let resp = self.wait_for_response().await?;
379
380        let (crc_enabled, mut blksize) = match_response!(
381            resp,
382            "ConfirmBlockDownload",
383            SdoResponse::ConfirmBlockDownload {
384                sc,
385                index: resp_index,
386                sub: resp_sub,
387                blksize,
388            } => {
389                if index != resp_index || sub != resp_sub {
390                    return MismatchedObjectIndexSnafu {
391                        expected: (index, sub),
392                        received: (resp_index, resp_sub),
393                    }
394                    .fail();
395                }
396                (sc, blksize)
397            }
398        );
399
400        let mut seqnum = 1;
401        let mut last_block_start = 0;
402        let mut segment_num = 0;
403        let total_segments = data.len().div_ceil(7);
404
405        while segment_num < total_segments {
406            let segment_start = segment_num * 7;
407            let segment_len = (data.len() - segment_start).min(7);
408            // Is this the last segment?
409            let c = segment_start + segment_len == data.len();
410            let mut segment_data = [0; 7];
411            segment_data[0..segment_len]
412                .copy_from_slice(&data[segment_start..segment_start + segment_len]);
413
414            // Send the segment
415            let segment = BlockSegment {
416                c,
417                seqnum,
418                data: segment_data,
419            };
420            self.send(segment.to_bytes()).await?;
421
422            // Expect a confirmation message after blksize segments are sent, or after sending the
423            // complete flag
424            if c || seqnum == blksize {
425                let resp = self.wait_for_response().await?;
426                match_response!(
427                    resp,
428                    "ConfirmBlock",
429                    SdoResponse::ConfirmBlock {
430                        ackseq,
431                        blksize: new_blksize,
432                    } => {
433                        if ackseq == blksize {
434                            // All segments are acknowledged. Block accepted
435                            seqnum = 1;
436                            segment_num += 1;
437                            last_block_start = segment_num;
438                        } else {
439                            // Missing segments. Resend all segments after ackseq
440                            seqnum = ackseq;
441                            segment_num = last_block_start + ackseq as usize;
442                            // The spec says the block size given by the server can change between
443                            // blocks. What should a client do if it is going to resend a block, and
444                            // the server sets the block size smaller than the already delivered
445                            // segments? This shouldn't happen I think, but, it's possible.
446                            // zencan-node based nodes won't do it, but there are other devices out
447                            // there.
448                            if new_blksize < seqnum {
449                                return BlockSizeChangedTooSmallSnafu.fail();
450                            }
451                        }
452                        blksize = new_blksize;
453                    }
454                );
455            } else {
456                seqnum += 1;
457                segment_num += 1;
458            }
459        }
460
461        // End the download
462        let crc = if crc_enabled {
463            crc16::State::<crc16::XMODEM>::calculate(data)
464        } else {
465            0
466        };
467
468        let n = ((7 - data.len() % 7) % 7) as u8;
469
470        self.send(SdoRequest::EndBlockDownload { n, crc }.to_bytes())
471            .await?;
472
473        let resp = self.wait_for_response().await?;
474        match_response!(
475            resp,
476            "ConfirmBlockDownloadEnd",
477            SdoResponse::ConfirmBlockDownloadEnd => { Ok(()) }
478        )
479    }
480
481    /// Perform a block upload of data from the node
482    pub async fn block_upload(&mut self, index: u16, sub: u8) -> Result<Vec<u8>> {
483        const CRC_SUPPORTED: bool = true;
484        const BLKSIZE: u8 = 127;
485        const PST: u8 = 0;
486        self.send(
487            SdoRequest::initiate_block_upload(index, sub, CRC_SUPPORTED, BLKSIZE, PST).to_bytes(),
488        )
489        .await?;
490
491        let resp = self.wait_for_response().await?;
492
493        let server_supports_crc = match_response!(
494            resp,
495            "ConfirmBlockUpload",
496            SdoResponse::ConfirmBlockUpload { sc, s: _, index: _, sub: _, size: _ } => {sc}
497        );
498
499        self.send(SdoRequest::StartBlockUpload.to_bytes()).await?;
500
501        let mut rx_data = Vec::new();
502        let last_segment;
503        loop {
504            let segment = self.wait_for_block_segment().await?;
505            rx_data.extend_from_slice(&segment.data);
506            if !segment.c && segment.seqnum == BLKSIZE {
507                // Finished sub block, but not yet done. Confirm this sub block and expect more
508                self.send(
509                    SdoRequest::ConfirmBlock {
510                        ackseq: BLKSIZE,
511                        blksize: BLKSIZE,
512                    }
513                    .to_bytes(),
514                )
515                .await?;
516            }
517            if segment.c {
518                last_segment = segment.seqnum;
519                break;
520            }
521        }
522
523        // NOTE: Ignoring the possibility of dropped messages here. Should check seqno to make sure
524        // all blocks are received.
525        self.send(
526            SdoRequest::ConfirmBlock {
527                ackseq: last_segment,
528                blksize: BLKSIZE,
529            }
530            .to_bytes(),
531        )
532        .await?;
533
534        let resp = self.wait_for_response().await?;
535        let (n, crc) = match_response!(
536            resp,
537            "BlockUploadEnd",
538            SdoResponse::BlockUploadEnd { n, crc } => {(n, crc)}
539        );
540
541        // Drop the n invalid data bytes
542        rx_data.resize(rx_data.len() - n as usize, 0);
543
544        if server_supports_crc {
545            let computed_crc = crc16::State::<crc16::XMODEM>::calculate(&rx_data);
546            if crc != computed_crc {
547                self.send(SdoRequest::abort(index, sub, AbortCode::CrcError).to_bytes())
548                    .await?;
549                return Err(SdoClientError::CrcMismatch);
550            }
551        }
552
553        self.send(SdoRequest::EndBlockUpload.to_bytes()).await?;
554
555        Ok(rx_data)
556    }
557
558    access_methods!(f64);
559    access_methods!(f32);
560    access_methods!(u64);
561    access_methods!(u32);
562    access_methods!(u16);
563    access_methods!(u8);
564    access_methods!(i64);
565    access_methods!(i32);
566    access_methods!(i16);
567    access_methods!(i8);
568
569    /// Write to a TimeOfDay object on the SDO server
570    pub async fn download_time_of_day(
571        &mut self,
572        index: u16,
573        sub: u8,
574        data: TimeOfDay,
575    ) -> Result<()> {
576        let data = data.to_le_bytes();
577        self.download(index, sub, &data).await
578    }
579
580    /// Write to a TimeOfDay object on the SDO server
581    ///
582    /// Alias for `download_time_of_day`. This is a convenience function to allow for a more intuitive API.
583    pub async fn write_time_of_day(&mut self, index: u16, sub: u8, data: TimeOfDay) -> Result<()> {
584        let data = data.to_le_bytes();
585        self.download(index, sub, &data).await
586    }
587
588    /// Write to a TimeDifference object on the SDO server
589    pub async fn download_time_difference(
590        &mut self,
591        index: u16,
592        sub: u8,
593        data: TimeDifference,
594    ) -> Result<()> {
595        let data = data.to_le_bytes();
596        self.download(index, sub, &data).await
597    }
598
599    /// Write to a TimeDifference object on the SDO server
600    ///
601    /// Alias for `download_time_difference`. This is a convenience function to allow for a more intuitive API.
602    pub async fn write_time_difference(
603        &mut self,
604        index: u16,
605        sub: u8,
606        data: TimeDifference,
607    ) -> Result<()> {
608        let data = data.to_le_bytes();
609        self.download(index, sub, &data).await
610    }
611
612    /// Read a string from the SDO server
613    pub async fn upload_utf8(&mut self, index: u16, sub: u8) -> Result<String> {
614        let data = self.upload(index, sub).await?;
615        Ok(String::from_utf8_lossy(&data).into())
616    }
617    /// Alias for `upload_utf8`
618    pub async fn read_utf8(&mut self, index: u16, sub: u8) -> Result<String> {
619        self.upload_utf8(index, sub).await
620    }
621
622    /// Read a TimeOfDay object from the SDO server
623    pub async fn upload_time_of_day(&mut self, index: u16, sub: u8) -> Result<TimeOfDay> {
624        let data = self.upload(index, sub).await?;
625        if data.len() != TimeOfDay::SIZE {
626            UnexpectedSizeSnafu.fail()
627        } else {
628            Ok(TimeOfDay::from_le_bytes(data.try_into().unwrap()))
629        }
630    }
631
632    /// Read a TimeOfDay object from the SDO server
633    ///
634    /// Alias for `upload_time_of_day`. This is a convenience function to allow for a more intuitive
635    /// API.
636    pub async fn read_time_of_day(&mut self, index: u16, sub: u8) -> Result<TimeOfDay> {
637        self.upload_time_of_day(index, sub).await
638    }
639
640    /// Read a TimeOfDay object from the SDO server
641    pub async fn upload_time_difference(&mut self, index: u16, sub: u8) -> Result<TimeDifference> {
642        let data = self.upload(index, sub).await?;
643        if data.len() != TimeDifference::SIZE {
644            UnexpectedSizeSnafu.fail()
645        } else {
646            Ok(TimeDifference::from_le_bytes(data.try_into().unwrap()))
647        }
648    }
649
650    /// Read a TimeOfDay object from the SDO server
651    ///
652    /// Alias for `upload_time_of_day`. This is a convenience function to allow for a more intuitive
653    /// API.
654    pub async fn read_time_difference(&mut self, index: u16, sub: u8) -> Result<TimeDifference> {
655        self.upload_time_difference(index, sub).await
656    }
657
658    /// Read an object as a visible string
659    ///
660    /// It will be read and assumed to contain valid UTF8 characters
661    pub async fn read_visible_string(&mut self, index: u16, sub: u8) -> Result<String> {
662        let bytes = self.upload(index, sub).await?;
663        Ok(String::from_utf8_lossy(&bytes).into())
664    }
665
666    /// Read the identity object
667    ///
668    /// All nodes should implement this object
669    pub async fn read_identity(&mut self) -> Result<LssIdentity> {
670        let vendor_id = self.upload_u32(object_ids::IDENTITY, 1).await?;
671        let product_code = self.upload_u32(object_ids::IDENTITY, 2).await?;
672        let revision_number = self.upload_u32(object_ids::IDENTITY, 3).await?;
673        let serial = self.upload_u32(object_ids::IDENTITY, 4).await?;
674        Ok(LssIdentity::new(
675            vendor_id,
676            product_code,
677            revision_number,
678            serial,
679        ))
680    }
681
682    /// Write object 0x1010sub1 to command all objects be saved
683    pub async fn save_objects(&mut self) -> Result<()> {
684        self.download_u32(object_ids::SAVE_OBJECTS, 1, SAVE_CMD)
685            .await
686    }
687
688    /// Read the device name object
689    ///
690    /// All nodes should implement this object
691    pub async fn read_device_name(&mut self) -> Result<String> {
692        self.read_visible_string(object_ids::DEVICE_NAME, 0).await
693    }
694
695    /// Read the software version object
696    ///
697    /// All nodes should implement this object
698    pub async fn read_software_version(&mut self) -> Result<String> {
699        self.read_visible_string(object_ids::SOFTWARE_VERSION, 0)
700            .await
701    }
702
703    /// Read the hardware version object
704    ///
705    /// All nodes should implement this object
706    pub async fn read_hardware_version(&mut self) -> Result<String> {
707        self.read_visible_string(object_ids::HARDWARE_VERSION, 0)
708            .await
709    }
710
711    /// Configure a transmit PDO on the device
712    ///
713    /// This is a convenience function to write the PDO comm and mapping objects based on a
714    /// [`PdoConfig`].
715    pub async fn configure_tpdo(&mut self, pdo_num: usize, cfg: &PdoConfig) -> Result<()> {
716        let comm_index = 0x1800 + pdo_num as u16;
717        let mapping_index = 0x1a00 + pdo_num as u16;
718        self.store_pdo(comm_index, mapping_index, cfg).await
719    }
720
721    /// Configure a receive PDO on the device
722    ///
723    /// This is a convenience function to write the PDO comm and mapping objects based on a
724    /// [`PdoConfig`].
725    pub async fn configure_rpdo(&mut self, pdo_num: usize, cfg: &PdoConfig) -> Result<()> {
726        let comm_index = 0x1400 + pdo_num as u16;
727        let mapping_index = 0x1600 + pdo_num as u16;
728        self.store_pdo(comm_index, mapping_index, cfg).await
729    }
730
731    async fn store_pdo(
732        &mut self,
733        comm_index: u16,
734        mapping_index: u16,
735        cfg: &PdoConfig,
736    ) -> Result<()> {
737        assert!(cfg.mappings.len() < 0x40);
738        for (i, m) in cfg.mappings.iter().enumerate() {
739            let mapping_value = m.to_object_value();
740            self.write_u32(mapping_index, (i + 1) as u8, mapping_value)
741                .await?;
742        }
743
744        let num_mappings = cfg.mappings.len() as u8;
745        self.write_u8(mapping_index, 0, num_mappings).await?;
746
747        let mut cob_value = cfg.cob_id.raw() & 0x1FFFFFFF;
748        if !cfg.enabled {
749            cob_value |= 1 << 31;
750        }
751        if cfg.cob_id.is_extended() {
752            cob_value |= 1 << 29;
753        }
754        self.write_u8(comm_index, 2, cfg.transmission_type).await?;
755        self.write_u32(comm_index, 1, cob_value).await?;
756
757        Ok(())
758    }
759
760    /// Read the configuration of an RPDO from the node
761    pub async fn read_rpdo_config(&mut self, pdo_num: usize) -> Result<PdoConfig> {
762        let comm_index = 0x1400 + pdo_num as u16;
763        let mapping_index = 0x1600 + pdo_num as u16;
764        self.read_pdo_config(comm_index, mapping_index).await
765    }
766
767    /// Read the configuration of a TPDO from the node
768    pub async fn read_tpdo_config(&mut self, pdo_num: usize) -> Result<PdoConfig> {
769        let comm_index = 0x1800 + pdo_num as u16;
770        let mapping_index = 0x1a00 + pdo_num as u16;
771        self.read_pdo_config(comm_index, mapping_index).await
772    }
773
774    async fn read_pdo_config(&mut self, comm_index: u16, mapping_index: u16) -> Result<PdoConfig> {
775        let cob_word = self.read_u32(comm_index, 1).await?;
776        let transmission_type = self.read_u8(comm_index, 2).await?;
777        let num_mappings = self.read_u8(mapping_index, 0).await?;
778        let mut mappings = Vec::with_capacity(num_mappings as usize);
779        for i in 0..num_mappings {
780            let mapping_raw = self.read_u32(mapping_index, i + 1).await?;
781            mappings.push(PdoMapping::from_object_value(mapping_raw));
782        }
783        let enabled = cob_word & (1 << 31) == 0;
784        let rtr_disabled = cob_word & (1 << 30) != 0;
785        let extended = cob_word & (1 << 29) != 0;
786        let cob_id = cob_word & 0x1FFFFFFF;
787        let cob_id = if extended {
788            CanId::extended(cob_id)
789        } else {
790            CanId::std(cob_id as u16)
791        };
792        Ok(PdoConfig {
793            cob_id,
794            enabled,
795            rtr_disabled,
796            mappings,
797            transmission_type,
798        })
799    }
800
801    async fn wait_for_block_segment(&mut self) -> Result<BlockSegment> {
802        let wait_until = tokio::time::Instant::now() + self.timeout;
803        loop {
804            match tokio::time::timeout_at(wait_until, self.receiver.recv()).await {
805                // Err indicates the timeout elapsed, so return
806                Err(_) => return NoResponseSnafu.fail(),
807                // Message was recieved. If it is the resp, return. Otherwise, keep waiting
808                Ok(Ok(msg)) => {
809                    if msg.id == self.resp_cob_id {
810                        return msg
811                            .data()
812                            .try_into()
813                            .map_err(|_| MalformedResponseSnafu.build());
814                    }
815                }
816                // Recv returned an error
817                Ok(Err(e)) => {
818                    log::error!("Error reading from socket: {e:?}");
819                    return NoResponseSnafu.fail();
820                }
821            }
822        }
823    }
824
825    async fn wait_for_response(&mut self) -> Result<SdoResponse> {
826        let wait_until = tokio::time::Instant::now() + self.timeout;
827        loop {
828            match tokio::time::timeout_at(wait_until, self.receiver.recv()).await {
829                // Err indicates the timeout elapsed, so return
830                Err(_) => return NoResponseSnafu.fail(),
831                // Message was recieved. If it is the resp, return. Otherwise, keep waiting
832                Ok(Ok(msg)) => {
833                    if msg.id == self.resp_cob_id {
834                        return msg.try_into().map_err(|_| MalformedResponseSnafu.build());
835                    }
836                }
837                // Recv returned an error
838                Ok(Err(e)) => {
839                    log::error!("Error reading from socket: {e:?}");
840                    return NoResponseSnafu.fail();
841                }
842            }
843        }
844    }
845}