zencan_common/
socketcan.rs

1use std::sync::Arc;
2
3use crate::{
4    messages::{CanError, CanId, CanMessage},
5    traits::{AsyncCanReceiver, AsyncCanSender},
6};
7use snafu::{ResultExt, Snafu};
8use socketcan::{CanFrame, CanSocket, EmbeddedFrame, Frame, ShouldRetry, Socket};
9use tokio::io::{unix::AsyncFd, Interest};
10
11fn socketcan_id_to_zencan_id(id: socketcan::CanId) -> CanId {
12    match id {
13        socketcan::CanId::Standard(id) => CanId::std(id.as_raw()),
14        socketcan::CanId::Extended(id) => CanId::extended(id.as_raw()),
15    }
16}
17
18fn zencan_id_to_socketcan_id(id: CanId) -> socketcan::CanId {
19    match id {
20        CanId::Extended(id) => socketcan::ExtendedId::new(id).unwrap().into(),
21        CanId::Std(id) => socketcan::StandardId::new(id).unwrap().into(),
22    }
23}
24
25fn socketcan_frame_to_zencan_message(frame: socketcan::CanFrame) -> Result<CanMessage, CanError> {
26    let id = socketcan_id_to_zencan_id(frame.can_id());
27
28    match frame {
29        CanFrame::Data(frame) => Ok(CanMessage::new(id, frame.data())),
30        CanFrame::Remote(_) => Ok(CanMessage::new_rtr(id)),
31        CanFrame::Error(frame) => Err(CanError::from_raw(frame.error_bits() as u8)),
32    }
33}
34
35fn zencan_message_to_socket_frame(frame: CanMessage) -> socketcan::CanFrame {
36    let id = zencan_id_to_socketcan_id(frame.id());
37
38    if frame.is_rtr() {
39        socketcan::CanFrame::new_remote(id, 0).unwrap()
40    } else {
41        socketcan::CanFrame::new(id, frame.data()).unwrap()
42    }
43}
44
45#[derive(Debug, Clone)]
46pub struct SocketCanReceiver {
47    socket: Arc<AsyncCanSocket>,
48}
49
50#[derive(Debug, Snafu)]
51pub enum ReceiveError {
52    Io { source: socketcan::IoError },
53    Can { source: CanError },
54}
55
56/// Create an Async socket around a socketcan CanSocket. This is just a reimplemenation of the tokio
57/// socket in the `socketcan` crate, but with support for `try_read_frame` and `try_write_frame`
58/// added.
59#[derive(Debug)]
60struct AsyncCanSocket(AsyncFd<CanSocket>);
61
62#[allow(dead_code)]
63impl AsyncCanSocket {
64    pub fn new(inner: CanSocket) -> Result<Self, std::io::Error> {
65        inner.set_nonblocking(true)?;
66        Ok(Self(AsyncFd::new(inner)?))
67    }
68
69    pub fn open(ifname: &str) -> Result<Self, std::io::Error> {
70        let socket = CanSocket::open(ifname)?;
71        socket.set_nonblocking(true)?;
72        Ok(Self(AsyncFd::new(socket)?))
73    }
74
75    /// Attempt to read a CAN frame from the socket without blocking
76    ///
77    /// If no message is immediately available, a WouldBlock error is returned.
78    pub fn try_read_frame(&self) -> Result<CanFrame, std::io::Error> {
79        self.0.get_ref().read_frame()
80    }
81
82    /// Read a CAN frame from the socket asynchronously
83    pub async fn read_frame(&self) -> Result<CanFrame, std::io::Error> {
84        self.0
85            .async_io(Interest::READABLE, |inner| inner.read_frame())
86            .await
87    }
88
89    pub async fn write_frame(&self, frame: &CanFrame) -> Result<(), std::io::Error> {
90        self.0
91            .async_io(Interest::WRITABLE, |inner| inner.write_frame(frame))
92            .await
93    }
94
95    /// Attempt to write a CAN frame to the socket without blocking
96    pub fn try_write_frame(&self, frame: CanFrame) -> Result<(), std::io::Error> {
97        self.0.get_ref().write_frame(&frame)
98    }
99}
100
101impl AsyncCanReceiver for SocketCanReceiver {
102    type Error = ReceiveError;
103
104    fn try_recv(&mut self) -> Option<CanMessage> {
105        match self.socket.try_read_frame() {
106            Ok(frame) => Some(socketcan_frame_to_zencan_message(frame).unwrap()),
107            _ => None,
108        }
109    }
110
111    async fn recv(&mut self) -> Result<CanMessage, ReceiveError> {
112        loop {
113            match self.socket.read_frame().await {
114                Ok(frame) => return socketcan_frame_to_zencan_message(frame).context(CanSnafu),
115                Err(e) => {
116                    if !e.should_retry() {
117                        return Err(ReceiveError::Io { source: e });
118                    }
119                }
120            }
121        }
122    }
123}
124
125#[derive(Debug, Clone)]
126pub struct SocketCanSender {
127    socket: Arc<AsyncCanSocket>,
128}
129
130impl AsyncCanSender for SocketCanSender {
131    async fn send(&mut self, msg: CanMessage) -> Result<(), CanMessage> {
132        let socketcan_frame = zencan_message_to_socket_frame(msg);
133
134        let result = self.socket.write_frame(&socketcan_frame).await;
135        if result.is_err() {
136            Err(msg)
137        } else {
138            Ok(())
139        }
140    }
141}
142
143/// Open a socketcan device and split it into a sender and receiver object for use with zencan
144/// library
145///
146/// # Arguments
147/// * `device` - The name of the socketcan device to open, e.g. "vcan0", or "can0"
148///
149/// A key benefit of this is that by creating both sender and receiver objects from a shared socket,
150/// the receiver will not receive messages sent by the sender.
151#[cfg_attr(docsrs, doc(cfg(feature = "socketcan")))]
152pub fn open_socketcan<S: AsRef<str>>(
153    device: S,
154) -> Result<(SocketCanSender, SocketCanReceiver), socketcan::IoError> {
155    let device: &str = device.as_ref();
156    let socket = Arc::new(AsyncCanSocket::open(device)?);
157    let receiver = SocketCanReceiver {
158        socket: socket.clone(),
159    };
160    let sender = SocketCanSender { socket };
161    Ok((sender, receiver))
162}