ssp_server/
server.rs

1#[cfg(feature = "jsonrpc")]
2use std::os::unix::net::{UnixListener, UnixStream};
3#[cfg(feature = "jsonrpc")]
4use std::sync::atomic::Ordering;
5use std::sync::{atomic::AtomicBool, Arc};
6use std::time;
7#[cfg(feature = "jsonrpc")]
8use std::{io::Write, net::Shutdown, path::PathBuf, thread};
9
10use parking_lot::{Mutex, MutexGuard};
11use ssp::{Error, Result};
12#[cfg(feature = "jsonrpc")]
13use ssp::{Event, Method};
14
15use crate::DeviceHandle;
16#[cfg(feature = "jsonrpc")]
17use crate::{continue_on_err, PollMode, PushEventReceiver};
18
19const HANDLE_TIMEOUT_MS: u128 = 5_000;
20const MAX_RESETS: u64 = 10;
21
22#[cfg(feature = "jsonrpc")]
23static STOP_SERVING_CLIENT: AtomicBool = AtomicBool::new(false);
24
25#[cfg(feature = "jsonrpc")]
26fn stop_serving_client() -> bool {
27    STOP_SERVING_CLIENT.load(Ordering::Relaxed)
28}
29
30#[cfg(feature = "jsonrpc")]
31fn set_stop_serving_client(s: bool) {
32    STOP_SERVING_CLIENT.store(s, Ordering::SeqCst);
33}
34
35/// SSP/eSSP server for communicating with supported device over serial.
36///
37/// Optionally, can be used interactively with a client connection.
38pub struct Server {
39    pub handle: Arc<Mutex<DeviceHandle>>,
40    #[cfg(feature = "jsonrpc")]
41    socket_path: Option<String>,
42    #[cfg(feature = "jsonrpc")]
43    push_queue: Option<PushEventReceiver>,
44    #[cfg(feature = "jsonrpc")]
45    listener: Option<UnixListener>,
46    #[cfg(feature = "jsonrpc")]
47    bus: Option<bus::Bus<Event>>,
48}
49
50impl Server {
51    /// Creates a new [Server] that automatically handles device events.
52    pub fn new_auto(
53        serial_path: &str,
54        stop_polling: Arc<AtomicBool>,
55        protocol_version: ssp::ProtocolVersion,
56    ) -> Result<Self> {
57        let handle = DeviceHandle::new(serial_path)?;
58        handle.start_background_polling(stop_polling)?;
59        // enable the device to fully configure
60        handle.enable_device(protocol_version)?;
61
62        Ok(Self {
63            handle: Arc::new(Mutex::new(handle)),
64            #[cfg(feature = "jsonrpc")]
65            socket_path: None,
66            #[cfg(feature = "jsonrpc")]
67            push_queue: None,
68            #[cfg(feature = "jsonrpc")]
69            listener: None,
70            #[cfg(feature = "jsonrpc")]
71            bus: None,
72        })
73    }
74
75    /// Creates a new [Server] that interactively handles device events via
76    /// a client connects over Unix domain sockets.
77    #[cfg(feature = "jsonrpc")]
78    #[cfg_attr(doc_cfg, doc(cfg(feature = "jsonrpc")))]
79    pub fn new_uds(
80        serial_path: &str,
81        socket_path: &str,
82        stop_polling: Arc<AtomicBool>,
83        protocol_version: ssp::ProtocolVersion,
84        encrypt: bool,
85    ) -> Result<Self> {
86        let mut handle = DeviceHandle::new(serial_path)?;
87
88        if encrypt {
89            handle.sync()?;
90            handle.renegotiate_key()?;
91            thread::sleep(time::Duration::from_millis(500));
92        }
93
94        // enable the device to fully configure
95        let mut reset_count = 0;
96        while let Err(err) = handle.enable_device(protocol_version) {
97            log::error!("error enabling device: {err}, resetting");
98
99            if let Err(err) = handle.full_reset() {
100                log::error!("error resetting device: {err}");
101            }
102
103            reset_count += 1;
104
105            if reset_count >= MAX_RESETS {
106                log::error!("maximum resets reached: {MAX_RESETS}");
107                break;
108            }
109        }
110
111        // disable again to allow client to decide when to begin accepting notes
112        if encrypt {
113            handle.disable_payout()?;
114        } else {
115            handle.disable()?;
116        }
117
118        let push_queue =
119            Some(handle.start_background_polling_with_queue(stop_polling, PollMode::Interactive)?);
120
121        // ensure path exists for UNIX socket
122        let path_dir = PathBuf::from(socket_path);
123        let sock_path = path_dir
124            .parent()
125            .ok_or(Error::Io("creating PathBuf from socket path".into()))?;
126        std::fs::create_dir_all(sock_path)?;
127
128        let listener = Some(UnixListener::bind(socket_path)?);
129        let bus = Some(bus::Bus::new(1024));
130
131        Ok(Self {
132            handle: Arc::new(Mutex::new(handle)),
133            socket_path: Some(socket_path.into()),
134            push_queue,
135            listener,
136            bus,
137        })
138    }
139
140    /// Gets a reference to the [DeviceHandle].
141    pub fn handle(&self) -> Result<MutexGuard<'_, DeviceHandle>> {
142        Self::lock_handle(&self.handle)
143    }
144
145    /// Aquires a lock on the [DeviceHandle].
146    ///
147    /// Returns an `Err(_)` if the timeout expires before acquiring the lock.
148    pub fn lock_handle(handle: &Arc<Mutex<DeviceHandle>>) -> Result<MutexGuard<'_, DeviceHandle>> {
149        let now = time::Instant::now();
150
151        while now.elapsed().as_millis() < HANDLE_TIMEOUT_MS {
152            if let Some(lock) = handle.try_lock() {
153                return Ok(lock);
154            }
155        }
156
157        Err(Error::Timeout("waiting for DeviceHandle".into()))
158    }
159
160    /// Gets a reference to the event queue bus for broadcasting [Event]s.
161    ///
162    /// Returns `Err(_)` if the [Bus](bus::Bus) is unset.
163    #[cfg(feature = "jsonrpc")]
164    pub fn bus(&self) -> Result<&bus::Bus<Event>> {
165        self.bus
166            .as_ref()
167            .ok_or(ssp::Error::JsonRpc("unset event queue Bus".into()))
168    }
169
170    /// Gets a mutable reference to the event queue bus for broadcasting [Event]s.
171    ///
172    /// Returns `Err(_)` if the [Bus](bus::Bus) is unset.
173    #[cfg(feature = "jsonrpc")]
174    pub fn bus_mut(&mut self) -> Result<&mut bus::Bus<Event>> {
175        self.bus
176            .as_mut()
177            .ok_or(ssp::Error::JsonRpc("unset event queue Bus".into()))
178    }
179
180    /// Gets a reference to the push event queue.
181    ///
182    /// Returns `Err(_)` if the [PushEventReceiver](crate::PushEventReceiver) is unset.
183    #[cfg(feature = "jsonrpc")]
184    pub fn push_queue(&self) -> Result<&PushEventReceiver> {
185        self.push_queue
186            .as_ref()
187            .ok_or(ssp::Error::JsonRpc("unset push event queue".into()))
188    }
189
190    /// Gets a mutable reference to the push event queue.
191    ///
192    /// Returns `Err(_)` if the [PushEventReceiver](crate::PushEventReceiver) is unset.
193    #[cfg(feature = "jsonrpc")]
194    pub fn push_queue_mut(&mut self) -> Result<&mut PushEventReceiver> {
195        self.push_queue
196            .as_mut()
197            .ok_or(ssp::Error::JsonRpc("unset push event queue".into()))
198    }
199
200    /// Gets a reference to the [Listener](UnixListener).
201    ///
202    /// Returns `Err(_)` if the [Listener](UnixListener) is unset.
203    #[cfg(feature = "jsonrpc")]
204    pub fn listener(&self) -> Result<&UnixListener> {
205        self.listener
206            .as_ref()
207            .ok_or(ssp::Error::JsonRpc("unset unix listener".into()))
208    }
209
210    /// Gets a mutable reference to the push event queue.
211    ///
212    /// Returns `Err(_)` if the [PushEventReceiver](crate::PushEventReceiver) is unset.
213    #[cfg(feature = "jsonrpc")]
214    pub fn listener_mut(&mut self) -> Result<&mut UnixListener> {
215        self.listener
216            .as_mut()
217            .ok_or(ssp::Error::JsonRpc("unset unix listener".into()))
218    }
219
220    #[cfg(feature = "jsonrpc")]
221    pub fn accept(&mut self, stop: Arc<AtomicBool>) -> Result<()> {
222        self.listener()?.set_nonblocking(true)?;
223
224        while !stop.load(Ordering::Relaxed) {
225            if let Ok((mut stream, _)) = self.listener_mut()?.accept() {
226                log::debug!("Accepted new connection");
227
228                let socket_timeout = std::env::var("SSP_SOCKET_TIMEOUT")
229                    .unwrap_or("1".into())
230                    .parse::<u64>()
231                    .unwrap_or(1);
232
233                stream.set_read_timeout(Some(time::Duration::from_secs(socket_timeout)))?;
234                stream.set_write_timeout(Some(time::Duration::from_secs(socket_timeout)))?;
235
236                let handle = Arc::clone(&self.handle);
237                let stop_stream = Arc::clone(&stop);
238                let mut rx = self.bus_mut()?.add_rx();
239
240                thread::spawn(move || -> Result<()> {
241                    while !stop_stream.load(Ordering::Relaxed) {
242                        let mut lock = continue_on_err!(
243                            Self::lock_handle(&handle),
244                            "lock handle in accept loop"
245                        );
246                        match Self::receive(&mut lock, &mut stream) {
247                            Ok(method) => match method {
248                                Method::Enable | Method::StackerFull => {
249                                    set_stop_serving_client(false);
250                                }
251                                Method::Disable => {
252                                    if stop_serving_client() {
253                                        let _ = stream.shutdown(Shutdown::Both);
254                                        set_stop_serving_client(false);
255                                        log::debug!("Shutting down stream");
256                                        return Ok(());
257                                    } else {
258                                        set_stop_serving_client(true);
259                                        continue;
260                                    }
261                                }
262                                Method::Shutdown => {
263                                    log::debug!("Shutting down the socket connection");
264                                    stream.shutdown(Shutdown::Both)?;
265                                    return Ok(());
266                                }
267                                _ => log::debug!("Handled method: {method}"),
268                            },
269                            Err(err) => {
270                                log::warn!("Error handling request: {err}");
271                                stream.shutdown(Shutdown::Both)?;
272                                return Err(err);
273                            }
274                        }
275
276                        while let Ok(msg) = rx.try_recv() {
277                            Self::send(&mut stream, &msg)?;
278                        }
279                    }
280
281                    Ok(())
282                });
283            }
284
285            while let Ok(msg) = self.push_queue()?.pop_event() {
286                log::trace!("Sending message from push queue: {msg}");
287
288                self.bus_mut()?.broadcast(msg);
289            }
290
291            // Sleep for a bit to avoid a tight loop
292            thread::sleep(time::Duration::from_millis(250));
293        }
294
295        Ok(())
296    }
297
298    #[cfg(feature = "jsonrpc")]
299    fn receive(handle: &mut DeviceHandle, stream: &mut UnixStream) -> Result<Method> {
300        handle.on_message(stream)
301    }
302
303    #[cfg(feature = "jsonrpc")]
304    fn send(stream: &mut UnixStream, msg: &Event) -> Result<()> {
305        log::debug!("Sending push event: {msg}");
306
307        let push_req = smol_jsonrpc::Request::new()
308            .with_method(msg.method().to_str())
309            .with_params(msg);
310
311        let mut json_str = serde_json::to_string(&push_req)?;
312        json_str += "\n";
313
314        stream.write_all(json_str.as_bytes())?;
315        stream.flush()?;
316
317        Ok(())
318    }
319}
320
321#[cfg(feature = "jsonrpc")]
322impl Drop for Server {
323    fn drop(&mut self) {
324        if let Some(path) = self.socket_path.as_ref() {
325            let _ = std::fs::remove_file(path);
326        }
327    }
328}