Skip to main content

autocore_std/ethercat/
sdo_client.rs

1//! Non-blocking SDO read/write client for EtherCAT devices.
2//!
3//! [`SdoClient`] wraps [`CommandClient`](crate::CommandClient) to provide an
4//! ergonomic, handle-based interface for runtime SDO (Service Data Object)
5//! operations over CoE (CANopen over EtherCAT). Create one per device, issue
6//! reads/writes from your control loop, and check results by handle on
7//! subsequent ticks.
8//!
9//! # When to use
10//!
11//! Use `SdoClient` for **runtime** SDO access — reading diagnostic registers,
12//! changing operating parameters on the fly, or any CoE transfer that happens
13//! after the cyclic loop is running. For SDOs that must be applied **before**
14//! the cyclic loop starts (e.g. setting `modes_of_operation`), use the
15//! `startup_sdo` array in `project.json` instead.
16//!
17//! # Topic format
18//!
19//! Requests are sent as IPC commands through the existing WebSocket channel.
20//! Topics are scoped to the device name configured in `project.json`:
21//!
22//! | Operation | Topic                              | Payload                                          |
23//! |-----------|------------------------------------|--------------------------------------------------|
24//! | Write     | `ethercat.{device}.sdo_write`      | `{"index": "0x6060", "sub": 0, "value": "0x01"}` |
25//! | Read      | `ethercat.{device}.sdo_read`       | `{"index": "0x6060", "sub": 0}`                  |
26//!
27//! # Usage with a state machine
28//!
29//! A typical pattern pairs `SdoClient` with [`StateMachine`](crate::fb::StateMachine)
30//! to fire an SDO write in one state, then advance on success:
31//!
32//! ```ignore
33//! use autocore_std::{ControlProgram, TickContext};
34//! use autocore_std::ethercat::{SdoClient, SdoResult};
35//! use autocore_std::fb::StateMachine;
36//! use serde_json::json;
37//! use std::time::Duration;
38//!
39//! pub struct MyProgram {
40//!     sm: StateMachine,
41//!     sdo: SdoClient,
42//!     write_tid: Option<u32>,
43//! }
44//!
45//! impl MyProgram {
46//!     pub fn new() -> Self {
47//!         Self {
48//!             sm: StateMachine::new(),
49//!             sdo: SdoClient::new("ClearPath_0"),
50//!             write_tid: None,
51//!         }
52//!     }
53//! }
54//!
55//! impl ControlProgram for MyProgram {
56//!     type Memory = GlobalMemory;
57//!
58//!     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
59//!         self.sm.call();
60//!         match self.sm.index {
61//!             // State 10: Send SDO write for modes_of_operation = PP
62//!             10 => {
63//!                 self.write_tid = Some(
64//!                     self.sdo.write(ctx.client, 0x6060, 0, json!(1))
65//!                 );
66//!                 self.sm.timeout_preset = Duration::from_secs(3);
67//!                 self.sm.index = 20;
68//!             }
69//!             // State 20: Wait for response
70//!             20 => {
71//!                 let tid = self.write_tid.unwrap();
72//!                 match self.sdo.result(ctx.client, tid, Duration::from_secs(3)) {
73//!                     SdoResult::Pending => { /* keep waiting */ }
74//!                     SdoResult::Ok(_) => {
75//!                         log::info!("modes_of_operation set to PP");
76//!                         self.sm.index = 30;
77//!                     }
78//!                     SdoResult::Err(e) => {
79//!                         log::error!("SDO write failed: {}", e);
80//!                         self.sm.set_error(1);
81//!                     }
82//!                     SdoResult::Timeout => {
83//!                         log::error!("SDO write timed out");
84//!                         self.sm.set_error(2);
85//!                     }
86//!                 }
87//!             }
88//!             // State 30: Done — continue with normal operation
89//!             30 => { /* ... */ }
90//!             _ => {}
91//!         }
92//!     }
93//! }
94//! ```
95//!
96//! # Reading an SDO
97//!
98//! ```ignore
99//! // Fire a read request
100//! let tid = sdo.read(ctx.client, 0x6064, 0); // Position Actual Value
101//!
102//! // On a later tick, check the result
103//! match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
104//!     SdoResult::Ok(data) => {
105//!         let position: i32 = serde_json::from_value(data).unwrap();
106//!         log::info!("Current position: {}", position);
107//!     }
108//!     SdoResult::Pending => { /* still waiting */ }
109//!     SdoResult::Err(e) => { log::error!("SDO read failed: {}", e); }
110//!     SdoResult::Timeout => { log::error!("SDO read timed out"); }
111//! }
112//! ```
113
114use std::collections::HashMap;
115use std::time::{Duration, Instant};
116
117use serde_json::{json, Value};
118
119use crate::command_client::CommandClient;
120
121/// Metadata for an in-flight SDO request.
122pub struct SdoRequest {
123    /// CoE object dictionary index (e.g. 0x6060).
124    pub index: u16,
125    /// CoE sub-index.
126    pub sub_index: u8,
127    /// Whether this is a read or write.
128    pub kind: SdoRequestKind,
129    /// When the request was sent (for timeout detection).
130    pub sent_at: Instant,
131}
132
133/// Discriminates SDO reads from writes.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SdoRequestKind {
136    /// SDO read (CoE upload).
137    Read,
138    /// SDO write (CoE download).
139    Write,
140}
141
142/// Result of checking an in-flight SDO request.
143///
144/// Returned by [`SdoClient::result()`]. The caller should match on this each
145/// tick until the request resolves (i.e. is no longer [`Pending`](SdoResult::Pending)).
146///
147/// ```ignore
148/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
149///     SdoResult::Pending => { /* check again next tick */ }
150///     SdoResult::Ok(data) => { /* use data */ }
151///     SdoResult::Err(msg) => { log::error!("{}", msg); }
152///     SdoResult::Timeout => { log::error!("timed out"); }
153/// }
154/// ```
155#[derive(Debug, Clone)]
156pub enum SdoResult {
157    /// No response yet; check again next tick.
158    Pending,
159    /// Operation succeeded. Contains the response `data` field — the read
160    /// value for reads, or an empty/null value for writes.
161    Ok(Value),
162    /// The server (or EtherCAT master) reported an error. The string contains
163    /// the `error_message` from the response (e.g. `"SDO abort: 0x06090011"`).
164    Err(String),
165    /// No response arrived within the caller-specified deadline.
166    Timeout,
167}
168
169/// Non-blocking SDO client scoped to a single EtherCAT device.
170///
171/// Create one `SdoClient` per device in your control program struct. It holds
172/// a map of outstanding requests keyed by `transaction_id` (returned by
173/// [`CommandClient::send`]). Keep the returned handle and poll
174/// [`result()`](Self::result) each tick until it resolves.
175///
176/// # Example
177///
178/// ```ignore
179/// use autocore_std::ethercat::{SdoClient, SdoResult};
180/// use serde_json::json;
181/// use std::time::Duration;
182///
183/// let mut sdo = SdoClient::new("ClearPath_0");
184///
185/// // Issue an SDO write (from process_tick):
186/// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
187///
188/// // Check result on subsequent ticks:
189/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
190///     SdoResult::Pending => {}
191///     SdoResult::Ok(_) => { /* success */ }
192///     SdoResult::Err(e) => { log::error!("SDO error: {}", e); }
193///     SdoResult::Timeout => { log::error!("SDO timed out"); }
194/// }
195/// ```
196pub struct SdoClient {
197    device: String,
198    requests: HashMap<u32, SdoRequest>,
199}
200
201impl SdoClient {
202    /// Create a new client for the given device name.
203    ///
204    /// The `device` string must match the `name` field in the slave's
205    /// `project.json` configuration (e.g. `"ClearPath_0"`).
206    ///
207    /// ```ignore
208    /// let sdo = SdoClient::new("ClearPath_0");
209    /// ```
210    pub fn new(device: &str) -> Self {
211        Self {
212            device: device.to_string(),
213            requests: HashMap::new(),
214        }
215    }
216
217    /// Issue an SDO write (CoE download).
218    ///
219    /// Sends a command to topic `ethercat.{device}.sdo_write` with payload:
220    /// ```json
221    /// {"index": "0x6060", "sub": 0, "value": 1}
222    /// ```
223    ///
224    /// Returns a transaction handle for use with [`result()`](Self::result).
225    ///
226    /// # Arguments
227    ///
228    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
229    /// * `index` — CoE object dictionary index (e.g. `0x6060`)
230    /// * `sub_index` — CoE sub-index (usually `0`)
231    /// * `value` — the value to write, as a [`serde_json::Value`]
232    ///
233    /// # Example
234    ///
235    /// ```ignore
236    /// // Set modes_of_operation to Profile Position (1)
237    /// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
238    /// ```
239    pub fn write(
240        &mut self,
241        client: &mut CommandClient,
242        index: u16,
243        sub_index: u8,
244        value: Value,
245    ) -> u32 {
246        let topic = format!("ethercat.{}.sdo_write", self.device);
247        let payload = json!({
248            "index": format!("0x{:04X}", index),
249            "sub": sub_index,
250            "value": value,
251        });
252        let tid = client.send(&topic, payload);
253
254        self.requests.insert(tid, SdoRequest {
255            index,
256            sub_index,
257            kind: SdoRequestKind::Write,
258            sent_at: Instant::now(),
259        });
260
261        tid
262    }
263
264    /// Issue an SDO read (CoE upload).
265    ///
266    /// Sends a command to topic `ethercat.{device}.sdo_read` with payload:
267    /// ```json
268    /// {"index": "0x6064", "sub": 0}
269    /// ```
270    ///
271    /// Returns a transaction handle for use with [`result()`](Self::result).
272    ///
273    /// # Arguments
274    ///
275    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
276    /// * `index` — CoE object dictionary index (e.g. `0x6064`)
277    /// * `sub_index` — CoE sub-index (usually `0`)
278    ///
279    /// # Example
280    ///
281    /// ```ignore
282    /// // Read the actual position value
283    /// let tid = sdo.read(ctx.client, 0x6064, 0);
284    /// ```
285    pub fn read(
286        &mut self,
287        client: &mut CommandClient,
288        index: u16,
289        sub_index: u8,
290    ) -> u32 {
291        let topic = format!("ethercat.{}.sdo_read", self.device);
292        let payload = json!({
293            "index": format!("0x{:04X}", index),
294            "sub": sub_index,
295        });
296        let tid = client.send(&topic, payload);
297
298        self.requests.insert(tid, SdoRequest {
299            index,
300            sub_index,
301            kind: SdoRequestKind::Read,
302            sent_at: Instant::now(),
303        });
304
305        tid
306    }
307
308    /// Check the result of a previous SDO request.
309    ///
310    /// Call this each tick with the handle returned by [`write()`](Self::write)
311    /// or [`read()`](Self::read). The result is consumed (removed from the
312    /// internal map) once it resolves to [`Ok`](SdoResult::Ok),
313    /// [`Err`](SdoResult::Err), or [`Timeout`](SdoResult::Timeout).
314    ///
315    /// # Arguments
316    ///
317    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
318    /// * `tid` — transaction handle returned by `write()` or `read()`
319    /// * `timeout` — maximum time to wait before returning [`SdoResult::Timeout`]
320    ///
321    /// # Example
322    ///
323    /// ```ignore
324    /// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
325    ///     SdoResult::Pending => { /* keep waiting */ }
326    ///     SdoResult::Ok(data) => {
327    ///         log::info!("SDO response: {}", data);
328    ///         sm.index = next_state;
329    ///     }
330    ///     SdoResult::Err(e) => {
331    ///         log::error!("SDO failed: {}", e);
332    ///         sm.set_error(1);
333    ///     }
334    ///     SdoResult::Timeout => {
335    ///         log::error!("SDO timed out");
336    ///         sm.set_error(2);
337    ///     }
338    /// }
339    /// ```
340    pub fn result(
341        &mut self,
342        client: &mut CommandClient,
343        tid: u32,
344        timeout: Duration,
345    ) -> SdoResult {
346        let req = match self.requests.get(&tid) {
347            Some(r) => r,
348            None => return SdoResult::Err("unknown transaction id".into()),
349        };
350
351        // Check for response from CommandClient
352        if let Some(resp) = client.take_response(tid) {
353            self.requests.remove(&tid);
354            if resp.success {
355                return SdoResult::Ok(resp.data);
356            } else {
357                return SdoResult::Err(resp.error_message);
358            }
359        }
360
361        // Check timeout
362        if req.sent_at.elapsed() > timeout {
363            self.requests.remove(&tid);
364            return SdoResult::Timeout;
365        }
366
367        SdoResult::Pending
368    }
369
370    /// Remove all requests that have been pending longer than `timeout`.
371    ///
372    /// Call periodically (e.g. once per second) to prevent the internal map
373    /// from growing unboundedly if callers forget to check results.
374    ///
375    /// # Example
376    ///
377    /// ```ignore
378    /// // At the end of process_tick, clean up anything older than 10s
379    /// self.sdo.drain_stale(ctx.client, Duration::from_secs(10));
380    /// ```
381    pub fn drain_stale(&mut self, client: &mut CommandClient, timeout: Duration) {
382        let stale_tids: Vec<u32> = self
383            .requests
384            .iter()
385            .filter(|(_, req)| req.sent_at.elapsed() > timeout)
386            .map(|(&tid, _)| tid)
387            .collect();
388
389        for tid in stale_tids {
390            self.requests.remove(&tid);
391            // Also consume the response from CommandClient if one arrived late
392            let _ = client.take_response(tid);
393        }
394    }
395
396    /// Number of in-flight SDO requests.
397    pub fn pending_count(&self) -> usize {
398        self.requests.len()
399    }
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use mechutil::ipc::CommandMessage;
406    use tokio::sync::mpsc;
407
408    /// Helper: create a CommandClient backed by test channels, returning
409    /// (client, response_sender, write_receiver).
410    fn test_client() -> (
411        CommandClient,
412        mpsc::UnboundedSender<CommandMessage>,
413        mpsc::UnboundedReceiver<String>,
414    ) {
415        let (write_tx, write_rx) = mpsc::unbounded_channel();
416        let (response_tx, response_rx) = mpsc::unbounded_channel();
417        let client = CommandClient::new(write_tx, response_rx);
418        (client, response_tx, write_rx)
419    }
420
421    #[test]
422    fn write_sends_correct_topic_and_payload() {
423        let (mut client, _resp_tx, mut write_rx) = test_client();
424        let mut sdo = SdoClient::new("ClearPath_0");
425
426        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
427
428        let msg_json = write_rx.try_recv().expect("should have sent a message");
429        let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
430
431        assert_eq!(msg.transaction_id, tid);
432        assert_eq!(msg.topic, "ethercat.ClearPath_0.sdo_write");
433        assert_eq!(msg.data["index"], "0x6060");
434        assert_eq!(msg.data["sub"], 0);
435        assert_eq!(msg.data["value"], 1);
436        assert_eq!(sdo.pending_count(), 1);
437    }
438
439    #[test]
440    fn read_sends_correct_topic_and_payload() {
441        let (mut client, _resp_tx, mut write_rx) = test_client();
442        let mut sdo = SdoClient::new("ClearPath_0");
443
444        let tid = sdo.read(&mut client, 0x6064, 0);
445
446        let msg_json = write_rx.try_recv().expect("should have sent a message");
447        let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
448
449        assert_eq!(msg.transaction_id, tid);
450        assert_eq!(msg.topic, "ethercat.ClearPath_0.sdo_read");
451        assert_eq!(msg.data["index"], "0x6064");
452        assert_eq!(msg.data["sub"], 0);
453        assert!(msg.data.get("value").is_none());
454    }
455
456    #[test]
457    fn result_returns_ok_on_success() {
458        let (mut client, resp_tx, _write_rx) = test_client();
459        let mut sdo = SdoClient::new("ClearPath_0");
460
461        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
462
463        // Simulate successful response
464        resp_tx
465            .send(CommandMessage::response(tid, json!(null)))
466            .unwrap();
467        client.poll();
468
469        match sdo.result(&mut client, tid, Duration::from_secs(3)) {
470            SdoResult::Ok(data) => assert_eq!(data, json!(null)),
471            other => panic!("expected Ok, got {:?}", other),
472        }
473
474        // Consumed — no longer tracked
475        assert_eq!(sdo.pending_count(), 0);
476    }
477
478    #[test]
479    fn result_returns_err_on_failure() {
480        let (mut client, resp_tx, _write_rx) = test_client();
481        let mut sdo = SdoClient::new("ClearPath_0");
482
483        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
484
485        // Simulate error response
486        let mut err_resp = CommandMessage::response(tid, json!(null));
487        err_resp.success = false;
488        err_resp.error_message = "SDO abort: 0x06090011".into();
489        resp_tx.send(err_resp).unwrap();
490        client.poll();
491
492        match sdo.result(&mut client, tid, Duration::from_secs(3)) {
493            SdoResult::Err(msg) => assert_eq!(msg, "SDO abort: 0x06090011"),
494            other => panic!("expected Err, got {:?}", other),
495        }
496    }
497
498    #[test]
499    fn result_returns_pending_while_waiting() {
500        let (mut client, _resp_tx, _write_rx) = test_client();
501        let mut sdo = SdoClient::new("ClearPath_0");
502
503        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
504        client.poll();
505
506        match sdo.result(&mut client, tid, Duration::from_secs(30)) {
507            SdoResult::Pending => {}
508            other => panic!("expected Pending, got {:?}", other),
509        }
510
511        // Still tracked
512        assert_eq!(sdo.pending_count(), 1);
513    }
514
515    #[test]
516    fn result_returns_timeout_when_deadline_exceeded() {
517        let (mut client, _resp_tx, _write_rx) = test_client();
518        let mut sdo = SdoClient::new("ClearPath_0");
519
520        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
521        client.poll();
522
523        // Zero timeout -> immediately expired
524        match sdo.result(&mut client, tid, Duration::ZERO) {
525            SdoResult::Timeout => {}
526            other => panic!("expected Timeout, got {:?}", other),
527        }
528
529        assert_eq!(sdo.pending_count(), 0);
530    }
531
532    #[test]
533    fn drain_stale_removes_old_requests() {
534        let (mut client, _resp_tx, _write_rx) = test_client();
535        let mut sdo = SdoClient::new("ClearPath_0");
536
537        sdo.write(&mut client, 0x6060, 0, json!(1));
538        sdo.read(&mut client, 0x6064, 0);
539        assert_eq!(sdo.pending_count(), 2);
540
541        // Zero timeout -> everything is stale
542        sdo.drain_stale(&mut client, Duration::ZERO);
543        assert_eq!(sdo.pending_count(), 0);
544    }
545
546    #[test]
547    fn multiple_concurrent_requests() {
548        let (mut client, resp_tx, _write_rx) = test_client();
549        let mut sdo = SdoClient::new("ClearPath_0");
550
551        let tid1 = sdo.write(&mut client, 0x6060, 0, json!(1));
552        let tid2 = sdo.read(&mut client, 0x6064, 0);
553        assert_eq!(sdo.pending_count(), 2);
554
555        // Only respond to the read
556        resp_tx
557            .send(CommandMessage::response(tid2, json!(12345)))
558            .unwrap();
559        client.poll();
560
561        // Read resolves, write still pending
562        match sdo.result(&mut client, tid2, Duration::from_secs(3)) {
563            SdoResult::Ok(v) => assert_eq!(v, json!(12345)),
564            other => panic!("expected Ok, got {:?}", other),
565        }
566        match sdo.result(&mut client, tid1, Duration::from_secs(30)) {
567            SdoResult::Pending => {}
568            other => panic!("expected Pending, got {:?}", other),
569        }
570        assert_eq!(sdo.pending_count(), 1);
571    }
572
573    #[test]
574    fn unknown_tid_returns_err() {
575        let (mut client, _resp_tx, _write_rx) = test_client();
576        let mut sdo = SdoClient::new("ClearPath_0");
577
578        match sdo.result(&mut client, 99999, Duration::from_secs(3)) {
579            SdoResult::Err(msg) => assert!(msg.contains("unknown")),
580            other => panic!("expected Err for unknown tid, got {:?}", other),
581        }
582    }
583}