Skip to main content

autocore_std/ethercat/
sdo_client.rs

1//! Non-blocking SDO read/write client for EtherCAT devices.
2//!
3//! [`SdoClient`] wraps [`CommandClient`](crate::CommandClient) to provide an
4//! ergonomic, handle-based interface for runtime SDO (Service Data Object)
5//! operations over CoE (CANopen over EtherCAT). Create one per device, issue
6//! reads/writes from your control loop, and check results by handle on
7//! subsequent ticks.
8//!
9//! # When to use
10//!
11//! Use `SdoClient` for **runtime** SDO access — reading diagnostic registers,
12//! changing operating parameters on the fly, or any CoE transfer that happens
13//! after the cyclic loop is running. For SDOs that must be applied **before**
14//! the cyclic loop starts (e.g. setting `modes_of_operation`), use the
15//! `startup_sdo` array in `project.json` instead.
16//!
17//! # Topic format
18//!
19//! Requests are sent as IPC commands through the existing WebSocket channel.
20//! Topics are scoped to the device name configured in `project.json`:
21//!
22//! | Operation | Topic                              | Payload                                          |
23//! |-----------|------------------------------------|--------------------------------------------------|
24//! | Write     | `ethercat.write_sdo`               | `{"device": "...", "index": "0x6060", "sub": 0, "value": "0x01"}` |
25//! | Read      | `ethercat.read_sdo`                | `{"device": "...", "index": "0x6060", "sub": 0}`                 |
26//!
27//! # Usage with a state machine
28//!
29//! A typical pattern pairs `SdoClient` with [`StateMachine`](crate::fb::StateMachine)
30//! to fire an SDO write in one state, then advance on success:
31//!
32//! ```ignore
33//! use autocore_std::{ControlProgram, TickContext};
34//! use autocore_std::ethercat::{SdoClient, SdoResult};
35//! use autocore_std::fb::StateMachine;
36//! use serde_json::json;
37//! use std::time::Duration;
38//!
39//! pub struct MyProgram {
40//!     sm: StateMachine,
41//!     sdo: SdoClient,
42//!     write_tid: Option<u32>,
43//! }
44//!
45//! impl MyProgram {
46//!     pub fn new() -> Self {
47//!         Self {
48//!             sm: StateMachine::new(),
49//!             sdo: SdoClient::new("ClearPath_0"),
50//!             write_tid: None,
51//!         }
52//!     }
53//! }
54//!
55//! impl ControlProgram for MyProgram {
56//!     type Memory = GlobalMemory;
57//!
58//!     fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
59//!         self.sm.call();
60//!         match self.sm.index {
61//!             // State 10: Send SDO write for modes_of_operation = PP
62//!             10 => {
63//!                 self.write_tid = Some(
64//!                     self.sdo.write(ctx.client, 0x6060, 0, json!(1))
65//!                 );
66//!                 self.sm.timeout_preset = Duration::from_secs(3);
67//!                 self.sm.index = 20;
68//!             }
69//!             // State 20: Wait for response
70//!             20 => {
71//!                 let tid = self.write_tid.unwrap();
72//!                 match self.sdo.result(ctx.client, tid, Duration::from_secs(3)) {
73//!                     SdoResult::Pending => { /* keep waiting */ }
74//!                     SdoResult::Ok(_) => {
75//!                         log::info!("modes_of_operation set to PP");
76//!                         self.sm.index = 30;
77//!                     }
78//!                     SdoResult::Err(e) => {
79//!                         log::error!("SDO write failed: {}", e);
80//!                         self.sm.set_error(1);
81//!                     }
82//!                     SdoResult::Timeout => {
83//!                         log::error!("SDO write timed out");
84//!                         self.sm.set_error(2);
85//!                     }
86//!                 }
87//!             }
88//!             // State 30: Done — continue with normal operation
89//!             30 => { /* ... */ }
90//!             _ => {}
91//!         }
92//!     }
93//! }
94//! ```
95//!
96//! # Reading an SDO
97//!
98//! ```ignore
99//! // Fire a read request
100//! let tid = sdo.read(ctx.client, 0x6064, 0); // Position Actual Value
101//!
102//! // On a later tick, check the result
103//! match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
104//!     SdoResult::Ok(data) => {
105//!         let position: i32 = serde_json::from_value(data).unwrap();
106//!         log::info!("Current position: {}", position);
107//!     }
108//!     SdoResult::Pending => { /* still waiting */ }
109//!     SdoResult::Err(e) => { log::error!("SDO read failed: {}", e); }
110//!     SdoResult::Timeout => { log::error!("SDO read timed out"); }
111//! }
112//! ```
113
114use std::collections::HashMap;
115use std::time::{Duration, Instant};
116
117use serde_json::{json, Value};
118
119use crate::command_client::CommandClient;
120
121/// Metadata for an in-flight SDO request.
122pub struct SdoRequest {
123    /// CoE object dictionary index (e.g. 0x6060).
124    pub index: u16,
125    /// CoE sub-index.
126    pub sub_index: u8,
127    /// Whether this is a read or write.
128    pub kind: SdoRequestKind,
129    /// When the request was sent (for timeout detection).
130    pub sent_at: Instant,
131}
132
133/// Discriminates SDO reads from writes.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SdoRequestKind {
136    /// SDO read (CoE upload).
137    Read,
138    /// SDO write (CoE download).
139    Write,
140}
141
142/// Result of checking an in-flight SDO request.
143///
144/// Returned by [`SdoClient::result()`]. The caller should match on this each
145/// tick until the request resolves (i.e. is no longer [`Pending`](SdoResult::Pending)).
146///
147/// ```ignore
148/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
149///     SdoResult::Pending => { /* check again next tick */ }
150///     SdoResult::Ok(data) => { /* use data */ }
151///     SdoResult::Err(msg) => { log::error!("{}", msg); }
152///     SdoResult::Timeout => { log::error!("timed out"); }
153/// }
154/// ```
155#[derive(Debug, Clone)]
156pub enum SdoResult {
157    /// No response yet; check again next tick.
158    Pending,
159    /// Operation succeeded. Contains the response `data` field — the read
160    /// value for reads, or an empty/null value for writes.
161    Ok(Value),
162    /// The server (or EtherCAT master) reported an error. The string contains
163    /// the `error_message` from the response (e.g. `"SDO abort: 0x06090011"`).
164    Err(String),
165    /// No response arrived within the caller-specified deadline.
166    Timeout,
167}
168
169/// Non-blocking SDO client scoped to a single EtherCAT device.
170///
171/// Create one `SdoClient` per device in your control program struct. It holds
172/// a map of outstanding requests keyed by `transaction_id` (returned by
173/// [`CommandClient::send`]). Keep the returned handle and poll
174/// [`result()`](Self::result) each tick until it resolves.
175///
176/// # Example
177///
178/// ```ignore
179/// use autocore_std::ethercat::{SdoClient, SdoResult};
180/// use serde_json::json;
181/// use std::time::Duration;
182///
183/// let mut sdo = SdoClient::new("ClearPath_0");
184///
185/// // Issue an SDO write (from process_tick):
186/// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
187///
188/// // Check result on subsequent ticks:
189/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
190///     SdoResult::Pending => {}
191///     SdoResult::Ok(_) => { /* success */ }
192///     SdoResult::Err(e) => { log::error!("SDO error: {}", e); }
193///     SdoResult::Timeout => { log::error!("SDO timed out"); }
194/// }
195/// ```
196pub struct SdoClient {
197    device: String,
198    requests: HashMap<u32, SdoRequest>,
199}
200
201impl SdoClient {
202    /// Create a new client for the given device name.
203    ///
204    /// The `device` string must match the `name` field in the slave's
205    /// `project.json` configuration (e.g. `"ClearPath_0"`).
206    ///
207    /// ```ignore
208    /// let sdo = SdoClient::new("ClearPath_0");
209    /// ```
210    pub fn new(device: &str) -> Self {
211        Self {
212            device: device.to_string(),
213            requests: HashMap::new(),
214        }
215    }
216
217    /// Issue an SDO write (CoE download).
218    ///
219    /// Sends a command to topic `ethercat.{device}.sdo_write` with payload:
220    /// ```json
221    /// {"index": "0x6060", "sub": 0, "value": 1}
222    /// ```
223    ///
224    /// Returns a transaction handle for use with [`result()`](Self::result).
225    ///
226    /// # Arguments
227    ///
228    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
229    /// * `index` — CoE object dictionary index (e.g. `0x6060`)
230    /// * `sub_index` — CoE sub-index (usually `0`)
231    /// * `value` — the value to write, as a [`serde_json::Value`]
232    ///
233    /// # Example
234    ///
235    /// ```ignore
236    /// // Set modes_of_operation to Profile Position (1)
237    /// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
238    /// ```
239    pub fn write(
240        &mut self,
241        client: &mut CommandClient,
242        index: u16,
243        sub_index: u8,
244        value: Value,
245    ) -> u32 {
246        let topic = "ethercat.write_sdo".to_string();
247        let payload = json!({
248            "device": self.device,
249            "index": format!("0x{:04X}", index),
250            "sub": sub_index,
251            "value": value,
252        });
253        let tid = client.send(&topic, payload);
254
255        self.requests.insert(tid, SdoRequest {
256            index,
257            sub_index,
258            kind: SdoRequestKind::Write,
259            sent_at: Instant::now(),
260        });
261
262        tid
263    }
264
265    /// Issue an SDO read (CoE upload).
266    ///
267    /// Sends a command to topic `ethercat.{device}.sdo_read` with payload:
268    /// ```json
269    /// {"index": "0x6064", "sub": 0}
270    /// ```
271    ///
272    /// Returns a transaction handle for use with [`result()`](Self::result).
273    ///
274    /// # Arguments
275    ///
276    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
277    /// * `index` — CoE object dictionary index (e.g. `0x6064`)
278    /// * `sub_index` — CoE sub-index (usually `0`)
279    ///
280    /// # Example
281    ///
282    /// ```ignore
283    /// // Read the actual position value
284    /// let tid = sdo.read(ctx.client, 0x6064, 0);
285    /// ```
286    pub fn read(
287        &mut self,
288        client: &mut CommandClient,
289        index: u16,
290        sub_index: u8,
291    ) -> u32 {
292        let topic = "ethercat.read_sdo".to_string();
293        let payload = json!({
294            "device": self.device,
295            "index": format!("0x{:04X}", index),
296            "sub": sub_index,
297        });
298        let tid = client.send(&topic, payload);
299
300        self.requests.insert(tid, SdoRequest {
301            index,
302            sub_index,
303            kind: SdoRequestKind::Read,
304            sent_at: Instant::now(),
305        });
306
307        tid
308    }
309
310    /// Check the result of a previous SDO request.
311    ///
312    /// Call this each tick with the handle returned by [`write()`](Self::write)
313    /// or [`read()`](Self::read). The result is consumed (removed from the
314    /// internal map) once it resolves to [`Ok`](SdoResult::Ok),
315    /// [`Err`](SdoResult::Err), or [`Timeout`](SdoResult::Timeout).
316    ///
317    /// # Arguments
318    ///
319    /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
320    /// * `tid` — transaction handle returned by `write()` or `read()`
321    /// * `timeout` — maximum time to wait before returning [`SdoResult::Timeout`]
322    ///
323    /// # Example
324    ///
325    /// ```ignore
326    /// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
327    ///     SdoResult::Pending => { /* keep waiting */ }
328    ///     SdoResult::Ok(data) => {
329    ///         log::info!("SDO response: {}", data);
330    ///         sm.index = next_state;
331    ///     }
332    ///     SdoResult::Err(e) => {
333    ///         log::error!("SDO failed: {}", e);
334    ///         sm.set_error(1);
335    ///     }
336    ///     SdoResult::Timeout => {
337    ///         log::error!("SDO timed out");
338    ///         sm.set_error(2);
339    ///     }
340    /// }
341    /// ```
342    pub fn result(
343        &mut self,
344        client: &mut CommandClient,
345        tid: u32,
346        timeout: Duration,
347    ) -> SdoResult {
348        let req = match self.requests.get(&tid) {
349            Some(r) => r,
350            None => return SdoResult::Err("unknown transaction id".into()),
351        };
352
353        // Check for response from CommandClient
354        if let Some(resp) = client.take_response(tid) {
355            self.requests.remove(&tid);
356            if resp.success {
357                return SdoResult::Ok(resp.data);
358            } else {
359                return SdoResult::Err(resp.error_message);
360            }
361        }
362
363        // Check timeout
364        if req.sent_at.elapsed() > timeout {
365            self.requests.remove(&tid);
366            return SdoResult::Timeout;
367        }
368
369        SdoResult::Pending
370    }
371
372    /// Remove all requests that have been pending longer than `timeout`.
373    ///
374    /// Call periodically (e.g. once per second) to prevent the internal map
375    /// from growing unboundedly if callers forget to check results.
376    ///
377    /// # Example
378    ///
379    /// ```ignore
380    /// // At the end of process_tick, clean up anything older than 10s
381    /// self.sdo.drain_stale(ctx.client, Duration::from_secs(10));
382    /// ```
383    pub fn drain_stale(&mut self, client: &mut CommandClient, timeout: Duration) {
384        let stale_tids: Vec<u32> = self
385            .requests
386            .iter()
387            .filter(|(_, req)| req.sent_at.elapsed() > timeout)
388            .map(|(&tid, _)| tid)
389            .collect();
390
391        for tid in stale_tids {
392            self.requests.remove(&tid);
393            // Also consume the response from CommandClient if one arrived late
394            let _ = client.take_response(tid);
395        }
396    }
397
398    /// Number of in-flight SDO requests.
399    pub fn pending_count(&self) -> usize {
400        self.requests.len()
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use mechutil::ipc::CommandMessage;
408    use tokio::sync::mpsc;
409
410    /// Helper: create a CommandClient backed by test channels, returning
411    /// (client, response_sender, write_receiver).
412    fn test_client() -> (
413        CommandClient,
414        mpsc::UnboundedSender<CommandMessage>,
415        mpsc::UnboundedReceiver<String>,
416    ) {
417        let (write_tx, write_rx) = mpsc::unbounded_channel();
418        let (response_tx, response_rx) = mpsc::unbounded_channel();
419        let client = CommandClient::new(write_tx, response_rx);
420        (client, response_tx, write_rx)
421    }
422
423    #[test]
424    fn write_sends_correct_topic_and_payload() {
425        let (mut client, _resp_tx, mut write_rx) = test_client();
426        let mut sdo = SdoClient::new("ClearPath_0");
427
428        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
429
430        let msg_json = write_rx.try_recv().expect("should have sent a message");
431        let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
432
433        assert_eq!(msg.transaction_id, tid);
434        assert_eq!(msg.topic, "ethercat.write_sdo");
435        assert_eq!(msg.data["device"], "ClearPath_0");
436        assert_eq!(msg.data["index"], "0x6060");
437        assert_eq!(msg.data["sub"], 0);
438        assert_eq!(msg.data["value"], 1);
439        assert_eq!(sdo.pending_count(), 1);
440    }
441
442    #[test]
443    fn read_sends_correct_topic_and_payload() {
444        let (mut client, _resp_tx, mut write_rx) = test_client();
445        let mut sdo = SdoClient::new("ClearPath_0");
446
447        let tid = sdo.read(&mut client, 0x6064, 0);
448
449        let msg_json = write_rx.try_recv().expect("should have sent a message");
450        let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
451
452        assert_eq!(msg.transaction_id, tid);
453        assert_eq!(msg.topic, "ethercat.read_sdo");
454        assert_eq!(msg.data["device"], "ClearPath_0");
455        assert_eq!(msg.data["index"], "0x6064");
456        assert_eq!(msg.data["sub"], 0);
457        assert!(msg.data.get("value").is_none());
458    }
459
460    #[test]
461    fn result_returns_ok_on_success() {
462        let (mut client, resp_tx, _write_rx) = test_client();
463        let mut sdo = SdoClient::new("ClearPath_0");
464
465        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
466
467        // Simulate successful response
468        resp_tx
469            .send(CommandMessage::response(tid, json!(null)))
470            .unwrap();
471        client.poll();
472
473        match sdo.result(&mut client, tid, Duration::from_secs(3)) {
474            SdoResult::Ok(data) => assert_eq!(data, json!(null)),
475            other => panic!("expected Ok, got {:?}", other),
476        }
477
478        // Consumed — no longer tracked
479        assert_eq!(sdo.pending_count(), 0);
480    }
481
482    #[test]
483    fn result_returns_err_on_failure() {
484        let (mut client, resp_tx, _write_rx) = test_client();
485        let mut sdo = SdoClient::new("ClearPath_0");
486
487        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
488
489        // Simulate error response
490        let mut err_resp = CommandMessage::response(tid, json!(null));
491        err_resp.success = false;
492        err_resp.error_message = "SDO abort: 0x06090011".into();
493        resp_tx.send(err_resp).unwrap();
494        client.poll();
495
496        match sdo.result(&mut client, tid, Duration::from_secs(3)) {
497            SdoResult::Err(msg) => assert_eq!(msg, "SDO abort: 0x06090011"),
498            other => panic!("expected Err, got {:?}", other),
499        }
500    }
501
502    #[test]
503    fn result_returns_pending_while_waiting() {
504        let (mut client, _resp_tx, _write_rx) = test_client();
505        let mut sdo = SdoClient::new("ClearPath_0");
506
507        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
508        client.poll();
509
510        match sdo.result(&mut client, tid, Duration::from_secs(30)) {
511            SdoResult::Pending => {}
512            other => panic!("expected Pending, got {:?}", other),
513        }
514
515        // Still tracked
516        assert_eq!(sdo.pending_count(), 1);
517    }
518
519    #[test]
520    fn result_returns_timeout_when_deadline_exceeded() {
521        let (mut client, _resp_tx, _write_rx) = test_client();
522        let mut sdo = SdoClient::new("ClearPath_0");
523
524        let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
525        client.poll();
526
527        // Zero timeout -> immediately expired
528        match sdo.result(&mut client, tid, Duration::ZERO) {
529            SdoResult::Timeout => {}
530            other => panic!("expected Timeout, got {:?}", other),
531        }
532
533        assert_eq!(sdo.pending_count(), 0);
534    }
535
536    #[test]
537    fn drain_stale_removes_old_requests() {
538        let (mut client, _resp_tx, _write_rx) = test_client();
539        let mut sdo = SdoClient::new("ClearPath_0");
540
541        sdo.write(&mut client, 0x6060, 0, json!(1));
542        sdo.read(&mut client, 0x6064, 0);
543        assert_eq!(sdo.pending_count(), 2);
544
545        // Zero timeout -> everything is stale
546        sdo.drain_stale(&mut client, Duration::ZERO);
547        assert_eq!(sdo.pending_count(), 0);
548    }
549
550    #[test]
551    fn multiple_concurrent_requests() {
552        let (mut client, resp_tx, _write_rx) = test_client();
553        let mut sdo = SdoClient::new("ClearPath_0");
554
555        let tid1 = sdo.write(&mut client, 0x6060, 0, json!(1));
556        let tid2 = sdo.read(&mut client, 0x6064, 0);
557        assert_eq!(sdo.pending_count(), 2);
558
559        // Only respond to the read
560        resp_tx
561            .send(CommandMessage::response(tid2, json!(12345)))
562            .unwrap();
563        client.poll();
564
565        // Read resolves, write still pending
566        match sdo.result(&mut client, tid2, Duration::from_secs(3)) {
567            SdoResult::Ok(v) => assert_eq!(v, json!(12345)),
568            other => panic!("expected Ok, got {:?}", other),
569        }
570        match sdo.result(&mut client, tid1, Duration::from_secs(30)) {
571            SdoResult::Pending => {}
572            other => panic!("expected Pending, got {:?}", other),
573        }
574        assert_eq!(sdo.pending_count(), 1);
575    }
576
577    #[test]
578    fn unknown_tid_returns_err() {
579        let (mut client, _resp_tx, _write_rx) = test_client();
580        let mut sdo = SdoClient::new("ClearPath_0");
581
582        match sdo.result(&mut client, 99999, Duration::from_secs(3)) {
583            SdoResult::Err(msg) => assert!(msg.contains("unknown")),
584            other => panic!("expected Err for unknown tid, got {:?}", other),
585        }
586    }
587}