webrtc_connection/rtc_connection/
disconnected.rs

1use std::sync::Arc;
2
3use anyhow::{Result, bail};
4use async_trait::async_trait;
5use cs_trace::{Tracer, child};
6use tokio_util::codec::Framed;
7use tokio::sync::mpsc::Receiver;
8use cs_utils::futures::GenericCodec;
9use futures::{stream::{SplitSink, SplitStream}, SinkExt};
10use tokio::{join, io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::Sender}, try_join};
11use webrtc::{peer_connection::RTCPeerConnection, data_channel::RTCDataChannel};
12use connection_utils::{Channel, Disconnected, Connected};
13
14use crate::{types::RTCSignalingMessage, rtc_connection::event_handlers::{on_peer_connection_state_change, on_signal, on_ice_candidate}, RtcChannel};
15
16use super::{RtcConnection, connected::RtcConnectionConnected};
17
18async fn on_connection(
19    mut connection_signal_source: Receiver<()>,
20) -> anyhow::Result<()> {
21    while let Some(_) = connection_signal_source.recv().await {
22        return Ok(());
23    }
24
25    bail!("Failed to receive single connection signal message.");
26}
27
28async fn initiate_connection<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
29    trace: Box<dyn Tracer>,
30    peer_connection: Arc<RTCPeerConnection>,
31    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
32) -> Result<Arc<RTCDataChannel>> {
33    // Create a datachannel with label 'data'
34    let data_channel = peer_connection
35        .create_data_channel("default_channel", None)
36        .await
37        .expect("Cannot create data channel.");
38
39    // trace.trace("data channel created");
40
41    // register text message handling
42    // let on_message_trace = trace.child("on-channel-msg");
43    // data_channel
44    //     .on_message(Box::new(move |msg: DataChannelMessage| {
45    //         let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
46    //         on_message_trace.info(
47    //             &format!("<< {}", msg_str),
48    //         );
49    //         Box::pin(async {})
50    //     }))
51    //     .await;
52
53    trace.trace("channel on_message handler set up, creating offer");
54
55    // Create an offer to send to the other process
56    let offer = peer_connection
57        .create_offer(None)
58        .await
59        .expect("Cannot create initial offer.");
60
61    trace.trace(
62        &format!("offer created: {:?}", &offer),
63    );
64
65    // Sets the LocalDescription, and starts our UDP listeners
66    // Note: this will start the gathering of ICE candidates
67    peer_connection
68        .set_local_description(offer.clone())
69        .await
70        .expect("Cannot set initial offer local description.");
71
72    trace.trace("local description set");
73
74    signaling_sink
75        .lock()
76        .await
77        .send(RTCSignalingMessage::SessionDescription(offer))
78        .await
79        .expect("Cannot send inital offer.");
80
81    trace.trace("offer sent");
82
83    return Ok(data_channel);
84}
85
86async fn listen_impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
87    trace: &Box<dyn Tracer>,
88    should_reply: bool,
89    peer_connection: Arc<RTCPeerConnection>,
90    connection_signal_sink: Arc<tokio::sync::mpsc::Sender<()>>,
91    signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
92    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
93) -> anyhow::Result<()> {
94    let pending_candidates = Arc::new(Mutex::new(vec![]));
95
96    let on_ice_candidate_trace = child!(trace, "on-ice-candidate");
97    let on_ice_candidate_peer_connection = Arc::clone(&peer_connection);
98    let on_ice_candidate_signaling_sink = Arc::clone(&signaling_sink);
99    let on_ice_candidate_pending_candidates = Arc::clone(&pending_candidates);
100
101    let on_peer_state_trace = child!(trace, "on-peer-state");
102    let on_peer_state_peer_connection = Arc::clone(&peer_connection);
103
104    let on_signal_trace = child!(trace, "on-signal");
105    let on_signal_peer_connection = Arc::clone(&peer_connection);
106    let on_signal_signaling_sink = Arc::clone(&signaling_sink);
107    let on_signal_pending_candidates = Arc::clone(&pending_candidates);
108
109    // setup peer_connection event listeners
110    let _res = join!(
111        tokio::spawn(on_ice_candidate(
112            on_ice_candidate_trace,
113            on_ice_candidate_peer_connection,
114            on_ice_candidate_signaling_sink,
115            on_ice_candidate_pending_candidates,
116        )),
117        tokio::spawn(on_peer_connection_state_change(
118            on_peer_state_trace,
119            on_peer_state_peer_connection,
120            connection_signal_sink,
121        )),
122        tokio::spawn(on_signal(
123            on_signal_trace,
124            on_signal_peer_connection,
125            signaling_source,
126            on_signal_signaling_sink,
127            on_signal_pending_candidates,
128            should_reply,
129        )),
130    );
131
132    // TODO: handle errors [@legomushroom]
133
134    trace.info("Done.");
135
136    return Ok(());
137}
138
139async fn listen_until_connected<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
140    trace: Box<dyn Tracer>,
141    should_reply: bool,
142    peer_connection: Arc<RTCPeerConnection>,
143    on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
144    signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
145    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
146) -> anyhow::Result<()> {
147    let (connection_signal_sink, connection_signal_source) = tokio::sync::mpsc::channel::<()>(1);
148
149    tokio::select! {
150        _ = listen_impl(&trace, should_reply, Arc::clone(&peer_connection), Arc::new(connection_signal_sink), signaling_source, signaling_sink) => {
151            trace.trace("listeners complete");
152
153            bail!("Connection listeners stopped before connection is established.");
154        },
155        connection_signal_result = on_connection(connection_signal_source) => {
156            trace.info("connection complete");
157
158            // let on_data_channel_sink = Arc::clone(&on_data_channel_sink);
159            let on_data_channel_trace = child!(trace, "channel");
160            peer_connection
161                .on_data_channel(Box::new(move |data_channel| {
162                    let on_data_channel_sink = Arc::clone(&on_data_channel_sink);
163
164                    let trace = child!(on_data_channel_trace, "");
165                    return Box::pin(async move {
166                        let data_stream = RtcChannel::new(
167                            &trace,
168                            data_channel,
169                        ).await;
170
171                        let _res = on_data_channel_sink
172                            .lock()
173                            .await
174                            .try_send(Box::new(data_stream));
175                    });
176                }))
177                .await;
178
179            trace.info("on_data_channel handler set up");
180
181            return Ok(connection_signal_result?);
182        },
183    };
184}
185
186#[async_trait]
187impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> Disconnected for RtcConnection<TAsyncDuplexStream> {
188    async fn connect(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
189        let trace  =&self.trace;
190        let trace = child!(trace, "connect");
191        let peer_connection = Arc::clone(&self.peer_connection);
192        let signaling_sink = Arc::clone(&self.signaling_sink);
193
194        let (data_channel, _) = try_join!(
195            // initialize the connection
196            initiate_connection(child!(trace, "init"), peer_connection, signaling_sink),
197            // setup peer_connection event listeners
198            listen_until_connected(
199                child!(trace, "events"),
200                    false,
201                    Arc::clone(&self.peer_connection),
202                    Arc::clone(&self.on_data_channel_sink),
203                    Arc::clone(&self.signaling_source),
204                    Arc::clone(&self.signaling_sink),
205            ),
206        )?;
207
208        let inital_channel = Box::new(
209            RtcChannel::new(
210                &self.trace,
211                data_channel,
212            ).await,
213        );
214
215        return Ok(
216            Box::new(
217                RtcConnectionConnected::new(
218                    &self.trace,
219                    Arc::clone(&self.peer_connection),
220                    Some(inital_channel),
221                    self.on_data_channel_source.take().unwrap(),
222                ).unwrap(),
223            ),
224        ); 
225    }
226
227    async fn listen(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
228        let trace = &self.trace;
229        listen_until_connected(
230            child!(trace, "listen"),
231            true,
232            Arc::clone(&self.peer_connection),
233            Arc::clone(&self.on_data_channel_sink),
234            Arc::clone(&self.signaling_source),
235            Arc::clone(&self.signaling_sink),
236        ).await?;
237    
238        return Ok(
239            Box::new(
240                RtcConnectionConnected::new(
241                    &self.trace,
242                    Arc::clone(&self.peer_connection),
243                    None,
244                    self.on_data_channel_source.take().unwrap(),
245                ).unwrap(),
246            ),
247        );
248    }
249}