use crate::command_types::Command;
use crate::error::{Error, Result};
use crate::transport::transaction::{FrameFilter, Request};
use moteus_protocol::{CanFdFrame, CLIENT_POLL_SERVER, CLIENT_TO_SERVER, SERVER_TO_CLIENT};
pub const DEFAULT_CHANNEL: u8 = 1;
pub const MAX_DIAGNOSTIC_WRITE: usize = 61;
pub const MAX_DIAGNOSTIC_READ: usize = 61;
#[derive(Debug, Clone)]
pub struct DiagnosticResponse {
pub id: u8,
pub data: Vec<u8>,
}
pub fn make_diagnostic_write_frame(
dest_id: u8,
source_id: u8,
channel: u8,
data: &[u8],
) -> CanFdFrame {
assert!(data.len() <= MAX_DIAGNOSTIC_WRITE);
let mut frame = CanFdFrame::new();
frame.arbitration_id =
moteus_protocol::calculate_arbitration_id(source_id as i8, dest_id as i8, 0, false);
frame.data[0] = CLIENT_TO_SERVER;
frame.data[1] = channel;
frame.data[2] = data.len() as u8;
frame.data[3..3 + data.len()].copy_from_slice(data);
frame.size = (3 + data.len()) as u8;
frame
}
pub fn make_diagnostic_read_frame(
dest_id: u8,
source_id: u8,
channel: u8,
max_length: u8,
) -> CanFdFrame {
let mut frame = CanFdFrame::new();
frame.arbitration_id =
moteus_protocol::calculate_arbitration_id(source_id as i8, dest_id as i8, 0, true);
frame.data[0] = CLIENT_POLL_SERVER;
frame.data[1] = channel;
frame.data[2] = max_length;
frame.size = 3;
frame
}
pub fn parse_diagnostic_response(frame: &CanFdFrame, channel: u8) -> Option<DiagnosticResponse> {
let data = &frame.data[..frame.size as usize];
if data.len() < 3 {
return None;
}
if data[0] != SERVER_TO_CLIENT {
return None;
}
if data[1] != channel {
return None;
}
let data_len = data[2] as usize;
let data_start = 3;
if data_len > data.len() - data_start {
return None;
}
let id = ((frame.arbitration_id >> 8) & 0x7F) as u8;
Some(DiagnosticResponse {
id,
data: data[data_start..data_start + data_len].to_vec(),
})
}
use crate::blocking_controller::BlockingController;
use crate::transport::Transport;
pub struct DiagnosticStream<
'a,
T: Transport = std::sync::Arc<std::sync::Mutex<crate::transport::Router>>,
> {
controller: &'a mut BlockingController<T>,
channel: u8,
read_buffer: Vec<u8>,
}
impl<'a, T: Transport> std::fmt::Debug for DiagnosticStream<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DiagnosticStream")
.field("device_id", &self.controller.controller.id)
.field("channel", &self.channel)
.field("read_buffer_len", &self.read_buffer.len())
.finish()
}
}
impl<'a, T: Transport> DiagnosticStream<'a, T> {
pub fn new(controller: &'a mut BlockingController<T>) -> Self {
Self::with_channel(controller, DEFAULT_CHANNEL)
}
pub fn with_channel(controller: &'a mut BlockingController<T>, channel: u8) -> Self {
Self {
controller,
channel,
read_buffer: Vec::new(),
}
}
fn id(&self) -> u8 {
self.controller.controller.id
}
fn source_id(&self) -> u8 {
self.controller.controller.source_id
}
pub fn write(&mut self, data: &[u8]) -> Result<()> {
for chunk in data.chunks(MAX_DIAGNOSTIC_WRITE) {
let frame =
make_diagnostic_write_frame(self.id(), self.source_id(), self.channel, chunk);
let mut requests = [Request::new(frame).with_expected_replies(0)];
self.controller.transport.cycle(&mut requests)?;
}
Ok(())
}
pub fn write_message(&mut self, data: &[u8]) -> Result<()> {
let mut msg = data.to_vec();
msg.push(b'\n');
self.write(&msg)
}
pub fn read(&mut self, max_bytes: usize) -> Result<Vec<u8>> {
let read_size = std::cmp::min(max_bytes, MAX_DIAGNOSTIC_READ) as u8;
let frame =
make_diagnostic_read_frame(self.id(), self.source_id(), self.channel, read_size);
let id = self.id();
let mut requests = [Request::new(frame)
.with_filter(FrameFilter::custom(move |f| {
let frame_source = ((f.arbitration_id >> 8) & 0x7F) as u8;
if frame_source != id {
return false;
}
Command::diagnostic_reply_filter().matches(f)
}))
.with_expected_replies(1)];
self.controller.transport.cycle(&mut requests)?;
let mut result = Vec::new();
for response in requests[0].responses.take() {
if let Some(diag) = parse_diagnostic_response(&response, self.channel) {
result.extend(diag.data);
}
}
Ok(result)
}
pub fn flush_read(&mut self) -> Result<()> {
self.read_buffer.clear();
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_millis(200);
while start.elapsed() < timeout {
let data = self.read(MAX_DIAGNOSTIC_READ)?;
if data.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(10));
}
}
self.read_buffer.clear();
Ok(())
}
pub fn readline(&mut self) -> Result<Vec<u8>> {
loop {
if let Some(pos) = self
.read_buffer
.iter()
.position(|&b| b == b'\n' || b == b'\r')
{
let line: Vec<u8> = self.read_buffer.drain(..=pos).collect();
let line: Vec<u8> = line
.into_iter()
.filter(|&b| b != b'\n' && b != b'\r')
.collect();
if !line.is_empty() {
return Ok(line);
}
continue;
}
let data = self.read(MAX_DIAGNOSTIC_READ)?;
if data.is_empty() {
std::thread::sleep(std::time::Duration::from_millis(10));
}
self.read_buffer.extend(data);
}
}
fn read_until_ok(&mut self) -> Result<Vec<u8>> {
let mut result = Vec::new();
loop {
let line = self.readline()?;
if line.starts_with(b"OK") {
return Ok(result);
}
if line.starts_with(b"ERR") {
return Err(Error::Protocol(String::from_utf8_lossy(&line).to_string()));
}
result.extend(&line);
result.push(b'\n');
}
}
pub fn command(&mut self, data: &[u8]) -> Result<Vec<u8>> {
self.write_message(data)?;
self.read_until_ok()
}
pub fn command_oneline(&mut self, data: &[u8]) -> Result<Vec<u8>> {
self.write_message(data)?;
self.readline()
}
}
#[cfg(feature = "tokio")]
use crate::async_controller::AsyncController;
#[cfg(feature = "tokio")]
use crate::transport::async_transport::AsyncTransport;
#[cfg(feature = "tokio")]
pub struct AsyncDiagnosticStream<
'a,
T: AsyncTransport = std::sync::Arc<
tokio::sync::Mutex<crate::transport::async_transport::AsyncRouter>,
>,
> {
controller: &'a mut AsyncController<T>,
channel: u8,
read_buffer: Vec<u8>,
}
#[cfg(feature = "tokio")]
impl<'a, T: AsyncTransport> std::fmt::Debug for AsyncDiagnosticStream<'a, T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncDiagnosticStream")
.field("device_id", &self.controller.controller.id)
.field("channel", &self.channel)
.field("read_buffer_len", &self.read_buffer.len())
.finish()
}
}
#[cfg(feature = "tokio")]
impl<'a, T: AsyncTransport> AsyncDiagnosticStream<'a, T> {
pub fn new(controller: &'a mut AsyncController<T>) -> Self {
Self::with_channel(controller, DEFAULT_CHANNEL)
}
pub fn with_channel(controller: &'a mut AsyncController<T>, channel: u8) -> Self {
Self {
controller,
channel,
read_buffer: Vec::new(),
}
}
fn id(&self) -> u8 {
self.controller.controller.id
}
fn source_id(&self) -> u8 {
self.controller.controller.source_id
}
pub async fn write(&mut self, data: &[u8]) -> Result<()> {
for chunk in data.chunks(MAX_DIAGNOSTIC_WRITE) {
let frame =
make_diagnostic_write_frame(self.id(), self.source_id(), self.channel, chunk);
let mut requests = [Request::new(frame).with_expected_replies(0)];
self.controller.transport.cycle(&mut requests).await?;
}
Ok(())
}
pub async fn write_message(&mut self, data: &[u8]) -> Result<()> {
let mut msg = data.to_vec();
msg.push(b'\n');
self.write(&msg).await
}
pub async fn read(&mut self, max_bytes: usize) -> Result<Vec<u8>> {
let read_size = std::cmp::min(max_bytes, MAX_DIAGNOSTIC_READ) as u8;
let frame =
make_diagnostic_read_frame(self.id(), self.source_id(), self.channel, read_size);
let id = self.id();
let mut requests = [Request::new(frame)
.with_filter(FrameFilter::custom(move |f| {
let frame_source = ((f.arbitration_id >> 8) & 0x7F) as u8;
if frame_source != id {
return false;
}
Command::diagnostic_reply_filter().matches(f)
}))
.with_expected_replies(1)];
self.controller.transport.cycle(&mut requests).await?;
let mut result = Vec::new();
for response in requests[0].responses.take() {
if let Some(diag) = parse_diagnostic_response(&response, self.channel) {
result.extend(diag.data);
}
}
Ok(result)
}
pub async fn flush_read(&mut self) -> Result<()> {
self.read_buffer.clear();
let start = std::time::Instant::now();
let timeout = std::time::Duration::from_millis(200);
while start.elapsed() < timeout {
let data = self.read(MAX_DIAGNOSTIC_READ).await?;
if data.is_empty() {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
}
self.read_buffer.clear();
Ok(())
}
pub async fn readline(&mut self) -> Result<Vec<u8>> {
loop {
if let Some(pos) = self
.read_buffer
.iter()
.position(|&b| b == b'\n' || b == b'\r')
{
let line: Vec<u8> = self.read_buffer.drain(..=pos).collect();
let line: Vec<u8> = line
.into_iter()
.filter(|&b| b != b'\n' && b != b'\r')
.collect();
if !line.is_empty() {
return Ok(line);
}
continue;
}
let data = self.read(MAX_DIAGNOSTIC_READ).await?;
if data.is_empty() {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
self.read_buffer.extend(data);
}
}
async fn read_until_ok(&mut self) -> Result<Vec<u8>> {
let mut result = Vec::new();
loop {
let line = self.readline().await?;
if line.starts_with(b"OK") {
return Ok(result);
}
if line.starts_with(b"ERR") {
return Err(Error::Protocol(String::from_utf8_lossy(&line).to_string()));
}
result.extend(&line);
result.push(b'\n');
}
}
pub async fn command(&mut self, data: &[u8]) -> Result<Vec<u8>> {
self.write_message(data).await?;
self.read_until_ok().await
}
pub async fn command_oneline(&mut self, data: &[u8]) -> Result<Vec<u8>> {
self.write_message(data).await?;
self.readline().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_make_diagnostic_write_frame() {
let frame = make_diagnostic_write_frame(1, 0, 1, b"hello");
assert_eq!(
frame.arbitration_id,
moteus_protocol::calculate_arbitration_id(0, 1, 0, false)
);
assert_eq!(frame.data[0], CLIENT_TO_SERVER);
assert_eq!(frame.data[1], 1); assert_eq!(frame.data[2], 5); assert_eq!(&frame.data[3..8], b"hello");
assert_eq!(frame.size, 8);
}
#[test]
fn test_make_diagnostic_read_frame() {
let frame = make_diagnostic_read_frame(1, 0, 1, 48);
assert_eq!(
frame.arbitration_id,
moteus_protocol::calculate_arbitration_id(0, 1, 0, true)
);
assert_eq!(frame.data[0], CLIENT_POLL_SERVER);
assert_eq!(frame.data[1], 1); assert_eq!(frame.data[2], 48); assert_eq!(frame.size, 3);
}
#[test]
fn test_parse_diagnostic_response() {
let mut frame = CanFdFrame::new();
frame.arbitration_id = 0x8100; frame.data[0] = SERVER_TO_CLIENT;
frame.data[1] = 1; frame.data[2] = 5; frame.data[3..8].copy_from_slice(b"hello");
frame.size = 8;
let result = parse_diagnostic_response(&frame, 1).unwrap();
assert_eq!(result.id, 1);
assert_eq!(result.data, b"hello");
}
#[test]
fn test_parse_diagnostic_response_wrong_channel() {
let mut frame = CanFdFrame::new();
frame.data[0] = SERVER_TO_CLIENT;
frame.data[1] = 2; frame.data[2] = 5;
frame.size = 8;
let result = parse_diagnostic_response(&frame, 1);
assert!(result.is_none());
}
}