tokio_socketcan/
lib.rs

1//! # tokio-socketcan
2//!
3//! Connective plumbing between the socketcan crate
4//! and the tokio asynchronous I/O system
5//!
6//! # Usage
7//!
8//! The [socketcan](https://docs.rs/socketcan/1.7.0/socketcan/)
9//! crate's documentation is valuable as the api used by
10//! tokio-socketcan is largely identical to the socketcan one.
11//!
12//! An example echo server:
13//!
14//! ```no_run
15//! use futures_util::stream::StreamExt;
16//! use tokio_socketcan::{CANSocket, Error};
17//!
18//! #[tokio::main]
19//! async fn main() -> Result<(), Error> {
20//!     let mut socket_rx = CANSocket::open("vcan0")?;
21//!     let socket_tx = CANSocket::open("vcan0")?;
22//!
23//!     while let Some(Ok(frame)) = socket_rx.next().await {
24//!         socket_tx.write_frame(frame)?.await;
25//!     }
26//!     Ok(())
27//! }
28//! ```
29use std::io;
30use std::os::raw::c_uint;
31use std::os::unix::io::{AsRawFd, FromRawFd};
32use std::pin::Pin;
33use std::task::Poll;
34use std::{future::Future, os::unix::prelude::RawFd};
35
36use futures::prelude::*;
37use futures::ready;
38use futures::task::Context;
39
40use mio::{event, unix::SourceFd, Interest, Registry, Token};
41
42use thiserror::Error as ThisError;
43
44pub use socketcan::{CANFilter, CANFrame, CANSocketOpenError};
45use tokio::io::unix::AsyncFd;
46
47#[derive(Debug, ThisError)]
48pub enum Error {
49    #[error("Failed to open CAN Socket")]
50    CANSocketOpen(#[from] socketcan::CANSocketOpenError),
51    #[error("IO error")]
52    IO(#[from] io::Error),
53}
54
55/// A Future representing the eventual
56/// writing of a CANFrame to the socket
57///
58/// Created by the CANSocket.write_frame() method
59pub struct CANWriteFuture {
60    socket: CANSocket,
61    frame: CANFrame,
62}
63
64impl Future for CANWriteFuture {
65    type Output = io::Result<()>;
66
67    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68        let _ = ready!(self.socket.0.poll_write_ready(cx))?;
69        match self.socket.0.get_ref().0.write_frame_insist(&self.frame) {
70            Ok(_) => Poll::Ready(Ok(())),
71            Err(err) => Poll::Ready(Err(err)),
72        }
73    }
74}
75
76/// A socketcan::CANSocket wrapped for mio eventing
77/// to allow it be integrated in turn into tokio
78#[derive(Debug)]
79pub struct EventedCANSocket(socketcan::CANSocket);
80
81impl EventedCANSocket {
82    fn get_ref(&self) -> &socketcan::CANSocket {
83        &self.0
84    }
85}
86
87impl AsRawFd for EventedCANSocket {
88    fn as_raw_fd(&self) -> RawFd {
89        self.0.as_raw_fd()
90    }
91}
92
93impl event::Source for EventedCANSocket {
94    fn register(
95        &mut self,
96        registry: &Registry,
97        token: Token,
98        interests: Interest,
99    ) -> io::Result<()> {
100        SourceFd(&self.0.as_raw_fd()).register(registry, token, interests)
101    }
102
103    fn reregister(
104        &mut self,
105        registry: &Registry,
106        token: Token,
107        interests: Interest,
108    ) -> io::Result<()> {
109        SourceFd(&self.0.as_raw_fd()).reregister(registry, token, interests)
110    }
111
112    fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
113        SourceFd(&self.0.as_raw_fd()).deregister(registry)
114    }
115}
116
117/// An asynchronous I/O wrapped socketcan::CANSocket
118#[derive(Debug)]
119pub struct CANSocket(AsyncFd<EventedCANSocket>);
120
121impl CANSocket {
122    /// Open a named CAN device such as "vcan0"
123    pub fn open(ifname: &str) -> Result<CANSocket, Error> {
124        let sock = socketcan::CANSocket::open(ifname)?;
125        sock.set_nonblocking(true)?;
126        Ok(CANSocket(AsyncFd::new(EventedCANSocket(sock))?))
127    }
128
129    /// Open CAN device by kernel interface number
130    pub fn open_if(if_index: c_uint) -> Result<CANSocket, Error> {
131        let sock = socketcan::CANSocket::open_if(if_index)?;
132        sock.set_nonblocking(true)?;
133        Ok(CANSocket(AsyncFd::new(EventedCANSocket(sock))?))
134    }
135
136    /// Sets the filter mask on the socket
137    pub fn set_filter(&self, filters: &[CANFilter]) -> io::Result<()> {
138        self.0.get_ref().0.set_filter(filters)
139    }
140
141    /// Disable reception of CAN frames by setting an empty filter
142    pub fn filter_drop_all(&self) -> io::Result<()> {
143        self.0.get_ref().0.filter_drop_all()
144    }
145
146    /// Accept all frames, disabling any kind of filtering.
147    pub fn filter_accept_all(&self) -> io::Result<()> {
148        self.0.get_ref().0.filter_accept_all()
149    }
150
151    pub fn set_error_filter(&self, mask: u32) -> io::Result<()> {
152        self.0.get_ref().0.set_error_filter(mask)
153    }
154
155    pub fn error_filter_drop_all(&self) -> io::Result<()> {
156        self.0.get_ref().0.error_filter_drop_all()
157    }
158
159    pub fn error_filter_accept_all(&self) -> io::Result<()> {
160        self.0.get_ref().0.error_filter_accept_all()
161    }
162
163    /// Write a CANFrame to the socket asynchronously
164    ///
165    /// This uses the semantics of socketcan's `write_frame_insist`,
166    /// IE: it will automatically retry when it fails on an EINTR
167    pub fn write_frame(&self, frame: CANFrame) -> Result<CANWriteFuture, Error> {
168        Ok(CANWriteFuture {
169            socket: self.try_clone()?,
170            frame,
171        })
172    }
173
174    /// Clone the CANSocket by using the `dup` syscall to get another
175    /// file descriptor. This method makes clones fairly cheap and
176    /// avoids complexity around ownership
177    fn try_clone(&self) -> Result<Self, Error> {
178        let fd = self.0.get_ref().0.as_raw_fd();
179        unsafe {
180            // essentially we're cheating and making it cheaper/easier
181            // to manage multiple references to the socket by relying
182            // on the posix behaviour of `dup()` which essentially lets
183            // the kernel worry about keeping track of references;
184            // as long as one of the duplicated file descriptors is open
185            // the socket as a whole isn't going to be closed.
186            let new_fd = libc::dup(fd);
187            let new = socketcan::CANSocket::from_raw_fd(new_fd);
188            Ok(CANSocket(AsyncFd::new(EventedCANSocket(new))?))
189        }
190    }
191}
192
193impl Stream for CANSocket {
194    type Item = io::Result<CANFrame>;
195
196    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
197        loop {
198            let mut ready_guard = ready!(self.0.poll_read_ready(cx))?;
199            match ready_guard.try_io(|inner| inner.get_ref().get_ref().read_frame()) {
200                Ok(result) => return Poll::Ready(Some(result)),
201                Err(_would_block) => continue,
202            }
203        }
204    }
205}
206
207impl Sink<CANFrame> for CANSocket {
208    type Error = io::Error;
209
210    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211        let _ = ready!(self.0.poll_write_ready(cx))?;
212        Poll::Ready(Ok(()))
213    }
214
215    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216        Poll::Ready(Ok(()))
217    }
218
219    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220        let mut ready_guard = ready!(self.0.poll_write_ready(cx))?;
221        ready_guard.clear_ready();
222        Poll::Ready(Ok(()))
223    }
224
225    fn start_send(self: Pin<&mut Self>, item: CANFrame) -> Result<(), Self::Error> {
226        self.0.get_ref().0.write_frame_insist(&item)?;
227        Ok(())
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use super::*;
234    use futures::{select, try_join};
235    use futures_timer::Delay;
236
237    use std::io;
238    use std::time::Duration;
239
240    /// Receive a frame from the CANSocket
241    async fn recv_frame(mut socket: CANSocket) -> io::Result<CANSocket> {
242        // let mut frame_stream = socket;
243
244        select!(
245            frame = socket.next().fuse() => if let Some(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
246            _timeout = Delay::new(Duration::from_millis(100)).fuse() => Err(io::Error::from(io::ErrorKind::TimedOut)),
247        )
248    }
249
250    /// Write a test frame to the CANSocket
251    async fn write_frame(socket: &CANSocket) -> Result<(), Error> {
252        let test_frame = socketcan::CANFrame::new(0x1, &[0], false, false).unwrap();
253        socket.write_frame(test_frame)?.await?;
254        Ok(())
255    }
256
257    /// Attempt delivery of two messages, using a oneshot channel
258    /// to prompt the second message in order to demonstrate that
259    /// waiting for CAN reads is not blocking.
260    #[tokio::test]
261    async fn test_receive() -> Result<(), Error> {
262        let socket1 = CANSocket::open("vcan0").unwrap();
263        let socket2 = CANSocket::open("vcan0").unwrap();
264
265        let send_frames = future::try_join(write_frame(&socket1), write_frame(&socket1));
266
267        let recv_frames = async {
268            let socket2 = recv_frame(socket2).await?;
269            let _socket2 = recv_frame(socket2).await;
270            Ok(())
271        };
272
273        try_join!(recv_frames, send_frames)?;
274
275        Ok(())
276    }
277
278    #[tokio::test]
279    async fn test_sink_stream() -> io::Result<()> {
280        let socket1 = CANSocket::open("vcan0").unwrap();
281        let socket2 = CANSocket::open("vcan0").unwrap();
282
283        let frame_id_1 = CANFrame::new(1, &[0u8], false, false).unwrap();
284        let frame_id_2 = CANFrame::new(2, &[0u8], false, false).unwrap();
285        let frame_id_3 = CANFrame::new(3, &[0u8], false, false).unwrap();
286
287        let (mut sink, _stream) = socket1.split();
288        let (_sink, stream) = socket2.split();
289
290        let count_ids_less_than_3 = stream
291            .map(|x| x.unwrap())
292            .take_while(|frame| future::ready(frame.id() < 3))
293            .fold(0u8, |acc, _frame| async move { acc + 1 });
294
295        let send_frames = async {
296            let _frame_1 = sink.send(frame_id_1).await?;
297            let _frame_2 = sink.send(frame_id_2).await?;
298            let _frame_3 = sink.send(frame_id_3).await?;
299            println!("Sent 3 frames");
300            Ok::<(), io::Error>(())
301        };
302
303        let (x, frame_send_r) = futures::future::join(count_ids_less_than_3, send_frames).await;
304        frame_send_r?;
305
306        assert_eq!(x, 2);
307
308        Ok(())
309    }
310}