wasm_peers/one_to_many/
mod.rs

1/*!
2Library module for the one-to-many topology in client-server architecture.
3There can be exactly one instance of [`MiniServer`] and arbitrary number of [`MiniClient`]'s
4connected to the same session.
5
6A `RtcPeerConnection` with an accompanying `RtcDataChannel` will be established between the [`MiniServer`]
7and each of the [`MiniClient`]'s. [`MiniServer`] can decide whether to send a message to a single peer,
8identified by [`UserId`] returned by signaling server during connection establishment method,
9with [::], or to fire to all clients with [::send_message_to_all].
10
11[`MiniClient`] only has an option to message the host with [::send_message_to_host].
12
13# Example
14
15This example shows three peers connecting, with one being a dedicated host.
16Host waits for both peers to connect and only then sends `ping` messages to both
17and clients independently respond with `pong` messages.
18
19```
20use wasm_peers::one_to_many::{MiniClient, MiniServer};
21use wasm_peers::ConnectionType;
22use std::cell::RefCell;
23use std::rc::Rc;
24use wasm_peers_protocol::SessionId;
25use web_sys::console;
26
27const SIGNALING_SERVER_URL: &str = "ws://0.0.0.0:9001/one-to-many";
28const STUN_SERVER_URL: &str = "stun:openrelay.metered.ca:80";
29
30let mut server = MiniServer::new(
31    SIGNALING_SERVER_URL,
32    SessionId::new("dummy-session-id".to_string()),
33    ConnectionType::Stun { urls: STUN_SERVER_URL.to_string() },
34)
35.unwrap();
36let server_open_connections_count = Rc::new(RefCell::new(0));
37
38let server_clone = server.clone();
39let server_on_open = {
40    let server_open_connections_count = server_open_connections_count.clone();
41    move |user_id| {
42        console::log_1(&format!("connection to user established: {:?}", user_id).into());
43        *server_open_connections_count.borrow_mut() += 1;
44        if *server_open_connections_count.borrow() == 2 {
45            server_clone.send_message_to_all("ping!");
46        }
47    }
48};
49let server_on_message = {
50    move |user_id, message| {
51        console::log_1(
52            &format!(
53                "server received message from client {:?}: {}",
54                user_id, message
55            )
56            .into(),
57        );
58    }
59};
60server.start(server_on_open, server_on_message);
61
62let client_generator = || {
63    let mut client = MiniClient::new(
64        SIGNALING_SERVER_URL,
65        SessionId::new("dummy-session-id".to_string()),
66        ConnectionType::Stun { urls: STUN_SERVER_URL.to_string() },
67    )
68    .unwrap();
69    let client_on_open = || { /* do nothing */ };
70    let client_clone = client.clone();
71    let client_on_message = {
72        move |message| {
73            console::log_1(&format!("client received message: {}", message).into());
74            client_clone.send_message_to_host("pong!");
75        }
76    };
77    client.start(client_on_open, client_on_message);
78};
79client_generator();
80client_generator();
81```
82*/
83
84mod callbacks;
85mod websocket_handler;
86
87use std::cell::RefCell;
88use std::collections::HashMap;
89use std::rc::Rc;
90
91use wasm_bindgen::JsValue;
92use wasm_peers_protocol::{SessionId, UserId};
93use web_sys::{RtcDataChannel, RtcPeerConnection, WebSocket};
94
95use crate::one_to_many::callbacks::{set_websocket_on_message, set_websocket_on_open};
96use crate::ConnectionType;
97
98#[derive(Debug, Clone)]
99struct Connection {
100    peer_connection: RtcPeerConnection,
101    data_channel: Option<RtcDataChannel>,
102}
103
104impl Connection {
105    fn new(peer_connection: RtcPeerConnection, data_channel: Option<RtcDataChannel>) -> Self {
106        Connection {
107            peer_connection,
108            data_channel,
109        }
110    }
111}
112
113#[derive(Debug)]
114struct NetworkManagerInner {
115    session_id: SessionId,
116    websocket: WebSocket,
117    connection_type: ConnectionType,
118    is_host: bool,
119    connections: HashMap<UserId, Connection>,
120}
121
122#[derive(Debug, Clone)]
123pub(crate) struct NetworkManager {
124    inner: Rc<RefCell<NetworkManagerInner>>,
125}
126
127impl NetworkManager {
128    pub(crate) fn new(
129        signaling_server_url: &str,
130        session_id: SessionId,
131        connection_type: ConnectionType,
132        is_host: bool,
133    ) -> Result<Self, JsValue> {
134        let websocket = WebSocket::new(signaling_server_url)?;
135        websocket.set_binary_type(web_sys::BinaryType::Arraybuffer);
136
137        Ok(NetworkManager {
138            inner: Rc::new(RefCell::new(NetworkManagerInner {
139                session_id,
140                websocket,
141                connection_type,
142                is_host,
143                connections: HashMap::new(),
144            })),
145        })
146    }
147
148    pub(crate) fn start(
149        &mut self,
150        on_open_callback: impl FnMut(UserId) + Clone + 'static,
151        on_message_callback: impl FnMut(UserId, String) + Clone + 'static,
152    ) {
153        let websocket = self.inner.borrow().websocket.clone();
154        let session_id = self.inner.borrow().session_id.clone();
155        let is_host = self.inner.borrow().is_host;
156
157        set_websocket_on_open(&websocket, session_id, is_host);
158        set_websocket_on_message(
159            &websocket,
160            self.clone(),
161            on_open_callback,
162            on_message_callback,
163            is_host,
164        );
165    }
166
167    pub(crate) fn send_message(&self, user_id: UserId, message: &str) -> Result<(), JsValue> {
168        self.inner
169            .borrow()
170            .connections
171            .get(&user_id)
172            .ok_or_else(|| JsValue::from_str(&format!("no connection for user {}", user_id)))?
173            .data_channel
174            .as_ref()
175            .ok_or_else(|| {
176                JsValue::from_str(&format!("no data channel setup yet for user {}", user_id))
177            })?
178            // this is an ugly fix to the fact, that if you send empty string as message
179            // webrtc fails with a cryptic "The operation failed for an operation-specific reason"
180            // message
181            .send_with_str(&format!("x{}", message))
182    }
183
184    pub(crate) fn send_message_to_all(&self, message: &str) {
185        for data_channel in self
186            .inner
187            .borrow()
188            .connections
189            .values()
190            .filter_map(|connection| connection.data_channel.as_ref())
191        {
192            // TODO(tkarwowski): some may fail, should we return a list results?
193            let _ = data_channel
194                // this is an ugly fix to the fact, that if you send empty string as message
195                // webrtc fails with a cryptic "The operation failed for an operation-specific reason"
196                // message
197                .send_with_str(&format!("x{}", message));
198        }
199    }
200}
201
202/// Abstraction over `WebRTC` peer-to-peer connection.
203/// Structure representing server in client-server topology.
204///
205/// `WebRTC` data channel communication abstracted to a single class.
206/// All setup is handled internally, you must only provide callbacks
207/// for when the connection opens and for handling incoming messages.
208/// It also provides a method of sending data to the other end of the connection.
209///
210/// Only works with [wasm-peers-signaling-server](https://docs.rs/wasm-peers-signaling-server/latest/wasm_peers_signaling_server/) instance,
211/// whose full  address must be provided.
212///
213/// Start-up flow is divided into two methods [::new] and [::start]
214/// to allow possibility of referring to network manger itself from the callbacks.
215///
216/// This class is a  pointer to the underlying resource and can be cloned freely.
217#[derive(Debug, Clone)]
218pub struct MiniServer {
219    inner: NetworkManager,
220}
221
222impl MiniServer {
223    /// Creates an instance with all resources required to create a connections to client-peers.
224    /// Requires an  address of an signaling server instance,
225    /// session id by which it will identify connecting pair of peers and type of connection.
226    pub fn new(
227        signaling_server_url: &str,
228        session_id: SessionId,
229        connection_type: ConnectionType,
230    ) -> Result<Self, JsValue> {
231        Ok(MiniServer {
232            inner: NetworkManager::new(signaling_server_url, session_id, connection_type, true)?,
233        })
234    }
235
236    /// Second part of the setup that begins the actual connection.
237    /// Requires specifying a callbacks that are guaranteed to run
238    /// when the connection opens and on each message received.
239    /// It takes [`UserId`] as an argument which helps identify which client-peer.
240    pub fn start(
241        &mut self,
242        on_open_callback: impl FnMut(UserId) + Clone + 'static,
243        on_message_callback: impl FnMut(UserId, String) + Clone + 'static,
244    ) {
245        self.inner.start(on_open_callback, on_message_callback);
246    }
247
248    /// Sends message over established data channel with a single client-peer represented by
249    /// the [`UserId`] returned by signaling server during connection establishment.
250    pub fn send_message(&self, user_id: UserId, message: &str) -> Result<(), JsValue> {
251        self.inner.send_message(user_id, message)
252    }
253
254    /// Convenience function that sends the same message to all connected client-peers.
255    pub fn send_message_to_all(&self, message: &str) {
256        self.inner.send_message_to_all(message)
257    }
258}
259
260/// Abstraction over `WebRTC` peer-to-peer connection.
261/// Same as [`MiniServer`], but representing clients in client-server topology.
262#[derive(Debug, Clone)]
263pub struct MiniClient {
264    inner: NetworkManager,
265}
266
267impl MiniClient {
268    /// Same as [::new]
269    pub fn new(
270        signaling_server_url: &str,
271        session_id: SessionId,
272        connection_type: ConnectionType,
273    ) -> Result<Self, JsValue> {
274        Ok(MiniClient {
275            inner: NetworkManager::new(signaling_server_url, session_id, connection_type, false)?,
276        })
277    }
278
279    /// Same as [::start], but callbacks don't take `UserId` argument, as it will always be host.
280    pub fn start(
281        &mut self,
282        mut on_open_callback: impl FnMut() + Clone + 'static,
283        mut on_message_callback: impl FnMut(String) + Clone + 'static,
284    ) {
285        let on_open_callback = move |_| on_open_callback();
286        let on_message_callback = move |_, message| on_message_callback(message);
287        self.inner.start(on_open_callback, on_message_callback);
288    }
289
290    /// Way of communicating with peer-server
291    pub fn send_message_to_host(&self, message: &str) -> Result<(), JsValue> {
292        self.inner.send_message_to_all(message);
293        // TODO(tkarwowski): we always return success, but this is subject to change
294        Ok(())
295    }
296}