1use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12mod platform;
13
14use crate::platform::PlatformStream;
15use futures::task::AtomicWaker;
16
17pub use futures::stream::{Stream, TryStreamExt};
18pub use serialport;
19use serialport::{DataBits, FlowControl, Parity, StopBits};
20
21#[derive(Debug)]
22pub(crate) struct EventsInner {
23 pub(crate) in_buffer: Mutex<Vec<u8>>,
24 pub(crate) stream_error: Mutex<Option<std::io::Error>>,
25 pub(crate) waker: AtomicWaker,
26}
27
28impl EventsInner {
29 pub(crate) fn new() -> Self {
30 Self {
31 in_buffer: Mutex::new(Vec::new()),
32 stream_error: Mutex::new(None),
33 waker: AtomicWaker::new(),
34 }
35 }
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct SerialPortStreamBuilder {
63 pub(crate) path: String,
64 pub(crate) baud_rate: u32,
65 pub(crate) data_bits: DataBits,
66 pub(crate) flow_control: FlowControl,
67 pub(crate) parity: Parity,
68 pub(crate) stop_bits: StopBits,
69 pub(crate) timeout: Duration,
70 pub(crate) dtr_on_open: Option<bool>,
71}
72
73impl SerialPortStreamBuilder {
74 #[allow(clippy::assigning_clones)]
80 #[must_use]
81 pub fn path<'a>(mut self, path: impl Into<std::borrow::Cow<'a, str>>) -> Self {
82 self.path = path.into().as_ref().to_owned();
83 self
84 }
85
86 #[must_use]
90 pub fn baud_rate(mut self, baud_rate: u32) -> Self {
91 self.baud_rate = baud_rate;
92 self
93 }
94
95 #[must_use]
99 pub fn data_bits(mut self, data_bits: DataBits) -> Self {
100 self.data_bits = data_bits;
101 self
102 }
103
104 #[must_use]
108 pub fn flow_control(mut self, flow_control: FlowControl) -> Self {
109 self.flow_control = flow_control;
110 self
111 }
112
113 #[must_use]
117 pub fn parity(mut self, parity: Parity) -> Self {
118 self.parity = parity;
119 self
120 }
121
122 #[must_use]
126 pub fn stop_bits(mut self, stop_bits: StopBits) -> Self {
127 self.stop_bits = stop_bits;
128 self
129 }
130
131 #[must_use]
135 pub fn timeout(mut self, timeout: Duration) -> Self {
136 self.timeout = timeout;
137 self
138 }
139
140 #[must_use]
144 pub fn dtr_on_open(mut self, state: bool) -> Self {
145 self.dtr_on_open = Some(state);
146 self
147 }
148
149 #[must_use]
153 pub fn preserve_dtr_on_open(mut self) -> Self {
154 self.dtr_on_open = None;
155 self
156 }
157
158 pub fn open(self) -> std::io::Result<SerialPortStream> {
161 let inner = Arc::new(EventsInner::new());
162 Ok(SerialPortStream {
163 platform: PlatformStream::new(self, inner.clone())?,
164 inner,
165 })
166 }
167}
168
169pub fn new<'a>(
176 path: impl Into<std::borrow::Cow<'a, str>>,
177 baud_rate: u32,
178) -> SerialPortStreamBuilder {
179 SerialPortStreamBuilder {
180 path: path.into().into_owned(),
181 baud_rate,
182 data_bits: DataBits::Eight,
183 flow_control: FlowControl::None,
184 parity: Parity::None,
185 stop_bits: StopBits::One,
186 timeout: Duration::from_millis(0),
187 dtr_on_open: None,
188 }
189}
190
191#[derive(Debug)]
198pub struct SerialPortStream {
199 platform: PlatformStream,
200 inner: Arc<EventsInner>,
201}
202
203impl SerialPortStream {
204 pub fn clear(&mut self, buffer_to_clear: serialport::ClearBuffer) -> std::io::Result<()> {
205 self.platform.clear(buffer_to_clear)
206 }
207
208 pub fn set_break(&mut self) -> std::io::Result<()> {
209 self.platform.set_break()
210 }
211
212 pub fn clear_break(&mut self) -> std::io::Result<()> {
213 self.platform.clear_break()
214 }
215
216 pub fn write_request_to_send(&mut self, level: bool) -> std::io::Result<()> {
217 self.platform.write_request_to_send(level)
218 }
219
220 pub fn write_data_terminal_ready(&mut self, level: bool) -> std::io::Result<()> {
221 self.platform.write_data_terminal_ready(level)
222 }
223
224 pub fn read_clear_to_send(&mut self) -> std::io::Result<bool> {
225 self.platform.read_clear_to_send()
226 }
227
228 pub fn read_data_set_ready(&mut self) -> std::io::Result<bool> {
229 self.platform.read_data_set_ready()
230 }
231
232 pub fn read_ring_indicator(&mut self) -> std::io::Result<bool> {
233 self.platform.read_ring_indicator()
234 }
235
236 pub fn read_carrier_detect(&mut self) -> std::io::Result<bool> {
237 self.platform.read_carrier_detect()
238 }
239
240 pub fn bytes_to_read(&self) -> std::io::Result<u32> {
241 self.platform.bytes_to_read()
242 }
243
244 pub fn try_poll_next(
245 &mut self,
246 cx: &mut Context<'_>,
247 ) -> Poll<Option<Result<Vec<u8>, std::io::Error>>> {
248 self.inner.waker.register(cx.waker());
249
250 if let Some(err) = self.inner.stream_error.lock().unwrap().as_ref() {
251 return Poll::Ready(Some(Err(std::io::Error::new(err.kind(), err.to_string()))));
252 }
253
254 if !self.platform.is_thread_started() {
255 self.platform.start_thread();
256 return Poll::Pending;
257 }
258
259 let mut buffer = self.inner.in_buffer.lock().unwrap();
260 if !buffer.is_empty() {
261 let data = buffer.drain(..).collect();
263 return Poll::Ready(Some(Ok(data)));
264 }
265
266 Poll::Pending
267 }
268}
269
270unsafe impl Send for SerialPortStream {}
271
272unsafe impl Sync for SerialPortStream {}
273
274impl std::io::Read for SerialPortStream {
275 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
276 self.platform.read(buf)
277 }
278}
279
280impl std::io::Write for SerialPortStream {
281 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
282 self.platform.write(buf)
283 }
284
285 fn flush(&mut self) -> std::io::Result<()> {
286 self.platform.flush()
287 }
288}
289
290impl Stream for SerialPortStream {
291 type Item = Result<Vec<u8>, std::io::Error>;
292
293 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294 self.try_poll_next(cx)
295 }
296}