1use std::io;
30use std::os::raw::c_uint;
31use std::os::unix::io::{AsRawFd, FromRawFd};
32use std::pin::Pin;
33use std::task::Poll;
34use std::{future::Future, os::unix::prelude::RawFd};
35
36use futures::prelude::*;
37use futures::ready;
38use futures::task::Context;
39
40use mio::{event, unix::SourceFd, Interest, Registry, Token};
41
42use thiserror::Error as ThisError;
43
44pub use socketcan::{CANFilter, CANFrame, CANSocketOpenError};
45use tokio::io::unix::AsyncFd;
46
47#[derive(Debug, ThisError)]
48pub enum Error {
49 #[error("Failed to open CAN Socket")]
50 CANSocketOpen(#[from] socketcan::CANSocketOpenError),
51 #[error("IO error")]
52 IO(#[from] io::Error),
53}
54
55pub struct CANWriteFuture {
60 socket: CANSocket,
61 frame: CANFrame,
62}
63
64impl Future for CANWriteFuture {
65 type Output = io::Result<()>;
66
67 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
68 let _ = ready!(self.socket.0.poll_write_ready(cx))?;
69 match self.socket.0.get_ref().0.write_frame_insist(&self.frame) {
70 Ok(_) => Poll::Ready(Ok(())),
71 Err(err) => Poll::Ready(Err(err)),
72 }
73 }
74}
75
76#[derive(Debug)]
79pub struct EventedCANSocket(socketcan::CANSocket);
80
81impl EventedCANSocket {
82 fn get_ref(&self) -> &socketcan::CANSocket {
83 &self.0
84 }
85}
86
87impl AsRawFd for EventedCANSocket {
88 fn as_raw_fd(&self) -> RawFd {
89 self.0.as_raw_fd()
90 }
91}
92
93impl event::Source for EventedCANSocket {
94 fn register(
95 &mut self,
96 registry: &Registry,
97 token: Token,
98 interests: Interest,
99 ) -> io::Result<()> {
100 SourceFd(&self.0.as_raw_fd()).register(registry, token, interests)
101 }
102
103 fn reregister(
104 &mut self,
105 registry: &Registry,
106 token: Token,
107 interests: Interest,
108 ) -> io::Result<()> {
109 SourceFd(&self.0.as_raw_fd()).reregister(registry, token, interests)
110 }
111
112 fn deregister(&mut self, registry: &Registry) -> io::Result<()> {
113 SourceFd(&self.0.as_raw_fd()).deregister(registry)
114 }
115}
116
117#[derive(Debug)]
119pub struct CANSocket(AsyncFd<EventedCANSocket>);
120
121impl CANSocket {
122 pub fn open(ifname: &str) -> Result<CANSocket, Error> {
124 let sock = socketcan::CANSocket::open(ifname)?;
125 sock.set_nonblocking(true)?;
126 Ok(CANSocket(AsyncFd::new(EventedCANSocket(sock))?))
127 }
128
129 pub fn open_if(if_index: c_uint) -> Result<CANSocket, Error> {
131 let sock = socketcan::CANSocket::open_if(if_index)?;
132 sock.set_nonblocking(true)?;
133 Ok(CANSocket(AsyncFd::new(EventedCANSocket(sock))?))
134 }
135
136 pub fn set_filter(&self, filters: &[CANFilter]) -> io::Result<()> {
138 self.0.get_ref().0.set_filter(filters)
139 }
140
141 pub fn filter_drop_all(&self) -> io::Result<()> {
143 self.0.get_ref().0.filter_drop_all()
144 }
145
146 pub fn filter_accept_all(&self) -> io::Result<()> {
148 self.0.get_ref().0.filter_accept_all()
149 }
150
151 pub fn set_error_filter(&self, mask: u32) -> io::Result<()> {
152 self.0.get_ref().0.set_error_filter(mask)
153 }
154
155 pub fn error_filter_drop_all(&self) -> io::Result<()> {
156 self.0.get_ref().0.error_filter_drop_all()
157 }
158
159 pub fn error_filter_accept_all(&self) -> io::Result<()> {
160 self.0.get_ref().0.error_filter_accept_all()
161 }
162
163 pub fn write_frame(&self, frame: CANFrame) -> Result<CANWriteFuture, Error> {
168 Ok(CANWriteFuture {
169 socket: self.try_clone()?,
170 frame,
171 })
172 }
173
174 fn try_clone(&self) -> Result<Self, Error> {
178 let fd = self.0.get_ref().0.as_raw_fd();
179 unsafe {
180 let new_fd = libc::dup(fd);
187 let new = socketcan::CANSocket::from_raw_fd(new_fd);
188 Ok(CANSocket(AsyncFd::new(EventedCANSocket(new))?))
189 }
190 }
191}
192
193impl Stream for CANSocket {
194 type Item = io::Result<CANFrame>;
195
196 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
197 loop {
198 let mut ready_guard = ready!(self.0.poll_read_ready(cx))?;
199 match ready_guard.try_io(|inner| inner.get_ref().get_ref().read_frame()) {
200 Ok(result) => return Poll::Ready(Some(result)),
201 Err(_would_block) => continue,
202 }
203 }
204 }
205}
206
207impl Sink<CANFrame> for CANSocket {
208 type Error = io::Error;
209
210 fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
211 let _ = ready!(self.0.poll_write_ready(cx))?;
212 Poll::Ready(Ok(()))
213 }
214
215 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
216 Poll::Ready(Ok(()))
217 }
218
219 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
220 let mut ready_guard = ready!(self.0.poll_write_ready(cx))?;
221 ready_guard.clear_ready();
222 Poll::Ready(Ok(()))
223 }
224
225 fn start_send(self: Pin<&mut Self>, item: CANFrame) -> Result<(), Self::Error> {
226 self.0.get_ref().0.write_frame_insist(&item)?;
227 Ok(())
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use futures::{select, try_join};
235 use futures_timer::Delay;
236
237 use std::io;
238 use std::time::Duration;
239
240 async fn recv_frame(mut socket: CANSocket) -> io::Result<CANSocket> {
242 select!(
245 frame = socket.next().fuse() => if let Some(_frame) = frame { Ok(socket) } else { panic!("unexpected") },
246 _timeout = Delay::new(Duration::from_millis(100)).fuse() => Err(io::Error::from(io::ErrorKind::TimedOut)),
247 )
248 }
249
250 async fn write_frame(socket: &CANSocket) -> Result<(), Error> {
252 let test_frame = socketcan::CANFrame::new(0x1, &[0], false, false).unwrap();
253 socket.write_frame(test_frame)?.await?;
254 Ok(())
255 }
256
257 #[tokio::test]
261 async fn test_receive() -> Result<(), Error> {
262 let socket1 = CANSocket::open("vcan0").unwrap();
263 let socket2 = CANSocket::open("vcan0").unwrap();
264
265 let send_frames = future::try_join(write_frame(&socket1), write_frame(&socket1));
266
267 let recv_frames = async {
268 let socket2 = recv_frame(socket2).await?;
269 let _socket2 = recv_frame(socket2).await;
270 Ok(())
271 };
272
273 try_join!(recv_frames, send_frames)?;
274
275 Ok(())
276 }
277
278 #[tokio::test]
279 async fn test_sink_stream() -> io::Result<()> {
280 let socket1 = CANSocket::open("vcan0").unwrap();
281 let socket2 = CANSocket::open("vcan0").unwrap();
282
283 let frame_id_1 = CANFrame::new(1, &[0u8], false, false).unwrap();
284 let frame_id_2 = CANFrame::new(2, &[0u8], false, false).unwrap();
285 let frame_id_3 = CANFrame::new(3, &[0u8], false, false).unwrap();
286
287 let (mut sink, _stream) = socket1.split();
288 let (_sink, stream) = socket2.split();
289
290 let count_ids_less_than_3 = stream
291 .map(|x| x.unwrap())
292 .take_while(|frame| future::ready(frame.id() < 3))
293 .fold(0u8, |acc, _frame| async move { acc + 1 });
294
295 let send_frames = async {
296 let _frame_1 = sink.send(frame_id_1).await?;
297 let _frame_2 = sink.send(frame_id_2).await?;
298 let _frame_3 = sink.send(frame_id_3).await?;
299 println!("Sent 3 frames");
300 Ok::<(), io::Error>(())
301 };
302
303 let (x, frame_send_r) = futures::future::join(count_ids_less_than_3, send_frames).await;
304 frame_send_r?;
305
306 assert_eq!(x, 2);
307
308 Ok(())
309 }
310}