1use std::sync::Arc;
16
17use cxx::SharedPtr;
18use parking_lot::Mutex;
19use tokio::sync::{mpsc, oneshot};
20use webrtc_sys::{
21 data_channel as sys_dc, jsep as sys_jsep, peer_connection as sys_pc,
22 peer_connection_factory as sys_pcf, rtc_error as sys_err,
23};
24
25use crate::{
26 data_channel::{DataChannel, DataChannelInit},
27 ice_candidate::IceCandidate,
28 imp::{
29 data_channel as imp_dc, ice_candidate as imp_ic, media_stream as imp_ms,
30 media_stream_track as imp_mst, rtp_receiver as imp_rr, rtp_sender as imp_rs,
31 rtp_transceiver as imp_rt, session_description as imp_sdp,
32 },
33 media_stream::MediaStream,
34 media_stream_track::MediaStreamTrack,
35 peer_connection::{
36 AnswerOptions, IceCandidateError, IceConnectionState, IceGatheringState, OfferOptions,
37 OnConnectionChange, OnDataChannel, OnIceCandidate, OnIceCandidateError,
38 OnIceConnectionChange, OnIceGatheringChange, OnNegotiationNeeded, OnSignalingChange,
39 OnTrack, PeerConnectionState, SignalingState, TrackEvent,
40 },
41 peer_connection_factory::{
42 ContinualGatheringPolicy, IceServer, IceTransportsType, RtcConfiguration,
43 },
44 rtp_receiver::RtpReceiver,
45 rtp_sender::RtpSender,
46 rtp_transceiver::{RtpTransceiver, RtpTransceiverInit},
47 session_description::SessionDescription,
48 stats::RtcStats,
49 MediaType, RtcError, RtcErrorType,
50};
51
52impl From<OfferOptions> for sys_pc::ffi::RtcOfferAnswerOptions {
53 fn from(options: OfferOptions) -> Self {
54 Self {
55 ice_restart: options.ice_restart,
56 offer_to_receive_audio: options.offer_to_receive_audio as i32,
57 offer_to_receive_video: options.offer_to_receive_video as i32,
58 ..Default::default()
59 }
60 }
61}
62
63impl From<AnswerOptions> for sys_pc::ffi::RtcOfferAnswerOptions {
64 fn from(_options: AnswerOptions) -> Self {
65 Self::default()
66 }
67}
68
69impl From<sys_pc::ffi::PeerConnectionState> for PeerConnectionState {
70 fn from(state: sys_pc::ffi::PeerConnectionState) -> Self {
71 match state {
72 sys_pc::ffi::PeerConnectionState::New => PeerConnectionState::New,
73 sys_pc::ffi::PeerConnectionState::Connecting => PeerConnectionState::Connecting,
74 sys_pc::ffi::PeerConnectionState::Connected => PeerConnectionState::Connected,
75 sys_pc::ffi::PeerConnectionState::Disconnected => PeerConnectionState::Disconnected,
76 sys_pc::ffi::PeerConnectionState::Failed => PeerConnectionState::Failed,
77 sys_pc::ffi::PeerConnectionState::Closed => PeerConnectionState::Closed,
78 _ => panic!("unknown PeerConnectionState"),
79 }
80 }
81}
82
83impl From<sys_pc::ffi::IceConnectionState> for IceConnectionState {
84 fn from(state: sys_pc::ffi::IceConnectionState) -> Self {
85 match state {
86 sys_pc::ffi::IceConnectionState::IceConnectionNew => IceConnectionState::New,
87 sys_pc::ffi::IceConnectionState::IceConnectionChecking => IceConnectionState::Checking,
88 sys_pc::ffi::IceConnectionState::IceConnectionConnected => {
89 IceConnectionState::Connected
90 }
91 sys_pc::ffi::IceConnectionState::IceConnectionCompleted => {
92 IceConnectionState::Completed
93 }
94 sys_pc::ffi::IceConnectionState::IceConnectionFailed => IceConnectionState::Failed,
95 sys_pc::ffi::IceConnectionState::IceConnectionDisconnected => {
96 IceConnectionState::Disconnected
97 }
98 sys_pc::ffi::IceConnectionState::IceConnectionClosed => IceConnectionState::Closed,
99 sys_pc::ffi::IceConnectionState::IceConnectionMax => IceConnectionState::Max,
100 _ => panic!("unknown IceConnectionState"),
101 }
102 }
103}
104
105impl From<sys_pc::ffi::IceGatheringState> for IceGatheringState {
106 fn from(state: sys_pc::ffi::IceGatheringState) -> Self {
107 match state {
108 sys_pc::ffi::IceGatheringState::IceGatheringNew => IceGatheringState::New,
109 sys_pc::ffi::IceGatheringState::IceGatheringGathering => IceGatheringState::Gathering,
110 sys_pc::ffi::IceGatheringState::IceGatheringComplete => IceGatheringState::Complete,
111 _ => panic!("unknown IceGatheringState"),
112 }
113 }
114}
115
116impl From<sys_pc::ffi::SignalingState> for SignalingState {
117 fn from(state: sys_pc::ffi::SignalingState) -> Self {
118 match state {
119 sys_pc::ffi::SignalingState::Stable => SignalingState::Stable,
120 sys_pc::ffi::SignalingState::HaveLocalOffer => SignalingState::HaveLocalOffer,
121 sys_pc::ffi::SignalingState::HaveRemoteOffer => SignalingState::HaveRemoteOffer,
122 sys_pc::ffi::SignalingState::HaveLocalPrAnswer => SignalingState::HaveLocalPrAnswer,
123 sys_pc::ffi::SignalingState::HaveRemotePrAnswer => SignalingState::HaveRemotePrAnswer,
124 sys_pc::ffi::SignalingState::Closed => SignalingState::Closed,
125 _ => panic!("unknown SignalingState"),
126 }
127 }
128}
129
130impl From<IceServer> for sys_pc::ffi::IceServer {
131 fn from(value: IceServer) -> Self {
132 sys_pc::ffi::IceServer {
133 urls: value.urls,
134 username: value.username,
135 password: value.password,
136 }
137 }
138}
139
140impl From<ContinualGatheringPolicy> for sys_pc::ffi::ContinualGatheringPolicy {
141 fn from(value: ContinualGatheringPolicy) -> Self {
142 match value {
143 ContinualGatheringPolicy::GatherOnce => {
144 sys_pc::ffi::ContinualGatheringPolicy::GatherOnce
145 }
146 ContinualGatheringPolicy::GatherContinually => {
147 sys_pc::ffi::ContinualGatheringPolicy::GatherContinually
148 }
149 }
150 }
151}
152
153impl From<IceTransportsType> for sys_pc::ffi::IceTransportsType {
154 fn from(value: IceTransportsType) -> Self {
155 match value {
156 IceTransportsType::Relay => sys_pc::ffi::IceTransportsType::Relay,
157 IceTransportsType::NoHost => sys_pc::ffi::IceTransportsType::NoHost,
158 IceTransportsType::All => sys_pc::ffi::IceTransportsType::All,
159 }
160 }
161}
162
163impl From<RtcConfiguration> for sys_pc::ffi::RtcConfiguration {
164 fn from(value: RtcConfiguration) -> Self {
165 Self {
166 ice_servers: value.ice_servers.into_iter().map(Into::into).collect(),
167 continual_gathering_policy: value.continual_gathering_policy.into(),
168 ice_transport_type: value.ice_transport_type.into(),
169 }
170 }
171}
172
173#[derive(Clone)]
174pub struct PeerConnection {
175 observer: Arc<PeerObserver>,
176 pub(crate) sys_handle: SharedPtr<sys_pc::ffi::PeerConnection>,
177}
178
179impl PeerConnection {
180 pub fn configure(
181 sys_handle: SharedPtr<sys_pc::ffi::PeerConnection>,
182 observer: Arc<PeerObserver>,
183 ) -> Self {
184 Self { sys_handle, observer }
185 }
186
187 pub fn set_configuration(&self, config: RtcConfiguration) -> Result<(), RtcError> {
188 let res = self.sys_handle.set_configuration(config.into());
189
190 match res {
191 Ok(_) => Ok(()),
192 Err(e) => unsafe { Err(sys_err::ffi::RtcError::from(e.what()).into()) },
193 }
194 }
195
196 pub async fn create_offer(
197 &self,
198 options: OfferOptions,
199 ) -> Result<SessionDescription, RtcError> {
200 let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
201 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
202 type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;
203
204 self.sys_handle.create_offer(
205 options.into(),
206 ctx,
207 |ctx, sdp| {
208 let tx = *ctx.0.downcast::<CtxType>().unwrap();
209 let _ = tx.blocking_send(Ok(SessionDescription {
210 handle: imp_sdp::SessionDescription { sys_handle: sdp },
211 }));
212 },
213 |ctx, error| {
214 let tx = *ctx.0.downcast::<CtxType>().unwrap();
215 let _ = tx.blocking_send(Err(error.into()));
216 },
217 );
218
219 rx.recv().await.unwrap()
220 }
221
222 pub async fn create_answer(
223 &self,
224 options: AnswerOptions,
225 ) -> Result<SessionDescription, RtcError> {
226 let (tx, mut rx) = mpsc::channel::<Result<SessionDescription, RtcError>>(1);
227 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
228 type CtxType = mpsc::Sender<Result<SessionDescription, RtcError>>;
229
230 self.sys_handle.create_answer(
231 options.into(),
232 ctx,
233 |ctx, sdp| {
234 let tx = *ctx.0.downcast::<CtxType>().unwrap();
235 let _ = tx.blocking_send(Ok(SessionDescription {
236 handle: imp_sdp::SessionDescription { sys_handle: sdp },
237 }));
238 },
239 |ctx, error| {
240 let tx = *ctx.0.downcast::<CtxType>().unwrap();
241 let _ = tx.blocking_send(Err(error.into()));
242 },
243 );
244
245 rx.recv().await.unwrap()
246 }
247
248 pub async fn set_local_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
249 let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
250 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
251
252 self.sys_handle.set_local_description(desc.handle.sys_handle, ctx, |ctx, err| {
253 let tx = ctx.0.downcast::<oneshot::Sender<Result<(), RtcError>>>().unwrap();
254
255 if err.ok() {
256 let _ = tx.send(Ok(()));
257 } else {
258 let _ = tx.send(Err(err.into()));
259 }
260 });
261
262 rx.await.unwrap()
263 }
264
265 pub async fn set_remote_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
266 let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
267 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
268
269 self.sys_handle.set_remote_description(desc.handle.sys_handle, ctx, |ctx, err| {
270 let tx = ctx.0.downcast::<oneshot::Sender<Result<(), RtcError>>>().unwrap();
271
272 if err.ok() {
273 let _ = tx.send(Ok(()));
274 } else {
275 let _ = tx.send(Err(err.into()));
276 }
277 });
278
279 rx.await.map_err(|_| RtcError {
280 error_type: RtcErrorType::Internal,
281 message: "set_remote_description cancelled".to_owned(),
282 })?
283 }
284
285 pub async fn add_ice_candidate(&self, candidate: IceCandidate) -> Result<(), RtcError> {
286 let (tx, rx) = oneshot::channel::<Result<(), RtcError>>();
287 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
288
289 self.sys_handle.add_ice_candidate(candidate.handle.sys_handle, ctx, |ctx, err| {
290 let tx = ctx.0.downcast::<oneshot::Sender<Result<(), RtcError>>>().unwrap();
291
292 if err.ok() {
293 let _ = tx.send(Ok(()));
294 } else {
295 let _ = tx.send(Err(err.into()));
296 }
297 });
298
299 rx.await.map_err(|_| RtcError {
300 error_type: RtcErrorType::Internal,
301 message: "add_ice_candidate cancelled".to_owned(),
302 })?
303 }
304
305 pub fn create_data_channel(
306 &self,
307 label: &str,
308 init: DataChannelInit,
309 ) -> Result<DataChannel, RtcError> {
310 let res = self.sys_handle.create_data_channel(label.to_string(), init.into());
311
312 match res {
313 Ok(sys_handle) => {
314 Ok(DataChannel { handle: imp_dc::DataChannel::configure(sys_handle) })
315 }
316 Err(e) => Err(unsafe { sys_err::ffi::RtcError::from(e.what()).into() }),
317 }
318 }
319
320 pub fn add_track<T: AsRef<str>>(
321 &self,
322 track: MediaStreamTrack,
323 stream_ids: &[T],
324 ) -> Result<RtpSender, RtcError> {
325 let stream_ids = stream_ids.iter().map(|s| s.as_ref().to_owned()).collect();
326 let res = self.sys_handle.add_track(track.sys_handle(), &stream_ids);
327
328 match res {
329 Ok(sys_handle) => Ok(RtpSender { handle: imp_rs::RtpSender { sys_handle } }),
330 Err(e) => unsafe { Err(sys_err::ffi::RtcError::from(e.what()).into()) },
331 }
332 }
333
334 pub fn add_transceiver(
335 &self,
336 track: MediaStreamTrack,
337 init: RtpTransceiverInit,
338 ) -> Result<RtpTransceiver, RtcError> {
339 let res = self.sys_handle.add_transceiver(track.sys_handle(), init.into());
340
341 match res {
342 Ok(sys_handle) => Ok(RtpTransceiver { handle: imp_rt::RtpTransceiver { sys_handle } }),
343 Err(e) => unsafe { Err(sys_err::ffi::RtcError::from(e.what()).into()) },
344 }
345 }
346
347 pub fn add_transceiver_for_media(
348 &self,
349 media_type: MediaType,
350 init: RtpTransceiverInit,
351 ) -> Result<RtpTransceiver, RtcError> {
352 let res = self.sys_handle.add_transceiver_for_media(media_type.into(), init.into());
353
354 match res {
355 Ok(cxx_handle) => {
356 Ok(RtpTransceiver { handle: imp_rt::RtpTransceiver { sys_handle: cxx_handle } })
357 }
358 Err(e) => unsafe { Err(sys_err::ffi::RtcError::from(e.what()).into()) },
359 }
360 }
361
362 pub fn restart_ice(&self) {
363 self.sys_handle.restart_ice();
364 }
365
366 pub fn close(&self) {
367 self.sys_handle.close();
368 }
369
370 pub fn connection_state(&self) -> PeerConnectionState {
371 self.sys_handle.connection_state().into()
372 }
373
374 pub fn ice_connection_state(&self) -> IceConnectionState {
375 self.sys_handle.ice_connection_state().into()
376 }
377
378 pub fn ice_gathering_state(&self) -> IceGatheringState {
379 self.sys_handle.ice_gathering_state().into()
380 }
381
382 pub fn signaling_state(&self) -> SignalingState {
383 self.sys_handle.signaling_state().into()
384 }
385
386 pub fn current_local_description(&self) -> Option<SessionDescription> {
387 let sdp = self.sys_handle.current_local_description();
388 if sdp.is_null() {
389 return None;
390 }
391
392 Some(SessionDescription { handle: imp_sdp::SessionDescription { sys_handle: sdp } })
393 }
394
395 pub fn current_remote_description(&self) -> Option<SessionDescription> {
396 let sdp = self.sys_handle.current_remote_description();
397 if sdp.is_null() {
398 return None;
399 }
400
401 Some(SessionDescription { handle: imp_sdp::SessionDescription { sys_handle: sdp } })
402 }
403
404 pub fn remove_track(&self, sender: RtpSender) -> Result<(), RtcError> {
405 self.sys_handle
406 .remove_track(sender.handle.sys_handle)
407 .map_err(|e| unsafe { sys_err::ffi::RtcError::from(e.what()).into() })
408 }
409
410 pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
411 let (tx, rx) = oneshot::channel::<Result<Vec<RtcStats>, RtcError>>();
412 let ctx = Box::new(sys_pc::PeerContext(Box::new(tx)));
413
414 self.sys_handle.get_stats(ctx, |ctx, stats| {
415 let tx = ctx.0.downcast::<oneshot::Sender<Result<Vec<RtcStats>, RtcError>>>().unwrap();
416
417 if stats.is_empty() {
418 let _ = tx.send(Ok(vec![]));
419 return;
420 }
421
422 let vec = serde_json::from_str(&stats).unwrap();
424 let _ = tx.send(Ok(vec));
425 });
426
427 rx.await.map_err(|_| RtcError {
428 error_type: RtcErrorType::Internal,
429 message: "get_stats cancelled".to_owned(),
430 })?
431 }
432
433 pub fn senders(&self) -> Vec<RtpSender> {
434 self.sys_handle
435 .get_senders()
436 .into_iter()
437 .map(|sender| RtpSender { handle: imp_rs::RtpSender { sys_handle: sender.ptr } })
438 .collect()
439 }
440
441 pub fn receivers(&self) -> Vec<RtpReceiver> {
442 self.sys_handle
443 .get_receivers()
444 .into_iter()
445 .map(|receiver| RtpReceiver {
446 handle: imp_rr::RtpReceiver { sys_handle: receiver.ptr },
447 })
448 .collect()
449 }
450
451 pub fn transceivers(&self) -> Vec<RtpTransceiver> {
452 self.sys_handle
453 .get_transceivers()
454 .into_iter()
455 .map(|transceiver| RtpTransceiver {
456 handle: imp_rt::RtpTransceiver { sys_handle: transceiver.ptr },
457 })
458 .collect()
459 }
460
461 pub fn on_connection_state_change(&self, f: Option<OnConnectionChange>) {
462 *self.observer.connection_change_handler.lock() = f;
463 }
464
465 pub fn on_data_channel(&self, f: Option<OnDataChannel>) {
466 *self.observer.data_channel_handler.lock() = f;
467 }
468
469 pub fn on_ice_candidate(&self, f: Option<OnIceCandidate>) {
470 *self.observer.ice_candidate_handler.lock() = f;
471 }
472
473 pub fn on_ice_candidate_error(&self, f: Option<OnIceCandidateError>) {
474 *self.observer.ice_candidate_error_handler.lock() = f;
475 }
476
477 pub fn on_ice_connection_state_change(&self, f: Option<OnIceConnectionChange>) {
478 *self.observer.ice_connection_change_handler.lock() = f;
479 }
480
481 pub fn on_ice_gathering_state_change(&self, f: Option<OnIceGatheringChange>) {
482 *self.observer.ice_gathering_change_handler.lock() = f;
483 }
484
485 pub fn on_negotiation_needed(&self, f: Option<OnNegotiationNeeded>) {
486 *self.observer.negotiation_needed_handler.lock() = f;
487 }
488
489 pub fn on_signaling_state_change(&self, f: Option<OnSignalingChange>) {
490 *self.observer.signaling_change_handler.lock() = f;
491 }
492
493 pub fn on_track(&self, f: Option<OnTrack>) {
494 *self.observer.track_handler.lock() = f;
495 }
496}
497
498#[derive(Default)]
499pub struct PeerObserver {
500 pub connection_change_handler: Mutex<Option<OnConnectionChange>>,
501 pub data_channel_handler: Mutex<Option<OnDataChannel>>,
502 pub ice_candidate_handler: Mutex<Option<OnIceCandidate>>,
503 pub ice_candidate_error_handler: Mutex<Option<OnIceCandidateError>>,
504 pub ice_connection_change_handler: Mutex<Option<OnIceConnectionChange>>,
505 pub ice_gathering_change_handler: Mutex<Option<OnIceGatheringChange>>,
506 pub negotiation_needed_handler: Mutex<Option<OnNegotiationNeeded>>,
507 pub signaling_change_handler: Mutex<Option<OnSignalingChange>>,
508 pub track_handler: Mutex<Option<OnTrack>>,
509}
510
511impl sys_pcf::PeerConnectionObserver for PeerObserver {
512 fn on_signaling_change(&self, new_state: sys_pc::ffi::SignalingState) {
513 if let Some(f) = self.signaling_change_handler.lock().as_mut() {
514 f(new_state.into());
515 }
516 }
517
518 fn on_add_stream(&self, _stream: SharedPtr<webrtc_sys::media_stream::ffi::MediaStream>) {}
519
520 fn on_remove_stream(&self, _stream: SharedPtr<webrtc_sys::media_stream::ffi::MediaStream>) {}
521
522 fn on_data_channel(&self, data_channel: SharedPtr<sys_dc::ffi::DataChannel>) {
523 if let Some(f) = self.data_channel_handler.lock().as_mut() {
524 f(DataChannel { handle: imp_dc::DataChannel::configure(data_channel) });
525 }
526 }
527
528 fn on_renegotiation_needed(&self) {}
529
530 fn on_negotiation_needed_event(&self, event: u32) {
531 if let Some(f) = self.negotiation_needed_handler.lock().as_mut() {
532 f(event);
533 }
534 }
535
536 fn on_ice_connection_change(&self, _new_state: sys_pc::ffi::IceConnectionState) {}
537
538 fn on_standardized_ice_connection_change(&self, new_state: sys_pc::ffi::IceConnectionState) {
539 if let Some(f) = self.ice_connection_change_handler.lock().as_mut() {
540 f(new_state.into());
541 }
542 }
543
544 fn on_connection_change(&self, new_state: sys_pc::ffi::PeerConnectionState) {
545 if let Some(f) = self.connection_change_handler.lock().as_mut() {
546 f(new_state.into());
547 }
548 }
549
550 fn on_ice_gathering_change(&self, new_state: sys_pc::ffi::IceGatheringState) {
551 if let Some(f) = self.ice_gathering_change_handler.lock().as_mut() {
552 f(new_state.into());
553 }
554 }
555
556 fn on_ice_candidate(&self, candidate: SharedPtr<sys_jsep::ffi::IceCandidate>) {
557 if let Some(f) = self.ice_candidate_handler.lock().as_mut() {
558 f(IceCandidate { handle: imp_ic::IceCandidate { sys_handle: candidate } });
559 }
560 }
561
562 fn on_ice_candidate_error(
563 &self,
564 address: String,
565 port: i32,
566 url: String,
567 error_code: i32,
568 error_text: String,
569 ) {
570 if let Some(f) = self.ice_candidate_error_handler.lock().as_mut() {
571 f(IceCandidateError { address, port, url, error_code, error_text });
572 }
573 }
574
575 fn on_ice_candidates_removed(
576 &self,
577 _removed: Vec<SharedPtr<webrtc_sys::candidate::ffi::Candidate>>,
578 ) {
579 }
580
581 fn on_ice_connection_receiving_change(&self, _receiving: bool) {}
582
583 fn on_ice_selected_candidate_pair_changed(
584 &self,
585 _event: sys_pcf::ffi::CandidatePairChangeEvent,
586 ) {
587 }
588
589 fn on_add_track(
590 &self,
591 _receiver: SharedPtr<webrtc_sys::rtp_receiver::ffi::RtpReceiver>,
592 _streams: Vec<SharedPtr<webrtc_sys::media_stream::ffi::MediaStream>>,
593 ) {
594 }
595
596 fn on_track(&self, transceiver: SharedPtr<webrtc_sys::rtp_transceiver::ffi::RtpTransceiver>) {
597 if let Some(f) = self.track_handler.lock().as_mut() {
598 let receiver = transceiver.receiver();
599 let streams = receiver.streams();
600 let track = receiver.track();
601
602 f(TrackEvent {
603 receiver: RtpReceiver { handle: imp_rr::RtpReceiver { sys_handle: receiver } },
604 streams: streams
605 .into_iter()
606 .map(|s| MediaStream { handle: imp_ms::MediaStream { sys_handle: s.ptr } })
607 .collect(),
608 track: imp_mst::new_media_stream_track(track),
609 transceiver: RtpTransceiver {
610 handle: imp_rt::RtpTransceiver { sys_handle: transceiver },
611 },
612 });
613 }
614 }
615
616 fn on_remove_track(&self, _receiver: SharedPtr<webrtc_sys::rtp_receiver::ffi::RtpReceiver>) {}
617
618 fn on_interesting_usage(&self, _usage_pattern: i32) {}
619}