1use anyhow::anyhow;
2use anyhow::Result;
3use base64::engine::general_purpose::STANDARD;
4use base64::Engine;
5pub use bytes::Bytes;
6use std::future::Future;
7use std::mem;
8use std::sync::Arc;
9use tokio::sync::mpsc;
10use webrtc::api::interceptor_registry::register_default_interceptors;
11use webrtc::api::media_engine::MediaEngine;
12use webrtc::api::APIBuilder;
13pub use webrtc::data_channel::data_channel_init::RTCDataChannelInit;
14pub use webrtc::data_channel::data_channel_message::DataChannelMessage;
15pub use webrtc::data_channel::data_channel_state::RTCDataChannelState;
16pub use webrtc::data_channel::RTCDataChannel;
17use webrtc::ice_transport::ice_server::RTCIceServer;
18use webrtc::interceptor::registry::Registry;
19use webrtc::peer_connection::configuration::RTCConfiguration;
20pub use webrtc::peer_connection::peer_connection_state::RTCPeerConnectionState;
21use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
22use webrtc::peer_connection::RTCPeerConnection;
23
24pub struct Configuration {
25 stun_or_turn_urls: Vec<String>,
26}
27
28pub type DataChannel = Arc<RTCDataChannel>;
29
30pub struct Peer {
31 pub peer_id: u128,
32 peer_connection: Arc<RTCPeerConnection>,
33 abort: mpsc::UnboundedSender<()>,
34}
35
36pub enum PeerEvent {
37 PeerConnectionStateChange(RTCPeerConnectionState),
38 DataChannelStateChange(DataChannel),
39 DataChannelMessage(DataChannel, DataChannelMessage),
40}
41
42impl Peer {
43 pub async fn new<T>(
44 handle_message: impl Fn(u128, PeerEvent) -> T + Send + Sync + 'static,
45 ) -> Result<Peer>
46 where
47 T: Future<Output = ()> + Send + Sync,
48 {
49 Peer::new_with_configuration(
50 handle_message,
51 Configuration {
52 stun_or_turn_urls: vec!["stun:stun.l.google.com:19302".to_owned()],
53 },
54 )
55 .await
56 }
57
58 pub async fn new_with_configuration<T>(
59 handle_message: impl Fn(u128, PeerEvent) -> T + Send + Sync + 'static,
60 mut config: Configuration,
61 ) -> Result<Peer>
62 where
63 T: Future<Output = ()> + Send + Sync,
64 {
65 let mut m = MediaEngine::default();
66 m.register_default_codecs()?;
67 let mut registry = Registry::new();
68 registry = register_default_interceptors(registry, &mut m)?;
69
70 let api = APIBuilder::new()
71 .with_media_engine(m)
72 .with_interceptor_registry(registry)
73 .build();
74
75 let config = RTCConfiguration {
76 ice_servers: vec![RTCIceServer {
77 urls: mem::take(&mut config.stun_or_turn_urls),
78 ..Default::default()
79 }],
80 ..Default::default()
81 };
82
83 let peer_connection = Arc::new(api.new_peer_connection(config).await?);
84
85 let (tx, mut msg_rx) = mpsc::unbounded_channel::<(u128, PeerEvent)>();
86 let tx_clone = tx.clone();
87 let (abort_tx, mut abort_rx) = mpsc::unbounded_channel::<()>();
88 let abort_tx_clone = abort_tx.clone();
89
90 let peer_id = Peer::random_peer_id();
91 let c = Peer {
92 peer_id,
93 peer_connection,
94 abort: abort_tx,
95 };
96
97 tokio::spawn(async move {
98 loop {
99 tokio::select! {
100 val = msg_rx.recv() => {
101 if let Some(v) = val {
102 handle_message(v.0,v.1).await;
103 }
104 }
105 _ = abort_rx.recv() => {
106 break;
107 }
108 };
109 }
110 });
111
112 c.peer_connection.on_peer_connection_state_change(Box::new(
113 move |s: RTCPeerConnectionState| {
114 match tx_clone.send((peer_id, PeerEvent::PeerConnectionStateChange(s))) {
115 Ok(_) => (),
116 Err(error) => {
117 panic!("Error sending mpsc message: {:?}", error.to_string())
118 }
119 };
120 if s == RTCPeerConnectionState::Failed {
121 match abort_tx_clone.send(()) {
122 Ok(_) => (),
123 Err(error) => {
124 panic!("Error sending mpsc message: {:?}", error.to_string())
125 }
126 };
127 }
128 Box::pin(async {})
129 },
130 ));
131
132 c.peer_connection
133 .on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
134 let tx1 = tx.clone();
135 let tx2 = tx.clone();
136 let tx3 = tx.clone();
137
138 Box::pin(async move {
139 let data_cannel_clone1 = d.clone();
140 let data_cannel_clone2 = d.clone();
141 let data_cannel_clone3 = d.clone();
142 d.on_open(Box::new(move || {
143 match tx1.send((
144 peer_id,
145 PeerEvent::DataChannelStateChange(data_cannel_clone1.clone()),
146 )) {
147 Ok(_) => (),
148 Err(error) => {
149 panic!("Error sending mpsc message: {:?}", error.to_string())
150 }
151 };
152 Box::pin(async {})
153 }));
154
155 d.on_close(Box::new(move || {
156 match tx2.send((
157 peer_id,
158 PeerEvent::DataChannelStateChange(data_cannel_clone2.clone()),
159 )) {
160 Ok(_) => (),
161 Err(error) => {
162 panic!("Error sending mpsc message: {:?}", error.to_string())
163 }
164 };
165 Box::pin(async {})
166 }));
167
168 d.on_message(Box::new(move |msg: DataChannelMessage| {
169 match tx3.send((
170 peer_id,
171 PeerEvent::DataChannelMessage(data_cannel_clone3.clone(), msg),
172 )) {
173 Ok(_) => (),
174 Err(error) => {
175 panic!("Error sending mpsc message: {:?}", error.to_string())
176 }
177 };
178 Box::pin(async {})
179 }));
180 })
181 }));
182
183 Ok(c)
184 }
185
186 pub async fn create_offer(&mut self) -> Result<String> {
187 let offer = self.peer_connection.create_offer(None).await?;
188
189 let mut gather_complete = self.peer_connection.gathering_complete_promise().await;
192 self.peer_connection.set_local_description(offer).await?;
193 let _ = gather_complete.recv().await;
194
195 if let Some(local_desc) = self.peer_connection.local_description().await {
196 let json_str = serde_json::to_string(&local_desc)?;
197 let b64 = encode(&json_str);
198 Ok(b64)
199 } else {
200 Err(anyhow!("generate local_description failed!"))
201 }
202 }
203
204 pub async fn receive_offer(&mut self, offer: &str) -> Result<String> {
205 let desc_data = decode(offer)?.to_string();
206 let offer = serde_json::from_str::<RTCSessionDescription>(&desc_data)?;
207 self.peer_connection.set_remote_description(offer).await?;
208 let answer = self.peer_connection.create_answer(None).await?;
209 let mut gather_complete = self.peer_connection.gathering_complete_promise().await;
210 self.peer_connection.set_local_description(answer).await?;
211 let _ = gather_complete.recv().await;
212
213 if let Some(local_desc) = self.peer_connection.local_description().await {
214 let json_str = serde_json::to_string(&local_desc)?;
215 let b64 = encode(&json_str);
216 Ok(b64)
217 } else {
218 Err(anyhow!("generate local_description failed!"))
219 }
220 }
221
222 pub async fn create_channel(&mut self, name: &str) -> Result<(), webrtc::Error> {
223 match self.peer_connection.create_data_channel(name, None).await {
224 Ok(_) => Ok(()),
225 Err(e) => Err(e),
226 }
227 }
228
229 pub async fn create_channel_with_configuration(
230 &mut self,
231 name: &str,
232 config: RTCDataChannelInit,
233 ) -> Result<(), webrtc::Error> {
234 match self
235 .peer_connection
236 .create_data_channel(name, Some(config))
237 .await
238 {
239 Ok(_) => Ok(()),
240 Err(e) => Err(e),
241 }
242 }
243
244 pub async fn close(&mut self) -> Result<(), webrtc::Error> {
245 self.abort.send(())?;
246 self.peer_connection.close().await
247 }
248
249 pub fn connection_state(&self) -> RTCPeerConnectionState {
250 self.peer_connection.connection_state()
251 }
252
253 pub fn random_peer_id() -> u128 {
254 rand::random()
255 }
256}
257
258impl Drop for Peer {
259 fn drop(&mut self) {
260 self.abort.send(()).expect("could not stop task on drop");
261 }
262}
263
264fn encode(b: &str) -> String {
265 STANDARD.encode(b)
266}
267
268fn decode(s: &str) -> Result<String> {
269 let b = STANDARD.decode(s)?;
270 let s = String::from_utf8(b)?;
271 Ok(s)
272}