tokio_serial_pacing/
lib.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio FA AB
2// SPDX-License-Identifier: MIT
3//! This crate is a simple wrapper around
4//!
5//! [tokio_serial::SerialStream](https://docs.rs/tokio-serial/latest/tokio_serial/struct.SerialStream.html)
6//! for use with [tokio-modbus](https://docs.rs/tokio-modbus/latest/tokio_modbus/) in RTU mode.
7//! The wrappers can ensure that an application obeys the inter frame delay of 3.5 characters
8//! between reading and writing.
9//!
10//! The helper `wait_time` can attempt to calculate the proper delay from the serial port settings.
11//! In practice, the timers in tokio are not very exact, but since in theory, it's always okay to
12//! have a longer delay, that is not considered a big concern.
13//!
14//! the SerialPacing  trait just wraps the "set_delay" function as shared functionality, and then
15//! SerialReadPacing and SerialWritePacing implement the functionality around AsyncRead and
16//! AsyncWrite traits.
17//!
18//! Example
19//! ```rust
20//! #[tokio::main(flavor="current_thread")]
21//! async fn main() -> std::io::Result<()> {
22//!   use tokio_serial::{SerialPort, SerialStream};
23//!   use tokio_serial_pacing::{SerialPacing, SerialWritePacing};
24//!
25//!   let (tx, mut rx) = SerialStream::pair().expect("Failed to open PTY");
26//!   let mut rx: SerialWritePacing<SerialStream> = rx.into();
27//!   rx.set_delay(std::time::Duration::from_millis(3));
28//!   Ok(())
29//! }
30//! ```
31mod wrp;
32pub use wrp::wait_time;
33pub use wrp::{SerialPacing, SerialReadPacing, SerialWritePacing};
34
35#[cfg(test)]
36mod tests {
37    use super::*;
38    use tokio::io::AsyncReadExt;
39    use tokio::io::AsyncWriteExt;
40    use tokio::io::{AsyncRead, AsyncWrite};
41    use tokio::time::Duration;
42    use tokio::time::Instant;
43    use tokio_serial::SerialStream;
44
45    #[tokio::test]
46    async fn check_wait_time() {
47        let (s1, _s2) = SerialStream::pair().expect("Failed to open PTY");
48        let out = wait_time(&s1);
49
50        // 1.75ms is from the modbus spec, magic number is magic, but all baudrates > 19200 is
51        //   expected to use that.
52        assert_eq!(out, Duration::from_micros(1750));
53    }
54
55    // Perform a read-write operation
56    async fn read_write<T, U>(mut tx: T, mut rx: U)
57    where
58        // Trait bounds how I love you, wait. That other thing. Ewww.
59        T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
60        U: AsyncRead + AsyncWrite + Unpin + Send + 'static,
61    {
62        let write_buf = b"The implementation of RTU reception driver may imply the management of a lot of interruptions due to the t 1.5 and t 3.5 timers.";
63        /* With high communication baud rates, this leads to a heavy CPU load. Consequently these two timers must be strictly respected when the
64        baud rate is equal or lower than 19200 Bps. For baud rates greater than 19200 Bps, fixed values for the 2 timers should be used: it is
65        recommended to use a value of 750us for the inter-character time-out (t 1.5 ) and a value of 1.750ms for inter-frame delay (t 3.5 ).*/
66        const DATA_LEN: usize = 128;
67        assert_eq!(
68            write_buf.len(),
69            DATA_LEN,
70            "check that the buffer we work with is around"
71        );
72
73        let tx_task = tokio::spawn(async move {
74            eprintln!("TX=>RX:  Writing large buf");
75            tx.write_all(write_buf)
76                .await
77                .expect("TX=>RX Failed to write bytes to PTY");
78            eprintln!("TX=>RX Flushing");
79            tx.flush().await.expect("TX: can flush fail? on a PTY");
80            // we write an ack back to the sender.
81            eprintln!("TX<=RX Reading ack");
82            let mut ack = [0; 1];
83            tx.read_exact(&mut ack)
84                .await
85                .expect("TX<=RX Reading failed?");
86            assert_eq!(ack[0], 1);
87            write_buf
88        });
89
90        let rx_task = tokio::spawn(async move {
91            eprintln!("RX>=TX Reading large(?) buf");
92            let mut read_buf = [0; 256];
93            assert!(read_buf.len() >= DATA_LEN);
94            let read_num = rx
95                .read(&mut read_buf)
96                .await
97                .expect("RX<=TX Failed to eat bytes from PTY");
98
99            eprintln!("RX=>TX, Writing ack data");
100            rx.write_all(&[1])
101                .await
102                .expect("RX=>TX Failed to write ack");
103            rx.flush().await.expect("RX=>TX Failed to flush");
104            (read_buf, read_num)
105        });
106
107        let (read_buf, read_num) = rx_task.await.expect("Error in rx side");
108        let write_buf = tx_task.await.expect("Error in tx side");
109        assert_eq!(DATA_LEN, read_num);
110        assert_eq!(write_buf[0..DATA_LEN], read_buf[0..DATA_LEN]);
111        eprintln!("Test cycle complete");
112    }
113
114    #[tokio::test]
115    async fn check_write_pacing() {
116        let time_before = {
117            let (tx, rx) = SerialStream::pair().expect("Failed to open PTY");
118            let start = Instant::now();
119            read_write(tx, rx).await;
120            start.elapsed().as_micros()
121        };
122        assert!(
123            time_before < 1000,
124            "It should not take a millisecond normally."
125        );
126        let time_after = {
127            let (tx, rx) = SerialStream::pair().expect("Failed to open PTY");
128            // Wrap the _rx_ in the delay code, as it must ensure that it only writes a reply after the
129            // elapsed timeout has happened.
130            let mut rx: SerialWritePacing<SerialStream> = rx.into();
131            rx.set_delay(Duration::from_micros(1000));
132
133            let start = Instant::now();
134            read_write(tx, rx).await;
135            start.elapsed().as_micros()
136        };
137        println!("time_before={time_before} time_after={time_after}");
138        assert!(
139            time_after > 1000,
140            "It should take a millisecond with our pacing code installed"
141        );
142    }
143
144    #[tokio::test]
145    async fn check_read_pacing() {
146        let time_before = {
147            let (tx, rx) = SerialStream::pair().expect("Failed to open PTY");
148            let start = Instant::now();
149            read_write(tx, rx).await;
150            start.elapsed().as_micros()
151        };
152        assert!(
153            time_before < 1000,
154            "It should not take a millisecond normally."
155        );
156
157        let time_after = {
158            let (tx, rx) = SerialStream::pair().expect("Failed to open PTY");
159            // Wrap the tx side in a delay code, making it wait between writing and reading.
160            let mut tx: SerialReadPacing<SerialStream> = tx.into();
161            tx.set_delay(Duration::from_micros(1000));
162            let start = Instant::now();
163            read_write(tx, rx).await;
164            start.elapsed().as_micros()
165        };
166        println!("time_before={time_before} time_after={time_after}");
167        assert!(
168            time_after > 1000,
169            "It should take a millisecond with our pacing code installed"
170        );
171    }
172
173    #[test]
174    fn ensure_sync_port_works() {
175        // As I had some troubles during development to make async read/write to a serial port
176        // work, this opens the PTY naked, sync, and checks that read-write works.
177        use serialport::TTYPort;
178        use std::io::{Read, Write};
179        let (mut tx, mut rx) = TTYPort::pair().expect("Unable to create pseudo-terminal pair");
180        let mut buf = [0u8; 512];
181        for x in 1..6 {
182            let msg = format!("Message #{x}");
183            assert_eq!(tx.write(msg.as_bytes()).unwrap(), msg.len());
184            // Receive on the slave
185            let bytes_recvd = rx.read(&mut buf).unwrap();
186            assert_eq!(bytes_recvd, msg.len());
187            let msg_recvd = std::str::from_utf8(&buf[..bytes_recvd]).unwrap();
188            assert_eq!(msg_recvd, msg);
189        }
190    }
191}