use crate::error::{Error, Result};
use crate::transport::async_transport::BoxFuture;
use crate::transport::device::{AsyncTransportDevice, TransportDeviceInfo};
use crate::transport::fdcanusb::FdcanusbProtocol;
use crate::transport::transaction::{dispatch_frame, Request};
use moteus_protocol::CanFdFrame;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, ReadHalf, WriteHalf};
use tokio_serial::SerialStream;
pub struct AsyncFdcanusbDevice {
reader: BufReader<ReadHalf<SerialStream>>,
writer: WriteHalf<SerialStream>,
timeout: std::time::Duration,
disable_brs: bool,
line_buffer: String,
pending_frames: Vec<CanFdFrame>,
pub(crate) info: TransportDeviceInfo,
needs_recovery: bool,
}
impl std::fmt::Debug for AsyncFdcanusbDevice {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncFdcanusbDevice")
.field("info", &self.info)
.field("timeout", &self.timeout)
.field("disable_brs", &self.disable_brs)
.field("pending_frames", &self.pending_frames.len())
.field("needs_recovery", &self.needs_recovery)
.finish()
}
}
impl AsyncFdcanusbDevice {
pub async fn open(path: &str) -> Result<Self> {
Self::open_with_options(path, crate::transport::factory::DEFAULT_TIMEOUT, false).await
}
pub async fn open_with_brs(path: &str, disable_brs: bool) -> Result<Self> {
Self::open_with_options(
path,
crate::transport::factory::DEFAULT_TIMEOUT,
disable_brs,
)
.await
}
pub async fn open_with_options(
path: &str,
timeout: std::time::Duration,
disable_brs: bool,
) -> Result<Self> {
let builder = tokio_serial::new(path, 9600);
let port = SerialStream::open(&builder).map_err(|e| Error::Io(e.into()))?;
let (reader, writer) = tokio::io::split(port);
let reader = BufReader::new(reader);
Ok(Self {
reader,
writer,
timeout,
disable_brs,
line_buffer: String::new(),
pending_frames: Vec::new(),
info: TransportDeviceInfo::new(0, "AsyncFdcanusb"),
needs_recovery: false,
})
}
pub fn set_disable_brs(&mut self, disable: bool) {
self.disable_brs = disable;
}
async fn write_frame(&mut self, frame: &CanFdFrame) -> Result<()> {
let cmd = FdcanusbProtocol::encode_frame(frame, self.disable_brs);
self.writer.write_all(cmd.as_bytes()).await?;
Ok(())
}
async fn send_frame(&mut self, frame: &CanFdFrame) -> Result<()> {
self.write_frame(frame).await?;
self.writer.flush().await?;
self.wait_for_ok().await
}
async fn wait_for_ok(&mut self) -> Result<()> {
let timeout = self.timeout;
loop {
self.line_buffer.clear();
let read_result =
tokio::time::timeout(timeout, self.reader.read_line(&mut self.line_buffer)).await;
match read_result {
Ok(Ok(0)) => {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Serial port closed",
)));
}
Ok(Ok(_)) => {
let line = self.line_buffer.trim();
if FdcanusbProtocol::is_ok_response(line) {
return Ok(());
}
if FdcanusbProtocol::is_error_response(line) {
return Err(Error::Protocol(format!("fdcanusb error: {}", line)));
}
if let Some(frame) = FdcanusbProtocol::parse_frame(&self.line_buffer) {
self.pending_frames.push(frame);
}
}
Ok(Err(e)) => return Err(Error::Io(e)),
Err(_) => return Err(Error::Timeout),
}
}
}
async fn receive_frames(&mut self, expected_count: usize) -> Result<Vec<CanFdFrame>> {
let mut frames: Vec<CanFdFrame> = self.pending_frames.drain(..).collect();
if frames.len() >= expected_count {
return Ok(frames);
}
let timeout = self.timeout;
let deadline = tokio::time::Instant::now() + timeout;
while frames.len() < expected_count {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
self.line_buffer.clear();
let read_result =
tokio::time::timeout(remaining, self.reader.read_line(&mut self.line_buffer)).await;
match read_result {
Ok(Ok(0)) => break,
Ok(Ok(_)) => {
if let Some(frame) = FdcanusbProtocol::parse_frame(&self.line_buffer) {
frames.push(frame);
}
}
Ok(Err(_)) | Err(_) => break,
}
}
Ok(frames)
}
async fn execute_cycle(&mut self, requests: &mut [Request]) -> Result<()> {
debug_assert!(
requests.iter().all(|r| r.child_device.is_none()),
"AsyncFdcanusbDevice does not support child devices"
);
self.needs_recovery = true;
let mut frames_sent = 0usize;
for req in requests.iter() {
if let Some(frame) = &req.frame {
self.write_frame(frame).await?;
frames_sent += 1;
}
}
if frames_sent > 0 {
self.writer.flush().await?;
}
for _ in 0..frames_sent {
self.wait_for_ok().await?;
}
let expected: usize = Request::total_expected_replies(requests);
if expected > 0 {
let responses = self.receive_frames(expected).await?;
for frame in responses {
dispatch_frame(&frame, requests);
}
}
self.needs_recovery = false;
Ok(())
}
}
impl AsyncTransportDevice for AsyncFdcanusbDevice {
fn recover(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
if !self.needs_recovery {
return Ok(());
}
self.pending_frames.clear();
let _ = self.writer.write_all(b"\n").await;
let _ = self.writer.flush().await;
let drain_timeout = std::time::Duration::from_millis(20);
let deadline = tokio::time::Instant::now() + drain_timeout;
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
self.line_buffer.clear();
match tokio::time::timeout(remaining, self.reader.read_line(&mut self.line_buffer))
.await
{
Ok(Ok(0)) | Ok(Err(_)) | Err(_) => break,
Ok(Ok(_)) => continue,
}
}
self.needs_recovery = false;
Ok(())
})
}
fn transaction<'a>(&'a mut self, requests: &'a mut [Request]) -> BoxFuture<'a, Result<()>> {
Box::pin(self.execute_cycle(requests))
}
fn write<'a>(&'a mut self, frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
Box::pin(self.send_frame(frame))
}
fn read(&mut self) -> BoxFuture<'_, Result<Option<CanFdFrame>>> {
Box::pin(async move {
if let Some(frame) = self.pending_frames.pop() {
return Ok(Some(frame));
}
loop {
self.line_buffer.clear();
let n = self.reader.read_line(&mut self.line_buffer).await?;
if n == 0 {
return Err(Error::Io(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Serial port closed",
)));
}
if let Some(frame) = FdcanusbProtocol::parse_frame(&self.line_buffer) {
return Ok(Some(frame));
}
}
})
}
fn flush(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
self.pending_frames.clear();
let flush_timeout = std::time::Duration::from_millis(50);
let deadline = tokio::time::Instant::now() + flush_timeout;
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
self.line_buffer.clear();
let read_result =
tokio::time::timeout(remaining, self.reader.read_line(&mut self.line_buffer))
.await;
match read_result {
Ok(Ok(0)) | Ok(Err(_)) | Err(_) => break,
Ok(Ok(_)) => continue, }
}
Ok(())
})
}
fn info(&self) -> &TransportDeviceInfo {
&self.info
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_protocol_reuse() {
let mut frame = CanFdFrame::new();
frame.arbitration_id = 0x8001;
frame.data[0..3].copy_from_slice(&[0x01, 0x00, 0x0A]);
frame.size = 3;
frame.set_brs(true);
frame.set_fdcan(true);
let encoded = FdcanusbProtocol::encode_frame(&frame, false);
assert!(encoded.starts_with("can send 8001"));
}
}