webrtc_connection/
rtc_connection.rs

1use std::sync::Arc;
2use cs_trace::{Tracer, create_trace};
3use cs_utils::futures::GenericCodec;
4use tokio::{io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::{Sender, Receiver}}};
5use tokio_util::codec::Framed;
6use futures::{StreamExt, stream::{SplitStream, SplitSink}};
7use webrtc::{peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, api::{APIBuilder, media_engine::MediaEngine, interceptor_registry::register_default_interceptors}, interceptor::registry::Registry};
8use connection_utils::{Channel, Disconnected};
9
10use crate::types::RTCSignalingMessage;
11
12mod event_handlers;
13
14mod disconnected;
15mod connected;
16
17pub struct RtcConnection<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> {
18    trace: Box<dyn Tracer>,
19    signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
20    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
21    peer_connection: Arc<RTCPeerConnection>,
22    on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
23    on_data_channel_source: Option<Receiver<Box<dyn Channel>>>,
24}
25
26impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> RtcConnection<TAsyncDuplexStream> {
27    pub async fn new(
28        stream: Box<TAsyncDuplexStream>,
29        config: RTCConfiguration,
30    ) -> anyhow::Result<Box<dyn Disconnected>> {
31        let trace = create_trace!("rtc-connection");
32        let codec = GenericCodec::new();
33        let framed_stream = Framed::new(stream, codec);
34        let (
35            signaling_sink,
36            signaling_source
37        ) = framed_stream.split();
38
39        // create a MediaEngine object to configure the supported codec
40        let mut media_engine = MediaEngine::default();
41        // register default codecs
42        media_engine.register_default_codecs()?;
43
44        // create a InterceptorRegistry. this is the user configurable RTP/RTCP Pipeline.
45        // this provides NACKs, RTCP Reports and other features. if you use `webrtc.NewPeerConnection`
46        // this is enabled by default. if you are manually managing You MUST create a interceptorRegistry
47        // for each peerConnection.
48        let mut registry = Registry::new();
49        // use the default set of Interceptors
50        registry = register_default_interceptors(registry, &mut media_engine).await?;
51
52        // create the API object with the MediaEngine
53        let api = APIBuilder::new()
54            .with_media_engine(media_engine)
55            .with_interceptor_registry(registry)
56            .build();
57
58        // create a new RTCPeerConnection
59        let peer_connection = Arc::new(
60            api.new_peer_connection(config).await?
61        );
62
63        let (
64            on_data_channel_sink,
65            on_data_channel_source,
66        ) = tokio::sync::mpsc::channel(10);
67        
68        return Ok(
69            Box::new(
70                RtcConnection {
71                    trace,
72                    signaling_source: Arc::new(Mutex::new(signaling_source)),
73                    signaling_sink: Arc::new(Mutex::new(signaling_sink)),
74                    peer_connection,
75                    on_data_channel_sink: Arc::new(Mutex::new(on_data_channel_sink)),
76                    on_data_channel_source: Some(on_data_channel_source),
77                },
78            ),
79        );
80    }
81}