can_socket/tokio/
socket.rs

1use tokio::io::unix::AsyncFd;
2
3use crate::sys;
4use crate::CanFilter;
5use crate::CanFrame;
6use crate::CanInterface;
7use crate::Deadline;
8
9/// An asynchronous CAN socket for `tokio`.
10pub struct CanSocket {
11	io: AsyncFd<sys::Socket>,
12}
13
14impl std::fmt::Debug for CanSocket {
15	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16		let mut debug = f.debug_struct("CanSocket");
17		#[cfg(unix)]
18		{
19			use std::os::unix::io::AsRawFd;
20			debug.field("fd", &self.as_raw_fd());
21			debug.finish()
22		}
23
24		#[cfg(not(unix))]
25		debug.finish_non_exhaustive()
26	}
27}
28
29impl CanSocket {
30	/// Create a new socket bound to a named CAN interface.
31	///
32	/// This function is not async as it will either succeed or fail immediately.
33	pub fn bind(interface: impl AsRef<str>) -> std::io::Result<Self> {
34		let inner = sys::Socket::new(true)?;
35		let interface = inner.get_interface_by_name(interface.as_ref())?;
36		inner.bind(&interface)?;
37		let io = AsyncFd::new(inner)?;
38		Ok(Self { io })
39	}
40
41	/// Create a new socket bound to a interface by index.
42	///
43	/// This function is not async as it will either succeed or fail immediately.
44	pub fn bind_interface_index(index: u32) -> std::io::Result<Self> {
45		let inner = sys::Socket::new(true)?;
46		inner.bind(&crate::sys::CanInterface::from_index(index))?;
47		let io = AsyncFd::new(inner)?;
48		Ok(Self { io })
49	}
50
51	/// Create a new socket bound to all CAN interfaces on the system.
52	///
53	/// You can use [`Self::recv_from()`] if you need to know on which interface a frame was received,
54	/// and [`Self::send_to()`] to send a frame on a particular interface.
55	///
56	/// This function is not async as it will either succeed or fail immediately.
57	pub fn bind_all() -> std::io::Result<Self> {
58		Self::bind_interface_index(0)
59	}
60
61	/// Get the interface this socket is bound to.
62	///
63	/// If the socket is bound to all interfaces, the returned `CanInterface` will report index 0.
64	pub fn local_addr(&self) -> std::io::Result<CanInterface> {
65		Ok(CanInterface {
66			inner: self.io.get_ref().local_addr()?,
67		})
68	}
69
70	/// Send a frame over the socket.
71	///
72	/// Note that if this function success, it only means that the kernel accepted the frame for transmission.
73	/// It does not mean the frame has been sucessfully transmitted over the CAN bus.
74	pub async fn send(&self, frame: &CanFrame) -> std::io::Result<()> {
75		self.io.async_io(tokio::io::Interest::WRITABLE, |inner| {
76			inner.send(&frame.inner)
77		}).await
78	}
79
80	/// Send a frame over the socket with a timeout.
81	///
82	/// Note that if this function success, it only means that the kernel accepted the frame for transmission.
83	/// It does not mean the frame has been sucessfully transmitted over the CAN bus.
84	///
85	/// The timeout can be a [`std::time::Duration`], [`std::time::Instant`], [`tokio::time::Instant`] or any other implementator of the [`Deadline`] trait.
86	pub async fn send_timeout(&self, frame: &CanFrame, timeout: impl Deadline) -> std::io::Result<()> {
87		let deadline = timeout.deadline().into();
88		tokio::time::timeout_at(deadline, self.send(frame)).await?
89	}
90
91	/// Try to send a frame over the socket without waiting for the socket to become writable.
92	///
93	/// Note that if this function success, it only means that the kernel accepted the frame for transmission.
94	/// It does not mean the frame has been sucessfully transmitted over the CAN bus.
95	pub fn try_send(&self, frame: &CanFrame) -> std::io::Result<()> {
96		self.io.try_io(tokio::io::Interest::WRITABLE, |inner| {
97			inner.send(&frame.inner)
98		})
99	}
100
101	/// Send a frame over a particular interface.
102	///
103	/// The interface must match the interface the socket was bound to,
104	/// or the socket must have been bound to all interfaces.
105	pub async fn send_to(&self, frame: &CanFrame, interface: &CanInterface) -> std::io::Result<()> {
106		self.io.async_io(tokio::io::Interest::WRITABLE, |inner| {
107			inner.send_to(&frame.inner, &interface.inner)
108		}).await
109	}
110
111	/// Send a frame over a particular interface.
112	///
113	/// The interface must match the interface the socket was bound to,
114	/// or the socket must have been bound to all interfaces.
115	///
116	/// Note that if this function success, it only means that the kernel accepted the frame for transmission.
117	/// It does not mean the frame has been sucessfully transmitted over the CAN bus.
118	///
119	/// The timeout can be a [`std::time::Duration`], [`std::time::Instant`], [`tokio::time::Instant`] or any other implementator of the [`Deadline`] trait.
120	pub async fn send_to_timeout(&self, frame: &CanFrame, interface: &CanInterface, timeout: impl Deadline) -> std::io::Result<()> {
121		let deadline = timeout.deadline().into();
122		tokio::time::timeout_at(deadline, self.send_to(frame, interface)).await?
123	}
124
125	/// Try to send a frame over the socket without waiting for the socket to become writable.
126	///
127	/// Note that if this function success, it only means that the kernel accepted the frame for transmission.
128	/// It does not mean the frame has been sucessfully transmitted over the CAN bus.
129	pub fn try_send_to(&self, frame: &CanFrame, interface: &CanInterface) -> std::io::Result<()> {
130		self.io.try_io(tokio::io::Interest::WRITABLE, |inner| {
131			inner.send_to(&frame.inner, &interface.inner)
132		})
133	}
134
135	/// Receive a frame from the socket.
136	pub async fn recv(&self) -> std::io::Result<CanFrame> {
137		self.io.async_io(tokio::io::Interest::READABLE, |inner| {
138			Ok(CanFrame {
139				inner: inner.recv()?,
140			})
141		}).await
142	}
143
144	/// Receive a frame from the socket with a timeout.
145	///
146	/// The timeout can be a [`std::time::Duration`], [`std::time::Instant`], [`tokio::time::Instant`] or any other implementator of the [`Deadline`] trait.
147	pub async fn recv_timeout(&self, timeout: impl Deadline) -> std::io::Result<CanFrame> {
148		let deadline = timeout.deadline().into();
149		tokio::time::timeout_at(deadline, self.recv()).await?
150	}
151
152	/// Receive a frame from the socket, without waiting for one to become available.
153	pub fn try_recv(&self) -> std::io::Result<CanFrame> {
154		self.io.try_io(tokio::io::Interest::READABLE, |socket| {
155			Ok(CanFrame {
156				inner: socket.recv()?,
157			})
158		})
159	}
160
161	/// Receive a frame from the socket, including information about which interface the frame was received on.
162	pub async fn recv_from(&self) -> std::io::Result<(CanFrame, CanInterface)> {
163		self.io.async_io(tokio::io::Interest::READABLE, |inner| {
164			let (frame, interface) = inner.recv_from()?;
165			let frame = CanFrame { inner: frame };
166			let interface = CanInterface { inner: interface };
167			Ok((frame, interface))
168		}).await
169	}
170
171	/// Receive a frame from the socket with a timeout, including information about which interface the frame was received on.
172	///
173	/// The timeout can be a [`std::time::Duration`], [`std::time::Instant`], [`tokio::time::Instant`] or any other implementator of the [`Deadline`] trait.
174	pub async fn recv_from_timeout(&self, timeout: impl Deadline) -> std::io::Result<(CanFrame, CanInterface)> {
175		let deadline = timeout.deadline().into();
176		tokio::time::timeout_at(deadline, self.recv_from()).await?
177	}
178
179	/// Receive a frame from the socket, without waiting for one to become available.
180	pub fn try_recv_from(&self) -> std::io::Result<(CanFrame, CanInterface)> {
181		self.io.try_io(tokio::io::Interest::READABLE, |socket| {
182			let (frame, interface) = socket.recv_from()?;
183			let frame = CanFrame { inner: frame };
184			let interface = CanInterface { inner: interface };
185			Ok((frame, interface))
186		})
187	}
188
189	/// Set the list of filters on the socket.
190	///
191	/// When a socket is created, it will receive all frames from the CAN interface.
192	/// You can restrict this by setting the filters with this function.
193	///
194	/// A frame has to match only one of the filters in the list to be received by the socket.
195	pub fn set_filters(&self, filters: &[CanFilter]) -> std::io::Result<()> {
196		self.io.get_ref().set_filters(filters)
197	}
198
199	/// Check if the loopback option of the socket is enabled.
200	///
201	/// When enabled (the default for new sockets),
202	/// frames sent on the same interface by other sockets are also received by this socket.
203	pub fn get_loopback(&self) -> std::io::Result<bool> {
204		self.io.get_ref().get_loopback()
205	}
206
207	/// Enable or disabling the loopback option of the socket.
208	///
209	/// When enabled (the default for new sockets),
210	/// frames sent on the same interface by other sockets are also received by this socket.
211	///
212	/// See `Self::set_receive_own_messages()` if you also want to receive messages sens on *this* socket.
213	pub fn set_loopback(&self, enable: bool) -> std::io::Result<()> {
214		self.io.get_ref().set_loopback(enable)
215	}
216
217	/// Check if the receive own messages option of the socket is enabled.
218	///
219	/// When this option is enabled, frames sent on this socket are also delivered to this socket.
220	///
221	/// Note that frames sent on this socket are subject to all the same filtering mechanisms as other frames.
222	/// To receive frames send on this socket, you must also to ensure that the loopback option is enabled ([`Self::get_loopback()`]),
223	/// and that the frame is not discarded by the filters ([`Self::set_filters()`]).
224	pub fn get_receive_own_messages(&self) -> std::io::Result<bool> {
225		self.io.get_ref().get_receive_own_messages()
226	}
227
228	/// Enable or disable the receive own messages option of the socket.
229	///
230	/// When this option is enabled, frames sent on this socket are also delivered to this socket.
231	///
232	/// Note that frames sent on this socket are subject to all the same filtering mechanisms as other frames.
233	/// To receive frames send on this socket, you must also to ensure that the loopback option is enabled ([`Self::set_loopback()`]),
234	/// and that the frame is not discarded by the filters ([`Self::set_filters()`]).
235	pub fn set_receive_own_messages(&self, enable: bool) -> std::io::Result<()> {
236		self.io.get_ref().set_receive_own_messages(enable)
237	}
238}
239
240impl std::os::fd::AsFd for CanSocket {
241	fn as_fd(&self) -> std::os::fd::BorrowedFd<'_> {
242		self.io.as_fd()
243	}
244}
245
246impl From<CanSocket> for std::os::fd::OwnedFd {
247	fn from(value: CanSocket) -> Self {
248		value.io.into_inner().into()
249	}
250}
251
252impl TryFrom<std::os::fd::OwnedFd> for CanSocket {
253	type Error = std::io::Error;
254
255	fn try_from(value: std::os::fd::OwnedFd) -> std::io::Result<Self> {
256		let io = AsyncFd::new(sys::Socket::from(value))?;
257		Ok(Self { io })
258	}
259}
260
261impl std::os::fd::AsRawFd for CanSocket {
262	fn as_raw_fd(&self) -> std::os::fd::RawFd {
263		self.io.as_raw_fd()
264	}
265}
266
267impl std::os::fd::IntoRawFd for CanSocket {
268	fn into_raw_fd(self) -> std::os::fd::RawFd {
269		self.io.into_inner().into_raw_fd()
270	}
271}