use std::collections::HashMap;
use std::time::{Duration, Instant};
use serde_json::{json, Value};
use crate::command_client::CommandClient;
pub struct SdoRequest {
pub index: u16,
pub sub_index: u8,
pub kind: SdoRequestKind,
pub sent_at: Instant,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SdoRequestKind {
Read,
Write,
}
#[derive(Debug, Clone)]
pub enum SdoResult {
Pending,
Ok(Value),
Err(String),
Timeout,
}
pub struct SdoClient {
device: String,
requests: HashMap<u32, SdoRequest>,
}
impl SdoClient {
pub fn new(device: &str) -> Self {
Self {
device: device.to_string(),
requests: HashMap::new(),
}
}
pub fn write(
&mut self,
client: &mut CommandClient,
index: u16,
sub_index: u8,
value: Value,
) -> u32 {
let topic = "ethercat.write_sdo".to_string();
let payload = json!({
"device": self.device,
"index": format!("0x{:04X}", index),
"sub": sub_index,
"value": value,
});
let tid = client.send(&topic, payload);
self.requests.insert(tid, SdoRequest {
index,
sub_index,
kind: SdoRequestKind::Write,
sent_at: Instant::now(),
});
tid
}
pub fn read(
&mut self,
client: &mut CommandClient,
index: u16,
sub_index: u8,
) -> u32 {
let topic = "ethercat.read_sdo".to_string();
let payload = json!({
"device": self.device,
"index": format!("0x{:04X}", index),
"sub": sub_index,
});
let tid = client.send(&topic, payload);
self.requests.insert(tid, SdoRequest {
index,
sub_index,
kind: SdoRequestKind::Read,
sent_at: Instant::now(),
});
tid
}
pub fn result(
&mut self,
client: &mut CommandClient,
tid: u32,
timeout: Duration,
) -> SdoResult {
let req = match self.requests.get(&tid) {
Some(r) => r,
None => return SdoResult::Err("unknown transaction id".into()),
};
if let Some(resp) = client.take_response(tid) {
self.requests.remove(&tid);
if resp.success {
return SdoResult::Ok(resp.data);
} else {
return SdoResult::Err(resp.error_message);
}
}
if req.sent_at.elapsed() > timeout {
self.requests.remove(&tid);
return SdoResult::Timeout;
}
SdoResult::Pending
}
pub fn drain_stale(&mut self, client: &mut CommandClient, timeout: Duration) {
let stale_tids: Vec<u32> = self
.requests
.iter()
.filter(|(_, req)| req.sent_at.elapsed() > timeout)
.map(|(&tid, _)| tid)
.collect();
for tid in stale_tids {
self.requests.remove(&tid);
let _ = client.take_response(tid);
}
}
pub fn pending_count(&self) -> usize {
self.requests.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use mechutil::ipc::CommandMessage;
use tokio::sync::mpsc;
fn test_client() -> (
CommandClient,
mpsc::UnboundedSender<CommandMessage>,
mpsc::UnboundedReceiver<String>,
) {
let (write_tx, write_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = mpsc::unbounded_channel();
let client = CommandClient::new(write_tx, response_rx);
(client, response_tx, write_rx)
}
#[test]
fn write_sends_correct_topic_and_payload() {
let (mut client, _resp_tx, mut write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
let msg_json = write_rx.try_recv().expect("should have sent a message");
let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
assert_eq!(msg.transaction_id, tid);
assert_eq!(msg.topic, "ethercat.write_sdo");
assert_eq!(msg.data["device"], "ClearPath_0");
assert_eq!(msg.data["index"], "0x6060");
assert_eq!(msg.data["sub"], 0);
assert_eq!(msg.data["value"], 1);
assert_eq!(sdo.pending_count(), 1);
}
#[test]
fn read_sends_correct_topic_and_payload() {
let (mut client, _resp_tx, mut write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.read(&mut client, 0x6064, 0);
let msg_json = write_rx.try_recv().expect("should have sent a message");
let msg: CommandMessage = serde_json::from_str(&msg_json).unwrap();
assert_eq!(msg.transaction_id, tid);
assert_eq!(msg.topic, "ethercat.read_sdo");
assert_eq!(msg.data["device"], "ClearPath_0");
assert_eq!(msg.data["index"], "0x6064");
assert_eq!(msg.data["sub"], 0);
assert!(msg.data.get("value").is_none());
}
#[test]
fn result_returns_ok_on_success() {
let (mut client, resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
resp_tx
.send(CommandMessage::response(tid, json!(null)))
.unwrap();
client.poll();
match sdo.result(&mut client, tid, Duration::from_secs(3)) {
SdoResult::Ok(data) => assert_eq!(data, json!(null)),
other => panic!("expected Ok, got {:?}", other),
}
assert_eq!(sdo.pending_count(), 0);
}
#[test]
fn result_returns_err_on_failure() {
let (mut client, resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
let mut err_resp = CommandMessage::response(tid, json!(null));
err_resp.success = false;
err_resp.error_message = "SDO abort: 0x06090011".into();
resp_tx.send(err_resp).unwrap();
client.poll();
match sdo.result(&mut client, tid, Duration::from_secs(3)) {
SdoResult::Err(msg) => assert_eq!(msg, "SDO abort: 0x06090011"),
other => panic!("expected Err, got {:?}", other),
}
}
#[test]
fn result_returns_pending_while_waiting() {
let (mut client, _resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
client.poll();
match sdo.result(&mut client, tid, Duration::from_secs(30)) {
SdoResult::Pending => {}
other => panic!("expected Pending, got {:?}", other),
}
assert_eq!(sdo.pending_count(), 1);
}
#[test]
fn result_returns_timeout_when_deadline_exceeded() {
let (mut client, _resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid = sdo.write(&mut client, 0x6060, 0, json!(1));
client.poll();
match sdo.result(&mut client, tid, Duration::ZERO) {
SdoResult::Timeout => {}
other => panic!("expected Timeout, got {:?}", other),
}
assert_eq!(sdo.pending_count(), 0);
}
#[test]
fn drain_stale_removes_old_requests() {
let (mut client, _resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
sdo.write(&mut client, 0x6060, 0, json!(1));
sdo.read(&mut client, 0x6064, 0);
assert_eq!(sdo.pending_count(), 2);
sdo.drain_stale(&mut client, Duration::ZERO);
assert_eq!(sdo.pending_count(), 0);
}
#[test]
fn multiple_concurrent_requests() {
let (mut client, resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
let tid1 = sdo.write(&mut client, 0x6060, 0, json!(1));
let tid2 = sdo.read(&mut client, 0x6064, 0);
assert_eq!(sdo.pending_count(), 2);
resp_tx
.send(CommandMessage::response(tid2, json!(12345)))
.unwrap();
client.poll();
match sdo.result(&mut client, tid2, Duration::from_secs(3)) {
SdoResult::Ok(v) => assert_eq!(v, json!(12345)),
other => panic!("expected Ok, got {:?}", other),
}
match sdo.result(&mut client, tid1, Duration::from_secs(30)) {
SdoResult::Pending => {}
other => panic!("expected Pending, got {:?}", other),
}
assert_eq!(sdo.pending_count(), 1);
}
#[test]
fn unknown_tid_returns_err() {
let (mut client, _resp_tx, _write_rx) = test_client();
let mut sdo = SdoClient::new("ClearPath_0");
match sdo.result(&mut client, 99999, Duration::from_secs(3)) {
SdoResult::Err(msg) => assert!(msg.contains("unknown")),
other => panic!("expected Err for unknown tid, got {:?}", other),
}
}
}