1#[cfg(test)]
2mod dummy;
3mod runtime;
4
5use runtime::{AsyncBufRead, AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter, TcpStream};
6use std::future::Future;
7use std::net::SocketAddr;
8use std::pin::Pin;
9
10const DEFAULT_EOL: &[u8] = b"\r\n";
11
12#[derive(thiserror::Error, Debug)]
13pub enum Error {
14 #[error(transparent)]
15 IO(#[from] runtime::Error),
16
17 #[error("Invalid response data (lossy decoding from UTF-8): {0}")]
18 ResponseDataInvalid(String),
19
20 #[error(transparent)]
21 UserCallbackError(#[from] anyhow::Error),
22}
23
24fn remove_newline(text: &mut String) {
25 match text.pop() {
26 Some('\n') => match text.pop() {
27 Some('\r') => (),
28 Some(c) => text.push(c),
29 None => (),
30 },
31 Some(c) => text.push(c),
32 None => (),
33 }
34}
35
36pub struct LxiDevice {
37 stream: Pin<Box<BufReader<BufWriter<TcpStream>>>>,
38 eol: Vec<u8>,
39}
40
41impl LxiDevice {
42 pub async fn connect(addr: &SocketAddr) -> Result<Self, Error> {
43 Self::connect_with_buffer_capacity(addr, 1024, 128).await
44 }
45
46 pub async fn connect_with_buffer_capacity(
47 addr: &SocketAddr,
48 read_buffer_size: usize,
49 write_buffer_size: usize,
50 ) -> Result<Self, Error> {
51 let stream = BufReader::with_capacity(
52 read_buffer_size,
53 BufWriter::with_capacity(write_buffer_size, TcpStream::connect(&addr).await?),
54 );
55 Ok(Self {
56 stream: Box::pin(stream),
57 eol: DEFAULT_EOL.to_vec(),
58 })
59 }
60
61 pub fn set_eol(&mut self, eol: &[u8]) {
62 self.eol = eol.to_vec();
63 }
64
65 async fn write<T: AsRef<[u8]>>(&mut self, buf: T) -> Result<(), Error> {
66 self.stream.write_all(buf.as_ref()).await?;
67 Ok(())
68 }
69
70 pub async fn send(&mut self, req: &str) -> Result<(), Error> {
71 self.write(req).await?;
72 self.stream.write_all(&self.eol).await?;
73 self.stream.flush().await?;
74 Ok(())
75 }
76
77 pub async fn receive(&mut self) -> Result<String, Error> {
78 let mut buf = vec![];
79 self.stream.read_until(b'\n', &mut buf).await?;
80 let mut response = String::from_utf8(buf).map_err(|error| {
81 Error::ResponseDataInvalid(String::from_utf8_lossy(error.as_bytes()).into_owned())
82 })?;
83
84 remove_newline(&mut response);
85
86 Ok(response)
87 }
88
89 pub async fn receive_data<'a, T, F, P>(&'a mut self, parser: P) -> Result<T, Error>
90 where
91 F: Future<Output = Result<T, Error>> + Send,
92 P: FnOnce(Pin<&'a mut (dyn AsyncBufRead + Send)>) -> F,
93 {
94 let stream = self.stream.as_mut();
95 Ok(parser(stream).await?)
96 }
97
98 pub async fn request(&mut self, req: &str) -> Result<String, Error> {
99 self.send(req).await?;
100 self.receive().await
101 }
102
103 pub async fn request_data<'a, T, F, P>(&'a mut self, req: &str, parser: P) -> Result<T, Error>
104 where
105 F: Future<Output = Result<T, Error>> + Send,
106 P: FnOnce(Pin<&'a mut (dyn AsyncBufRead + Send)>) -> F,
107 {
108 self.send(req).await?;
109 self.receive_data(parser).await
110 }
111}
112
113#[cfg(test)]
114#[cfg(feature = "runtime-tokio")]
115mod tests {
116 use super::*;
117
118 use dummy::DummyEmulator;
119 use runtime::{AsyncReadExt, BufReader, TcpListener};
120 use std::net::{IpAddr, Ipv4Addr};
121
122 pub static LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::LOCALHOST);
123
124 #[tokio::test]
125 async fn client_server() {
126 let server: TcpListener = TcpListener::bind(&SocketAddr::new(LOCALHOST, 0))
127 .await
128 .unwrap();
129 let address = server.local_addr().unwrap();
130
131 let server_future = async move {
132 let (mut conn, _): (TcpStream, _) = server.accept().await.unwrap();
133 let (mut reader, mut writer) = conn.split();
134 tokio::io::copy(&mut reader, &mut writer).await.unwrap();
135 };
136
137 let client_future = async move {
138 let mut client: TcpStream = TcpStream::connect(&address).await.unwrap();
139 let (mut reader, mut writer) = client.split();
140 let request = b"hello, server\n";
141 writer.write_all(request).await.unwrap();
142 let mut reply = vec![0; request.len()];
143 reader.read_exact(&mut reply).await.unwrap();
144
145 assert_eq!(&request[..], &reply[..]);
146 };
147
148 let (_, _) = tokio::join!(server_future, client_future);
149 }
150
151 #[tokio::test]
152 async fn dummy_idn_stream() {
153 let mut device = DummyEmulator::start(LOCALHOST).await;
154 let address = device.address().unwrap();
155 let server_future = device.run(1);
156
157 let client_future = async move {
158 let mut client: TcpStream = TcpStream::connect(&address).await.unwrap();
159 let (reader, mut writer) = client.split();
160 writer.write_all(b"IDN?\r\n").await.unwrap();
161 let mut reader = BufReader::new(reader);
162 let mut reply = vec![];
163 reader.read_until(b'\n', &mut reply).await.unwrap();
164
165 assert_eq!(&b"DummyEmulator\r\n"[..], &reply[..]);
166 };
167
168 tokio::join!(server_future, client_future);
169 }
170
171 #[tokio::test]
172 async fn dummy_idn_device() {
173 let mut device = DummyEmulator::start(LOCALHOST).await;
174 let address = device.address().unwrap();
175 let server_future = device.run(1);
176
177 let client_future = async move {
178 let mut device = LxiDevice::connect(&address).await.unwrap();
179 device.send("IDN?").await.unwrap();
180 let response = device.receive().await.unwrap();
181 assert_eq!("DummyEmulator", response);
182 };
183
184 tokio::join!(server_future, client_future);
185 }
186}