cat_dev/serial/
mod.rs

1//! Tools for interacting with the serial port of a cat-dev.
2//!
3//! This interface was originally a fork of:
4//! <https://github.com/de-vri-es/serial2-tokio-rs> at commit:
5//! `65ff229f65c27c57e261f94dc6cc9a761cce9b21`.
6//! And
7//! <https://github.com/de-vri-es/serial2-rs/> at commit:
8//! `dc1333ce8f205e77cb2a89d2ed52463ff56cdc04`
9//!
10//! You can see the dual apache/bsd licenses for them at:
11//! <https://raw.githubusercontent.com/de-vri-es/serial2-tokio-rs/65ff229f65c27c57e261f94dc6cc9a761cce9b21/LICENSE-APACHE>
12//! <https://raw.githubusercontent.com/de-vri-es/serial2-tokio-rs/65ff229f65c27c57e261f94dc6cc9a761cce9b21/LICENSE-BSD>
13//!
14//! This fork has had minor changes to it, mostly updating to dependencies
15//! like `windows`, over `winapi` and other logging integrations that we want
16//! the overarching library to have, etc.
17//!
18//! We also contain a slightly-modified copy of the [`tokio::io::Lines`]
19//! structure from [`tokio`], but one that reads the newlines of the
20//! serial port `\r`, as opposed to the normal `.lines()` method
21//! which uses `\n`.
22
23mod async_sys;
24mod underlying;
25
26pub use async_sys::*;
27pub use underlying::*;
28
29use pin_project_lite::pin_project;
30use std::{
31	fmt::{Debug, Formatter, Result as FmtResult},
32	future::Future,
33	pin::Pin,
34	string::FromUtf8Error,
35	task::{Context, Poll},
36};
37use tokio::io::{AsyncBufRead, Error as IoError, ErrorKind as IoErrorKind, Result as IoResult};
38
39macro_rules! ready {
40	($e:expr $(,)?) => {
41		match $e {
42			std::task::Poll::Ready(t) => t,
43			std::task::Poll::Pending => return std::task::Poll::Pending,
44		}
45	};
46}
47
48pin_project! {
49	/// Reads serial lines from an [`AsyncBufRead`].
50	///
51	/// This tries to act a lot like the structure returned from `.lines()` on a
52	/// normal async buf reader. Except instead of splitting on `\n`, we use
53	/// `\r`.
54	///
55	/// [`AsyncBufRead`]: tokio::io::AsyncBufRead
56	/// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
57	#[derive(Debug)]
58	#[must_use = "streams do nothing unless polled"]
59	pub struct SerialLines<ReaderTy> {
60		#[pin]
61		reader: ReaderTy,
62		buf: String,
63		bytes: Vec<u8>,
64		read: usize,
65	}
66}
67
68impl<ReaderTy> SerialLines<ReaderTy>
69where
70	ReaderTy: AsyncBufRead + Unpin,
71{
72	/// Grab the next line from the serial line reader.
73	///
74	/// ## Errors
75	///
76	/// If we get an [`IoError`] back from the underlying reader we're reading
77	/// from.
78	pub async fn next_line(&mut self) -> IoResult<Option<String>> {
79		poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
80	}
81}
82
83impl<ReaderTy> SerialLines<ReaderTy>
84where
85	ReaderTy: AsyncBufRead,
86{
87	/// Create a new Serial Lines reader.
88	pub fn new(reader: ReaderTy) -> Self {
89		SerialLines {
90			reader,
91			buf: String::new(),
92			bytes: Vec::new(),
93			read: 0,
94		}
95	}
96
97	/// Polls for the next line in the stream.
98	///
99	/// This method returns:
100	///
101	///  * `Poll::Pending` if the next line is not yet available.
102	///  * `Poll::Ready(Ok(Some(line)))` if the next line is available.
103	///  * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
104	///  * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
105	///
106	/// When the method returns `Poll::Pending`, the `Waker` in the provided
107	/// `Context` is scheduled to receive a wakeup when more bytes become
108	/// available on the underlying IO resource.  Note that on multiple calls to
109	/// `poll_next_line`, only the `Waker` from the `Context` passed to the most
110	/// recent call is scheduled to receive a wakeup.
111	pub fn poll_next_line(
112		self: Pin<&mut Self>,
113		cx: &mut Context<'_>,
114	) -> Poll<IoResult<Option<String>>> {
115		let me = self.project();
116
117		let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
118		debug_assert_eq!(*me.read, 0);
119
120		if n == 0 && me.buf.is_empty() {
121			return Poll::Ready(Ok(None));
122		}
123
124		if me.buf.ends_with('\r') {
125			me.buf.pop();
126		}
127
128		Poll::Ready(Ok(Some(std::mem::take(me.buf))))
129	}
130}
131
132fn read_line_internal<ReaderTy: AsyncBufRead + ?Sized>(
133	reader: Pin<&mut ReaderTy>,
134	cx: &mut Context<'_>,
135	output: &mut String,
136	buf: &mut Vec<u8>,
137	read: &mut usize,
138) -> Poll<IoResult<usize>> {
139	let io_res = ready!(read_until_internal(reader, cx, b'\r', buf, read));
140	let utf8_res = String::from_utf8(std::mem::take(buf));
141	// At this point both buf and output are empty. The allocation is in utf8_res.
142	debug_assert!(buf.is_empty());
143	debug_assert!(output.is_empty());
144	finish_string_read(io_res, utf8_res, *read, output, false)
145}
146
147fn read_until_internal<ReaderTy: AsyncBufRead + ?Sized>(
148	mut reader: Pin<&mut ReaderTy>,
149	cx: &mut Context<'_>,
150	delimiter: u8,
151	buf: &mut Vec<u8>,
152	read: &mut usize,
153) -> Poll<IoResult<usize>> {
154	loop {
155		let (done, used) = {
156			let available = ready!(reader.as_mut().poll_fill_buf(cx))?;
157			if let Some(i) = memchr(delimiter, available) {
158				buf.extend_from_slice(&available[..=i]);
159				(true, i + 1)
160			} else {
161				buf.extend_from_slice(available);
162				(false, available.len())
163			}
164		};
165		reader.as_mut().consume(used);
166		*read += used;
167		if done || used == 0 {
168			return Poll::Ready(Ok(std::mem::replace(read, 0)));
169		}
170	}
171}
172
173#[cfg(not(unix))]
174fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
175	haystack.iter().position(|val| needle == *val)
176}
177
178#[cfg(unix)]
179#[allow(clippy::cast_lossless)]
180fn memchr(needle: u8, haystack: &[u8]) -> Option<usize> {
181	let start = haystack.as_ptr();
182
183	// SAFETY: `start` is valid for `haystack.len()` bytes.
184	let ptr = unsafe { libc::memchr(start.cast(), needle as _, haystack.len()) };
185
186	if ptr.is_null() {
187		None
188	} else {
189		Some(ptr as usize - start as usize)
190	}
191}
192
193// This struct is intentionally `!Unpin` when `F` is `!Unpin`. This is to
194// mitigate the issue where rust puts noalias on mutable references to the
195// `PollFn` type if it is `Unpin`. If the closure has ownership of a future,
196// then this "leaks" and the future is affected by noalias too, which we don't
197// want.
198//
199// See this thread for more information:
200// <https://internals.rust-lang.org/t/surprising-soundness-trouble-around-pollfn/17484>
201
202/// Future for the [`poll_fn`] function.
203struct PollFn<F> {
204	f: F,
205}
206
207/// Creates a new future wrapping around a function returning [`Poll`].
208fn poll_fn<T, F>(f: F) -> PollFn<F>
209where
210	F: FnMut(&mut Context<'_>) -> Poll<T>,
211{
212	PollFn { f }
213}
214
215impl<F> Debug for PollFn<F> {
216	fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
217		f.debug_struct("PollFn").finish()
218	}
219}
220
221impl<T, F> Future for PollFn<F>
222where
223	F: FnMut(&mut Context<'_>) -> Poll<T>,
224{
225	type Output = T;
226
227	fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
228		// Safety: We never construct a `Pin<&mut F>` anywhere, so accessing `f`
229		// mutably in an unpinned way is sound.
230		//
231		// This use of unsafe cannot be replaced with the pin-project macro
232		// because:
233		//  * If we put `#[pin]` on the field, then it gives us a `Pin<&mut F>`,
234		//    which we can't use to call the closure.
235		//  * If we don't put `#[pin]` on the field, then it makes `PollFn` be
236		//    unconditionally `Unpin`, which we also don't want.
237		let me = unsafe { Pin::into_inner_unchecked(self) };
238		(me.f)(cx)
239	}
240}
241
242fn put_back_original_data(output: &mut String, mut vector: Vec<u8>, num_bytes_read: usize) {
243	let original_len = vector.len() - num_bytes_read;
244	vector.truncate(original_len);
245	*output = String::from_utf8(vector).expect("The original data must be valid utf-8.");
246}
247
248/// This handles the various failure cases and puts the string back into `output`.
249///
250/// The `truncate_on_io_error` `bool` is necessary because `read_to_string` and `read_line`
251/// disagree on what should happen when an IO error occurs.
252fn finish_string_read(
253	io_res: IoResult<usize>,
254	utf8_res: Result<String, FromUtf8Error>,
255	read: usize,
256	output: &mut String,
257	truncate_on_io_error: bool,
258) -> Poll<IoResult<usize>> {
259	match (io_res, utf8_res) {
260		(Ok(num_bytes), Ok(string)) => {
261			debug_assert_eq!(read, 0);
262			*output = string;
263			Poll::Ready(Ok(num_bytes))
264		}
265		(Err(io_err), Ok(string)) => {
266			*output = string;
267			if truncate_on_io_error {
268				let original_len = output.len() - read;
269				output.truncate(original_len);
270			}
271			Poll::Ready(Err(io_err))
272		}
273		(Ok(num_bytes), Err(utf8_err)) => {
274			debug_assert_eq!(read, 0);
275			put_back_original_data(output, utf8_err.into_bytes(), num_bytes);
276			Poll::Ready(Err(IoError::new(
277				IoErrorKind::InvalidData,
278				"stream did not contain valid UTF-8",
279			)))
280		}
281		(Err(io_err), Err(utf8_err)) => {
282			put_back_original_data(output, utf8_err.into_bytes(), read);
283			Poll::Ready(Err(io_err))
284		}
285	}
286}
287
288#[cfg(test)]
289mod unit_tests {
290	use super::*;
291
292	#[test]
293	fn memchr_test() {
294		let haystack = b"123abc456\0\xffabc\n";
295		assert_eq!(memchr(b'1', haystack), Some(0));
296		assert_eq!(memchr(b'2', haystack), Some(1));
297		assert_eq!(memchr(b'3', haystack), Some(2));
298		assert_eq!(memchr(b'4', haystack), Some(6));
299		assert_eq!(memchr(b'5', haystack), Some(7));
300		assert_eq!(memchr(b'6', haystack), Some(8));
301		assert_eq!(memchr(b'7', haystack), None);
302		assert_eq!(memchr(b'a', haystack), Some(3));
303		assert_eq!(memchr(b'b', haystack), Some(4));
304		assert_eq!(memchr(b'c', haystack), Some(5));
305		assert_eq!(memchr(b'd', haystack), None);
306		assert_eq!(memchr(b'A', haystack), None);
307		assert_eq!(memchr(0, haystack), Some(9));
308		assert_eq!(memchr(0xff, haystack), Some(10));
309		assert_eq!(memchr(0xfe, haystack), None);
310		assert_eq!(memchr(1, haystack), None);
311		assert_eq!(memchr(b'\n', haystack), Some(14));
312		assert_eq!(memchr(b'\r', haystack), None);
313	}
314
315	#[test]
316	fn memchr_all() {
317		let mut arr = Vec::new();
318		for b in 0..=255 {
319			arr.push(b);
320		}
321		for b in 0..=255 {
322			assert_eq!(memchr(b, &arr), Some(b as usize));
323		}
324		arr.reverse();
325		for b in 0..=255 {
326			assert_eq!(memchr(b, &arr), Some(255 - b as usize));
327		}
328	}
329
330	#[test]
331	fn memchr_empty() {
332		for b in 0..=255 {
333			assert_eq!(memchr(b, b""), None);
334		}
335	}
336}