tokio_lxi/
lib.rs

1#[cfg(test)]
2mod dummy;
3mod runtime;
4
5use runtime::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter, TcpStream};
6use std::future::Future;
7use std::net::SocketAddr;
8use std::pin::Pin;
9
10const DEFAULT_EOL: &[u8] = b"\r\n";
11
12#[derive(thiserror::Error, Debug)]
13pub enum Error {
14    #[error(transparent)]
15    IO(#[from] runtime::Error),
16
17    #[error("Invalid response data (lossy decoding from UTF-8): {0}")]
18    ResponseDataInvalid(String),
19
20    #[error(transparent)]
21    UserCallbackError(#[from] anyhow::Error),
22}
23
24fn remove_newline(text: &mut String) {
25    match text.pop() {
26        Some('\n') => match text.pop() {
27            Some('\r') => (),
28            Some(c) => text.push(c),
29            None => (),
30        },
31        Some(c) => text.push(c),
32        None => (),
33    }
34}
35
36pub struct LxiDevice {
37    stream: Pin<Box<BufReader<BufWriter<TcpStream>>>>,
38    eol: Vec<u8>,
39}
40
41impl LxiDevice {
42    pub async fn connect(addr: &SocketAddr) -> Result<Self, Error> {
43        Self::connect_with_buffer_capacity(addr, 1024, 128).await
44    }
45
46    pub async fn connect_with_buffer_capacity(
47        addr: &SocketAddr,
48        read_buffer_size: usize,
49        write_buffer_size: usize,
50    ) -> Result<Self, Error> {
51        let stream = BufReader::with_capacity(
52            read_buffer_size,
53            BufWriter::with_capacity(write_buffer_size, TcpStream::connect(&addr).await?),
54        );
55        Ok(Self {
56            stream: Box::pin(stream),
57            eol: DEFAULT_EOL.to_vec(),
58        })
59    }
60
61    pub fn set_eol(&mut self, eol: &[u8]) {
62        self.eol = eol.to_vec();
63    }
64
65    async fn write<T: AsRef<[u8]>>(&mut self, buf: T) -> Result<(), Error> {
66        self.stream.write_all(buf.as_ref()).await?;
67        Ok(())
68    }
69
70    pub async fn send(&mut self, req: &str) -> Result<(), Error> {
71        self.write(req).await?;
72        self.stream.write_all(&self.eol).await?;
73        self.stream.flush().await?;
74        Ok(())
75    }
76
77    pub async fn receive(&mut self) -> Result<String, Error> {
78        let mut buf = vec![];
79        self.stream.read_until(b'\n', &mut buf).await?;
80        let mut response = String::from_utf8(buf).map_err(|error| {
81            Error::ResponseDataInvalid(String::from_utf8_lossy(error.as_bytes()).into_owned())
82        })?;
83
84        remove_newline(&mut response);
85
86        Ok(response)
87    }
88
89    pub async fn receive_data<'a, T, F, P>(&'a mut self, parser: P) -> Result<T, Error>
90    where
91        F: Future<Output = Result<T, Error>> + Send,
92        P: FnOnce(Pin<&'a mut (dyn AsyncBufRead + Send)>) -> F,
93    {
94        let stream = self.stream.as_mut();
95        Ok(parser(stream).await?)
96    }
97
98    pub async fn request(&mut self, req: &str) -> Result<String, Error> {
99        self.send(req).await?;
100        self.receive().await
101    }
102
103    pub async fn request_data<'a, T, F, P>(&'a mut self, req: &str, parser: P) -> Result<T, Error>
104    where
105        F: Future<Output = Result<T, Error>> + Send,
106        P: FnOnce(Pin<&'a mut (dyn AsyncBufRead + Send)>) -> F,
107    {
108        self.send(req).await?;
109        self.receive_data(parser).await
110    }
111}
112
113#[cfg(test)]
114#[cfg(feature = "runtime-tokio")]
115mod tests {
116    use super::*;
117
118    use dummy::DummyEmulator;
119    use runtime::{AsyncReadExt, BufReader, TcpListener};
120    use std::net::{IpAddr, Ipv4Addr};
121
122    pub static LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
123
124    #[tokio::test]
125    async fn client_server() {
126        let server: TcpListener = TcpListener::bind(&SocketAddr::new(LOCALHOST, 0))
127            .await
128            .unwrap();
129        let address = server.local_addr().unwrap();
130
131        let server_future = async move {
132            let (mut conn, _): (TcpStream, _) = server.accept().await.unwrap();
133            let (mut reader, mut writer) = conn.split();
134            tokio::io::copy(&mut reader, &mut writer).await.unwrap();
135        };
136
137        let client_future = async move {
138            let mut client: TcpStream = TcpStream::connect(&address).await.unwrap();
139            let (mut reader, mut writer) = client.split();
140            let request = b"hello, server\n";
141            writer.write_all(request).await.unwrap();
142            let mut reply = vec![0; request.len()];
143            reader.read_exact(&mut reply).await.unwrap();
144
145            assert_eq!(&request[..], &reply[..]);
146        };
147
148        let (_, _) = tokio::join!(server_future, client_future);
149    }
150
151    #[tokio::test]
152    async fn dummy_idn_stream() {
153        let mut device = DummyEmulator::start(LOCALHOST).await;
154        let address = device.address().unwrap();
155        let server_future = device.run(1);
156
157        let client_future = async move {
158            let mut client: TcpStream = TcpStream::connect(&address).await.unwrap();
159            let (reader, mut writer) = client.split();
160            writer.write_all(b"IDN?\r\n").await.unwrap();
161            let mut reader = BufReader::new(reader);
162            let mut reply = vec![];
163            reader.read_until(b'\n', &mut reply).await.unwrap();
164
165            assert_eq!(&b"DummyEmulator\r\n"[..], &reply[..]);
166        };
167
168        tokio::join!(server_future, client_future);
169    }
170
171    #[tokio::test]
172    async fn dummy_idn_device() {
173        let mut device = DummyEmulator::start(LOCALHOST).await;
174        let address = device.address().unwrap();
175        let server_future = device.run(1);
176
177        let client_future = async move {
178            let mut device = LxiDevice::connect(&address).await.unwrap();
179            device.send("IDN?").await.unwrap();
180            let response = device.receive().await.unwrap();
181            assert_eq!("DummyEmulator", response);
182        };
183
184        tokio::join!(server_future, client_future);
185    }
186}