libwebrtc/
peer_connection.rs

1// Copyright 2023 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Debug;
16
17use crate::{
18    data_channel::{DataChannel, DataChannelInit},
19    ice_candidate::IceCandidate,
20    imp::peer_connection as imp_pc,
21    media_stream::MediaStream,
22    media_stream_track::MediaStreamTrack,
23    peer_connection_factory::RtcConfiguration,
24    rtp_receiver::RtpReceiver,
25    rtp_sender::RtpSender,
26    rtp_transceiver::{RtpTransceiver, RtpTransceiverInit},
27    session_description::SessionDescription,
28    stats::RtcStats,
29    MediaType, RtcError,
30};
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq)]
33pub enum PeerConnectionState {
34    New,
35    Connecting,
36    Connected,
37    Disconnected,
38    Failed,
39    Closed,
40}
41
42#[derive(Debug, Copy, Clone, PartialEq, Eq)]
43pub enum IceConnectionState {
44    New,
45    Checking,
46    Connected,
47    Completed,
48    Failed,
49    Disconnected,
50    Closed,
51    Max,
52}
53
54#[derive(Debug, Copy, Clone, PartialEq, Eq)]
55pub enum IceGatheringState {
56    New,
57    Gathering,
58    Complete,
59}
60
61#[derive(Debug, Copy, Clone, PartialEq, Eq)]
62pub enum SignalingState {
63    Stable,
64    HaveLocalOffer,
65    HaveLocalPrAnswer,
66    HaveRemoteOffer,
67    HaveRemotePrAnswer,
68    Closed,
69}
70
71#[derive(Debug, Clone, Default)]
72pub struct OfferOptions {
73    pub ice_restart: bool,
74    pub offer_to_receive_audio: bool,
75    pub offer_to_receive_video: bool,
76}
77
78#[derive(Debug, Clone, Default)]
79pub struct AnswerOptions {}
80
81#[derive(Debug, Clone)]
82pub struct IceCandidateError {
83    pub address: String,
84    pub port: i32,
85    pub url: String,
86    pub error_code: i32,
87    pub error_text: String,
88}
89
90#[derive(Debug, Clone)]
91pub struct TrackEvent {
92    pub receiver: RtpReceiver,
93    pub streams: Vec<MediaStream>,
94    pub track: MediaStreamTrack,
95    pub transceiver: RtpTransceiver,
96}
97
98pub type OnConnectionChange = Box<dyn FnMut(PeerConnectionState) + Send + Sync>;
99pub type OnDataChannel = Box<dyn FnMut(DataChannel) + Send + Sync>;
100pub type OnIceCandidate = Box<dyn FnMut(IceCandidate) + Send + Sync>;
101pub type OnIceCandidateError = Box<dyn FnMut(IceCandidateError) + Send + Sync>;
102pub type OnIceConnectionChange = Box<dyn FnMut(IceConnectionState) + Send + Sync>;
103pub type OnIceGatheringChange = Box<dyn FnMut(IceGatheringState) + Send + Sync>;
104pub type OnNegotiationNeeded = Box<dyn FnMut(u32) + Send + Sync>;
105pub type OnSignalingChange = Box<dyn FnMut(SignalingState) + Send + Sync>;
106pub type OnTrack = Box<dyn FnMut(TrackEvent) + Send + Sync>;
107
108#[derive(Clone)]
109pub struct PeerConnection {
110    pub(crate) handle: imp_pc::PeerConnection,
111}
112
113impl PeerConnection {
114    pub fn set_configuration(&self, config: RtcConfiguration) -> Result<(), RtcError> {
115        self.handle.set_configuration(config)
116    }
117
118    pub async fn create_offer(
119        &self,
120        options: OfferOptions,
121    ) -> Result<SessionDescription, RtcError> {
122        self.handle.create_offer(options).await
123    }
124
125    pub async fn create_answer(
126        &self,
127        options: AnswerOptions,
128    ) -> Result<SessionDescription, RtcError> {
129        self.handle.create_answer(options).await
130    }
131
132    pub async fn set_local_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
133        self.handle.set_local_description(desc).await
134    }
135
136    pub async fn set_remote_description(&self, desc: SessionDescription) -> Result<(), RtcError> {
137        self.handle.set_remote_description(desc).await
138    }
139
140    pub async fn add_ice_candidate(&self, candidate: IceCandidate) -> Result<(), RtcError> {
141        self.handle.add_ice_candidate(candidate).await
142    }
143
144    pub fn create_data_channel(
145        &self,
146        label: &str,
147        init: DataChannelInit,
148    ) -> Result<DataChannel, RtcError> {
149        self.handle.create_data_channel(label, init)
150    }
151
152    pub fn add_track<T: AsRef<str>>(
153        &self,
154        track: MediaStreamTrack,
155        streams_ids: &[T],
156    ) -> Result<RtpSender, RtcError> {
157        self.handle.add_track(track, streams_ids)
158    }
159
160    pub fn remove_track(&self, sender: RtpSender) -> Result<(), RtcError> {
161        self.handle.remove_track(sender)
162    }
163
164    pub async fn get_stats(&self) -> Result<Vec<RtcStats>, RtcError> {
165        self.handle.get_stats().await
166    }
167
168    pub fn add_transceiver(
169        &self,
170        track: MediaStreamTrack,
171        init: RtpTransceiverInit,
172    ) -> Result<RtpTransceiver, RtcError> {
173        self.handle.add_transceiver(track, init)
174    }
175
176    pub fn add_transceiver_for_media(
177        &self,
178        media_type: MediaType,
179        init: RtpTransceiverInit,
180    ) -> Result<RtpTransceiver, RtcError> {
181        self.handle.add_transceiver_for_media(media_type, init)
182    }
183
184    pub fn close(&self) {
185        self.handle.close()
186    }
187
188    pub fn restart_ice(&self) {
189        self.handle.restart_ice()
190    }
191
192    pub fn connection_state(&self) -> PeerConnectionState {
193        self.handle.connection_state()
194    }
195
196    pub fn ice_connection_state(&self) -> IceConnectionState {
197        self.handle.ice_connection_state()
198    }
199
200    pub fn ice_gathering_state(&self) -> IceGatheringState {
201        self.handle.ice_gathering_state()
202    }
203
204    pub fn signaling_state(&self) -> SignalingState {
205        self.handle.signaling_state()
206    }
207
208    pub fn current_local_description(&self) -> Option<SessionDescription> {
209        self.handle.current_local_description()
210    }
211
212    pub fn current_remote_description(&self) -> Option<SessionDescription> {
213        self.handle.current_remote_description()
214    }
215
216    pub fn senders(&self) -> Vec<RtpSender> {
217        self.handle.senders()
218    }
219
220    pub fn receivers(&self) -> Vec<RtpReceiver> {
221        self.handle.receivers()
222    }
223
224    pub fn transceivers(&self) -> Vec<RtpTransceiver> {
225        self.handle.transceivers()
226    }
227
228    pub fn on_connection_state_change(&self, f: Option<OnConnectionChange>) {
229        self.handle.on_connection_state_change(f)
230    }
231
232    pub fn on_data_channel(&self, f: Option<OnDataChannel>) {
233        self.handle.on_data_channel(f)
234    }
235
236    pub fn on_ice_candidate(&self, f: Option<OnIceCandidate>) {
237        self.handle.on_ice_candidate(f)
238    }
239
240    pub fn on_ice_candidate_error(&self, f: Option<OnIceCandidateError>) {
241        self.handle.on_ice_candidate_error(f)
242    }
243
244    pub fn on_ice_connection_state_change(&self, f: Option<OnIceConnectionChange>) {
245        self.handle.on_ice_connection_state_change(f)
246    }
247
248    pub fn on_ice_gathering_state_change(&self, f: Option<OnIceGatheringChange>) {
249        self.handle.on_ice_gathering_state_change(f)
250    }
251
252    pub fn on_negotiation_needed(&self, f: Option<OnNegotiationNeeded>) {
253        self.handle.on_negotiation_needed(f)
254    }
255
256    pub fn on_signaling_state_change(&self, f: Option<OnSignalingChange>) {
257        self.handle.on_signaling_state_change(f)
258    }
259
260    pub fn on_track(&self, f: Option<OnTrack>) {
261        self.handle.on_track(f)
262    }
263}
264
265impl Debug for PeerConnection {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        f.debug_struct("PeerConnection")
268            .field("state", &self.connection_state())
269            .field("ice_state", &self.ice_connection_state())
270            .finish()
271    }
272}
273
274#[cfg(test)]
275mod tests {
276    use log::trace;
277    use tokio::sync::mpsc;
278
279    use crate::{peer_connection::*, peer_connection_factory::*};
280
281    #[tokio::test]
282    async fn create_pc() {
283        let _ = env_logger::builder().is_test(true).try_init();
284
285        let factory = PeerConnectionFactory::default();
286        let config = RtcConfiguration {
287            ice_servers: vec![IceServer {
288                urls: vec!["stun:stun1.l.google.com:19302".to_string()],
289                username: "".into(),
290                password: "".into(),
291            }],
292            continual_gathering_policy: ContinualGatheringPolicy::GatherOnce,
293            ice_transport_type: IceTransportsType::All,
294        };
295
296        let bob = factory.create_peer_connection(config.clone()).unwrap();
297        let alice = factory.create_peer_connection(config.clone()).unwrap();
298
299        let (bob_ice_tx, mut bob_ice_rx) = mpsc::unbounded_channel::<IceCandidate>();
300        let (alice_ice_tx, mut alice_ice_rx) = mpsc::unbounded_channel::<IceCandidate>();
301        let (alice_dc_tx, mut alice_dc_rx) = mpsc::unbounded_channel::<DataChannel>();
302
303        bob.on_ice_candidate(Some(Box::new(move |candidate| {
304            bob_ice_tx.send(candidate).unwrap();
305        })));
306
307        alice.on_ice_candidate(Some(Box::new(move |candidate| {
308            alice_ice_tx.send(candidate).unwrap();
309        })));
310
311        alice.on_data_channel(Some(Box::new(move |dc| {
312            alice_dc_tx.send(dc).unwrap();
313        })));
314
315        let bob_dc = bob.create_data_channel("test_dc", DataChannelInit::default()).unwrap();
316
317        let offer = bob.create_offer(OfferOptions::default()).await.unwrap();
318        trace!("Bob offer: {:?}", offer);
319        bob.set_local_description(offer.clone()).await.unwrap();
320        alice.set_remote_description(offer).await.unwrap();
321
322        let answer = alice.create_answer(AnswerOptions::default()).await.unwrap();
323        trace!("Alice answer: {:?}", answer);
324        alice.set_local_description(answer.clone()).await.unwrap();
325        bob.set_remote_description(answer).await.unwrap();
326
327        let bob_ice = bob_ice_rx.recv().await.unwrap();
328        let alice_ice = alice_ice_rx.recv().await.unwrap();
329
330        bob.add_ice_candidate(alice_ice).await.unwrap();
331        alice.add_ice_candidate(bob_ice).await.unwrap();
332
333        let (data_tx, mut data_rx) = mpsc::unbounded_channel::<String>();
334        let alice_dc = alice_dc_rx.recv().await.unwrap();
335        alice_dc.on_message(Some(Box::new(move |buffer| {
336            data_tx.send(String::from_utf8_lossy(buffer.data).to_string()).unwrap();
337        })));
338
339        bob_dc.send(b"This is a test", true).unwrap();
340        assert_eq!(data_rx.recv().await.unwrap(), "This is a test");
341
342        alice.close();
343        bob.close();
344    }
345}