use std::sync::Arc;
use anyhow::{Result, bail};
use async_trait::async_trait;
use cs_trace::{Tracer, child};
use tokio_util::codec::Framed;
use tokio::sync::mpsc::Receiver;
use cs_utils::futures::GenericCodec;
use futures::{stream::{SplitSink, SplitStream}, SinkExt};
use tokio::{join, io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::Sender}, try_join};
use webrtc::{peer_connection::RTCPeerConnection, data_channel::RTCDataChannel};
use connection_utils::{Channel, Disconnected, Connected};
use crate::{types::RTCSignalingMessage, rtc_connection::event_handlers::{on_peer_connection_state_change, on_signal, on_ice_candidate}, RtcChannel};
use super::{RtcConnection, connected::RtcConnectionConnected};
async fn on_connection(
mut connection_signal_source: Receiver<()>,
) -> anyhow::Result<()> {
while let Some(_) = connection_signal_source.recv().await {
return Ok(());
}
bail!("Failed to receive single connection signal message.");
}
async fn initiate_connection<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
trace: Box<dyn Tracer>,
peer_connection: Arc<RTCPeerConnection>,
signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> Result<Arc<RTCDataChannel>> {
let data_channel = peer_connection
.create_data_channel("default_channel", None)
.await
.expect("Cannot create data channel.");
trace.trace("channel on_message handler set up, creating offer");
let offer = peer_connection
.create_offer(None)
.await
.expect("Cannot create initial offer.");
trace.trace(
&format!("offer created: {:?}", &offer),
);
peer_connection
.set_local_description(offer.clone())
.await
.expect("Cannot set initial offer local description.");
trace.trace("local description set");
signaling_sink
.lock()
.await
.send(RTCSignalingMessage::SessionDescription(offer))
.await
.expect("Cannot send inital offer.");
trace.trace("offer sent");
return Ok(data_channel);
}
async fn listen_impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
trace: &Box<dyn Tracer>,
should_reply: bool,
peer_connection: Arc<RTCPeerConnection>,
connection_signal_sink: Arc<tokio::sync::mpsc::Sender<()>>,
signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> anyhow::Result<()> {
let pending_candidates = Arc::new(Mutex::new(vec![]));
let on_ice_candidate_trace = child!(trace, "on-ice-candidate");
let on_ice_candidate_peer_connection = Arc::clone(&peer_connection);
let on_ice_candidate_signaling_sink = Arc::clone(&signaling_sink);
let on_ice_candidate_pending_candidates = Arc::clone(&pending_candidates);
let on_peer_state_trace = child!(trace, "on-peer-state");
let on_peer_state_peer_connection = Arc::clone(&peer_connection);
let on_signal_trace = child!(trace, "on-signal");
let on_signal_peer_connection = Arc::clone(&peer_connection);
let on_signal_signaling_sink = Arc::clone(&signaling_sink);
let on_signal_pending_candidates = Arc::clone(&pending_candidates);
let _res = join!(
tokio::spawn(on_ice_candidate(
on_ice_candidate_trace,
on_ice_candidate_peer_connection,
on_ice_candidate_signaling_sink,
on_ice_candidate_pending_candidates,
)),
tokio::spawn(on_peer_connection_state_change(
on_peer_state_trace,
on_peer_state_peer_connection,
connection_signal_sink,
)),
tokio::spawn(on_signal(
on_signal_trace,
on_signal_peer_connection,
signaling_source,
on_signal_signaling_sink,
on_signal_pending_candidates,
should_reply,
)),
);
trace.info("Done.");
return Ok(());
}
async fn listen_until_connected<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
trace: Box<dyn Tracer>,
should_reply: bool,
peer_connection: Arc<RTCPeerConnection>,
on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> anyhow::Result<()> {
let (connection_signal_sink, connection_signal_source) = tokio::sync::mpsc::channel::<()>(1);
tokio::select! {
_ = listen_impl(&trace, should_reply, Arc::clone(&peer_connection), Arc::new(connection_signal_sink), signaling_source, signaling_sink) => {
trace.trace("listeners complete");
bail!("Connection listeners stopped before connection is established.");
},
connection_signal_result = on_connection(connection_signal_source) => {
trace.info("connection complete");
let on_data_channel_trace = child!(trace, "channel");
peer_connection
.on_data_channel(Box::new(move |data_channel| {
let on_data_channel_sink = Arc::clone(&on_data_channel_sink);
let trace = child!(on_data_channel_trace, "");
return Box::pin(async move {
let data_stream = RtcChannel::new(
&trace,
data_channel,
).await;
let _res = on_data_channel_sink
.lock()
.await
.try_send(Box::new(data_stream));
});
}))
.await;
trace.info("on_data_channel handler set up");
return Ok(connection_signal_result?);
},
};
}
#[async_trait]
impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> Disconnected for RtcConnection<TAsyncDuplexStream> {
async fn connect(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
let trace =&self.trace;
let trace = child!(trace, "connect");
let peer_connection = Arc::clone(&self.peer_connection);
let signaling_sink = Arc::clone(&self.signaling_sink);
let (data_channel, _) = try_join!(
initiate_connection(child!(trace, "init"), peer_connection, signaling_sink),
listen_until_connected(
child!(trace, "events"),
false,
Arc::clone(&self.peer_connection),
Arc::clone(&self.on_data_channel_sink),
Arc::clone(&self.signaling_source),
Arc::clone(&self.signaling_sink),
),
)?;
let inital_channel = Box::new(
RtcChannel::new(
&self.trace,
data_channel,
).await,
);
return Ok(
Box::new(
RtcConnectionConnected::new(
&self.trace,
Arc::clone(&self.peer_connection),
Some(inital_channel),
self.on_data_channel_source.take().unwrap(),
).unwrap(),
),
);
}
async fn listen(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
let trace = &self.trace;
listen_until_connected(
child!(trace, "listen"),
true,
Arc::clone(&self.peer_connection),
Arc::clone(&self.on_data_channel_sink),
Arc::clone(&self.signaling_source),
Arc::clone(&self.signaling_sink),
).await?;
return Ok(
Box::new(
RtcConnectionConnected::new(
&self.trace,
Arc::clone(&self.peer_connection),
None,
self.on_data_channel_source.take().unwrap(),
).unwrap(),
),
);
}
}