dartboard_client_ws/
lib.rs1use std::sync::mpsc as stdmpsc;
9use std::thread;
10
11use futures_util::{SinkExt, StreamExt};
12use tokio::sync::mpsc as tkmpsc;
13use tokio_tungstenite::tungstenite::Message;
14
15use dartboard_core::{CanvasOp, Client, ClientMsg, ClientOpId, RgbColor, ServerMsg};
16
17#[derive(Debug, Clone)]
20pub struct Hello {
21 pub name: String,
22 pub color: RgbColor,
23}
24
25pub struct WebsocketClient {
26 outbound: tkmpsc::UnboundedSender<ClientMsg>,
27 inbound: stdmpsc::Receiver<ServerMsg>,
28 next_client_op_id: ClientOpId,
29 _runtime_thread: thread::JoinHandle<()>,
30}
31
32#[derive(Debug)]
33pub enum ConnectError {
34 Io(std::io::Error),
35 Ws(Box<tokio_tungstenite::tungstenite::Error>),
37 Rejected(String),
38}
39
40impl std::fmt::Display for ConnectError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 Self::Io(e) => write!(f, "io error: {}", e),
44 Self::Ws(e) => write!(f, "ws error: {}", e),
45 Self::Rejected(reason) => write!(f, "{}", reason),
46 }
47 }
48}
49
50impl std::error::Error for ConnectError {}
51
52impl From<std::io::Error> for ConnectError {
53 fn from(e: std::io::Error) -> Self {
54 Self::Io(e)
55 }
56}
57
58impl From<tokio_tungstenite::tungstenite::Error> for ConnectError {
59 fn from(e: tokio_tungstenite::tungstenite::Error) -> Self {
60 Self::Ws(Box::new(e))
61 }
62}
63
64impl WebsocketClient {
65 pub fn connect(url: &str, hello: Hello) -> Result<Self, ConnectError> {
66 let url = url.to_string();
67 let (outbound_tx, outbound_rx) = tkmpsc::unbounded_channel::<ClientMsg>();
68 let (inbound_tx, inbound_rx) = stdmpsc::channel::<ServerMsg>();
69 let (ready_tx, ready_rx) = stdmpsc::channel();
70
71 let runtime_thread = thread::spawn(move || {
72 let runtime = match tokio::runtime::Builder::new_multi_thread()
73 .enable_all()
74 .build()
75 {
76 Ok(rt) => rt,
77 Err(e) => {
78 let _ = ready_tx.send(Err(ConnectError::Io(e)));
79 return;
80 }
81 };
82 runtime.block_on(async move {
83 match run_connection(url, hello, outbound_rx, inbound_tx, ready_tx).await {
84 Ok(()) => {}
85 Err(e) => eprintln!("ws client ended: {}", e),
86 }
87 });
88 });
89
90 match ready_rx.recv() {
91 Ok(Ok(())) => Ok(Self {
92 outbound: outbound_tx,
93 inbound: inbound_rx,
94 next_client_op_id: 1,
95 _runtime_thread: runtime_thread,
96 }),
97 Ok(Err(e)) => Err(e),
98 Err(_) => Err(ConnectError::Io(std::io::Error::other(
99 "ws thread disappeared",
100 ))),
101 }
102 }
103}
104
105async fn run_connection(
106 url: String,
107 hello: Hello,
108 mut outbound_rx: tkmpsc::UnboundedReceiver<ClientMsg>,
109 inbound_tx: stdmpsc::Sender<ServerMsg>,
110 ready_tx: stdmpsc::Sender<Result<(), ConnectError>>,
111) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
112 let (ws, _response) = match tokio_tungstenite::connect_async(&url).await {
113 Ok(v) => v,
114 Err(e) => {
115 let _ = ready_tx.send(Err(ConnectError::Ws(Box::new(e))));
116 return Ok(());
117 }
118 };
119 let (mut write, mut read) = ws.split();
120
121 let hello_text = serde_json::to_string(&ClientMsg::Hello {
122 name: hello.name,
123 color: hello.color,
124 })?;
125 write.send(Message::Text(hello_text)).await?;
126
127 let first_msg = match read.next().await {
130 Some(Ok(Message::Text(text))) => match serde_json::from_str::<ServerMsg>(&text) {
131 Ok(msg) => msg,
132 Err(e) => {
133 let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(format!(
134 "invalid server handshake: {e}"
135 )))));
136 return Ok(());
137 }
138 },
139 Some(Ok(other)) => {
140 let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(format!(
141 "expected server handshake text frame, got {other:?}"
142 )))));
143 return Ok(());
144 }
145 Some(Err(e)) => {
146 let _ = ready_tx.send(Err(ConnectError::Ws(Box::new(e))));
147 return Ok(());
148 }
149 None => {
150 let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(
151 "server closed before handshake completed",
152 ))));
153 return Ok(());
154 }
155 };
156 if let ServerMsg::ConnectRejected { reason } = first_msg {
157 let _ = ready_tx.send(Err(ConnectError::Rejected(reason)));
158 return Ok(());
159 }
160 if inbound_tx.send(first_msg).is_err() {
161 let _ = ready_tx.send(Err(ConnectError::Io(std::io::Error::other(
162 "client dropped before handshake completed",
163 ))));
164 return Ok(());
165 }
166 let _ = ready_tx.send(Ok(()));
167
168 let writer = tokio::spawn(async move {
169 while let Some(msg) = outbound_rx.recv().await {
170 let Ok(text) = serde_json::to_string(&msg) else {
171 break;
172 };
173 if write.send(Message::Text(text)).await.is_err() {
174 break;
175 }
176 }
177 });
178
179 while let Some(frame) = read.next().await {
180 let Ok(Message::Text(text)) = frame else {
181 break;
182 };
183 let Ok(msg) = serde_json::from_str::<ServerMsg>(&text) else {
184 continue;
185 };
186 if inbound_tx.send(msg).is_err() {
187 break;
188 }
189 }
190
191 writer.abort();
192 Ok(())
193}
194
195impl Client for WebsocketClient {
196 fn submit_op(&mut self, op: CanvasOp) -> ClientOpId {
197 let id = self.next_client_op_id;
198 self.next_client_op_id += 1;
199 let _ = self.outbound.send(ClientMsg::Op {
200 client_op_id: id,
201 op,
202 });
203 id
204 }
205
206 fn try_recv(&mut self) -> Option<ServerMsg> {
207 self.inbound.try_recv().ok()
208 }
209}