use crate::{
frame::AsPtr, CanAddr, CanAnyFrame, CanFrame, Error, IoResult, Result, Socket, SocketOptions,
};
use futures::{prelude::*, ready, task::Context};
use std::{
io::{Read, Write},
os::unix::{
io::{AsRawFd, OwnedFd},
prelude::RawFd,
},
pin::Pin,
task::Poll,
};
use tokio::io::unix::AsyncFd;
use tokio::io::Interest;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
#[derive(Debug)]
pub struct AsyncCanSocket<T: Socket>(AsyncFd<T>);
impl<T: Socket + From<OwnedFd>> AsyncCanSocket<T> {
pub fn open(ifname: &str) -> IoResult<Self> {
let sock = T::open(ifname)?;
sock.set_nonblocking(true)?;
Ok(Self(AsyncFd::new(sock)?))
}
pub fn open_if(ifindex: u32) -> IoResult<Self> {
let sock = T::open_iface(ifindex)?;
sock.set_nonblocking(true)?;
Ok(Self(AsyncFd::new(sock)?))
}
pub fn open_addr(addr: &CanAddr) -> IoResult<Self> {
let sock = T::open_addr(addr)?;
sock.set_nonblocking(true)?;
Ok(Self(AsyncFd::new(sock)?))
}
}
impl<T: Socket> SocketOptions for AsyncCanSocket<T> {}
impl<T: Socket> AsRawFd for AsyncCanSocket<T> {
fn as_raw_fd(&self) -> RawFd {
self.0.as_raw_fd()
}
}
pub type CanSocket = AsyncCanSocket<crate::CanSocket>;
impl CanSocket {
pub async fn write_frame(&self, frame: CanFrame) -> IoResult<()> {
self.0
.async_io(Interest::WRITABLE, |inner| inner.write_frame(&frame))
.await
}
pub async fn read_frame(&self) -> IoResult<CanFrame> {
self.0
.async_io(Interest::READABLE, |inner| inner.read_frame())
.await
}
}
impl Stream for CanSocket {
type Item = Result<CanFrame>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let mut ready_guard = ready!(self.0.poll_read_ready(cx))?;
match ready_guard.try_io(|inner| inner.get_ref().read_frame()) {
Ok(result) => return Poll::Ready(Some(result.map_err(|e| e.into()))),
Err(_would_block) => continue,
}
}
}
}
impl Sink<CanFrame> for CanSocket {
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = ready!(self.0.poll_write_ready(cx))?;
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut ready_guard = ready!(self.0.poll_write_ready(cx))?;
ready_guard.clear_ready();
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: CanFrame) -> Result<()> {
self.0.get_ref().write_frame_insist(&item)?;
Ok(())
}
}
impl AsyncRead for CanSocket {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
loop {
let mut guard = ready!(self.0.poll_read_ready_mut(cx))?;
let unfilled = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_mut().read(unfilled)) {
Ok(Ok(len)) => {
buf.advance(len);
return Poll::Ready(Ok(()));
}
Ok(Err(err)) => return Poll::Ready(Err(err)),
Err(_would_block) => continue,
}
}
}
}
impl AsyncWrite for CanSocket {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
loop {
let mut guard = ready!(self.0.poll_write_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().write(buf)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Poll::Ready(Ok(()))
}
}
pub type CanFdSocket = AsyncCanSocket<crate::CanFdSocket>;
impl CanFdSocket {
pub async fn write_frame<F>(&self, frame: &F) -> IoResult<()>
where
F: Into<CanAnyFrame> + AsPtr,
{
self.0
.async_io(Interest::WRITABLE, |inner| inner.write_frame(frame))
.await
}
pub async fn read_frame(&self) -> IoResult<CanAnyFrame> {
self.0
.async_io(Interest::READABLE, |inner| inner.read_frame())
.await
}
}
impl Stream for CanFdSocket {
type Item = Result<CanAnyFrame>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
loop {
let mut ready_guard = ready!(self.0.poll_read_ready(cx))?;
match ready_guard.try_io(|inner| inner.get_ref().read_frame()) {
Ok(result) => return Poll::Ready(Some(result.map_err(|e| e.into()))),
Err(_would_block) => continue,
}
}
}
}
impl Sink<CanAnyFrame> for CanFdSocket {
type Error = Error;
fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let _ = ready!(self.0.poll_write_ready(cx))?;
Poll::Ready(Ok(()))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut ready_guard = ready!(self.0.poll_write_ready(cx))?;
ready_guard.clear_ready();
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: CanAnyFrame) -> Result<()> {
self.0.get_ref().write_frame_insist(&item)?;
Ok(())
}
}
impl AsyncRead for CanFdSocket {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<IoResult<()>> {
loop {
let mut guard = ready!(self.0.poll_read_ready_mut(cx))?;
let unfilled = buf.initialize_unfilled();
match guard.try_io(|inner| inner.get_mut().read(unfilled)) {
Ok(Ok(len)) => {
buf.advance(len);
return Poll::Ready(Ok(()));
}
Ok(Err(err)) => return Poll::Ready(Err(err)),
Err(_would_block) => continue,
}
}
}
}
impl AsyncWrite for CanFdSocket {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<IoResult<usize>> {
loop {
let mut guard = ready!(self.0.poll_write_ready_mut(cx))?;
match guard.try_io(|inner| inner.get_mut().write(buf)) {
Ok(result) => return Poll::Ready(result),
Err(_would_block) => continue,
}
}
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<IoResult<()>> {
Poll::Ready(Ok(()))
}
}
#[cfg(feature = "vcan_tests")]
#[cfg(test)]
mod tests {
use super::*;
use crate::{
frame::{can_frame_default, AsPtr},
CanFdFrame, CanFrame, Frame, IoErrorKind, StandardId,
};
use embedded_can::Frame as EmbeddedFrame;
use futures::{select, try_join};
use futures_timer::Delay;
use serial_test::serial;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
const TIMEOUT: Duration = Duration::from_millis(100);
async fn recv_frame(socket: CanSocket) -> Result<CanSocket> {
select!(
frame = socket.read_frame().fuse() => if let Ok(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn recv_frame_with_stream(mut socket: CanSocket) -> Result<CanSocket> {
select!(
frame = socket.next().fuse() => if let Some(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn recv_frame_with_async_read(mut socket: CanSocket) -> Result<CanSocket> {
let mut frame = can_frame_default();
select!(
frame = socket.read_exact(crate::as_bytes_mut(&mut frame)).fuse() => if let Ok(_bytes_read) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn write_frame(socket: &CanSocket) -> Result<()> {
let test_frame = CanFrame::new(StandardId::new(0x1).unwrap(), &[0]).unwrap();
socket.write_frame(test_frame).await?;
Ok(())
}
async fn write_frame_with_async_write(socket: &mut CanSocket) -> Result<()> {
let test_frame = CanFrame::new(StandardId::new(0x1).unwrap(), &[0]).unwrap();
socket.write(test_frame.as_bytes()).await?;
Ok(())
}
async fn recv_frame_fd(socket: CanFdSocket) -> Result<CanFdSocket> {
select!(
frame = socket.read_frame().fuse() => if let Ok(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn recv_frame_fd_with_stream(mut socket: CanFdSocket) -> Result<CanFdSocket> {
select!(
frame = socket.next().fuse() => if let Some(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn recv_frame_fd_with_async_read(mut socket: CanFdSocket) -> Result<CanFdSocket> {
let mut frame = can_frame_default();
select!(
frame = socket.read_exact(crate::as_bytes_mut(&mut frame)).fuse() => if let Ok(_bytes_read) = frame { Ok(socket) } else { panic!("unexpected") },
_timeout = Delay::new(TIMEOUT).fuse() => Err(IoErrorKind::TimedOut.into()),
)
}
async fn write_frame_fd_canfd(socket: &CanFdSocket) -> Result<()> {
let test_frame =
CanFdFrame::new(StandardId::new(0x1).unwrap(), &[0, 0, 0, 0, 0, 0, 0, 0, 0]).unwrap();
socket.write_frame(&test_frame).await?;
Ok(())
}
async fn write_frame_fd_can(socket: &CanFdSocket) -> Result<()> {
let test_frame = CanFrame::new(StandardId::new(0x1).unwrap(), &[0]).unwrap();
socket.write_frame(&test_frame).await?;
Ok(())
}
async fn write_frame_fd_with_async_write(socket: &mut CanFdSocket) -> Result<()> {
let test_frame = CanFdFrame::new(StandardId::new(0x1).unwrap(), &[0]).unwrap();
socket.write(test_frame.as_bytes()).await?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive() -> Result<()> {
let socket1 = CanSocket::open("vcan0").unwrap();
let socket2 = CanSocket::open("vcan0").unwrap();
let send_frames = future::try_join(write_frame(&socket1), write_frame(&socket1));
let recv_frames = async {
let socket2 = recv_frame(socket2).await?;
let _socket2 = recv_frame(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive_with_stream() -> Result<()> {
let socket1 = CanSocket::open("vcan0").unwrap();
let socket2 = CanSocket::open("vcan0").unwrap();
let send_frames = future::try_join(write_frame(&socket1), write_frame(&socket1));
let recv_frames = async {
let socket2 = recv_frame_with_stream(socket2).await?;
let _socket2 = recv_frame_with_stream(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_asyncread_and_asyncwrite() -> Result<()> {
let mut socket1 = CanSocket::open("vcan0").unwrap();
let socket2 = CanSocket::open("vcan0").unwrap();
let send_frames = write_frame_with_async_write(&mut socket1);
let recv_frames = async {
let _socket2 = recv_frame_with_async_read(socket2).await?;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive_can_fd_canfd() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let send_frames = future::try_join(
write_frame_fd_canfd(&socket1),
write_frame_fd_canfd(&socket1),
);
let recv_frames = async {
let socket2 = recv_frame_fd(socket2).await?;
let _socket2 = recv_frame_fd(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive_can_fd_can() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let send_frames =
future::try_join(write_frame_fd_can(&socket1), write_frame_fd_can(&socket1));
let recv_frames = async {
let socket2 = recv_frame_fd(socket2).await?;
let _socket2 = recv_frame_fd(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive_can_fd_canfd_with_stream() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let send_frames = future::try_join(
write_frame_fd_canfd(&socket1),
write_frame_fd_canfd(&socket1),
);
let recv_frames = async {
let socket2 = recv_frame_fd_with_stream(socket2).await?;
let _socket2 = recv_frame_fd_with_stream(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_receive_can_fd_can_with_stream() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let send_frames =
future::try_join(write_frame_fd_can(&socket1), write_frame_fd_can(&socket1));
let recv_frames = async {
let socket2 = recv_frame_fd_with_stream(socket2).await?;
let _socket2 = recv_frame_fd_with_stream(socket2).await;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_asyncread_and_asyncwrite_fd() -> Result<()> {
let mut socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let send_frames = write_frame_fd_with_async_write(&mut socket1);
let recv_frames = async {
let _socket2 = recv_frame_fd_with_async_read(socket2).await?;
Ok(())
};
try_join!(recv_frames, send_frames)?;
Ok(())
}
#[serial]
#[tokio::test]
async fn test_sink_stream() -> Result<()> {
let socket1 = CanSocket::open("vcan0").unwrap();
let socket2 = CanSocket::open("vcan0").unwrap();
let frame_id_1 = CanFrame::from_raw_id(0x01, &[0u8]).unwrap();
let frame_id_2 = CanFrame::from_raw_id(0x02, &[0u8]).unwrap();
let frame_id_3 = CanFrame::from_raw_id(0x03, &[0u8]).unwrap();
let (mut sink, _stream) = socket1.split();
let (_sink, stream) = socket2.split();
let count_ids_less_than_3 = stream
.map(|x| x.unwrap())
.take_while(|frame| future::ready(frame.raw_id() < 3))
.fold(0u8, |acc, _frame| async move { acc + 1 });
let send_frames = async {
let _frame_1 = sink.send(frame_id_1).await?;
let _frame_2 = sink.send(frame_id_2).await?;
let _frame_3 = sink.send(frame_id_3).await?;
println!("Sent 3 frames");
Ok::<(), Error>(())
};
let (x, frame_send_r) = future::join(count_ids_less_than_3, send_frames).await;
frame_send_r?;
assert_eq!(x, 2);
Ok(())
}
#[serial]
#[tokio::test]
async fn test_sink_stream_fd_canfd() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let frame_id_1 = CanFdFrame::from_raw_id(0x01, &[0u8]).unwrap();
let frame_id_2 = CanFdFrame::from_raw_id(0x02, &[0u8]).unwrap();
let frame_id_3 = CanFdFrame::from_raw_id(0x03, &[0u8]).unwrap();
let (mut sink, _stream) = socket1.split();
let (_sink, stream) = socket2.split();
let count_ids_less_than_3 = stream
.map(|x| x.unwrap())
.take_while(|frame| {
if let CanAnyFrame::Fd(frame) = frame {
future::ready(frame.raw_id() < 3)
} else {
future::ready(false)
}
})
.fold(0u8, |acc, _frame| async move { acc + 1 });
let send_frames = async {
let _frame_1 = sink.send(frame_id_1.into()).await?;
let _frame_2 = sink.send(frame_id_2.into()).await?;
let _frame_3 = sink.send(frame_id_3.into()).await?;
println!("Sent 3 frames");
Ok::<(), Error>(())
};
let (x, frame_send_r) = future::join(count_ids_less_than_3, send_frames).await;
frame_send_r?;
assert_eq!(x, 2);
Ok(())
}
#[serial]
#[tokio::test]
async fn test_sink_stream_fd_can() -> Result<()> {
let socket1 = CanFdSocket::open("vcan0").unwrap();
let socket2 = CanFdSocket::open("vcan0").unwrap();
let frame_id_1 = CanFrame::from_raw_id(0x01, &[0u8]).unwrap();
let frame_id_2 = CanFrame::from_raw_id(0x02, &[0u8]).unwrap();
let frame_id_3 = CanFrame::from_raw_id(0x03, &[0u8]).unwrap();
let (mut sink, _stream) = socket1.split();
let (_sink, stream) = socket2.split();
let count_ids_less_than_3 = stream
.map(|x| x.unwrap())
.take_while(|frame| {
if let CanAnyFrame::Normal(frame) = frame {
future::ready(frame.raw_id() < 3)
} else {
future::ready(false)
}
})
.fold(0u8, |acc, _frame| async move { acc + 1 });
let send_frames = async {
let _frame_1 = sink.send(frame_id_1.into()).await?;
let _frame_2 = sink.send(frame_id_2.into()).await?;
let _frame_3 = sink.send(frame_id_3.into()).await?;
println!("Sent 3 frames");
Ok::<(), Error>(())
};
let (x, frame_send_r) = future::join(count_ids_less_than_3, send_frames).await;
frame_send_r?;
assert_eq!(x, 2);
Ok(())
}
}