cat_dev_serial/
lib.rs

1//! Tools for interacting with the serial port of a cat-dev.
2//!
3//! This interface was originally a fork of:
4//! <https://github.com/de-vri-es/serial2-tokio-rs> at commit:
5//! `65ff229f65c27c57e261f94dc6cc9a761cce9b21`.
6//! And
7//! <https://github.com/de-vri-es/serial2-rs/> at commit:
8//! `dc1333ce8f205e77cb2a89d2ed52463ff56cdc04`
9//!
10//! You can see the dual apache/bsd licenses for them at:
11//! <https://raw.githubusercontent.com/de-vri-es/serial2-tokio-rs/65ff229f65c27c57e261f94dc6cc9a761cce9b21/LICENSE-APACHE>
12//! <https://raw.githubusercontent.com/de-vri-es/serial2-tokio-rs/65ff229f65c27c57e261f94dc6cc9a761cce9b21/LICENSE-BSD>
13//!
14//! This fork has had minor changes to it, mostly updating to dependencies
15//! like `windows`, over `winapi` and other logging integrations that we want
16//! the overarching library to have, etc.
17//!
18//! We also contain a slightly-modified copy of the [`tokio::io::Lines`]
19//! structure from [`tokio`], but one that reads the newlines of the
20//! serial port `\r`, as opposed to the normal `.lines()` method
21//! which uses `\n`.
22
23mod async_sys;
24mod underlying;
25
26pub use async_sys::*;
27pub use underlying::*;
28
29use pin_project_lite::pin_project;
30use std::{
31	fmt::{Debug, Formatter, Result as FmtResult},
32	future::Future,
33	pin::Pin,
34	string::FromUtf8Error,
35	task::{Context, Poll},
36};
37use tokio::io::{AsyncBufRead, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
38
39macro_rules! ready {
40	($e:expr $(,)?) => {
41		match $e {
42			std::task::Poll::Ready(t) => t,
43			std::task::Poll::Pending => return std::task::Poll::Pending,
44		}
45	};
46}
47
48pin_project! {
49	/// Reads serial lines from an [`AsyncBufRead`].
50	///
51	/// This tries to act a lot like the structure returned from `.lines()` on a
52	/// normal async buf reader. Except instead of splitting on `\n`, we use
53	/// `\r`.
54	///
55	/// [`AsyncBufRead`]: tokio::io::AsyncBufRead
56	/// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
57	#[derive(Debug)]
58	#[must_use = "streams do nothing unless polled"]
59	pub struct SerialLines<ReaderTy> {
60		#[pin]
61		reader: ReaderTy,
62		buf: String,
63		bytes: Vec<u8>,
64		read: usize,
65	}
66}
67
68impl<ReaderTy> SerialLines<ReaderTy>
69where
70	ReaderTy: AsyncBufRead + Unpin,
71{
72	/// Grab the next line from the serial line reader.
73	///
74	/// ## Errors
75	///
76	/// If we get an [`IoError`] back from the underlying reader we're reading
77	/// from.
78	pub async fn next_line(&mut self) -> IoResult<Option<String>> {
79		poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
80	}
81}
82
83impl<ReaderTy> SerialLines<ReaderTy>
84where
85	ReaderTy: AsyncBufRead,
86{
87	/// Create a new Serial Lines reader.
88	pub fn new(reader: ReaderTy) -> Self {
89		SerialLines {
90			reader,
91			buf: String::new(),
92			bytes: Vec::new(),
93			read: 0,
94		}
95	}
96
97	/// Get a constant reference to the current reader type.
98	#[must_use]
99	pub const fn get_ref(&self) -> &ReaderTy {
100		&self.reader
101	}
102
103	/// Get a mutable reference to the current reader type.
104	#[must_use]
105	pub const fn get_mut(&mut self) -> &mut ReaderTy {
106		&mut self.reader
107	}
108
109	/// Polls for the next line in the stream.
110	///
111	/// This method returns:
112	///
113	///  * `Poll::Pending` if the next line is not yet available.
114	///  * `Poll::Ready(Ok(Some(line)))` if the next line is available.
115	///  * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
116	///  * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
117	///
118	/// When the method returns `Poll::Pending`, the `Waker` in the provided
119	/// `Context` is scheduled to receive a wakeup when more bytes become
120	/// available on the underlying IO resource.  Note that on multiple calls to
121	/// `poll_next_line`, only the `Waker` from the `Context` passed to the most
122	/// recent call is scheduled to receive a wakeup.
123	pub fn poll_next_line(
124		self: Pin<&mut Self>,
125		cx: &mut Context<'_>,
126	) -> Poll<IoResult<Option<String>>> {
127		let me = self.project();
128
129		let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
130		debug_assert_eq!(*me.read, 0);
131
132		if n == 0 && me.buf.is_empty() {
133			return Poll::Ready(Ok(None));
134		}
135
136		if me.buf.ends_with('\r') {
137			me.buf.pop();
138		}
139
140		Poll::Ready(Ok(Some(std::mem::take(me.buf))))
141	}
142}
143
144fn read_line_internal<ReaderTy: AsyncBufRead + ?Sized>(
145	reader: Pin<&mut ReaderTy>,
146	cx: &mut Context<'_>,
147	output: &mut String,
148	buf: &mut Vec<u8>,
149	read: &mut usize,
150) -> Poll<IoResult<usize>> {
151	let io_res = ready!(read_until_internal(reader, cx, b'\r', buf, read));
152	let utf8_res = String::from_utf8(std::mem::take(buf));
153	// At this point both buf and output are empty. The allocation is in utf8_res.
154	debug_assert!(buf.is_empty());
155	debug_assert!(output.is_empty());
156	finish_string_read(io_res, utf8_res, *read, output, false)
157}
158
159fn read_until_internal<ReaderTy: AsyncBufRead + ?Sized>(
160	mut reader: Pin<&mut ReaderTy>,
161	cx: &mut Context<'_>,
162	delimiter: u8,
163	buf: &mut Vec<u8>,
164	read: &mut usize,
165) -> Poll<IoResult<usize>> {
166	loop {
167		let (done, used) = {
168			let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
169			if let Some(i) = memchr(delimiter, available) {
170				buf.extend_from_slice(&available[..=i]);
171				(true, i + 1)
172			} else {
173				buf.extend_from_slice(available);
174				(false, available.len())
175			}
176		};
177		reader.as_mut().consume(used);
178		*read += used;
179		if done || used == 0 {
180			return Poll::Ready(Ok(std::mem::replace(read, 0)));
181		}
182	}
183}
184
185#[cfg(not(unix))]
186fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
187	haystack.iter().position(|val| needle == *val)
188}
189
190#[cfg(unix)]
191#[allow(clippy::cast_lossless)]
192fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
193	let start = haystack.as_ptr();
194
195	// SAFETY: `start` is valid for `haystack.len()` bytes.
196	let ptr = unsafe { libc::memchr(start.cast(), needle as _, haystack.len()) };
197
198	if ptr.is_null() {
199		None
200	} else {
201		Some(ptr as usize - start as usize)
202	}
203}
204
205// This struct is intentionally `!Unpin` when `F` is `!Unpin`. This is to
206// mitigate the issue where rust puts noalias on mutable references to the
207// `PollFn` type if it is `Unpin`. If the closure has ownership of a future,
208// then this "leaks" and the future is affected by noalias too, which we don't
209// want.
210//
211// See this thread for more information:
212// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
213
214/// Future for the [`poll_fn`] function.
215struct PollFn<F> {
216	f: F,
217}
218
219/// Creates a new future wrapping around a function returning [`Poll`].
220fn poll_fn<T, F>(f: F) -> PollFn<F>
221where
222	F: FnMut(&mut Context<'_>) -> Poll<T>,
223{
224	PollFn { f }
225}
226
227impl<F> Debug for PollFn<F> {
228	fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
229		f.debug_struct("PollFn").finish()
230	}
231}
232
233impl<T, F> Future for PollFn<F>
234where
235	F: FnMut(&mut Context<'_>) -> Poll<T>,
236{
237	type Output = T;
238
239	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
240		// Safety: We never construct a `Pin<&mut F>` anywhere, so accessing `f`
241		// mutably in an unpinned way is sound.
242		//
243		// This use of unsafe cannot be replaced with the pin-project macro
244		// because:
245		//  * If we put `#[pin]` on the field, then it gives us a `Pin<&mut F>`,
246		//    which we can't use to call the closure.
247		//  * If we don't put `#[pin]` on the field, then it makes `PollFn` be
248		//    unconditionally `Unpin`, which we also don't want.
249		let me = unsafe { Pin::into_inner_unchecked(self) };
250		(me.f)(cx)
251	}
252}
253
254fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_read: usize) {
255	let original_len = vector.len() - num_bytes_read;
256	vector.truncate(original_len);
257	*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
258}
259
260/// This handles the various failure cases and puts the string back into `output`.
261///
262/// The `truncate_on_io_error` `bool` is necessary because `read_to_string` and `read_line`
263/// disagree on what should happen when an IO error occurs.
264fn finish_string_read(
265	io_res: IoResult<usize>,
266	utf8_res: Result<String, FromUtf8Error>,
267	read: usize,
268	output: &mut String,
269	truncate_on_io_error: bool,
270) -> Poll<IoResult<usize>> {
271	match (io_res, utf8_res) {
272		(Ok(num_bytes), Ok(string)) => {
273			debug_assert_eq!(read, 0);
274			*output = string;
275			Poll::Ready(Ok(num_bytes))
276		}
277		(Err(io_err), Ok(string)) => {
278			*output = string;
279			if truncate_on_io_error {
280				let original_len = output.len() - read;
281				output.truncate(original_len);
282			}
283			Poll::Ready(Err(io_err))
284		}
285		(Ok(num_bytes), Err(utf8_err)) => {
286			debug_assert_eq!(read, 0);
287			put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
288			Poll::Ready(Err(IoError::new(
289				IoErrorKind::InvalidData,
290				"stream did not contain valid UTF-8",
291			)))
292		}
293		(Err(io_err), Err(utf8_err)) => {
294			put_back_original_data(output, utf8_err.into_bytes(), read);
295			Poll::Ready(Err(io_err))
296		}
297	}
298}
299
300#[cfg(test)]
301mod unit_tests {
302	use super::*;
303
304	#[test]
305	fn memchr_test() {
306		let haystack = b"123abc456\0\xffabc\n";
307		assert_eq!(memchr(b'1', haystack), Some(0));
308		assert_eq!(memchr(b'2', haystack), Some(1));
309		assert_eq!(memchr(b'3', haystack), Some(2));
310		assert_eq!(memchr(b'4', haystack), Some(6));
311		assert_eq!(memchr(b'5', haystack), Some(7));
312		assert_eq!(memchr(b'6', haystack), Some(8));
313		assert_eq!(memchr(b'7', haystack), None);
314		assert_eq!(memchr(b'a', haystack), Some(3));
315		assert_eq!(memchr(b'b', haystack), Some(4));
316		assert_eq!(memchr(b'c', haystack), Some(5));
317		assert_eq!(memchr(b'd', haystack), None);
318		assert_eq!(memchr(b'A', haystack), None);
319		assert_eq!(memchr(0, haystack), Some(9));
320		assert_eq!(memchr(0xff, haystack), Some(10));
321		assert_eq!(memchr(0xfe, haystack), None);
322		assert_eq!(memchr(1, haystack), None);
323		assert_eq!(memchr(b'\n', haystack), Some(14));
324		assert_eq!(memchr(b'\r', haystack), None);
325	}
326
327	#[test]
328	fn memchr_all() {
329		let mut arr = Vec::new();
330		for b in 0..=255 {
331			arr.push(b);
332		}
333		for b in 0..=255 {
334			assert_eq!(memchr(b, &arr), Some(b as usize));
335		}
336		arr.reverse();
337		for b in 0..=255 {
338			assert_eq!(memchr(b, &arr), Some(255 - b as usize));
339		}
340	}
341
342	#[test]
343	fn memchr_empty() {
344		for b in 0..=255 {
345			assert_eq!(memchr(b, b""), None);
346		}
347	}
348}