webrtc_connection/rtc_connection/
disconnected.rs1use std::sync::Arc;
2
3use anyhow::{Result, bail};
4use async_trait::async_trait;
5use cs_trace::{Tracer, child};
6use tokio_util::codec::Framed;
7use tokio::sync::mpsc::Receiver;
8use cs_utils::futures::GenericCodec;
9use futures::{stream::{SplitSink, SplitStream}, SinkExt};
10use tokio::{join, io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::Sender}, try_join};
11use webrtc::{peer_connection::RTCPeerConnection, data_channel::RTCDataChannel};
12use connection_utils::{Channel, Disconnected, Connected};
13
14use crate::{types::RTCSignalingMessage, rtc_connection::event_handlers::{on_peer_connection_state_change, on_signal, on_ice_candidate}, RtcChannel};
15
16use super::{RtcConnection, connected::RtcConnectionConnected};
17
18async fn on_connection(
19 mut connection_signal_source: Receiver<()>,
20) -> anyhow::Result<()> {
21 while let Some(_) = connection_signal_source.recv().await {
22 return Ok(());
23 }
24
25 bail!("Failed to receive single connection signal message.");
26}
27
28async fn initiate_connection<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
29 trace: Box<dyn Tracer>,
30 peer_connection: Arc<RTCPeerConnection>,
31 signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
32) -> Result<Arc<RTCDataChannel>> {
33 let data_channel = peer_connection
35 .create_data_channel("default_channel", None)
36 .await
37 .expect("Cannot create data channel.");
38
39 trace.trace("channel on_message handler set up, creating offer");
54
55 let offer = peer_connection
57 .create_offer(None)
58 .await
59 .expect("Cannot create initial offer.");
60
61 trace.trace(
62 &format!("offer created: {:?}", &offer),
63 );
64
65 peer_connection
68 .set_local_description(offer.clone())
69 .await
70 .expect("Cannot set initial offer local description.");
71
72 trace.trace("local description set");
73
74 signaling_sink
75 .lock()
76 .await
77 .send(RTCSignalingMessage::SessionDescription(offer))
78 .await
79 .expect("Cannot send inital offer.");
80
81 trace.trace("offer sent");
82
83 return Ok(data_channel);
84}
85
86async fn listen_impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
87 trace: &Box<dyn Tracer>,
88 should_reply: bool,
89 peer_connection: Arc<RTCPeerConnection>,
90 connection_signal_sink: Arc<tokio::sync::mpsc::Sender<()>>,
91 signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
92 signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
93) -> anyhow::Result<()> {
94 let pending_candidates = Arc::new(Mutex::new(vec![]));
95
96 let on_ice_candidate_trace = child!(trace, "on-ice-candidate");
97 let on_ice_candidate_peer_connection = Arc::clone(&peer_connection);
98 let on_ice_candidate_signaling_sink = Arc::clone(&signaling_sink);
99 let on_ice_candidate_pending_candidates = Arc::clone(&pending_candidates);
100
101 let on_peer_state_trace = child!(trace, "on-peer-state");
102 let on_peer_state_peer_connection = Arc::clone(&peer_connection);
103
104 let on_signal_trace = child!(trace, "on-signal");
105 let on_signal_peer_connection = Arc::clone(&peer_connection);
106 let on_signal_signaling_sink = Arc::clone(&signaling_sink);
107 let on_signal_pending_candidates = Arc::clone(&pending_candidates);
108
109 let _res = join!(
111 tokio::spawn(on_ice_candidate(
112 on_ice_candidate_trace,
113 on_ice_candidate_peer_connection,
114 on_ice_candidate_signaling_sink,
115 on_ice_candidate_pending_candidates,
116 )),
117 tokio::spawn(on_peer_connection_state_change(
118 on_peer_state_trace,
119 on_peer_state_peer_connection,
120 connection_signal_sink,
121 )),
122 tokio::spawn(on_signal(
123 on_signal_trace,
124 on_signal_peer_connection,
125 signaling_source,
126 on_signal_signaling_sink,
127 on_signal_pending_candidates,
128 should_reply,
129 )),
130 );
131
132 trace.info("Done.");
135
136 return Ok(());
137}
138
139async fn listen_until_connected<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
140 trace: Box<dyn Tracer>,
141 should_reply: bool,
142 peer_connection: Arc<RTCPeerConnection>,
143 on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
144 signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
145 signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
146) -> anyhow::Result<()> {
147 let (connection_signal_sink, connection_signal_source) = tokio::sync::mpsc::channel::<()>(1);
148
149 tokio::select! {
150 _ = listen_impl(&trace, should_reply, Arc::clone(&peer_connection), Arc::new(connection_signal_sink), signaling_source, signaling_sink) => {
151 trace.trace("listeners complete");
152
153 bail!("Connection listeners stopped before connection is established.");
154 },
155 connection_signal_result = on_connection(connection_signal_source) => {
156 trace.info("connection complete");
157
158 let on_data_channel_trace = child!(trace, "channel");
160 peer_connection
161 .on_data_channel(Box::new(move |data_channel| {
162 let on_data_channel_sink = Arc::clone(&on_data_channel_sink);
163
164 let trace = child!(on_data_channel_trace, "");
165 return Box::pin(async move {
166 let data_stream = RtcChannel::new(
167 &trace,
168 data_channel,
169 ).await;
170
171 let _res = on_data_channel_sink
172 .lock()
173 .await
174 .try_send(Box::new(data_stream));
175 });
176 }))
177 .await;
178
179 trace.info("on_data_channel handler set up");
180
181 return Ok(connection_signal_result?);
182 },
183 };
184}
185
186#[async_trait]
187impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> Disconnected for RtcConnection<TAsyncDuplexStream> {
188 async fn connect(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
189 let trace =&self.trace;
190 let trace = child!(trace, "connect");
191 let peer_connection = Arc::clone(&self.peer_connection);
192 let signaling_sink = Arc::clone(&self.signaling_sink);
193
194 let (data_channel, _) = try_join!(
195 initiate_connection(child!(trace, "init"), peer_connection, signaling_sink),
197 listen_until_connected(
199 child!(trace, "events"),
200 false,
201 Arc::clone(&self.peer_connection),
202 Arc::clone(&self.on_data_channel_sink),
203 Arc::clone(&self.signaling_source),
204 Arc::clone(&self.signaling_sink),
205 ),
206 )?;
207
208 let inital_channel = Box::new(
209 RtcChannel::new(
210 &self.trace,
211 data_channel,
212 ).await,
213 );
214
215 return Ok(
216 Box::new(
217 RtcConnectionConnected::new(
218 &self.trace,
219 Arc::clone(&self.peer_connection),
220 Some(inital_channel),
221 self.on_data_channel_source.take().unwrap(),
222 ).unwrap(),
223 ),
224 );
225 }
226
227 async fn listen(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
228 let trace = &self.trace;
229 listen_until_connected(
230 child!(trace, "listen"),
231 true,
232 Arc::clone(&self.peer_connection),
233 Arc::clone(&self.on_data_channel_sink),
234 Arc::clone(&self.signaling_source),
235 Arc::clone(&self.signaling_sink),
236 ).await?;
237
238 return Ok(
239 Box::new(
240 RtcConnectionConnected::new(
241 &self.trace,
242 Arc::clone(&self.peer_connection),
243 None,
244 self.on_data_channel_source.take().unwrap(),
245 ).unwrap(),
246 ),
247 );
248 }
249}