#[cfg(target_os = "linux")]
mod linux {
use crate::error::{Error, Result};
use crate::transport::async_transport::BoxFuture;
use crate::transport::device::{AsyncTransportDevice, TransportDeviceInfo};
use crate::transport::socketcan_common::linux::*;
use crate::transport::transaction::{dispatch_frame, Request};
use moteus_protocol::CanFdFrame;
use std::io;
use std::os::unix::io::{AsRawFd, RawFd};
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
pub(crate) struct SocketCanRaw {
fd: RawFd,
disable_brs: bool,
}
impl SocketCanRaw {
fn new(interface: &str, disable_brs: bool) -> Result<Self> {
let fd = unsafe { socket(PF_CAN, SOCK_RAW, CAN_RAW) };
if fd < 0 {
return Err(Error::Io(io::Error::last_os_error()));
}
let ifindex = match get_ifindex(interface) {
Ok(idx) => idx,
Err(e) => {
unsafe { close(fd) };
return Err(e);
}
};
let addr = SockAddrCan {
can_family: AF_CAN as u16,
can_ifindex: ifindex,
rx_id: 0,
tx_id: 0,
};
let ret = unsafe {
bind(
fd,
&addr as *const SockAddrCan as *const std::ffi::c_void,
std::mem::size_of::<SockAddrCan>() as u32,
)
};
if ret < 0 {
unsafe { close(fd) };
return Err(Error::Io(io::Error::last_os_error()));
}
let enable: i32 = 1;
let ret = unsafe {
setsockopt(
fd,
SOL_CAN_RAW,
CAN_RAW_FD_FRAMES,
&enable as *const i32 as *const std::ffi::c_void,
std::mem::size_of::<i32>() as u32,
)
};
if ret < 0 {
unsafe { close(fd) };
return Err(Error::Io(io::Error::last_os_error()));
}
let flags = unsafe { fcntl(fd, F_GETFL) };
unsafe { fcntl(fd, F_SETFL, flags | O_NONBLOCK) };
Ok(SocketCanRaw { fd, disable_brs })
}
fn try_send(&self, frame: &CanFdFrame) -> io::Result<()> {
let raw = frame_to_raw(frame, self.disable_brs);
let ret = unsafe {
write(
self.fd,
&raw as *const CanFdFrameRaw as *const std::ffi::c_void,
CANFD_MTU,
)
};
if ret < 0 {
Err(io::Error::last_os_error())
} else {
Ok(())
}
}
fn try_recv(&self) -> io::Result<CanFdFrame> {
let mut raw = CanFdFrameRaw::default();
let ret = unsafe {
read(
self.fd,
&mut raw as *mut CanFdFrameRaw as *mut std::ffi::c_void,
CANFD_MTU,
)
};
if ret < 0 {
return Err(io::Error::last_os_error());
}
Ok(frame_from_raw(&raw))
}
}
impl Drop for SocketCanRaw {
fn drop(&mut self) {
unsafe { close(self.fd) };
}
}
impl AsRawFd for SocketCanRaw {
fn as_raw_fd(&self) -> RawFd {
self.fd
}
}
pub struct AsyncSocketCanDevice {
async_fd: AsyncFd<SocketCanRaw>,
timeout: std::time::Duration,
pub(crate) info: TransportDeviceInfo,
needs_recovery: bool,
}
impl std::fmt::Debug for AsyncSocketCanDevice {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AsyncSocketCanDevice")
.field("info", &self.info)
.field("timeout", &self.timeout)
.field("needs_recovery", &self.needs_recovery)
.finish()
}
}
impl AsyncSocketCanDevice {
pub async fn new(interface: &str) -> Result<Self> {
Self::with_options(interface, crate::transport::factory::DEFAULT_TIMEOUT, false).await
}
pub async fn with_options(
interface: &str,
timeout: std::time::Duration,
disable_brs: bool,
) -> Result<Self> {
let raw = SocketCanRaw::new(interface, disable_brs)?;
let async_fd = AsyncFd::new(raw).map_err(Error::Io)?;
Ok(AsyncSocketCanDevice {
async_fd,
timeout,
info: TransportDeviceInfo::new(0, "AsyncSocketCan")
.with_serial(interface.to_string())
.with_detail(format!("'{}'", interface)),
needs_recovery: false,
})
}
async fn send_frame(&self, frame: &CanFdFrame) -> Result<()> {
loop {
let mut guard = self
.async_fd
.ready(Interest::WRITABLE)
.await
.map_err(Error::Io)?;
match guard.try_io(|inner| inner.get_ref().try_send(frame)) {
Ok(result) => return result.map_err(Error::Io),
Err(_would_block) => continue,
}
}
}
async fn receive_frames(&self, expected_count: usize) -> Result<Vec<CanFdFrame>> {
let mut frames = Vec::new();
let timeout = self.timeout;
let deadline = tokio::time::Instant::now() + timeout;
while frames.len() < expected_count {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
let recv_result = tokio::time::timeout(remaining, async {
loop {
let mut guard = self.async_fd.ready(Interest::READABLE).await?;
match guard.try_io(|inner| inner.get_ref().try_recv()) {
Ok(Ok(frame)) => return Ok::<_, io::Error>(frame),
Ok(Err(e)) => return Err(e),
Err(_would_block) => continue,
}
}
})
.await;
match recv_result {
Ok(Ok(frame)) => frames.push(frame),
Ok(Err(_)) | Err(_) => break,
}
}
Ok(frames)
}
async fn execute_cycle(&mut self, requests: &mut [Request]) -> Result<()> {
debug_assert!(
requests.iter().all(|r| r.child_device.is_none()),
"AsyncSocketCanDevice does not support child devices"
);
self.needs_recovery = true;
for req in requests.iter() {
if let Some(frame) = &req.frame {
self.send_frame(frame).await?;
}
}
let expected: usize = Request::total_expected_replies(requests);
if expected > 0 {
let responses = self.receive_frames(expected).await?;
for frame in responses {
dispatch_frame(&frame, requests);
}
}
self.needs_recovery = false;
Ok(())
}
}
impl AsyncTransportDevice for AsyncSocketCanDevice {
fn recover(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
if !self.needs_recovery {
return Ok(());
}
while self.async_fd.get_ref().try_recv().is_ok() {}
self.needs_recovery = false;
Ok(())
})
}
fn transaction<'a>(&'a mut self, requests: &'a mut [Request]) -> BoxFuture<'a, Result<()>> {
Box::pin(self.execute_cycle(requests))
}
fn write<'a>(&'a mut self, frame: &'a CanFdFrame) -> BoxFuture<'a, Result<()>> {
Box::pin(self.send_frame(frame))
}
fn read(&mut self) -> BoxFuture<'_, Result<Option<CanFdFrame>>> {
Box::pin(async move {
loop {
let mut guard = self
.async_fd
.ready(Interest::READABLE)
.await
.map_err(Error::Io)?;
match guard.try_io(|inner| inner.get_ref().try_recv()) {
Ok(Ok(frame)) => return Ok(Some(frame)),
Ok(Err(e)) => return Err(Error::Io(e)),
Err(_would_block) => continue,
}
}
})
}
fn flush(&mut self) -> BoxFuture<'_, Result<()>> {
Box::pin(async move {
let flush_timeout = std::time::Duration::from_millis(50);
let deadline = tokio::time::Instant::now() + flush_timeout;
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
if remaining.is_zero() {
break;
}
let recv_result = tokio::time::timeout(remaining, async {
let mut guard = self.async_fd.ready(Interest::READABLE).await?;
match guard.try_io(|inner| inner.get_ref().try_recv()) {
Ok(Ok(_)) => Ok::<_, io::Error>(true), Ok(Err(e)) => Err(e),
Err(_would_block) => Ok(false),
}
})
.await;
match recv_result {
Ok(Ok(true)) => continue, _ => break,
}
}
Ok(())
})
}
fn info(&self) -> &TransportDeviceInfo {
&self.info
}
}
}
#[cfg(target_os = "linux")]
pub use linux::AsyncSocketCanDevice;
#[cfg(test)]
mod tests {
#[cfg(target_os = "linux")]
#[tokio::test]
async fn test_async_socketcan_interface_not_found() {
let result = super::AsyncSocketCanDevice::new("nonexistent_can_interface_12345").await;
assert!(result.is_err());
}
}