1use crate::error::{ProblemJson, ViiperError};
4use crate::types::*;
5use std::io::{Read, Write};
6use std::net::{SocketAddr, TcpStream, Shutdown};
7
8enum StreamWrapper {
10 Plain(TcpStream),
11 Encrypted(crate::auth::EncryptedStream),
12}
13
14impl StreamWrapper {
15 fn try_clone(&self) -> std::io::Result<Self> {
16 match self {
17 StreamWrapper::Plain(s) => Ok(StreamWrapper::Plain(s.try_clone()?)),
18 StreamWrapper::Encrypted(s) => Ok(StreamWrapper::Encrypted(s.try_clone()?)),
19 }
20 }
21
22 fn shutdown(&self, how: Shutdown) -> std::io::Result<()> {
23 match self {
24 StreamWrapper::Plain(s) => s.shutdown(how),
25 StreamWrapper::Encrypted(s) => s.shutdown(how),
26 }
27 }
28}
29
30impl Read for StreamWrapper {
31 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
32 match self {
33 StreamWrapper::Plain(s) => s.read(buf),
34 StreamWrapper::Encrypted(s) => s.read(buf),
35 }
36 }
37}
38
39impl Write for StreamWrapper {
40 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
41 match self {
42 StreamWrapper::Plain(s) => s.write(buf),
43 StreamWrapper::Encrypted(s) => s.write(buf),
44 }
45 }
46
47 fn flush(&mut self) -> std::io::Result<()> {
48 match self {
49 StreamWrapper::Plain(s) => s.flush(),
50 StreamWrapper::Encrypted(s) => s.flush(),
51 }
52 }
53}
54
55pub struct ViiperClient {
57 addr: SocketAddr,
58 password: Option<String>,
59}
60
61impl ViiperClient {
62 pub fn new(addr: SocketAddr) -> Self {
64 Self { addr, password: None }
65 }
66
67 pub fn new_with_password(addr: SocketAddr, password: String) -> Self {
70 let password = if password.is_empty() { None } else { Some(password) };
71 Self { addr, password }
72 }
73
74 fn do_request<T: for<'de> serde::Deserialize<'de>>(
75 &self,
76 path: &str,
77 payload: Option<&str>,
78 ) -> Result<T, ViiperError> {
79 let tcp_stream = TcpStream::connect(self.addr)?;
80 tcp_stream.set_nodelay(true)?;
81
82 let mut stream = if let Some(ref pwd) = self.password {
83 StreamWrapper::Encrypted(crate::auth::perform_handshake(tcp_stream, pwd)?)
84 } else {
85 StreamWrapper::Plain(tcp_stream)
86 };
87
88 stream.write_all(path.as_bytes())?;
89 if let Some(p) = payload {
90 stream.write_all(b" ")?;
91 stream.write_all(p.as_bytes())?;
92 }
93 stream.write_all(b"\0")?;
94
95 let mut buf = Vec::new();
96 stream.read_to_end(&mut buf)?;
97
98 let response = String::from_utf8(buf)
99 .map_err(|_| ViiperError::UnexpectedResponse("invalid UTF-8".into()))?
100 .trim_end_matches('\n')
101 .to_string();
102
103 if response.starts_with("{\"status\":") {
104 let problem: ProblemJson = serde_json::from_str(&response)?;
105 return Err(ViiperError::Protocol(problem));
106 }
107
108 serde_json::from_str(&response).map_err(Into::into)
109 }
110
111 pub fn ping(&self) -> Result<PingResponse, ViiperError> {
113 let path = "ping".to_string();
114 let payload: Option<String> = None;
115 self.do_request(&path, payload.as_deref())
116 }
117
118 pub fn bus_list(&self) -> Result<BusListResponse, ViiperError> {
120 let path = "bus/list".to_string();
121 let payload: Option<String> = None;
122 self.do_request(&path, payload.as_deref())
123 }
124
125 pub fn bus_create(&self, uint32: Option<u32>) -> Result<BusCreateResponse, ViiperError> {
127 let path = "bus/create".to_string();
128 let payload = uint32.map(|v| v.to_string());
129 self.do_request(&path, payload.as_deref())
130 }
131
132 pub fn bus_remove(&self, uint32: Option<u32>) -> Result<BusRemoveResponse, ViiperError> {
134 let path = "bus/remove".to_string();
135 let payload = uint32.map(|v| v.to_string());
136 self.do_request(&path, payload.as_deref())
137 }
138
139 pub fn bus_devices_list(&self, id: u32) -> Result<DevicesListResponse, ViiperError> {
141 let path = format!("bus/{}/list", id);
142 let payload: Option<String> = None;
143 self.do_request(&path, payload.as_deref())
144 }
145
146 pub fn bus_device_add(&self, id: u32, device_create_request: &DeviceCreateRequest) -> Result<Device, ViiperError> {
148 let path = format!("bus/{}/add", id);
149 let payload = Some(serde_json::to_string(&device_create_request)?);
150 self.do_request(&path, payload.as_deref())
151 }
152
153 pub fn bus_device_remove(&self, id: u32, string: Option<&str>) -> Result<DeviceRemoveResponse, ViiperError> {
155 let path = format!("bus/{}/remove", id);
156 let payload = string.map(|s| s.to_string());
157 self.do_request(&path, payload.as_deref())
158 }
159
160 pub fn connect_device(&self, bus_id: u32, dev_id: &str) -> Result<DeviceStream, ViiperError> {
162 DeviceStream::connect(self.addr, bus_id, dev_id, self.password.as_deref())
163 }
164}
165
166pub struct DeviceStream {
168 stream: StreamWrapper,
169 output_thread: Option<std::thread::JoinHandle<()>>,
170 disconnect_callback: Option<Box<dyn FnOnce() + Send + 'static>>,
171}
172
173impl DeviceStream {
174 pub fn connect(addr: SocketAddr, bus_id: u32, dev_id: &str, password: Option<&str>) -> Result<Self, ViiperError> {
175 let tcp_stream = TcpStream::connect(addr)?;
176 tcp_stream.set_nodelay(true)?;
177
178 let mut stream = if let Some(pwd) = password {
179 StreamWrapper::Encrypted(crate::auth::perform_handshake(tcp_stream, pwd)?)
180 } else {
181 StreamWrapper::Plain(tcp_stream)
182 };
183
184 let handshake = format!("bus/{}/{}\0", bus_id, dev_id);
185 stream.write_all(handshake.as_bytes())?;
186 Ok(Self {
187 stream,
188 output_thread: None,
189 disconnect_callback: None,
190 })
191 }
192
193 pub fn send<T: crate::wire::DeviceInput>(&mut self, input: &T) -> Result<(), ViiperError> {
195 let bytes = input.to_bytes();
196 self.stream.write_all(&bytes)?;
197 Ok(())
198 }
199
200 pub fn on_output<F>(&mut self, mut callback: F) -> Result<(), ViiperError>
205 where
206 F: FnMut(&mut dyn std::io::BufRead) -> std::io::Result<()> + Send + 'static,
207 {
208 if self.output_thread.is_some() {
209 return Err(ViiperError::UnexpectedResponse("Output callback already registered".into()));
210 }
211
212 let stream = self.stream.try_clone()?;
213 let disconnect = self.disconnect_callback.take();
214 let handle = std::thread::spawn(move || {
215 let mut reader = std::io::BufReader::new(stream);
216 while callback(&mut reader).is_ok() {}
217 if let Some(on_disconnect) = disconnect {
218 on_disconnect();
219 }
220 });
221 self.output_thread = Some(handle);
222 Ok(())
223 }
224
225 pub fn on_disconnect<F>(&mut self, callback: F) -> Result<(), ViiperError>
226 where
227 F: FnOnce() + Send + 'static,
228 {
229 self.disconnect_callback = Some(Box::new(callback));
230 Ok(())
231 }
232
233 pub fn send_raw(&mut self, data: &[u8]) -> Result<(), ViiperError> {
235 self.stream.write_all(data)?;
236 Ok(())
237 }
238
239 pub fn read_raw(&mut self, buf: &mut [u8]) -> Result<usize, ViiperError> {
241 self.stream.read(buf).map_err(Into::into)
242 }
243
244 pub fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), ViiperError> {
246 self.stream.read_exact(buf).map_err(Into::into)
247 }
248}
249
250impl Drop for DeviceStream {
251 fn drop(&mut self) {
252 let _ = self.stream.shutdown(std::net::Shutdown::Both);
253 if let Some(handle) = self.output_thread.take() {
254 let _ = handle.join();
255 }
256 }
257}