webrtc_connection/
rtc_connection.rs1use std::sync::Arc;
2use cs_trace::{Tracer, create_trace};
3use cs_utils::futures::GenericCodec;
4use tokio::{io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::{Sender, Receiver}}};
5use tokio_util::codec::Framed;
6use futures::{StreamExt, stream::{SplitStream, SplitSink}};
7use webrtc::{peer_connection::{configuration::RTCConfiguration, RTCPeerConnection}, api::{APIBuilder, media_engine::MediaEngine, interceptor_registry::register_default_interceptors}, interceptor::registry::Registry};
8use connection_utils::{Channel, Disconnected};
9
10use crate::types::RTCSignalingMessage;
11
12mod event_handlers;
13
14mod disconnected;
15mod connected;
16
17pub struct RtcConnection<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> {
18 trace: Box<dyn Tracer>,
19 signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
20 signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
21 peer_connection: Arc<RTCPeerConnection>,
22 on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
23 on_data_channel_source: Option<Receiver<Box<dyn Channel>>>,
24}
25
26impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> RtcConnection<TAsyncDuplexStream> {
27 pub async fn new(
28 stream: Box<TAsyncDuplexStream>,
29 config: RTCConfiguration,
30 ) -> anyhow::Result<Box<dyn Disconnected>> {
31 let trace = create_trace!("rtc-connection");
32 let codec = GenericCodec::new();
33 let framed_stream = Framed::new(stream, codec);
34 let (
35 signaling_sink,
36 signaling_source
37 ) = framed_stream.split();
38
39 let mut media_engine = MediaEngine::default();
41 media_engine.register_default_codecs()?;
43
44 let mut registry = Registry::new();
49 registry = register_default_interceptors(registry, &mut media_engine).await?;
51
52 let api = APIBuilder::new()
54 .with_media_engine(media_engine)
55 .with_interceptor_registry(registry)
56 .build();
57
58 let peer_connection = Arc::new(
60 api.new_peer_connection(config).await?
61 );
62
63 let (
64 on_data_channel_sink,
65 on_data_channel_source,
66 ) = tokio::sync::mpsc::channel(10);
67
68 return Ok(
69 Box::new(
70 RtcConnection {
71 trace,
72 signaling_source: Arc::new(Mutex::new(signaling_source)),
73 signaling_sink: Arc::new(Mutex::new(signaling_sink)),
74 peer_connection,
75 on_data_channel_sink: Arc::new(Mutex::new(on_data_channel_sink)),
76 on_data_channel_source: Some(on_data_channel_source),
77 },
78 ),
79 );
80 }
81}