1#[cfg(feature = "jsonrpc")]
2use std::os::unix::net::{UnixListener, UnixStream};
3#[cfg(feature = "jsonrpc")]
4use std::sync::atomic::Ordering;
5use std::sync::{atomic::AtomicBool, Arc};
6use std::time;
7#[cfg(feature = "jsonrpc")]
8use std::{io::Write, net::Shutdown, path::PathBuf, thread};
9
10use parking_lot::{Mutex, MutexGuard};
11use ssp::{Error, Result};
12#[cfg(feature = "jsonrpc")]
13use ssp::{Event, Method};
14
15use crate::DeviceHandle;
16#[cfg(feature = "jsonrpc")]
17use crate::{continue_on_err, PollMode, PushEventReceiver};
18
19const HANDLE_TIMEOUT_MS: u128 = 5_000;
20const MAX_RESETS: u64 = 10;
21
22#[cfg(feature = "jsonrpc")]
23static STOP_SERVING_CLIENT: AtomicBool = AtomicBool::new(false);
24
25#[cfg(feature = "jsonrpc")]
26fn stop_serving_client() -> bool {
27 STOP_SERVING_CLIENT.load(Ordering::Relaxed)
28}
29
30#[cfg(feature = "jsonrpc")]
31fn set_stop_serving_client(s: bool) {
32 STOP_SERVING_CLIENT.store(s, Ordering::SeqCst);
33}
34
35pub struct Server {
39 pub handle: Arc<Mutex<DeviceHandle>>,
40 #[cfg(feature = "jsonrpc")]
41 socket_path: Option<String>,
42 #[cfg(feature = "jsonrpc")]
43 push_queue: Option<PushEventReceiver>,
44 #[cfg(feature = "jsonrpc")]
45 listener: Option<UnixListener>,
46 #[cfg(feature = "jsonrpc")]
47 bus: Option<bus::Bus<Event>>,
48}
49
50impl Server {
51 pub fn new_auto(
53 serial_path: &str,
54 stop_polling: Arc<AtomicBool>,
55 protocol_version: ssp::ProtocolVersion,
56 ) -> Result<Self> {
57 let handle = DeviceHandle::new(serial_path)?;
58 handle.start_background_polling(stop_polling)?;
59 handle.enable_device(protocol_version)?;
61
62 Ok(Self {
63 handle: Arc::new(Mutex::new(handle)),
64 #[cfg(feature = "jsonrpc")]
65 socket_path: None,
66 #[cfg(feature = "jsonrpc")]
67 push_queue: None,
68 #[cfg(feature = "jsonrpc")]
69 listener: None,
70 #[cfg(feature = "jsonrpc")]
71 bus: None,
72 })
73 }
74
75 #[cfg(feature = "jsonrpc")]
78 #[cfg_attr(doc_cfg, doc(cfg(feature = "jsonrpc")))]
79 pub fn new_uds(
80 serial_path: &str,
81 socket_path: &str,
82 stop_polling: Arc<AtomicBool>,
83 protocol_version: ssp::ProtocolVersion,
84 encrypt: bool,
85 ) -> Result<Self> {
86 let mut handle = DeviceHandle::new(serial_path)?;
87
88 if encrypt {
89 handle.sync()?;
90 handle.renegotiate_key()?;
91 thread::sleep(time::Duration::from_millis(500));
92 }
93
94 let mut reset_count = 0;
96 while let Err(err) = handle.enable_device(protocol_version) {
97 log::error!("error enabling device: {err}, resetting");
98
99 if let Err(err) = handle.full_reset() {
100 log::error!("error resetting device: {err}");
101 }
102
103 reset_count += 1;
104
105 if reset_count >= MAX_RESETS {
106 log::error!("maximum resets reached: {MAX_RESETS}");
107 break;
108 }
109 }
110
111 if encrypt {
113 handle.disable_payout()?;
114 } else {
115 handle.disable()?;
116 }
117
118 let push_queue =
119 Some(handle.start_background_polling_with_queue(stop_polling, PollMode::Interactive)?);
120
121 let path_dir = PathBuf::from(socket_path);
123 let sock_path = path_dir
124 .parent()
125 .ok_or(Error::Io("creating PathBuf from socket path".into()))?;
126 std::fs::create_dir_all(sock_path)?;
127
128 let listener = Some(UnixListener::bind(socket_path)?);
129 let bus = Some(bus::Bus::new(1024));
130
131 Ok(Self {
132 handle: Arc::new(Mutex::new(handle)),
133 socket_path: Some(socket_path.into()),
134 push_queue,
135 listener,
136 bus,
137 })
138 }
139
140 pub fn handle(&self) -> Result<MutexGuard<'_, DeviceHandle>> {
142 Self::lock_handle(&self.handle)
143 }
144
145 pub fn lock_handle(handle: &Arc<Mutex<DeviceHandle>>) -> Result<MutexGuard<'_, DeviceHandle>> {
149 let now = time::Instant::now();
150
151 while now.elapsed().as_millis() < HANDLE_TIMEOUT_MS {
152 if let Some(lock) = handle.try_lock() {
153 return Ok(lock);
154 }
155 }
156
157 Err(Error::Timeout("waiting for DeviceHandle".into()))
158 }
159
160 #[cfg(feature = "jsonrpc")]
164 pub fn bus(&self) -> Result<&bus::Bus<Event>> {
165 self.bus
166 .as_ref()
167 .ok_or(ssp::Error::JsonRpc("unset event queue Bus".into()))
168 }
169
170 #[cfg(feature = "jsonrpc")]
174 pub fn bus_mut(&mut self) -> Result<&mut bus::Bus<Event>> {
175 self.bus
176 .as_mut()
177 .ok_or(ssp::Error::JsonRpc("unset event queue Bus".into()))
178 }
179
180 #[cfg(feature = "jsonrpc")]
184 pub fn push_queue(&self) -> Result<&PushEventReceiver> {
185 self.push_queue
186 .as_ref()
187 .ok_or(ssp::Error::JsonRpc("unset push event queue".into()))
188 }
189
190 #[cfg(feature = "jsonrpc")]
194 pub fn push_queue_mut(&mut self) -> Result<&mut PushEventReceiver> {
195 self.push_queue
196 .as_mut()
197 .ok_or(ssp::Error::JsonRpc("unset push event queue".into()))
198 }
199
200 #[cfg(feature = "jsonrpc")]
204 pub fn listener(&self) -> Result<&UnixListener> {
205 self.listener
206 .as_ref()
207 .ok_or(ssp::Error::JsonRpc("unset unix listener".into()))
208 }
209
210 #[cfg(feature = "jsonrpc")]
214 pub fn listener_mut(&mut self) -> Result<&mut UnixListener> {
215 self.listener
216 .as_mut()
217 .ok_or(ssp::Error::JsonRpc("unset unix listener".into()))
218 }
219
220 #[cfg(feature = "jsonrpc")]
221 pub fn accept(&mut self, stop: Arc<AtomicBool>) -> Result<()> {
222 self.listener()?.set_nonblocking(true)?;
223
224 while !stop.load(Ordering::Relaxed) {
225 if let Ok((mut stream, _)) = self.listener_mut()?.accept() {
226 log::debug!("Accepted new connection");
227
228 let socket_timeout = std::env::var("SSP_SOCKET_TIMEOUT")
229 .unwrap_or("1".into())
230 .parse::<u64>()
231 .unwrap_or(1);
232
233 stream.set_read_timeout(Some(time::Duration::from_secs(socket_timeout)))?;
234 stream.set_write_timeout(Some(time::Duration::from_secs(socket_timeout)))?;
235
236 let handle = Arc::clone(&self.handle);
237 let stop_stream = Arc::clone(&stop);
238 let mut rx = self.bus_mut()?.add_rx();
239
240 thread::spawn(move || -> Result<()> {
241 while !stop_stream.load(Ordering::Relaxed) {
242 let mut lock = continue_on_err!(
243 Self::lock_handle(&handle),
244 "lock handle in accept loop"
245 );
246 match Self::receive(&mut lock, &mut stream) {
247 Ok(method) => match method {
248 Method::Enable | Method::StackerFull => {
249 set_stop_serving_client(false);
250 }
251 Method::Disable => {
252 if stop_serving_client() {
253 let _ = stream.shutdown(Shutdown::Both);
254 set_stop_serving_client(false);
255 log::debug!("Shutting down stream");
256 return Ok(());
257 } else {
258 set_stop_serving_client(true);
259 continue;
260 }
261 }
262 Method::Shutdown => {
263 log::debug!("Shutting down the socket connection");
264 stream.shutdown(Shutdown::Both)?;
265 return Ok(());
266 }
267 _ => log::debug!("Handled method: {method}"),
268 },
269 Err(err) => {
270 log::warn!("Error handling request: {err}");
271 stream.shutdown(Shutdown::Both)?;
272 return Err(err);
273 }
274 }
275
276 while let Ok(msg) = rx.try_recv() {
277 Self::send(&mut stream, &msg)?;
278 }
279 }
280
281 Ok(())
282 });
283 }
284
285 while let Ok(msg) = self.push_queue()?.pop_event() {
286 log::trace!("Sending message from push queue: {msg}");
287
288 self.bus_mut()?.broadcast(msg);
289 }
290
291 thread::sleep(time::Duration::from_millis(250));
293 }
294
295 Ok(())
296 }
297
298 #[cfg(feature = "jsonrpc")]
299 fn receive(handle: &mut DeviceHandle, stream: &mut UnixStream) -> Result<Method> {
300 handle.on_message(stream)
301 }
302
303 #[cfg(feature = "jsonrpc")]
304 fn send(stream: &mut UnixStream, msg: &Event) -> Result<()> {
305 log::debug!("Sending push event: {msg}");
306
307 let push_req = smol_jsonrpc::Request::new()
308 .with_method(msg.method().to_str())
309 .with_params(msg);
310
311 let mut json_str = serde_json::to_string(&push_req)?;
312 json_str += "\n";
313
314 stream.write_all(json_str.as_bytes())?;
315 stream.flush()?;
316
317 Ok(())
318 }
319}
320
321#[cfg(feature = "jsonrpc")]
322impl Drop for Server {
323 fn drop(&mut self) {
324 if let Some(path) = self.socket_path.as_ref() {
325 let _ = std::fs::remove_file(path);
326 }
327 }
328}