Skip to main content

serialport_stream/
lib.rs

1//! # serialport-stream
2//!
3//! Pure event driven implementation of futures::Stream for reading data from serialport utilizing [serialport-rs](https://github.com/serialport/serialport-rs).
4//! Produces 1-N amount of bytes depending on polling interval. Initial poll starts background thread which will indefinitely wait for data in event, error or drop.
5//!
6
7use std::pin::Pin;
8use std::sync::{Arc, Mutex};
9use std::task::{Context, Poll};
10use std::time::Duration;
11
12mod platform;
13
14use crate::platform::PlatformStream;
15use futures::task::AtomicWaker;
16
17pub use futures::stream::{Stream, TryStreamExt};
18pub use serialport;
19use serialport::{DataBits, FlowControl, Parity, StopBits};
20
21#[derive(Debug)]
22pub(crate) struct EventsInner {
23    pub(crate) in_buffer: Mutex<Vec<u8>>,
24    pub(crate) stream_error: Mutex<Option<std::io::Error>>,
25    pub(crate) waker: AtomicWaker,
26}
27
28impl EventsInner {
29    pub(crate) fn new() -> Self {
30        Self {
31            in_buffer: Mutex::new(Vec::new()),
32            stream_error: Mutex::new(None),
33            waker: AtomicWaker::new(),
34        }
35    }
36}
37
38/// Builder for configuring and opening a serial port stream.
39///
40/// Use the [`new()`] function to create a builder, then chain configuration
41/// methods before calling [`open()`](SerialPortStreamBuilder::open).
42///
43/// # Example
44///
45/// ```no_run
46/// use serialport_stream::new;
47/// use serialport::{DataBits, Parity, StopBits, FlowControl};
48/// use std::time::Duration;
49///
50/// # fn example() -> std::io::Result<()> {
51/// let stream = new("/dev/ttyUSB0", 115200)
52///     .data_bits(DataBits::Eight)
53///     .parity(Parity::None)
54///     .stop_bits(StopBits::One)
55///     .flow_control(FlowControl::None)
56///     .timeout(Duration::from_millis(100))
57///     .open()?;
58/// # Ok(())
59/// # }
60/// ```
61#[derive(Debug, Clone, PartialEq, Eq)]
62pub struct SerialPortStreamBuilder {
63    pub(crate) path: String,
64    pub(crate) baud_rate: u32,
65    pub(crate) data_bits: DataBits,
66    pub(crate) flow_control: FlowControl,
67    pub(crate) parity: Parity,
68    pub(crate) stop_bits: StopBits,
69    pub(crate) timeout: Duration,
70    pub(crate) dtr_on_open: Option<bool>,
71}
72
73impl SerialPortStreamBuilder {
74    /// Sets the path to the serial port device.
75    ///
76    /// # Examples
77    /// - Unix: `"/dev/ttyUSB0"`, `"/dev/ttyACM0"`
78    /// - Windows: `"COM3"`, `"COM10"`
79    #[allow(clippy::assigning_clones)]
80    #[must_use]
81    pub fn path<'a>(mut self, path: impl Into<std::borrow::Cow<'a, str>>) -> Self {
82        self.path = path.into().as_ref().to_owned();
83        self
84    }
85
86    /// Sets the baud rate (bits per second).
87    ///
88    /// Common values: 9600, 19200, 38400, 57600, 115200
89    #[must_use]
90    pub fn baud_rate(mut self, baud_rate: u32) -> Self {
91        self.baud_rate = baud_rate;
92        self
93    }
94
95    /// Sets the number of data bits per character.
96    ///
97    /// Default: `DataBits::Eight`
98    #[must_use]
99    pub fn data_bits(mut self, data_bits: DataBits) -> Self {
100        self.data_bits = data_bits;
101        self
102    }
103
104    /// Sets the flow control mode.
105    ///
106    /// Default: `FlowControl::None`
107    #[must_use]
108    pub fn flow_control(mut self, flow_control: FlowControl) -> Self {
109        self.flow_control = flow_control;
110        self
111    }
112
113    /// Sets the parity checking mode.
114    ///
115    /// Default: `Parity::None`
116    #[must_use]
117    pub fn parity(mut self, parity: Parity) -> Self {
118        self.parity = parity;
119        self
120    }
121
122    /// Sets the number of stop bits.
123    ///
124    /// Default: `StopBits::One`
125    #[must_use]
126    pub fn stop_bits(mut self, stop_bits: StopBits) -> Self {
127        self.stop_bits = stop_bits;
128        self
129    }
130
131    /// Sets the timeout for read and write operations.
132    ///
133    /// Default: `Duration::from_millis(0)` (non-blocking)
134    #[must_use]
135    pub fn timeout(mut self, timeout: Duration) -> Self {
136        self.timeout = timeout;
137        self
138    }
139
140    /// Sets the DTR (Data Terminal Ready) signal state when opening the port.
141    ///
142    /// If not called, the DTR state is preserved from the previous port state.
143    #[must_use]
144    pub fn dtr_on_open(mut self, state: bool) -> Self {
145        self.dtr_on_open = Some(state);
146        self
147    }
148
149    /// Preserves the current DTR state when opening the port.
150    ///
151    /// This is the default behavior.
152    #[must_use]
153    pub fn preserve_dtr_on_open(mut self) -> Self {
154        self.dtr_on_open = None;
155        self
156    }
157
158    /// Opens the serial port and creates the stream.
159    ///
160    pub fn open(self) -> std::io::Result<SerialPortStream> {
161        let inner = Arc::new(EventsInner::new());
162        Ok(SerialPortStream {
163            platform: PlatformStream::new(self, inner.clone())?,
164            inner,
165        })
166    }
167}
168
169/// Creates a new serial port stream builder.
170///
171/// This is the main entry point for creating a serial port stream. After creating
172/// the builder, you can chain configuration methods and call `.open()` to create
173/// the stream.
174///
175pub fn new<'a>(
176    path: impl Into<std::borrow::Cow<'a, str>>,
177    baud_rate: u32,
178) -> SerialPortStreamBuilder {
179    SerialPortStreamBuilder {
180        path: path.into().into_owned(),
181        baud_rate,
182        data_bits: DataBits::Eight,
183        flow_control: FlowControl::None,
184        parity: Parity::None,
185        stop_bits: StopBits::One,
186        timeout: Duration::from_millis(0),
187        dtr_on_open: None,
188    }
189}
190
191/// An async stream for reading from a serial port.
192///
193/// This struct provides both synchronous and asynchronous I/O on a serial port:
194/// - Implements `std::io::Read` and `std::io::Write` for synchronous operations
195/// - Implements `futures::Stream` for asynchronous streaming of incoming data
196///
197#[derive(Debug)]
198pub struct SerialPortStream {
199    platform: PlatformStream,
200    inner: Arc<EventsInner>,
201}
202
203impl SerialPortStream {
204    pub fn clear(&mut self, buffer_to_clear: serialport::ClearBuffer) -> std::io::Result<()> {
205        self.platform.clear(buffer_to_clear)
206    }
207
208    pub fn set_break(&mut self) -> std::io::Result<()> {
209        self.platform.set_break()
210    }
211
212    pub fn clear_break(&mut self) -> std::io::Result<()> {
213        self.platform.clear_break()
214    }
215
216    pub fn write_request_to_send(&mut self, level: bool) -> std::io::Result<()> {
217        self.platform.write_request_to_send(level)
218    }
219
220    pub fn write_data_terminal_ready(&mut self, level: bool) -> std::io::Result<()> {
221        self.platform.write_data_terminal_ready(level)
222    }
223
224    pub fn read_clear_to_send(&mut self) -> std::io::Result<bool> {
225        self.platform.read_clear_to_send()
226    }
227
228    pub fn read_data_set_ready(&mut self) -> std::io::Result<bool> {
229        self.platform.read_data_set_ready()
230    }
231
232    pub fn read_ring_indicator(&mut self) -> std::io::Result<bool> {
233        self.platform.read_ring_indicator()
234    }
235
236    pub fn read_carrier_detect(&mut self) -> std::io::Result<bool> {
237        self.platform.read_carrier_detect()
238    }
239
240    pub fn bytes_to_read(&self) -> std::io::Result<u32> {
241        self.platform.bytes_to_read()
242    }
243
244    pub fn try_poll_next(
245        &mut self,
246        cx: &mut Context<'_>,
247    ) -> Poll<Option<Result<Vec<u8>, std::io::Error>>> {
248        self.inner.waker.register(cx.waker());
249
250        if let Some(err) = self.inner.stream_error.lock().unwrap().as_ref() {
251            return Poll::Ready(Some(Err(std::io::Error::new(err.kind(), err.to_string()))));
252        }
253
254        if !self.platform.is_thread_started() {
255            self.platform.start_thread();
256            return Poll::Pending;
257        }
258
259        let mut buffer = self.inner.in_buffer.lock().unwrap();
260        if !buffer.is_empty() {
261            // Drain all available data
262            let data = buffer.drain(..).collect();
263            return Poll::Ready(Some(Ok(data)));
264        }
265
266        Poll::Pending
267    }
268}
269
270unsafe impl Send for SerialPortStream {}
271
272unsafe impl Sync for SerialPortStream {}
273
274impl std::io::Read for SerialPortStream {
275    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
276        self.platform.read(buf)
277    }
278}
279
280impl std::io::Write for SerialPortStream {
281    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
282        self.platform.write(buf)
283    }
284
285    fn flush(&mut self) -> std::io::Result<()> {
286        self.platform.flush()
287    }
288}
289
290impl Stream for SerialPortStream {
291    type Item = Result<Vec<u8>, std::io::Error>;
292
293    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
294        self.try_poll_next(cx)
295    }
296}