autocore_std/ethercat/sdo_client.rs
1//! Non-blocking SDO read/write client for EtherCAT devices.
2//!
3//! [`SdoClient`] wraps [`CommandClient`](crate::CommandClient) to provide an
4//! ergonomic, handle-based interface for runtime SDO (Service Data Object)
5//! operations over CoE (CANopen over EtherCAT). Create one per device, issue
6//! reads/writes from your control loop, and check results by handle on
7//! subsequent ticks.
8//!
9//! # When to use
10//!
11//! Use `SdoClient` for **runtime** SDO access — reading diagnostic registers,
12//! changing operating parameters on the fly, or any CoE transfer that happens
13//! after the cyclic loop is running. For SDOs that must be applied **before**
14//! the cyclic loop starts (e.g. setting `modes_of_operation`), use the
15//! `startup_sdo` array in `project.json` instead.
16//!
17//! # Topic format
18//!
19//! Requests are sent as IPC commands through the existing WebSocket channel.
20//! Topics are scoped to the device name configured in `project.json`:
21//!
22//! | Operation | Topic | Payload |
23//! |-----------|------------------------------------|--------------------------------------------------|
24//! | Write | `ethercat.write_sdo` | `{"device": "...", "index": "0x6060", "sub": 0, "value": "0x01"}` |
25//! | Read | `ethercat.read_sdo` | `{"device": "...", "index": "0x6060", "sub": 0}` |
26//!
27//! # Usage with a state machine
28//!
29//! A typical pattern pairs `SdoClient` with [`StateMachine`](crate::fb::StateMachine)
30//! to fire an SDO write in one state, then advance on success:
31//!
32//! ```ignore
33//! use autocore_std::{ControlProgram, TickContext};
34//! use autocore_std::ethercat::{SdoClient, SdoResult};
35//! use autocore_std::fb::StateMachine;
36//! use serde_json::json;
37//! use std::time::Duration;
38//!
39//! pub struct MyProgram {
40//! sm: StateMachine,
41//! sdo: SdoClient,
42//! write_tid: Option<u32>,
43//! }
44//!
45//! impl MyProgram {
46//! pub fn new() -> Self {
47//! Self {
48//! sm: StateMachine::new(),
49//! sdo: SdoClient::new("ClearPath_0"),
50//! write_tid: None,
51//! }
52//! }
53//! }
54//!
55//! impl ControlProgram for MyProgram {
56//! type Memory = GlobalMemory;
57//!
58//! fn process_tick(&mut self, ctx: &mut TickContext<Self::Memory>) {
59//! self.sm.call();
60//! match self.sm.index {
61//! // State 10: Send SDO write for modes_of_operation = PP
62//! 10 => {
63//! self.write_tid = Some(
64//! self.sdo.write(ctx.client, 0x6060, 0, json!(1))
65//! );
66//! self.sm.timeout_preset = Duration::from_secs(3);
67//! self.sm.index = 20;
68//! }
69//! // State 20: Wait for response
70//! 20 => {
71//! let tid = self.write_tid.unwrap();
72//! match self.sdo.result(ctx.client, tid, Duration::from_secs(3)) {
73//! SdoResult::Pending => { /* keep waiting */ }
74//! SdoResult::Ok(_) => {
75//! log::info!("modes_of_operation set to PP");
76//! self.sm.index = 30;
77//! }
78//! SdoResult::Err(e) => {
79//! log::error!("SDO write failed: {}", e);
80//! self.sm.set_error(1);
81//! }
82//! SdoResult::Timeout => {
83//! log::error!("SDO write timed out");
84//! self.sm.set_error(2);
85//! }
86//! }
87//! }
88//! // State 30: Done — continue with normal operation
89//! 30 => { /* ... */ }
90//! _ => {}
91//! }
92//! }
93//! }
94//! ```
95//!
96//! # Reading an SDO
97//!
98//! ```ignore
99//! // Fire a read request
100//! let tid = sdo.read(ctx.client, 0x6064, 0); // Position Actual Value
101//!
102//! // On a later tick, check the result
103//! match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
104//! SdoResult::Ok(data) => {
105//! let position: i32 = serde_json::from_value(data).unwrap();
106//! log::info!("Current position: {}", position);
107//! }
108//! SdoResult::Pending => { /* still waiting */ }
109//! SdoResult::Err(e) => { log::error!("SDO read failed: {}", e); }
110//! SdoResult::Timeout => { log::error!("SDO read timed out"); }
111//! }
112//! ```
113
114use std::collections::HashMap;
115use std::time::{Duration, Instant};
116
117use serde_json::{json, Value};
118
119use crate::command_client::CommandClient;
120
121/// Metadata for an in-flight SDO request.
122pub struct SdoRequest {
123 /// CoE object dictionary index (e.g. 0x6060).
124 pub index: u16,
125 /// CoE sub-index.
126 pub sub_index: u8,
127 /// Whether this is a read or write.
128 pub kind: SdoRequestKind,
129 /// When the request was sent (for timeout detection).
130 pub sent_at: Instant,
131}
132
133/// Discriminates SDO reads from writes.
134#[derive(Debug, Clone, Copy, PartialEq, Eq)]
135pub enum SdoRequestKind {
136 /// SDO read (CoE upload).
137 Read,
138 /// SDO write (CoE download).
139 Write,
140}
141
142/// Result of checking an in-flight SDO request.
143///
144/// Returned by [`SdoClient::result()`]. The caller should match on this each
145/// tick until the request resolves (i.e. is no longer [`Pending`](SdoResult::Pending)).
146///
147/// ```ignore
148/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
149/// SdoResult::Pending => { /* check again next tick */ }
150/// SdoResult::Ok(data) => { /* use data */ }
151/// SdoResult::Err(msg) => { log::error!("{}", msg); }
152/// SdoResult::Timeout => { log::error!("timed out"); }
153/// }
154/// ```
155#[derive(Debug, Clone)]
156pub enum SdoResult {
157 /// No response yet; check again next tick.
158 Pending,
159 /// Operation succeeded. Contains the response `data` field — the read
160 /// value for reads, or an empty/null value for writes.
161 Ok(Value),
162 /// The server (or EtherCAT master) reported an error. The string contains
163 /// the `error_message` from the response (e.g. `"SDO abort: 0x06090011"`).
164 Err(String),
165 /// No response arrived within the caller-specified deadline.
166 Timeout,
167}
168
169/// Non-blocking SDO client scoped to a single EtherCAT device.
170///
171/// Create one `SdoClient` per device in your control program struct. It holds
172/// a map of outstanding requests keyed by `transaction_id` (returned by
173/// [`CommandClient::send`]). Keep the returned handle and poll
174/// [`result()`](Self::result) each tick until it resolves.
175///
176/// # Example
177///
178/// ```ignore
179/// use autocore_std::ethercat::{SdoClient, SdoResult};
180/// use serde_json::json;
181/// use std::time::Duration;
182///
183/// let mut sdo = SdoClient::new("ClearPath_0");
184///
185/// // Issue an SDO write (from process_tick):
186/// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
187///
188/// // Check result on subsequent ticks:
189/// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
190/// SdoResult::Pending => {}
191/// SdoResult::Ok(_) => { /* success */ }
192/// SdoResult::Err(e) => { log::error!("SDO error: {}", e); }
193/// SdoResult::Timeout => { log::error!("SDO timed out"); }
194/// }
195/// ```
196pub struct SdoClient {
197 device: String,
198 requests: HashMap<u32, SdoRequest>,
199}
200
201impl SdoClient {
202 /// Create a new client for the given device name.
203 ///
204 /// The `device` string must match the `name` field in the slave's
205 /// `project.json` configuration (e.g. `"ClearPath_0"`).
206 ///
207 /// ```ignore
208 /// let sdo = SdoClient::new("ClearPath_0");
209 /// ```
210 pub fn new(device: &str) -> Self {
211 Self {
212 device: device.to_string(),
213 requests: HashMap::new(),
214 }
215 }
216
217 /// Issue an SDO write (CoE download).
218 ///
219 /// Sends a command to topic `ethercat.{device}.sdo_write` with payload:
220 /// ```json
221 /// {"index": "0x6060", "sub": 0, "value": 1}
222 /// ```
223 ///
224 /// Returns a transaction handle for use with [`result()`](Self::result).
225 ///
226 /// # Arguments
227 ///
228 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
229 /// * `index` — CoE object dictionary index (e.g. `0x6060`)
230 /// * `sub_index` — CoE sub-index (usually `0`)
231 /// * `value` — the value to write, as a [`serde_json::Value`]
232 ///
233 /// # Example
234 ///
235 /// ```ignore
236 /// // Set modes_of_operation to Profile Position (1)
237 /// let tid = sdo.write(ctx.client, 0x6060, 0, json!(1));
238 /// ```
239 pub fn write(
240 &mut self,
241 client: &mut CommandClient,
242 index: u16,
243 sub_index: u8,
244 value: Value,
245 ) -> u32 {
246 let topic = "ethercat.write_sdo".to_string();
247 let payload = json!({
248 "device": self.device,
249 "index": format!("0x{:04X}", index),
250 "sub": sub_index,
251 "value": value,
252 });
253 let tid = client.send(&topic, payload);
254
255 self.requests.insert(tid, SdoRequest {
256 index,
257 sub_index,
258 kind: SdoRequestKind::Write,
259 sent_at: Instant::now(),
260 });
261
262 tid
263 }
264
265 /// Issue an SDO read (CoE upload).
266 ///
267 /// Sends a command to topic `ethercat.{device}.sdo_read` with payload:
268 /// ```json
269 /// {"index": "0x6064", "sub": 0}
270 /// ```
271 ///
272 /// Returns a transaction handle for use with [`result()`](Self::result).
273 ///
274 /// # Arguments
275 ///
276 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
277 /// * `index` — CoE object dictionary index (e.g. `0x6064`)
278 /// * `sub_index` — CoE sub-index (usually `0`)
279 ///
280 /// # Example
281 ///
282 /// ```ignore
283 /// // Read the actual position value
284 /// let tid = sdo.read(ctx.client, 0x6064, 0);
285 /// ```
286 pub fn read(
287 &mut self,
288 client: &mut CommandClient,
289 index: u16,
290 sub_index: u8,
291 ) -> u32 {
292 let topic = "ethercat.read_sdo".to_string();
293 let payload = json!({
294 "device": self.device,
295 "index": format!("0x{:04X}", index),
296 "sub": sub_index,
297 });
298 let tid = client.send(&topic, payload);
299
300 self.requests.insert(tid, SdoRequest {
301 index,
302 sub_index,
303 kind: SdoRequestKind::Read,
304 sent_at: Instant::now(),
305 });
306
307 tid
308 }
309
310 /// Check the result of a previous SDO request.
311 ///
312 /// Call this each tick with the handle returned by [`write()`](Self::write)
313 /// or [`read()`](Self::read). The result is consumed (removed from the
314 /// internal map) once it resolves to [`Ok`](SdoResult::Ok),
315 /// [`Err`](SdoResult::Err), or [`Timeout`](SdoResult::Timeout).
316 ///
317 /// # Arguments
318 ///
319 /// * `client` — the [`CommandClient`] from [`TickContext`](crate::TickContext)
320 /// * `tid` — transaction handle returned by `write()` or `read()`
321 /// * `timeout` — maximum time to wait before returning [`SdoResult::Timeout`]
322 ///
323 /// # Example
324 ///
325 /// ```ignore
326 /// match sdo.result(ctx.client, tid, Duration::from_secs(3)) {
327 /// SdoResult::Pending => { /* keep waiting */ }
328 /// SdoResult::Ok(data) => {
329 /// log::info!("SDO response: {}", data);
330 /// sm.index = next_state;
331 /// }
332 /// SdoResult::Err(e) => {
333 /// log::error!("SDO failed: {}", e);
334 /// sm.set_error(1);
335 /// }
336 /// SdoResult::Timeout => {
337 /// log::error!("SDO timed out");
338 /// sm.set_error(2);
339 /// }
340 /// }
341 /// ```
342 pub fn result(
343 &mut self,
344 client: &mut CommandClient,
345 tid: u32,
346 timeout: Duration,
347 ) -> SdoResult {
348 let req = match self.requests.get(&tid) {
349 Some(r) => r,
350 None => return SdoResult::Err("unknown transaction id".into()),
351 };
352
353 // Check for response from CommandClient
354 if let Some(resp) = client.take_response(tid) {
355 self.requests.remove(&tid);
356 if resp.success {
357 return SdoResult::Ok(resp.data);
358 } else {
359 return SdoResult::Err(resp.error_message);
360 }
361 }
362
363 // Check timeout
364 if req.sent_at.elapsed() > timeout {
365 self.requests.remove(&tid);
366 return SdoResult::Timeout;
367 }
368
369 SdoResult::Pending
370 }
371
372 /// Remove all requests that have been pending longer than `timeout`.
373 ///
374 /// Call periodically (e.g. once per second) to prevent the internal map
375 /// from growing unboundedly if callers forget to check results.
376 ///
377 /// # Example
378 ///
379 /// ```ignore
380 /// // At the end of process_tick, clean up anything older than 10s
381 /// self.sdo.drain_stale(ctx.client, Duration::from_secs(10));
382 /// ```
383 pub fn drain_stale(&mut self, client: &mut CommandClient, timeout: Duration) {
384 let stale_tids: Vec<u32> = self
385 .requests
386 .iter()
387 .filter(|(_, req)| req.sent_at.elapsed() > timeout)
388 .map(|(&tid, _)| tid)
389 .collect();
390
391 for tid in stale_tids {
392 self.requests.remove(&tid);
393 // Also consume the response from CommandClient if one arrived late
394 let _ = client.take_response(tid);
395 }
396 }
397
398 /// Number of in-flight SDO requests.
399 pub fn pending_count(&self) -> usize {
400 self.requests.len()
401 }
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use mechutil::ipc::CommandMessage;
408 use tokio::sync::mpsc;
409
410 /// Helper: create a CommandClient backed by test channels, returning
411 /// (client, response_sender, write_receiver).
412 fn test_client() -> (
413 CommandClient,
414 mpsc::UnboundedSender<CommandMessage>,
415 mpsc::UnboundedReceiver<String>,
416 ) {
417 let (write_tx, write_rx) = mpsc::unbounded_channel();
418 let (response_tx, response_rx) = mpsc::unbounded_channel();
419 let client = CommandClient::new(write_tx, response_rx);
420 (client, response_tx, write_rx)
421 }
422
423 #[test]
424 fn write_sends_correct_topic_and_payload() {
425 let (mut client, _resp_tx, mut write_rx) = test_client();
426 let mut sdo = SdoClient::new("ClearPath_0");
427
428 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
429
430 let msg_json = write_rx.try_recv().expect("should have sent a message");
431 let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
432
433 assert_eq!(msg.transaction_id, tid);
434 assert_eq!(msg.topic, "ethercat.write_sdo");
435 assert_eq!(msg.data["device"], "ClearPath_0");
436 assert_eq!(msg.data["index"], "0x6060");
437 assert_eq!(msg.data["sub"], 0);
438 assert_eq!(msg.data["value"], 1);
439 assert_eq!(sdo.pending_count(), 1);
440 }
441
442 #[test]
443 fn read_sends_correct_topic_and_payload() {
444 let (mut client, _resp_tx, mut write_rx) = test_client();
445 let mut sdo = SdoClient::new("ClearPath_0");
446
447 let tid = sdo.read(&mut client, 0x6064, 0);
448
449 let msg_json = write_rx.try_recv().expect("should have sent a message");
450 let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
451
452 assert_eq!(msg.transaction_id, tid);
453 assert_eq!(msg.topic, "ethercat.read_sdo");
454 assert_eq!(msg.data["device"], "ClearPath_0");
455 assert_eq!(msg.data["index"], "0x6064");
456 assert_eq!(msg.data["sub"], 0);
457 assert!(msg.data.get("value").is_none());
458 }
459
460 #[test]
461 fn result_returns_ok_on_success() {
462 let (mut client, resp_tx, _write_rx) = test_client();
463 let mut sdo = SdoClient::new("ClearPath_0");
464
465 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
466
467 // Simulate successful response
468 resp_tx
469 .send(CommandMessage::response(tid, json!(null)))
470 .unwrap();
471 client.poll();
472
473 match sdo.result(&mut client, tid, Duration::from_secs(3)) {
474 SdoResult::Ok(data) => assert_eq!(data, json!(null)),
475 other => panic!("expected Ok, got {:?}", other),
476 }
477
478 // Consumed — no longer tracked
479 assert_eq!(sdo.pending_count(), 0);
480 }
481
482 #[test]
483 fn result_returns_err_on_failure() {
484 let (mut client, resp_tx, _write_rx) = test_client();
485 let mut sdo = SdoClient::new("ClearPath_0");
486
487 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
488
489 // Simulate error response
490 let mut err_resp = CommandMessage::response(tid, json!(null));
491 err_resp.success = false;
492 err_resp.error_message = "SDO abort: 0x06090011".into();
493 resp_tx.send(err_resp).unwrap();
494 client.poll();
495
496 match sdo.result(&mut client, tid, Duration::from_secs(3)) {
497 SdoResult::Err(msg) => assert_eq!(msg, "SDO abort: 0x06090011"),
498 other => panic!("expected Err, got {:?}", other),
499 }
500 }
501
502 #[test]
503 fn result_returns_pending_while_waiting() {
504 let (mut client, _resp_tx, _write_rx) = test_client();
505 let mut sdo = SdoClient::new("ClearPath_0");
506
507 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
508 client.poll();
509
510 match sdo.result(&mut client, tid, Duration::from_secs(30)) {
511 SdoResult::Pending => {}
512 other => panic!("expected Pending, got {:?}", other),
513 }
514
515 // Still tracked
516 assert_eq!(sdo.pending_count(), 1);
517 }
518
519 #[test]
520 fn result_returns_timeout_when_deadline_exceeded() {
521 let (mut client, _resp_tx, _write_rx) = test_client();
522 let mut sdo = SdoClient::new("ClearPath_0");
523
524 let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
525 client.poll();
526
527 // Zero timeout -> immediately expired
528 match sdo.result(&mut client, tid, Duration::ZERO) {
529 SdoResult::Timeout => {}
530 other => panic!("expected Timeout, got {:?}", other),
531 }
532
533 assert_eq!(sdo.pending_count(), 0);
534 }
535
536 #[test]
537 fn drain_stale_removes_old_requests() {
538 let (mut client, _resp_tx, _write_rx) = test_client();
539 let mut sdo = SdoClient::new("ClearPath_0");
540
541 sdo.write(&mut client, 0x6060, 0, json!(1));
542 sdo.read(&mut client, 0x6064, 0);
543 assert_eq!(sdo.pending_count(), 2);
544
545 // Zero timeout -> everything is stale
546 sdo.drain_stale(&mut client, Duration::ZERO);
547 assert_eq!(sdo.pending_count(), 0);
548 }
549
550 #[test]
551 fn multiple_concurrent_requests() {
552 let (mut client, resp_tx, _write_rx) = test_client();
553 let mut sdo = SdoClient::new("ClearPath_0");
554
555 let tid1 = sdo.write(&mut client, 0x6060, 0, json!(1));
556 let tid2 = sdo.read(&mut client, 0x6064, 0);
557 assert_eq!(sdo.pending_count(), 2);
558
559 // Only respond to the read
560 resp_tx
561 .send(CommandMessage::response(tid2, json!(12345)))
562 .unwrap();
563 client.poll();
564
565 // Read resolves, write still pending
566 match sdo.result(&mut client, tid2, Duration::from_secs(3)) {
567 SdoResult::Ok(v) => assert_eq!(v, json!(12345)),
568 other => panic!("expected Ok, got {:?}", other),
569 }
570 match sdo.result(&mut client, tid1, Duration::from_secs(30)) {
571 SdoResult::Pending => {}
572 other => panic!("expected Pending, got {:?}", other),
573 }
574 assert_eq!(sdo.pending_count(), 1);
575 }
576
577 #[test]
578 fn unknown_tid_returns_err() {
579 let (mut client, _resp_tx, _write_rx) = test_client();
580 let mut sdo = SdoClient::new("ClearPath_0");
581
582 match sdo.result(&mut client, 99999, Duration::from_secs(3)) {
583 SdoResult::Err(msg) => assert!(msg.contains("unknown")),
584 other => panic!("expected Err for unknown tid, got {:?}", other),
585 }
586 }
587}