canopen_tokio/
lib.rs

1//! CANopen implementation for [`tokio`].
2
3#![warn(missing_docs)]
4#![warn(missing_debug_implementations)]
5
6use can_socket::tokio::CanSocket;
7use can_socket::{CanFrame, CanBaseId};
8use std::num::NonZeroU8;
9use std::time::{Duration, Instant};
10
11mod id;
12mod sync;
13pub use id::CanBaseIdExt;
14
15pub mod nmt;
16pub mod pdo;
17pub mod sdo;
18
19/// A CANopen socket.
20///
21/// Wrapper around a [`CanSocket`] that implements the `CANopen` protocol.
22#[allow(missing_debug_implementations)]
23pub struct CanOpenSocket {
24	socket: CanSocket,
25	// TODO: Save messages for later delivery?
26	// read_queue: Vec<CanFrame>,
27}
28
29/// An index in the object dictionary.
30#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
31pub struct ObjectIndex {
32	/// The main index of the object.
33	pub index: u16,
34
35	/// The subindex of the object.
36	pub subindex: u8,
37}
38
39impl CanOpenSocket {
40	/// Create a new CANopen socket from a [`CanSocket`].
41	pub fn new(can_socket: CanSocket) -> Self {
42		Self {
43			socket: can_socket,
44		}
45	}
46
47	/// Receive a raw CAN frame with a deadline.
48	///
49	/// Returns [`None`] if the deadline expires before a frame arrives.
50	/// Returns `Some(Err(...))` if the underlying CAN socket gives an error.
51	pub async fn recv_frame_deadline(
52		&mut self,
53		deadline: Instant,
54	) -> Option<std::io::Result<can_socket::CanFrame>> {
55		if Instant::now() >= deadline {
56			return None;
57		}
58		tokio::time::timeout_at(deadline.into(), self.socket.recv()).await.ok()
59	}
60
61	/// Send a raw CAN frame.
62	pub async fn send_frame(
63		&mut self,
64		frame: &CanFrame,
65	) -> std::io::Result<()> {
66		self.socket.send(frame).await
67	}
68
69	/// Send an NMT command and wait for the device to go into the specified state.
70	pub async fn send_nmt_command(
71		&mut self,
72		node_id: u8,
73		command: nmt::NmtCommand,
74		timeout: Duration,
75	) -> Result<(), nmt::NmtError> {
76		nmt::send_nmt_command(self, node_id, command, timeout).await
77	}
78
79	/// Read an object dictionary value by performing an upload from a SDO server.
80	///
81	/// Note that upload means "upload to server".
82	/// Most people outside of [CiA](https://can-cia.org/) would call this a download.
83	pub async fn sdo_upload_raw(
84		&mut self,
85		node_id: u8,
86		sdo: sdo::SdoAddress,
87		object: ObjectIndex,
88		buffer: &mut [u8],
89		timeout: Duration,
90	) -> Result<usize, sdo::SdoError> {
91		let mut buffer = buffer;
92		sdo::sdo_upload(self, node_id, sdo, object, &mut buffer, timeout).await
93	}
94
95	/// Read an object dictionary value by performing an upload from a SDO server.
96	///
97	/// Note that upload means "upload to server".
98	/// Most people outside of [CiA](https://can-cia.org/) would call this a download.
99	pub async fn sdo_upload<T: sdo::UploadObject>(
100		&mut self,
101		node_id: u8,
102		sdo: sdo::SdoAddress,
103		object: ObjectIndex,
104		timeout: Duration,
105	) -> Result<T, sdo::UploadError<T::Error>> {
106		let mut buffer = <T as sdo::UploadObject>::Buffer::default();
107		sdo::sdo_upload(self, node_id, sdo, object, &mut buffer, timeout).await
108			.map_err(sdo::UploadError::UploadFailed)?;
109		T::parse_buffer(buffer)
110			.map_err(sdo::UploadError::ParseFailed)
111	}
112
113	/// Write an object dictionary value by performing a download to a SDO server.
114	///
115	/// Note that download means "download to server".
116	/// Most people outside of [CiA](https://can-cia.org/) would call this an upload.
117	pub async fn sdo_download<T: sdo::DownloadObject>(
118		&mut self,
119		node_id: u8,
120		sdo: sdo::SdoAddress,
121		object: ObjectIndex,
122		data: T,
123		timeout: Duration,
124	) -> Result<(), sdo::SdoError> {
125		use std::borrow::Borrow;
126		let buffer = data.to_buffer();
127		sdo::sdo_download(self, node_id, sdo, object, buffer.borrow(), timeout).await
128	}
129
130	/// Get the full PDO configuration of an RPDO of a remote node.
131	pub async fn read_rpdo_configuration(
132		&mut self,
133		node_id: u8,
134		sdo: sdo::SdoAddress,
135		pdo: u16,
136		timeout: Duration,
137	) -> Result<pdo::RpdoConfiguration, pdo::PdoConfigError>
138	{
139		pdo::read_rpdo_configuration(self, node_id, sdo, pdo, timeout).await
140	}
141
142	/// Get the full configuration of a TPDO of a remote node.
143	pub async fn read_tpdo_configuration(
144		&mut self,
145		node_id: u8,
146		sdo: sdo::SdoAddress,
147		pdo: u16,
148		timeout: Duration,
149	) -> Result<pdo::TpdoConfiguration, pdo::PdoConfigError>
150	{
151		pdo::read_tpdo_configuration(self, node_id, sdo, pdo, timeout).await
152	}
153
154	/// Configure an RPDO of a remote node.
155	pub async fn configure_rpdo(
156		&mut self,
157		node_id: u8,
158		sdo: sdo::SdoAddress,
159		pdo: u16,
160		config: &pdo::RpdoConfiguration,
161		timeout: Duration,
162	) -> Result<(), pdo::PdoConfigError>
163	{
164		pdo::configure_rpdo(self, node_id, sdo, pdo, config, timeout).await
165	}
166
167	/// Configure a TPDO of a remote node.
168	pub async fn configure_tpdo(
169		&mut self,
170		node_id: u8,
171		sdo: sdo::SdoAddress,
172		pdo: u16,
173		config: &pdo::TpdoConfiguration,
174		timeout: Duration,
175	) -> Result<(), pdo::PdoConfigError>
176	{
177		pdo::configure_tpdo(self, node_id, sdo, pdo, config, timeout).await
178	}
179
180	/// Enable or disable an RPDO of a remote node.
181	pub async fn enable_rpdo(
182		&mut self,
183		node_id: u8,
184		sdo: sdo::SdoAddress,
185		pdo: u16,
186		enable: bool,
187		timeout: Duration,
188	) -> Result<(), pdo::PdoConfigError>
189	{
190		pdo::enable_rpdo(self, node_id, sdo, pdo, enable, timeout).await
191	}
192
193	/// Enable or disable a TPDO of a remote node.
194	pub async fn enable_tpdo(
195		&mut self,
196		node_id: u8,
197		sdo: sdo::SdoAddress,
198		pdo: u16,
199		enable: bool,
200		timeout: Duration,
201	) -> Result<(), pdo::PdoConfigError>
202	{
203		pdo::enable_tpdo(self, node_id, sdo, pdo, enable, timeout).await
204	}
205
206	/// Send a SYNC command to the CAN network.
207	pub async fn send_sync(
208		&mut self,
209		counter: Option<NonZeroU8>,
210	) -> Result<(), std::io::Error> {
211		sync::send_sync(self, counter).await
212	}
213
214	/// Receive a new message from the CAN bus that that matches the given predicate.
215	///
216	/// Messages already in the read queue are not returned.
217	/// If a message does not match the filter, it is added to the read queue.
218	async fn recv_new_filtered<F>(
219		&mut self,
220		predicate: F,
221		timeout: Duration,
222	) -> std::io::Result<Option<CanFrame>>
223	where
224		F: FnMut(&CanFrame) -> bool,
225	{
226		let receive_loop = async move {
227			let mut predicate = predicate;
228			loop {
229				let frame = self.socket.recv().await?;
230				if predicate(&frame) {
231					return Ok(frame);
232				} else {
233					// TODO: Save messages for later delivery?
234					// self.read_queue.push(frame)
235				}
236			}
237		};
238
239		tokio::time::timeout(timeout, receive_loop)
240			.await
241			.ok()
242			.transpose()
243	}
244
245	/// Receive a new message from the CAN bus that that matches the given function code and node ID.
246	///
247	/// RTR (request-to-read) messages are filtered out (not returned).
248	///
249	/// Messages already in the read queue are not returned.
250	/// If a message does not match the filter, it is added to the read queue.
251	async fn recv_new_by_can_id(&mut self, can_id: CanBaseId, timeout: Duration) -> std::io::Result<Option<CanFrame>> {
252		self.recv_new_filtered(|frame| !frame.is_rtr() && frame.id().to_base().ok() == Some(can_id), timeout).await
253	}
254}
255
256impl ObjectIndex {
257	/// Create a new object index from a main index and a subindex.
258	pub fn new(index: u16, subindex: u8) -> Self {
259		Self { index, subindex }
260	}
261}
262
263impl std::fmt::Debug for ObjectIndex {
264	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265		f.debug_struct("ObjectIndex")
266			.field("index", &format_args!("0x{:04X}", self.index))
267			.field("subindex", &format_args!("0x{:02X}", self.subindex))
268			.finish()
269	}
270}