cyberdeck/
lib.rs

1use anyhow::anyhow;
2use anyhow::Result;
3use base64::engine::general_purpose::STANDARD;
4use base64::Engine;
5pub use bytes::Bytes;
6use std::future::Future;
7use std::mem;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::APIBuilder;
13pub use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14pub use webrtc::data_channel::data_channel_message::DataChannelMessage;
15pub use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16pub use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_server::RTCIceServer;
18use webrtc::interceptor::registry::Registry;
19use webrtc::peer_connection::configuration::RTCConfiguration;
20pub use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
21use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
22use webrtc::peer_connection::RTCPeerConnection;
23
24pub struct Configuration {
25    stun_or_turn_urls: Vec<String>,
26}
27
28pub type DataChannel = Arc<RTCDataChannel>;
29
30pub struct Peer {
31    pub peer_id: u128,
32    peer_connection: Arc<RTCPeerConnection>,
33    abort: mpsc::UnboundedSender<()>,
34}
35
36pub enum PeerEvent {
37    PeerConnectionStateChange(RTCPeerConnectionState),
38    DataChannelStateChange(DataChannel),
39    DataChannelMessage(DataChannel, DataChannelMessage),
40}
41
42impl Peer {
43    pub async fn new<T>(
44        handle_message: impl Fn(u128, PeerEvent) -> T + Send + Sync + 'static,
45    ) -> Result<Peer>
46    where
47        T: Future<Output = ()> + Send + Sync,
48    {
49        Peer::new_with_configuration(
50            handle_message,
51            Configuration {
52                stun_or_turn_urls: vec!["stun:stun.l.google.com:19302".to_owned()],
53            },
54        )
55        .await
56    }
57
58    pub async fn new_with_configuration<T>(
59        handle_message: impl Fn(u128, PeerEvent) -> T + Send + Sync + 'static,
60        mut config: Configuration,
61    ) -> Result<Peer>
62    where
63        T: Future<Output = ()> + Send + Sync,
64    {
65        let mut m = MediaEngine::default();
66        m.register_default_codecs()?;
67        let mut registry = Registry::new();
68        registry = register_default_interceptors(registry, &mut m)?;
69
70        let api = APIBuilder::new()
71            .with_media_engine(m)
72            .with_interceptor_registry(registry)
73            .build();
74
75        let config = RTCConfiguration {
76            ice_servers: vec![RTCIceServer {
77                urls: mem::take(&mut config.stun_or_turn_urls),
78                ..Default::default()
79            }],
80            ..Default::default()
81        };
82
83        let peer_connection = Arc::new(api.new_peer_connection(config).await?);
84
85        let (tx, mut msg_rx) = mpsc::unbounded_channel::<(u128, PeerEvent)>();
86        let tx_clone = tx.clone();
87        let (abort_tx, mut abort_rx) = mpsc::unbounded_channel::<()>();
88        let abort_tx_clone = abort_tx.clone();
89
90        let peer_id = Peer::random_peer_id();
91        let c = Peer {
92            peer_id,
93            peer_connection,
94            abort: abort_tx,
95        };
96
97        tokio::spawn(async move {
98            loop {
99                tokio::select! {
100                    val = msg_rx.recv() => {
101                        if let Some(v) = val {
102                            handle_message(v.0,v.1).await;
103                        }
104                    }
105                    _ = abort_rx.recv() => {
106                        break;
107                    }
108                };
109            }
110        });
111
112        c.peer_connection.on_peer_connection_state_change(Box::new(
113            move |s: RTCPeerConnectionState| {
114                match tx_clone.send((peer_id, PeerEvent::PeerConnectionStateChange(s))) {
115                    Ok(_) => (),
116                    Err(error) => {
117                        panic!("Error sending mpsc message: {:?}", error.to_string())
118                    }
119                };
120                if s == RTCPeerConnectionState::Failed {
121                    match abort_tx_clone.send(()) {
122                        Ok(_) => (),
123                        Err(error) => {
124                            panic!("Error sending mpsc message: {:?}", error.to_string())
125                        }
126                    };
127                }
128                Box::pin(async {})
129            },
130        ));
131
132        c.peer_connection
133            .on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
134                let tx1 = tx.clone();
135                let tx2 = tx.clone();
136                let tx3 = tx.clone();
137
138                Box::pin(async move {
139                    let data_cannel_clone1 = d.clone();
140                    let data_cannel_clone2 = d.clone();
141                    let data_cannel_clone3 = d.clone();
142                    d.on_open(Box::new(move || {
143                        match tx1.send((
144                            peer_id,
145                            PeerEvent::DataChannelStateChange(data_cannel_clone1.clone()),
146                        )) {
147                            Ok(_) => (),
148                            Err(error) => {
149                                panic!("Error sending mpsc message: {:?}", error.to_string())
150                            }
151                        };
152                        Box::pin(async {})
153                    }));
154
155                    d.on_close(Box::new(move || {
156                        match tx2.send((
157                            peer_id,
158                            PeerEvent::DataChannelStateChange(data_cannel_clone2.clone()),
159                        )) {
160                            Ok(_) => (),
161                            Err(error) => {
162                                panic!("Error sending mpsc message: {:?}", error.to_string())
163                            }
164                        };
165                        Box::pin(async {})
166                    }));
167
168                    d.on_message(Box::new(move |msg: DataChannelMessage| {
169                        match tx3.send((
170                            peer_id,
171                            PeerEvent::DataChannelMessage(data_cannel_clone3.clone(), msg),
172                        )) {
173                            Ok(_) => (),
174                            Err(error) => {
175                                panic!("Error sending mpsc message: {:?}", error.to_string())
176                            }
177                        };
178                        Box::pin(async {})
179                    }));
180                })
181            }));
182
183        Ok(c)
184    }
185
186    pub async fn create_offer(&mut self) -> Result<String> {
187        let offer = self.peer_connection.create_offer(None).await?;
188
189        // Sets the LocalDescription, and starts our UDP listeners
190        // Note: this will start the gathering of ICE candidates
191        let mut gather_complete = self.peer_connection.gathering_complete_promise().await;
192        self.peer_connection.set_local_description(offer).await?;
193        let _ = gather_complete.recv().await;
194
195        if let Some(local_desc) = self.peer_connection.local_description().await {
196            let json_str = serde_json::to_string(&local_desc)?;
197            let b64 = encode(&json_str);
198            Ok(b64)
199        } else {
200            Err(anyhow!("generate local_description failed!"))
201        }
202    }
203
204    pub async fn receive_offer(&mut self, offer: &str) -> Result<String> {
205        let desc_data = decode(offer)?.to_string();
206        let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
207        self.peer_connection.set_remote_description(offer).await?;
208        let answer = self.peer_connection.create_answer(None).await?;
209        let mut gather_complete = self.peer_connection.gathering_complete_promise().await;
210        self.peer_connection.set_local_description(answer).await?;
211        let _ = gather_complete.recv().await;
212
213        if let Some(local_desc) = self.peer_connection.local_description().await {
214            let json_str = serde_json::to_string(&local_desc)?;
215            let b64 = encode(&json_str);
216            Ok(b64)
217        } else {
218            Err(anyhow!("generate local_description failed!"))
219        }
220    }
221
222    pub async fn create_channel(&mut self, name: &str) -> Result<(), webrtc::Error> {
223        match self.peer_connection.create_data_channel(name, None).await {
224            Ok(_) => Ok(()),
225            Err(e) => Err(e),
226        }
227    }
228
229    pub async fn create_channel_with_configuration(
230        &mut self,
231        name: &str,
232        config: RTCDataChannelInit,
233    ) -> Result<(), webrtc::Error> {
234        match self
235            .peer_connection
236            .create_data_channel(name, Some(config))
237            .await
238        {
239            Ok(_) => Ok(()),
240            Err(e) => Err(e),
241        }
242    }
243
244    pub async fn close(&mut self) -> Result<(), webrtc::Error> {
245        self.abort.send(())?;
246        self.peer_connection.close().await
247    }
248
249    pub fn connection_state(&self) -> RTCPeerConnectionState {
250        self.peer_connection.connection_state()
251    }
252
253    pub fn random_peer_id() -> u128 {
254        rand::random()
255    }
256}
257
258impl Drop for Peer {
259    fn drop(&mut self) {
260        self.abort.send(()).expect("could not stop task on drop");
261    }
262}
263
264fn encode(b: &str) -> String {
265    STANDARD.encode(b)
266}
267
268fn decode(s: &str) -> Result<String> {
269    let b = STANDARD.decode(s)?;
270    let s = String::from_utf8(b)?;
271    Ok(s)
272}