autocore_std/ethercat/sdo_client.rs
1//! Non-blocking SDO read/write client for EtherCAT devices.
2//!
3//! [`SdoClient`] wraps [`CommandClient`](crate::CommandClient) to provide an
4//! ergonomic, handle-based interface for runtime SDO (Service Data Object)
5//! operations over CoE (CANopen over EtherCAT). Create one per device, issue
6//! reads/writes from your control loop, and check results by handle on
7//! subsequent ticks.
8//!
9//! # When to use
10//!
11//! Use `SdoClient` for **runtime** SDO access — reading diagnostic registers,
12//! changing operating parameters on the fly, or any CoE transfer that happens
13//! after the cyclic loop is running. For SDOs that must be applied **before**
14//! the cyclic loop starts (e.g. setting `modes_of_operation`), use the
15//! `startup_sdo` array in `project.json` instead.
16//!
17//! # Topic format
18//!
19//! Requests are sent as IPC commands through the existing WebSocket channel.
20//! Topics are scoped to the device name configured in `project.json`:
21//!
22//! | Operation | Topic | Payload |
23//! |-----------|------------------------------------|--------------------------------------------------|
24//! | Write | `ethercat.{device}.sdo_write` | `{"index": "0x6060", "sub": 0, "value": "0x01"}` |
25//! | Read | `ethercat.{device}.sdo_read` | `{"index": "0x6060", "sub": 0}` |
26//!
27//! # Usage with a state machine
28//!
29//! A typical pattern pairs `SdoClient` with [`StateMachine`](crate::fb::StateMachine)
30//! to fire an SDO write in one state, then advance on success:
31//!
32//! ```ignore
33//! use autocore_std::{ControlProgram, TickContext};
34//! use autocore_std::ethercat::{SdoClient, SdoResult};
35//! use autocore_std::fb::StateMachine;
36//! use serde_json::json;
37//! use std::time::Duration;
38//!
39//! pub struct MyProgram {
40//! sm: StateMachine,
41//! sdo: SdoClient,
42//! write_tid: Option<u32>,
43//! }
44//!
45//! impl MyProgram {
46//! pub fn new() -> Self {
47//! Self {
48//! sm: StateMachine::new(),
49//! sdo: SdoClient::new("ClearPath_0"),
50//! write_tid: None,
51//! }
52//! }
53//! }
54//!
55//! impl ControlProgram for MyProgram {
56//! type Memory = GlobalMemory;
57//!
58//! fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
59//! self.sm.call();
60//! match self.sm.index {
61//! // State 10: Send SDO write for modes_of_operation = PP
62//! 10 => {
63//! self.write_tid = Some(
64//! self.sdo.write(ctx.client, 0x6060, 0, json!(1))
65//! );
66//! self.sm.timeout_preset = Duration::from_secs(3);
67//! self.sm.index = 20;
68//! }
69//! // State 20: Wait for response
70//! 20 => {
71//! let tid = self.write_tid.unwrap();
72//! match self.sdo.result(ctx.client, tid, Duration::from_secs(3)) {
73//! SdoResult::Pending => { /* keep waiting */ }
74//! SdoResult::Ok(_) => {
75//! log::info!("modes_of_operation set to PP");
76//! self.sm.index = 30;
77//! }
78//! SdoResult::Err(e) => {
79//! log::error!("SDO write failed: {}", e);
80//! self.sm.set_error(1);
81//! }
82//! SdoResult::Timeout => {
83//! log::error!("SDO write timed out");
84//! self.sm.set_error(2);
85//! }
86//! }
87//! }
88//! // State 30: Done — continue with normal operation
89//! 30 => { /* ... */ }
90//! _ => {}
91//! }
92//! }
93//! }
94//! ```
95//!
96//! # Reading an SDO
97//!
98//! ```ignore
99//! // Fire a read request
100//! let tid = sdo.read(ctx.client, 0x6064, 0); // Position Actual Value
101//!
102//! // On a later tick, check the result
103//! match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
104//! SdoResult::Ok(data) => {
105//! let position: i32 = serde_json::from_value(data).unwrap();
106//! log::info!("Current position: {}", position);
107//! }
108//! SdoResult::Pending => { /* still waiting */ }
109//! SdoResult::Err(e) => { log::error!("SDO read failed: {}", e); }
110//! SdoResult::Timeout => { log::error!("SDO read timed out"); }
111//! }
112//! ```
113
114use std::collections::HashMap;
115use std::time::{Duration, Instant};
116
117use serde_json::{json, Value};
118
119use crate::command_client::CommandClient;
120
121/// Metadata for an in-flight SDO request.
122pub struct SdoRequest {
123 /// CoE object dictionary index (e.g. 0x6060).
124 pub index: u16,
125 /// CoE sub-index.
126 pub sub_index: u8,
127 /// Whether this is a read or write.
128 pub kind: SdoRequestKind,
129 /// When the request was sent (for timeout detection).
130 pub sent_at: Instant,
131}
132
133/// Discriminates SDO reads from writes.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SdoRequestKind {
136 /// SDO read (CoE upload).
137 Read,
138 /// SDO write (CoE download).
139 Write,
140}
141
142/// Result of checking an in-flight SDO request.
143///
144/// Returned by [`SdoClient::result()`]. The caller should match on this each
145/// tick until the request resolves (i.e. is no longer [`Pending`](SdoResult::Pending)).
146///
147/// ```ignore
148/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
149/// SdoResult::Pending => { /* check again next tick */ }
150/// SdoResult::Ok(data) => { /* use data */ }
151/// SdoResult::Err(msg) => { log::error!("{}", msg); }
152/// SdoResult::Timeout => { log::error!("timed out"); }
153/// }
154/// ```
155#[derive(Debug, Clone)]
156pub enum SdoResult {
157 /// No response yet; check again next tick.
158 Pending,
159 /// Operation succeeded. Contains the response `data` field — the read
160 /// value for reads, or an empty/null value for writes.
161 Ok(Value),
162 /// The server (or EtherCAT master) reported an error. The string contains
163 /// the `error_message` from the response (e.g. `"SDO abort: 0x06090011"`).
164 Err(String),
165 /// No response arrived within the caller-specified deadline.
166 Timeout,
167}
168
169/// Non-blocking SDO client scoped to a single EtherCAT device.
170///
171/// Create one `SdoClient` per device in your control program struct. It holds
172/// a map of outstanding requests keyed by `transaction_id` (returned by
173/// [`CommandClient::send`]). Keep the returned handle and poll
174/// [`result()`](Self::result) each tick until it resolves.
175///
176/// # Example
177///
178/// ```ignore
179/// use autocore_std::ethercat::{SdoClient, SdoResult};
180/// use serde_json::json;
181/// use std::time::Duration;
182///
183/// let mut sdo = SdoClient::new("ClearPath_0");
184///
185/// // Issue an SDO write (from process_tick):
186/// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
187///
188/// // Check result on subsequent ticks:
189/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
190/// SdoResult::Pending => {}
191/// SdoResult::Ok(_) => { /* success */ }
192/// SdoResult::Err(e) => { log::error!("SDO error: {}", e); }
193/// SdoResult::Timeout => { log::error!("SDO timed out"); }
194/// }
195/// ```
196pub struct SdoClient {
197 device: String,
198 requests: HashMap<u32, SdoRequest>,
199}
200
201impl SdoClient {
202 /// Create a new client for the given device name.
203 ///
204 /// The `device` string must match the `name` field in the slave's
205 /// `project.json` configuration (e.g. `"ClearPath_0"`).
206 ///
207 /// ```ignore
208 /// let sdo = SdoClient::new("ClearPath_0");
209 /// ```
210 pub fn new(device: &str) -> Self {
211 Self {
212 device: device.to_string(),
213 requests: HashMap::new(),
214 }
215 }
216
217 /// Issue an SDO write (CoE download).
218 ///
219 /// Sends a command to topic `ethercat.{device}.sdo_write` with payload:
220 /// ```json
221 /// {"index": "0x6060", "sub": 0, "value": 1}
222 /// ```
223 ///
224 /// Returns a transaction handle for use with [`result()`](Self::result).
225 ///
226 /// # Arguments
227 ///
228 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
229 /// * `index` — CoE object dictionary index (e.g. `0x6060`)
230 /// * `sub_index` — CoE sub-index (usually `0`)
231 /// * `value` — the value to write, as a [`serde_json::Value`]
232 ///
233 /// # Example
234 ///
235 /// ```ignore
236 /// // Set modes_of_operation to Profile Position (1)
237 /// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
238 /// ```
239 pub fn write(
240 &mut self,
241 client: &mut CommandClient,
242 index: u16,
243 sub_index: u8,
244 value: Value,
245 ) -> u32 {
246 let topic = format!("ethercat.{}.sdo_write", self.device);
247 let payload = json!({
248 "index": format!("0x{:04X}", index),
249 "sub": sub_index,
250 "value": value,
251 });
252 let tid = client.send(&topic, payload);
253
254 self.requests.insert(tid, SdoRequest {
255 index,
256 sub_index,
257 kind: SdoRequestKind::Write,
258 sent_at: Instant::now(),
259 });
260
261 tid
262 }
263
264 /// Issue an SDO read (CoE upload).
265 ///
266 /// Sends a command to topic `ethercat.{device}.sdo_read` with payload:
267 /// ```json
268 /// {"index": "0x6064", "sub": 0}
269 /// ```
270 ///
271 /// Returns a transaction handle for use with [`result()`](Self::result).
272 ///
273 /// # Arguments
274 ///
275 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
276 /// * `index` — CoE object dictionary index (e.g. `0x6064`)
277 /// * `sub_index` — CoE sub-index (usually `0`)
278 ///
279 /// # Example
280 ///
281 /// ```ignore
282 /// // Read the actual position value
283 /// let tid = sdo.read(ctx.client, 0x6064, 0);
284 /// ```
285 pub fn read(
286 &mut self,
287 client: &mut CommandClient,
288 index: u16,
289 sub_index: u8,
290 ) -> u32 {
291 let topic = format!("ethercat.{}.sdo_read", self.device);
292 let payload = json!({
293 "index": format!("0x{:04X}", index),
294 "sub": sub_index,
295 });
296 let tid = client.send(&topic, payload);
297
298 self.requests.insert(tid, SdoRequest {
299 index,
300 sub_index,
301 kind: SdoRequestKind::Read,
302 sent_at: Instant::now(),
303 });
304
305 tid
306 }
307
308 /// Check the result of a previous SDO request.
309 ///
310 /// Call this each tick with the handle returned by [`write()`](Self::write)
311 /// or [`read()`](Self::read). The result is consumed (removed from the
312 /// internal map) once it resolves to [`Ok`](SdoResult::Ok),
313 /// [`Err`](SdoResult::Err), or [`Timeout`](SdoResult::Timeout).
314 ///
315 /// # Arguments
316 ///
317 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
318 /// * `tid` — transaction handle returned by `write()` or `read()`
319 /// * `timeout` — maximum time to wait before returning [`SdoResult::Timeout`]
320 ///
321 /// # Example
322 ///
323 /// ```ignore
324 /// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
325 /// SdoResult::Pending => { /* keep waiting */ }
326 /// SdoResult::Ok(data) => {
327 /// log::info!("SDO response: {}", data);
328 /// sm.index = next_state;
329 /// }
330 /// SdoResult::Err(e) => {
331 /// log::error!("SDO failed: {}", e);
332 /// sm.set_error(1);
333 /// }
334 /// SdoResult::Timeout => {
335 /// log::error!("SDO timed out");
336 /// sm.set_error(2);
337 /// }
338 /// }
339 /// ```
340 pub fn result(
341 &mut self,
342 client: &mut CommandClient,
343 tid: u32,
344 timeout: Duration,
345 ) -> SdoResult {
346 let req = match self.requests.get(&tid) {
347 Some(r) => r,
348 None => return SdoResult::Err("unknown transaction id".into()),
349 };
350
351 // Check for response from CommandClient
352 if let Some(resp) = client.take_response(tid) {
353 self.requests.remove(&tid);
354 if resp.success {
355 return SdoResult::Ok(resp.data);
356 } else {
357 return SdoResult::Err(resp.error_message);
358 }
359 }
360
361 // Check timeout
362 if req.sent_at.elapsed() > timeout {
363 self.requests.remove(&tid);
364 return SdoResult::Timeout;
365 }
366
367 SdoResult::Pending
368 }
369
370 /// Remove all requests that have been pending longer than `timeout`.
371 ///
372 /// Call periodically (e.g. once per second) to prevent the internal map
373 /// from growing unboundedly if callers forget to check results.
374 ///
375 /// # Example
376 ///
377 /// ```ignore
378 /// // At the end of process_tick, clean up anything older than 10s
379 /// self.sdo.drain_stale(ctx.client, Duration::from_secs(10));
380 /// ```
381 pub fn drain_stale(&mut self, client: &mut CommandClient, timeout: Duration) {
382 let stale_tids: Vec<u32> = self
383 .requests
384 .iter()
385 .filter(|(_, req)| req.sent_at.elapsed() > timeout)
386 .map(|(&tid, _)| tid)
387 .collect();
388
389 for tid in stale_tids {
390 self.requests.remove(&tid);
391 // Also consume the response from CommandClient if one arrived late
392 let _ = client.take_response(tid);
393 }
394 }
395
396 /// Number of in-flight SDO requests.
397 pub fn pending_count(&self) -> usize {
398 self.requests.len()
399 }
400}
401
402#[cfg(test)]
403mod tests {
404 use super::*;
405 use mechutil::ipc::CommandMessage;
406 use tokio::sync::mpsc;
407
408 /// Helper: create a CommandClient backed by test channels, returning
409 /// (client, response_sender, write_receiver).
410 fn test_client() -> (
411 CommandClient,
412 mpsc::UnboundedSender<CommandMessage>,
413 mpsc::UnboundedReceiver<String>,
414 ) {
415 let (write_tx, write_rx) = mpsc::unbounded_channel();
416 let (response_tx, response_rx) = mpsc::unbounded_channel();
417 let client = CommandClient::new(write_tx, response_rx);
418 (client, response_tx, write_rx)
419 }
420
421 #[test]
422 fn write_sends_correct_topic_and_payload() {
423 let (mut client, _resp_tx, mut write_rx) = test_client();
424 let mut sdo = SdoClient::new("ClearPath_0");
425
426 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
427
428 let msg_json = write_rx.try_recv().expect("should have sent a message");
429 let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
430
431 assert_eq!(msg.transaction_id, tid);
432 assert_eq!(msg.topic, "ethercat.ClearPath_0.sdo_write");
433 assert_eq!(msg.data["index"], "0x6060");
434 assert_eq!(msg.data["sub"], 0);
435 assert_eq!(msg.data["value"], 1);
436 assert_eq!(sdo.pending_count(), 1);
437 }
438
439 #[test]
440 fn read_sends_correct_topic_and_payload() {
441 let (mut client, _resp_tx, mut write_rx) = test_client();
442 let mut sdo = SdoClient::new("ClearPath_0");
443
444 let tid = sdo.read(&mut client, 0x6064, 0);
445
446 let msg_json = write_rx.try_recv().expect("should have sent a message");
447 let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
448
449 assert_eq!(msg.transaction_id, tid);
450 assert_eq!(msg.topic, "ethercat.ClearPath_0.sdo_read");
451 assert_eq!(msg.data["index"], "0x6064");
452 assert_eq!(msg.data["sub"], 0);
453 assert!(msg.data.get("value").is_none());
454 }
455
456 #[test]
457 fn result_returns_ok_on_success() {
458 let (mut client, resp_tx, _write_rx) = test_client();
459 let mut sdo = SdoClient::new("ClearPath_0");
460
461 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
462
463 // Simulate successful response
464 resp_tx
465 .send(CommandMessage::response(tid, json!(null)))
466 .unwrap();
467 client.poll();
468
469 match sdo.result(&mut client, tid, Duration::from_secs(3)) {
470 SdoResult::Ok(data) => assert_eq!(data, json!(null)),
471 other => panic!("expected Ok, got {:?}", other),
472 }
473
474 // Consumed — no longer tracked
475 assert_eq!(sdo.pending_count(), 0);
476 }
477
478 #[test]
479 fn result_returns_err_on_failure() {
480 let (mut client, resp_tx, _write_rx) = test_client();
481 let mut sdo = SdoClient::new("ClearPath_0");
482
483 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
484
485 // Simulate error response
486 let mut err_resp = CommandMessage::response(tid, json!(null));
487 err_resp.success = false;
488 err_resp.error_message = "SDO abort: 0x06090011".into();
489 resp_tx.send(err_resp).unwrap();
490 client.poll();
491
492 match sdo.result(&mut client, tid, Duration::from_secs(3)) {
493 SdoResult::Err(msg) => assert_eq!(msg, "SDO abort: 0x06090011"),
494 other => panic!("expected Err, got {:?}", other),
495 }
496 }
497
498 #[test]
499 fn result_returns_pending_while_waiting() {
500 let (mut client, _resp_tx, _write_rx) = test_client();
501 let mut sdo = SdoClient::new("ClearPath_0");
502
503 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
504 client.poll();
505
506 match sdo.result(&mut client, tid, Duration::from_secs(30)) {
507 SdoResult::Pending => {}
508 other => panic!("expected Pending, got {:?}", other),
509 }
510
511 // Still tracked
512 assert_eq!(sdo.pending_count(), 1);
513 }
514
515 #[test]
516 fn result_returns_timeout_when_deadline_exceeded() {
517 let (mut client, _resp_tx, _write_rx) = test_client();
518 let mut sdo = SdoClient::new("ClearPath_0");
519
520 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
521 client.poll();
522
523 // Zero timeout -> immediately expired
524 match sdo.result(&mut client, tid, Duration::ZERO) {
525 SdoResult::Timeout => {}
526 other => panic!("expected Timeout, got {:?}", other),
527 }
528
529 assert_eq!(sdo.pending_count(), 0);
530 }
531
532 #[test]
533 fn drain_stale_removes_old_requests() {
534 let (mut client, _resp_tx, _write_rx) = test_client();
535 let mut sdo = SdoClient::new("ClearPath_0");
536
537 sdo.write(&mut client, 0x6060, 0, json!(1));
538 sdo.read(&mut client, 0x6064, 0);
539 assert_eq!(sdo.pending_count(), 2);
540
541 // Zero timeout -> everything is stale
542 sdo.drain_stale(&mut client, Duration::ZERO);
543 assert_eq!(sdo.pending_count(), 0);
544 }
545
546 #[test]
547 fn multiple_concurrent_requests() {
548 let (mut client, resp_tx, _write_rx) = test_client();
549 let mut sdo = SdoClient::new("ClearPath_0");
550
551 let tid1 = sdo.write(&mut client, 0x6060, 0, json!(1));
552 let tid2 = sdo.read(&mut client, 0x6064, 0);
553 assert_eq!(sdo.pending_count(), 2);
554
555 // Only respond to the read
556 resp_tx
557 .send(CommandMessage::response(tid2, json!(12345)))
558 .unwrap();
559 client.poll();
560
561 // Read resolves, write still pending
562 match sdo.result(&mut client, tid2, Duration::from_secs(3)) {
563 SdoResult::Ok(v) => assert_eq!(v, json!(12345)),
564 other => panic!("expected Ok, got {:?}", other),
565 }
566 match sdo.result(&mut client, tid1, Duration::from_secs(30)) {
567 SdoResult::Pending => {}
568 other => panic!("expected Pending, got {:?}", other),
569 }
570 assert_eq!(sdo.pending_count(), 1);
571 }
572
573 #[test]
574 fn unknown_tid_returns_err() {
575 let (mut client, _resp_tx, _write_rx) = test_client();
576 let mut sdo = SdoClient::new("ClearPath_0");
577
578 match sdo.result(&mut client, 99999, Duration::from_secs(3)) {
579 SdoResult::Err(msg) => assert!(msg.contains("unknown")),
580 other => panic!("expected Err for unknown tid, got {:?}", other),
581 }
582 }
583}