1use std::{sync::Arc, task::Poll};
2
3use futures_util::{
4 stream::{SplitSink, SplitStream},
5 FutureExt, SinkExt, Stream, StreamExt,
6};
7use thiserror::Error;
8use tokio::{net::TcpStream, sync::Mutex};
9use tokio_websockets::{Error, Message, WebSocketStream};
10use tracing::{debug, trace};
11
12use crate::protocol::{ClientCommand, MiddlewareCommand, MonitorSettings};
13
14#[derive(Debug, Error)]
15pub enum ServerError {
16 #[error(transparent)]
18 Ws(#[from] tokio_websockets::Error),
19 #[error(transparent)]
21 Json(#[from] serde_json::Error),
22 #[error("Text-based (json) client command is expected from the serial plotter")]
26 NonTextMessage,
27}
28
29#[derive(Debug, Clone)]
39pub struct Server {
40 ws_stream: Arc<Mutex<SplitStream<WebSocketStream<TcpStream>>>>,
41}
42impl Server {
43 pub fn new(ws_stream: SplitStream<WebSocketStream<TcpStream>>) -> Self {
44 Self {
45 ws_stream: Arc::new(Mutex::new(ws_stream)),
46 }
47 }
48}
49
50impl Stream for Server {
51 type Item = Result<ClientCommand, ServerError>;
52
53 fn poll_next(
54 self: std::pin::Pin<&mut Self>,
55 cx: &mut std::task::Context<'_>,
56 ) -> Poll<Option<Self::Item>> {
57 let mut pin = Box::pin(self.ws_stream.lock());
58 match pin.poll_unpin(cx) {
59 Poll::Ready(mut guard) => guard.poll_next_unpin(cx).map(|next_value| {
60 next_value.and_then(|result| {
61 let a = result.map_err(ServerError::Ws).and_then(|message| {
62 if message.is_close() {
63 debug!("Websocket closed");
64 return Ok(None);
65 }
66
67 let client_command = message
78 .as_text()
79 .ok_or(ServerError::NonTextMessage)
80 .and_then(|text_payload| {
81 trace!(text_payload, "Text WS message received");
82
83 serde_json::from_str::<ClientCommand>(text_payload)
84 .map_err(ServerError::Json)
85 })?;
86
87 Ok(Some(client_command))
88 });
89
90 a.transpose()
91 })
92 }),
93 Poll::Pending => Poll::Pending,
94 }
95 }
96}
97
98#[derive(Debug, Clone)]
102pub struct Client {
103 ws_sink: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
104}
105
106impl Client {
107 pub fn new(ws_sink: SplitSink<WebSocketStream<TcpStream>, Message>) -> Self {
108 Self {
109 ws_sink: Arc::new(Mutex::new(ws_sink)),
110 }
111 }
112
113 pub async fn set_monitor_settings(
116 &self,
117 monitor_settings: MonitorSettings,
118 ) -> Result<(), Error> {
119 let settings = MiddlewareCommand(monitor_settings);
120
121 trace!("Settings to be sent: {settings:?}");
122 let command_json = serde_json::to_string(&settings).unwrap();
123 trace!("Settings command JSON to be sent: {command_json:?}");
124
125 self.ws_sink
126 .lock()
127 .await
128 .send(Message::text(command_json))
129 .await
130 }
131
132 pub async fn send(&self, data: &[&str]) -> Result<(), Error> {
134 let data_json = serde_json::to_string(data).expect("Should always be serializable!");
135
136 self.ws_sink
137 .lock()
138 .await
139 .send(Message::text(data_json))
140 .await
141 }
142}