Skip to main content

dartboard_client_ws/
lib.rs

1//! WebSocket [`Client`] implementation for dartboard.
2//!
3//! Runs a dedicated tokio runtime on its own OS thread. The runtime owns the
4//! ws read/write halves and two bridging channels so the sync `Client` trait
5//! (try_recv / submit_op) can talk to the async transport without forcing the
6//! caller into tokio.
7
8use std::sync::mpsc as stdmpsc;
9use std::thread;
10
11use futures_util::{SinkExt, StreamExt};
12use tokio::sync::mpsc as tkmpsc;
13use tokio_tungstenite::tungstenite::Message;
14
15use dartboard_core::{CanvasOp, Client, ClientMsg, ClientOpId, RgbColor, ServerMsg};
16
17/// The same Hello shape [`dartboard_local::Hello`] uses; defined here to
18/// avoid a server dep from the client-ws crate.
19#[derive(Debug, Clone)]
20pub struct Hello {
21    pub name: String,
22    pub color: RgbColor,
23}
24
25pub struct WebsocketClient {
26    outbound: tkmpsc::UnboundedSender<ClientMsg>,
27    inbound: stdmpsc::Receiver<ServerMsg>,
28    next_client_op_id: ClientOpId,
29    _runtime_thread: thread::JoinHandle<()>,
30}
31
32#[derive(Debug)]
33pub enum ConnectError {
34    Io(std::io::Error),
35    // Boxed to keep ConnectError small; the underlying type is ~130 bytes.
36    Ws(Box<tokio_tungstenite::tungstenite::Error>),
37    Rejected(String),
38}
39
40impl std::fmt::Display for ConnectError {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            Self::Io(e) => write!(f, "io error: {}", e),
44            Self::Ws(e) => write!(f, "ws error: {}", e),
45            Self::Rejected(reason) => write!(f, "{}", reason),
46        }
47    }
48}
49
50impl std::error::Error for ConnectError {}
51
52impl From<std::io::Error> for ConnectError {
53    fn from(e: std::io::Error) -> Self {
54        Self::Io(e)
55    }
56}
57
58impl From<tokio_tungstenite::tungstenite::Error> for ConnectError {
59    fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
60        Self::Ws(Box::new(e))
61    }
62}
63
64impl WebsocketClient {
65    pub fn connect(url: &str, hello: Hello) -> Result<Self, ConnectError> {
66        let url = url.to_string();
67        let (outbound_tx, outbound_rx) = tkmpsc::unbounded_channel::<ClientMsg>();
68        let (inbound_tx, inbound_rx) = stdmpsc::channel::<ServerMsg>();
69        let (ready_tx, ready_rx) = stdmpsc::channel();
70
71        let runtime_thread = thread::spawn(move || {
72            let runtime = match tokio::runtime::Builder::new_multi_thread()
73                .enable_all()
74                .build()
75            {
76                Ok(rt) => rt,
77                Err(e) => {
78                    let _ = ready_tx.send(Err(ConnectError::Io(e)));
79                    return;
80                }
81            };
82            runtime.block_on(async move {
83                match run_connection(url, hello, outbound_rx, inbound_tx, ready_tx).await {
84                    Ok(()) => {}
85                    Err(e) => eprintln!("ws client ended: {}", e),
86                }
87            });
88        });
89
90        match ready_rx.recv() {
91            Ok(Ok(())) => Ok(Self {
92                outbound: outbound_tx,
93                inbound: inbound_rx,
94                next_client_op_id: 1,
95                _runtime_thread: runtime_thread,
96            }),
97            Ok(Err(e)) => Err(e),
98            Err(_) => Err(ConnectError::Io(std::io::Error::other(
99                "ws thread disappeared",
100            ))),
101        }
102    }
103}
104
105async fn run_connection(
106    url: String,
107    hello: Hello,
108    mut outbound_rx: tkmpsc::UnboundedReceiver<ClientMsg>,
109    inbound_tx: stdmpsc::Sender<ServerMsg>,
110    ready_tx: stdmpsc::Sender<Result<(), ConnectError>>,
111) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
112    let (ws, _response) = match tokio_tungstenite::connect_async(&url).await {
113        Ok(v) => v,
114        Err(e) => {
115            let _ = ready_tx.send(Err(ConnectError::Ws(Box::new(e))));
116            return Ok(());
117        }
118    };
119    let (mut write, mut read) = ws.split();
120
121    let hello_text = serde_json::to_string(&ClientMsg::Hello {
122        name: hello.name,
123        color: hello.color,
124    })?;
125    write.send(Message::Text(hello_text)).await?;
126
127    // Wait for the handshake response so a full server fails fast instead of
128    // constructing a client that times out waiting for Welcome.
129    let first_msg = match read.next().await {
130        Some(Ok(Message::Text(text))) => match serde_json::from_str::<ServerMsg>(&text) {
131            Ok(msg) => msg,
132            Err(e) => {
133                let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(format!(
134                    "invalid server handshake: {e}"
135                )))));
136                return Ok(());
137            }
138        },
139        Some(Ok(other)) => {
140            let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(format!(
141                "expected server handshake text frame, got {other:?}"
142            )))));
143            return Ok(());
144        }
145        Some(Err(e)) => {
146            let _ = ready_tx.send(Err(ConnectError::Ws(Box::new(e))));
147            return Ok(());
148        }
149        None => {
150            let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(
151                "server closed before handshake completed",
152            ))));
153            return Ok(());
154        }
155    };
156    if let ServerMsg::ConnectRejected { reason } = first_msg {
157        let _ = ready_tx.send(Err(ConnectError::Rejected(reason)));
158        return Ok(());
159    }
160    if inbound_tx.send(first_msg).is_err() {
161        let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(
162            "client dropped before handshake completed",
163        ))));
164        return Ok(());
165    }
166    let _ = ready_tx.send(Ok(()));
167
168    let writer = tokio::spawn(async move {
169        while let Some(msg) = outbound_rx.recv().await {
170            let Ok(text) = serde_json::to_string(&msg) else {
171                break;
172            };
173            if write.send(Message::Text(text)).await.is_err() {
174                break;
175            }
176        }
177    });
178
179    while let Some(frame) = read.next().await {
180        let Ok(Message::Text(text)) = frame else {
181            break;
182        };
183        let Ok(msg) = serde_json::from_str::<ServerMsg>(&text) else {
184            continue;
185        };
186        if inbound_tx.send(msg).is_err() {
187            break;
188        }
189    }
190
191    writer.abort();
192    Ok(())
193}
194
195impl Client for WebsocketClient {
196    fn submit_op(&mut self, op: CanvasOp) -> ClientOpId {
197        let id = self.next_client_op_id;
198        self.next_client_op_id += 1;
199        let _ = self.outbound.send(ClientMsg::Op {
200            client_op_id: id,
201            op,
202        });
203        id
204    }
205
206    fn try_recv(&mut self) -> Option<ServerMsg> {
207        self.inbound.try_recv().ok()
208    }
209}