wasm_peers/one_to_many/mod.rs
1/*!
2Library module for the one-to-many topology in client-server architecture.
3There can be exactly one instance of [`MiniServer`] and arbitrary number of [`MiniClient`]'s
4connected to the same session.
5
6A `RtcPeerConnection` with an accompanying `RtcDataChannel` will be established between the [`MiniServer`]
7and each of the [`MiniClient`]'s. [`MiniServer`] can decide whether to send a message to a single peer,
8identified by [`UserId`] returned by signaling server during connection establishment method,
9with [::], or to fire to all clients with [::send_message_to_all].
10
11[`MiniClient`] only has an option to message the host with [::send_message_to_host].
12
13# Example
14
15This example shows three peers connecting, with one being a dedicated host.
16Host waits for both peers to connect and only then sends `ping` messages to both
17and clients independently respond with `pong` messages.
18
19```
20use wasm_peers::one_to_many::{MiniClient, MiniServer};
21use wasm_peers::ConnectionType;
22use std::cell::RefCell;
23use std::rc::Rc;
24use wasm_peers_protocol::SessionId;
25use web_sys::console;
26
27const SIGNALING_SERVER_URL: &str = "ws://0.0.0.0:9001/one-to-many";
28const STUN_SERVER_URL: &str = "stun:openrelay.metered.ca:80";
29
30let mut server = MiniServer::new(
31 SIGNALING_SERVER_URL,
32 SessionId::new("dummy-session-id".to_string()),
33 ConnectionType::Stun { urls: STUN_SERVER_URL.to_string() },
34)
35.unwrap();
36let server_open_connections_count = Rc::new(RefCell::new(0));
37
38let server_clone = server.clone();
39let server_on_open = {
40 let server_open_connections_count = server_open_connections_count.clone();
41 move |user_id| {
42 console::log_1(&format!("connection to user established: {:?}", user_id).into());
43 *server_open_connections_count.borrow_mut() += 1;
44 if *server_open_connections_count.borrow() == 2 {
45 server_clone.send_message_to_all("ping!");
46 }
47 }
48};
49let server_on_message = {
50 move |user_id, message| {
51 console::log_1(
52 &format!(
53 "server received message from client {:?}: {}",
54 user_id, message
55 )
56 .into(),
57 );
58 }
59};
60server.start(server_on_open, server_on_message);
61
62let client_generator = || {
63 let mut client = MiniClient::new(
64 SIGNALING_SERVER_URL,
65 SessionId::new("dummy-session-id".to_string()),
66 ConnectionType::Stun { urls: STUN_SERVER_URL.to_string() },
67 )
68 .unwrap();
69 let client_on_open = || { /* do nothing */ };
70 let client_clone = client.clone();
71 let client_on_message = {
72 move |message| {
73 console::log_1(&format!("client received message: {}", message).into());
74 client_clone.send_message_to_host("pong!");
75 }
76 };
77 client.start(client_on_open, client_on_message);
78};
79client_generator();
80client_generator();
81```
82*/
83
84mod callbacks;
85mod websocket_handler;
86
87use std::cell::RefCell;
88use std::collections::HashMap;
89use std::rc::Rc;
90
91use wasm_bindgen::JsValue;
92use wasm_peers_protocol::{SessionId, UserId};
93use web_sys::{RtcDataChannel, RtcPeerConnection, WebSocket};
94
95use crate::one_to_many::callbacks::{set_websocket_on_message, set_websocket_on_open};
96use crate::ConnectionType;
97
98#[derive(Debug, Clone)]
99struct Connection {
100 peer_connection: RtcPeerConnection,
101 data_channel: Option<RtcDataChannel>,
102}
103
104impl Connection {
105 fn new(peer_connection: RtcPeerConnection, data_channel: Option<RtcDataChannel>) -> Self {
106 Connection {
107 peer_connection,
108 data_channel,
109 }
110 }
111}
112
113#[derive(Debug)]
114struct NetworkManagerInner {
115 session_id: SessionId,
116 websocket: WebSocket,
117 connection_type: ConnectionType,
118 is_host: bool,
119 connections: HashMap<UserId, Connection>,
120}
121
122#[derive(Debug, Clone)]
123pub(crate) struct NetworkManager {
124 inner: Rc<RefCell<NetworkManagerInner>>,
125}
126
127impl NetworkManager {
128 pub(crate) fn new(
129 signaling_server_url: &str,
130 session_id: SessionId,
131 connection_type: ConnectionType,
132 is_host: bool,
133 ) -> Result<Self, JsValue> {
134 let websocket = WebSocket::new(signaling_server_url)?;
135 websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);
136
137 Ok(NetworkManager {
138 inner: Rc::new(RefCell::new(NetworkManagerInner {
139 session_id,
140 websocket,
141 connection_type,
142 is_host,
143 connections: HashMap::new(),
144 })),
145 })
146 }
147
148 pub(crate) fn start(
149 &mut self,
150 on_open_callback: impl FnMut(UserId) + Clone + 'static,
151 on_message_callback: impl FnMut(UserId, String) + Clone + 'static,
152 ) {
153 let websocket = self.inner.borrow().websocket.clone();
154 let session_id = self.inner.borrow().session_id.clone();
155 let is_host = self.inner.borrow().is_host;
156
157 set_websocket_on_open(&websocket, session_id, is_host);
158 set_websocket_on_message(
159 &websocket,
160 self.clone(),
161 on_open_callback,
162 on_message_callback,
163 is_host,
164 );
165 }
166
167 pub(crate) fn send_message(&self, user_id: UserId, message: &str) -> Result<(), JsValue> {
168 self.inner
169 .borrow()
170 .connections
171 .get(&user_id)
172 .ok_or_else(|| JsValue::from_str(&format!("no connection for user {}", user_id)))?
173 .data_channel
174 .as_ref()
175 .ok_or_else(|| {
176 JsValue::from_str(&format!("no data channel setup yet for user {}", user_id))
177 })?
178 // this is an ugly fix to the fact, that if you send empty string as message
179 // webrtc fails with a cryptic "The operation failed for an operation-specific reason"
180 // message
181 .send_with_str(&format!("x{}", message))
182 }
183
184 pub(crate) fn send_message_to_all(&self, message: &str) {
185 for data_channel in self
186 .inner
187 .borrow()
188 .connections
189 .values()
190 .filter_map(|connection| connection.data_channel.as_ref())
191 {
192 // TODO(tkarwowski): some may fail, should we return a list results?
193 let _ = data_channel
194 // this is an ugly fix to the fact, that if you send empty string as message
195 // webrtc fails with a cryptic "The operation failed for an operation-specific reason"
196 // message
197 .send_with_str(&format!("x{}", message));
198 }
199 }
200}
201
202/// Abstraction over `WebRTC` peer-to-peer connection.
203/// Structure representing server in client-server topology.
204///
205/// `WebRTC` data channel communication abstracted to a single class.
206/// All setup is handled internally, you must only provide callbacks
207/// for when the connection opens and for handling incoming messages.
208/// It also provides a method of sending data to the other end of the connection.
209///
210/// Only works with [wasm-peers-signaling-server](https://docs.rs/wasm-peers-signaling-server/latest/wasm_peers_signaling_server/) instance,
211/// whose full address must be provided.
212///
213/// Start-up flow is divided into two methods [::new] and [::start]
214/// to allow possibility of referring to network manger itself from the callbacks.
215///
216/// This class is a pointer to the underlying resource and can be cloned freely.
217#[derive(Debug, Clone)]
218pub struct MiniServer {
219 inner: NetworkManager,
220}
221
222impl MiniServer {
223 /// Creates an instance with all resources required to create a connections to client-peers.
224 /// Requires an address of an signaling server instance,
225 /// session id by which it will identify connecting pair of peers and type of connection.
226 pub fn new(
227 signaling_server_url: &str,
228 session_id: SessionId,
229 connection_type: ConnectionType,
230 ) -> Result<Self, JsValue> {
231 Ok(MiniServer {
232 inner: NetworkManager::new(signaling_server_url, session_id, connection_type, true)?,
233 })
234 }
235
236 /// Second part of the setup that begins the actual connection.
237 /// Requires specifying a callbacks that are guaranteed to run
238 /// when the connection opens and on each message received.
239 /// It takes [`UserId`] as an argument which helps identify which client-peer.
240 pub fn start(
241 &mut self,
242 on_open_callback: impl FnMut(UserId) + Clone + 'static,
243 on_message_callback: impl FnMut(UserId, String) + Clone + 'static,
244 ) {
245 self.inner.start(on_open_callback, on_message_callback);
246 }
247
248 /// Sends message over established data channel with a single client-peer represented by
249 /// the [`UserId`] returned by signaling server during connection establishment.
250 pub fn send_message(&self, user_id: UserId, message: &str) -> Result<(), JsValue> {
251 self.inner.send_message(user_id, message)
252 }
253
254 /// Convenience function that sends the same message to all connected client-peers.
255 pub fn send_message_to_all(&self, message: &str) {
256 self.inner.send_message_to_all(message)
257 }
258}
259
260/// Abstraction over `WebRTC` peer-to-peer connection.
261/// Same as [`MiniServer`], but representing clients in client-server topology.
262#[derive(Debug, Clone)]
263pub struct MiniClient {
264 inner: NetworkManager,
265}
266
267impl MiniClient {
268 /// Same as [::new]
269 pub fn new(
270 signaling_server_url: &str,
271 session_id: SessionId,
272 connection_type: ConnectionType,
273 ) -> Result<Self, JsValue> {
274 Ok(MiniClient {
275 inner: NetworkManager::new(signaling_server_url, session_id, connection_type, false)?,
276 })
277 }
278
279 /// Same as [::start], but callbacks don't take `UserId` argument, as it will always be host.
280 pub fn start(
281 &mut self,
282 mut on_open_callback: impl FnMut() + Clone + 'static,
283 mut on_message_callback: impl FnMut(String) + Clone + 'static,
284 ) {
285 let on_open_callback = move |_| on_open_callback();
286 let on_message_callback = move |_, message| on_message_callback(message);
287 self.inner.start(on_open_callback, on_message_callback);
288 }
289
290 /// Way of communicating with peer-server
291 pub fn send_message_to_host(&self, message: &str) -> Result<(), JsValue> {
292 self.inner.send_message_to_all(message);
293 // TODO(tkarwowski): we always return success, but this is subject to change
294 Ok(())
295 }
296}