1use std::fmt::Debug;
16
17use crate::{
18 data_channel::{DataChannel, DataChannelInit},
19 ice_candidate::IceCandidate,
20 imp::peer_connection as imp_pc,
21 media_stream::MediaStream,
22 media_stream_track::MediaStreamTrack,
23 peer_connection_factory::RtcConfiguration,
24 rtp_receiver::RtpReceiver,
25 rtp_sender::RtpSender,
26 rtp_transceiver::{RtpTransceiver, RtpTransceiverInit},
27 session_description::SessionDescription,
28 stats::RtcStats,
29 MediaType, RtcError,
30};
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub enum PeerConnectionState {
34 New,
35 Connecting,
36 Connected,
37 Disconnected,
38 Failed,
39 Closed,
40}
41
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub enum IceConnectionState {
44 New,
45 Checking,
46 Connected,
47 Completed,
48 Failed,
49 Disconnected,
50 Closed,
51 Max,
52}
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq)]
55pub enum IceGatheringState {
56 New,
57 Gathering,
58 Complete,
59}
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq)]
62pub enum SignalingState {
63 Stable,
64 HaveLocalOffer,
65 HaveLocalPrAnswer,
66 HaveRemoteOffer,
67 HaveRemotePrAnswer,
68 Closed,
69}
70
71#[derive(Debug, Clone, Default)]
72pub struct OfferOptions {
73 pub ice_restart: bool,
74 pub offer_to_receive_audio: bool,
75 pub offer_to_receive_video: bool,
76}
77
78#[derive(Debug, Clone, Default)]
79pub struct AnswerOptions {}
80
81#[derive(Debug, Clone)]
82pub struct IceCandidateError {
83 pub address: String,
84 pub port: i32,
85 pub url: String,
86 pub error_code: i32,
87 pub error_text: String,
88}
89
90#[derive(Debug, Clone)]
91pub struct TrackEvent {
92 pub receiver: RtpReceiver,
93 pub streams: Vec<MediaStream>,
94 pub track: MediaStreamTrack,
95 pub transceiver: RtpTransceiver,
96}
97
98pub type OnConnectionChange = Box<dyn FnMut(PeerConnectionState) + Send + Sync>;
99pub type OnDataChannel = Box<dyn FnMut(DataChannel) + Send + Sync>;
100pub type OnIceCandidate = Box<dyn FnMut(IceCandidate) + Send + Sync>;
101pub type OnIceCandidateError = Box<dyn FnMut(IceCandidateError) + Send + Sync>;
102pub type OnIceConnectionChange = Box<dyn FnMut(IceConnectionState) + Send + Sync>;
103pub type OnIceGatheringChange = Box<dyn FnMut(IceGatheringState) + Send + Sync>;
104pub type OnNegotiationNeeded = Box<dyn FnMut(u32) + Send + Sync>;
105pub type OnSignalingChange = Box<dyn FnMut(SignalingState) + Send + Sync>;
106pub type OnTrack = Box<dyn FnMut(TrackEvent) + Send + Sync>;
107
108#[derive(Clone)]
109pub struct PeerConnection {
110 pub(crate) handle: imp_pc::PeerConnection,
111}
112
113impl PeerConnection {
114 pub fn set_configuration(&self, config: RtcConfiguration) -> Result<(), RtcError> {
115 self.handle.set_configuration(config)
116 }
117
118 pub async fn create_offer(
119 &self,
120 options: OfferOptions,
121 ) -> Result<SessionDescription, RtcError> {
122 self.handle.create_offer(options).await
123 }
124
125 pub async fn create_answer(
126 &self,
127 options: AnswerOptions,
128 ) -> Result<SessionDescription, RtcError> {
129 self.handle.create_answer(options).await
130 }
131
132 pub async fn set_local_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
133 self.handle.set_local_description(desc).await
134 }
135
136 pub async fn set_remote_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
137 self.handle.set_remote_description(desc).await
138 }
139
140 pub async fn add_ice_candidate(&self, candidate: IceCandidate) -> Result<(), RtcError> {
141 self.handle.add_ice_candidate(candidate).await
142 }
143
144 pub fn create_data_channel(
145 &self,
146 label: &str,
147 init: DataChannelInit,
148 ) -> Result<DataChannel, RtcError> {
149 self.handle.create_data_channel(label, init)
150 }
151
152 pub fn add_track<T: AsRef<str>>(
153 &self,
154 track: MediaStreamTrack,
155 streams_ids: &[T],
156 ) -> Result<RtpSender, RtcError> {
157 self.handle.add_track(track, streams_ids)
158 }
159
160 pub fn remove_track(&self, sender: RtpSender) -> Result<(), RtcError> {
161 self.handle.remove_track(sender)
162 }
163
164 pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
165 self.handle.get_stats().await
166 }
167
168 pub fn add_transceiver(
169 &self,
170 track: MediaStreamTrack,
171 init: RtpTransceiverInit,
172 ) -> Result<RtpTransceiver, RtcError> {
173 self.handle.add_transceiver(track, init)
174 }
175
176 pub fn add_transceiver_for_media(
177 &self,
178 media_type: MediaType,
179 init: RtpTransceiverInit,
180 ) -> Result<RtpTransceiver, RtcError> {
181 self.handle.add_transceiver_for_media(media_type, init)
182 }
183
184 pub fn close(&self) {
185 self.handle.close()
186 }
187
188 pub fn restart_ice(&self) {
189 self.handle.restart_ice()
190 }
191
192 pub fn connection_state(&self) -> PeerConnectionState {
193 self.handle.connection_state()
194 }
195
196 pub fn ice_connection_state(&self) -> IceConnectionState {
197 self.handle.ice_connection_state()
198 }
199
200 pub fn ice_gathering_state(&self) -> IceGatheringState {
201 self.handle.ice_gathering_state()
202 }
203
204 pub fn signaling_state(&self) -> SignalingState {
205 self.handle.signaling_state()
206 }
207
208 pub fn current_local_description(&self) -> Option<SessionDescription> {
209 self.handle.current_local_description()
210 }
211
212 pub fn current_remote_description(&self) -> Option<SessionDescription> {
213 self.handle.current_remote_description()
214 }
215
216 pub fn senders(&self) -> Vec<RtpSender> {
217 self.handle.senders()
218 }
219
220 pub fn receivers(&self) -> Vec<RtpReceiver> {
221 self.handle.receivers()
222 }
223
224 pub fn transceivers(&self) -> Vec<RtpTransceiver> {
225 self.handle.transceivers()
226 }
227
228 pub fn on_connection_state_change(&self, f: Option<OnConnectionChange>) {
229 self.handle.on_connection_state_change(f)
230 }
231
232 pub fn on_data_channel(&self, f: Option<OnDataChannel>) {
233 self.handle.on_data_channel(f)
234 }
235
236 pub fn on_ice_candidate(&self, f: Option<OnIceCandidate>) {
237 self.handle.on_ice_candidate(f)
238 }
239
240 pub fn on_ice_candidate_error(&self, f: Option<OnIceCandidateError>) {
241 self.handle.on_ice_candidate_error(f)
242 }
243
244 pub fn on_ice_connection_state_change(&self, f: Option<OnIceConnectionChange>) {
245 self.handle.on_ice_connection_state_change(f)
246 }
247
248 pub fn on_ice_gathering_state_change(&self, f: Option<OnIceGatheringChange>) {
249 self.handle.on_ice_gathering_state_change(f)
250 }
251
252 pub fn on_negotiation_needed(&self, f: Option<OnNegotiationNeeded>) {
253 self.handle.on_negotiation_needed(f)
254 }
255
256 pub fn on_signaling_state_change(&self, f: Option<OnSignalingChange>) {
257 self.handle.on_signaling_state_change(f)
258 }
259
260 pub fn on_track(&self, f: Option<OnTrack>) {
261 self.handle.on_track(f)
262 }
263}
264
265impl Debug for PeerConnection {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 f.debug_struct("PeerConnection")
268 .field("state", &self.connection_state())
269 .field("ice_state", &self.ice_connection_state())
270 .finish()
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use log::trace;
277 use tokio::sync::mpsc;
278
279 use crate::{peer_connection::*, peer_connection_factory::*};
280
281 #[tokio::test]
282 async fn create_pc() {
283 let _ = env_logger::builder().is_test(true).try_init();
284
285 let factory = PeerConnectionFactory::default();
286 let config = RtcConfiguration {
287 ice_servers: vec![IceServer {
288 urls: vec!["stun:stun1.l.google.com:19302".to_string()],
289 username: "".into(),
290 password: "".into(),
291 }],
292 continual_gathering_policy: ContinualGatheringPolicy::GatherOnce,
293 ice_transport_type: IceTransportsType::All,
294 };
295
296 let bob = factory.create_peer_connection(config.clone()).unwrap();
297 let alice = factory.create_peer_connection(config.clone()).unwrap();
298
299 let (bob_ice_tx, mut bob_ice_rx) = mpsc::unbounded_channel::<IceCandidate>();
300 let (alice_ice_tx, mut alice_ice_rx) = mpsc::unbounded_channel::<IceCandidate>();
301 let (alice_dc_tx, mut alice_dc_rx) = mpsc::unbounded_channel::<DataChannel>();
302
303 bob.on_ice_candidate(Some(Box::new(move |candidate| {
304 bob_ice_tx.send(candidate).unwrap();
305 })));
306
307 alice.on_ice_candidate(Some(Box::new(move |candidate| {
308 alice_ice_tx.send(candidate).unwrap();
309 })));
310
311 alice.on_data_channel(Some(Box::new(move |dc| {
312 alice_dc_tx.send(dc).unwrap();
313 })));
314
315 let bob_dc = bob.create_data_channel("test_dc", DataChannelInit::default()).unwrap();
316
317 let offer = bob.create_offer(OfferOptions::default()).await.unwrap();
318 trace!("Bob offer: {:?}", offer);
319 bob.set_local_description(offer.clone()).await.unwrap();
320 alice.set_remote_description(offer).await.unwrap();
321
322 let answer = alice.create_answer(AnswerOptions::default()).await.unwrap();
323 trace!("Alice answer: {:?}", answer);
324 alice.set_local_description(answer.clone()).await.unwrap();
325 bob.set_remote_description(answer).await.unwrap();
326
327 let bob_ice = bob_ice_rx.recv().await.unwrap();
328 let alice_ice = alice_ice_rx.recv().await.unwrap();
329
330 bob.add_ice_candidate(alice_ice).await.unwrap();
331 alice.add_ice_candidate(bob_ice).await.unwrap();
332
333 let (data_tx, mut data_rx) = mpsc::unbounded_channel::<String>();
334 let alice_dc = alice_dc_rx.recv().await.unwrap();
335 alice_dc.on_message(Some(Box::new(move |buffer| {
336 data_tx.send(String::from_utf8_lossy(buffer.data).to_string()).unwrap();
337 })));
338
339 bob_dc.send(b"This is a test", true).unwrap();
340 assert_eq!(data_rx.recv().await.unwrap(), "This is a test");
341
342 alice.close();
343 bob.close();
344 }
345}