arduino_plotter/
api.rs

1use std::{sync::Arc, task::Poll};
2
3use futures_util::{
4    stream::{SplitSink, SplitStream},
5    FutureExt, SinkExt, Stream, StreamExt,
6};
7use thiserror::Error;
8use tokio::{net::TcpStream, sync::Mutex};
9use tokio_websockets::{Error, Message, WebSocketStream};
10use tracing::{debug, trace};
11
12use crate::protocol::{ClientCommand, MiddlewareCommand, MonitorSettings};
13
14#[derive(Debug, Error)]
15pub enum ServerError {
16    /// A Websocket Error occurred
17    #[error(transparent)]
18    Ws(#[from] tokio_websockets::Error),
19    /// An error occurred during the deserializing of a JSON to a value
20    #[error(transparent)]
21    Json(#[from] serde_json::Error),
22    /// WebSocket Message response was not a text one.
23    ///
24    /// See [`tokio_websockets::Message::as_text`] for more details.
25    #[error("Text-based (json) client command is expected from the serial plotter")]
26    NonTextMessage,
27}
28
29/// Server is needed for receiving messages from the plotter app.
30///
31/// 2 messages are possible [`ClientCommand`] and a websocket closing message:
32/// - `SEND_MESSAGE` - sending message to the board through serial
33/// - `CHANGE_SETTINGS` - settings for [`EndOfLine`] has bee changed in the application
34///
35/// Cheap to clone as it has an internal Atomic reference counter ([`Arc`]) for the Websocket Stream
36///
37/// [`EndOfLine`]: crate::protocol::EndOfLine
38#[derive(Debug, Clone)]
39pub struct Server {
40    ws_stream: Arc<Mutex<SplitStream<WebSocketStream<TcpStream>>>>,
41}
42impl Server {
43    pub fn new(ws_stream: SplitStream<WebSocketStream<TcpStream>>) -> Self {
44        Self {
45            ws_stream: Arc::new(Mutex::new(ws_stream)),
46        }
47    }
48}
49
50impl Stream for Server {
51    type Item = Result<ClientCommand, ServerError>;
52
53    fn poll_next(
54        self: std::pin::Pin<&mut Self>,
55        cx: &mut std::task::Context<'_>,
56    ) -> Poll<Option<Self::Item>> {
57        let mut pin = Box::pin(self.ws_stream.lock());
58        match pin.poll_unpin(cx) {
59            Poll::Ready(mut guard) => guard.poll_next_unpin(cx).map(|next_value| {
60                next_value.and_then(|result| {
61                    let a = result.map_err(ServerError::Ws).and_then(|message| {
62                        if message.is_close() {
63                            debug!("Websocket closed");
64                            return Ok(None);
65                        }
66
67                        // causes unsafe precondition panic on Rust 1.78
68                        // match message.as_close() {
69                        //     Some((close_code, reason)) => {
70                        //         debug!(?close_code, reason, "Websocket closed");
71                        //         // todo: notify the client for the closed websocket
72                        //         return Ok(None);
73                        //     }
74                        //     None => {}
75                        // }
76
77                        let client_command = message
78                            .as_text()
79                            .ok_or(ServerError::NonTextMessage)
80                            .and_then(|text_payload| {
81                            trace!(text_payload, "Text WS message received");
82
83                            serde_json::from_str::<ClientCommand>(text_payload)
84                                .map_err(ServerError::Json)
85                        })?;
86
87                        Ok(Some(client_command))
88                    });
89
90                    a.transpose()
91                })
92            }),
93            Poll::Pending => Poll::Pending,
94        }
95    }
96}
97
98/// Client for sending Data message or [`MiddlewareCommand`] (i.e. [`MonitorSettings`])
99///
100/// Cheap to clone as it has an internal Atomic reference counter ([`Arc`]) for the Websocket Stream
101#[derive(Debug, Clone)]
102pub struct Client {
103    ws_sink: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
104}
105
106impl Client {
107    pub fn new(ws_sink: SplitSink<WebSocketStream<TcpStream>, Message>) -> Self {
108        Self {
109            ws_sink: Arc::new(Mutex::new(ws_sink)),
110        }
111    }
112
113    /// Send a [`MonitorSettings`] ([`MiddlewareCommand`]) to the Arduino Serial Plotter UI
114    /// through an already established connection.
115    pub async fn set_monitor_settings(
116        &self,
117        monitor_settings: MonitorSettings,
118    ) -> Result<(), Error> {
119        let settings = MiddlewareCommand(monitor_settings);
120
121        trace!("Settings to be sent: {settings:?}");
122        let command_json = serde_json::to_string(&settings).unwrap();
123        trace!("Settings command JSON to be sent: {command_json:?}");
124
125        self.ws_sink
126            .lock()
127            .await
128            .send(Message::text(command_json))
129            .await
130    }
131
132    /// Send a Data lines message to the Arduino Serial Plotter UI to plot.
133    pub async fn send(&self, data: &[&str]) -> Result<(), Error> {
134        let data_json = serde_json::to_string(data).expect("Should always be serializable!");
135
136        self.ws_sink
137            .lock()
138            .await
139            .send(Message::text(data_json))
140            .await
141    }
142}