webrtc-connection 0.2.0

WebRTC connection.
Documentation
use std::sync::Arc;

use anyhow::{Result, bail};
use async_trait::async_trait;
use cs_trace::{Tracer, child};
use tokio_util::codec::Framed;
use tokio::sync::mpsc::Receiver;
use cs_utils::futures::GenericCodec;
use futures::{stream::{SplitSink, SplitStream}, SinkExt};
use tokio::{join, io::{AsyncRead, AsyncWrite}, sync::{Mutex, mpsc::Sender}, try_join};
use webrtc::{peer_connection::RTCPeerConnection, data_channel::RTCDataChannel};
use connection_utils::{Channel, Disconnected, Connected};

use crate::{types::RTCSignalingMessage, rtc_connection::event_handlers::{on_peer_connection_state_change, on_signal, on_ice_candidate}, RtcChannel};

use super::{RtcConnection, connected::RtcConnectionConnected};

async fn on_connection(
    mut connection_signal_source: Receiver<()>,
) -> anyhow::Result<()> {
    while let Some(_) = connection_signal_source.recv().await {
        return Ok(());
    }

    bail!("Failed to receive single connection signal message.");
}

async fn initiate_connection<AsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
    trace: Box<dyn Tracer>,
    peer_connection: Arc<RTCPeerConnection>,
    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<AsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> Result<Arc<RTCDataChannel>> {
    // Create a datachannel with label 'data'
    let data_channel = peer_connection
        .create_data_channel("default_channel", None)
        .await
        .expect("Cannot create data channel.");

    // trace.trace("data channel created");

    // register text message handling
    // let on_message_trace = trace.child("on-channel-msg");
    // data_channel
    //     .on_message(Box::new(move |msg: DataChannelMessage| {
    //         let msg_str = String::from_utf8(msg.data.to_vec()).unwrap();
    //         on_message_trace.info(
    //             &format!("<< {}", msg_str),
    //         );
    //         Box::pin(async {})
    //     }))
    //     .await;

    trace.trace("channel on_message handler set up, creating offer");

    // Create an offer to send to the other process
    let offer = peer_connection
        .create_offer(None)
        .await
        .expect("Cannot create initial offer.");

    trace.trace(
        &format!("offer created: {:?}", &offer),
    );

    // Sets the LocalDescription, and starts our UDP listeners
    // Note: this will start the gathering of ICE candidates
    peer_connection
        .set_local_description(offer.clone())
        .await
        .expect("Cannot set initial offer local description.");

    trace.trace("local description set");

    signaling_sink
        .lock()
        .await
        .send(RTCSignalingMessage::SessionDescription(offer))
        .await
        .expect("Cannot send inital offer.");

    trace.trace("offer sent");

    return Ok(data_channel);
}

async fn listen_impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
    trace: &Box<dyn Tracer>,
    should_reply: bool,
    peer_connection: Arc<RTCPeerConnection>,
    connection_signal_sink: Arc<tokio::sync::mpsc::Sender<()>>,
    signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> anyhow::Result<()> {
    let pending_candidates = Arc::new(Mutex::new(vec![]));

    let on_ice_candidate_trace = child!(trace, "on-ice-candidate");
    let on_ice_candidate_peer_connection = Arc::clone(&peer_connection);
    let on_ice_candidate_signaling_sink = Arc::clone(&signaling_sink);
    let on_ice_candidate_pending_candidates = Arc::clone(&pending_candidates);

    let on_peer_state_trace = child!(trace, "on-peer-state");
    let on_peer_state_peer_connection = Arc::clone(&peer_connection);

    let on_signal_trace = child!(trace, "on-signal");
    let on_signal_peer_connection = Arc::clone(&peer_connection);
    let on_signal_signaling_sink = Arc::clone(&signaling_sink);
    let on_signal_pending_candidates = Arc::clone(&pending_candidates);

    // setup peer_connection event listeners
    let _res = join!(
        tokio::spawn(on_ice_candidate(
            on_ice_candidate_trace,
            on_ice_candidate_peer_connection,
            on_ice_candidate_signaling_sink,
            on_ice_candidate_pending_candidates,
        )),
        tokio::spawn(on_peer_connection_state_change(
            on_peer_state_trace,
            on_peer_state_peer_connection,
            connection_signal_sink,
        )),
        tokio::spawn(on_signal(
            on_signal_trace,
            on_signal_peer_connection,
            signaling_source,
            on_signal_signaling_sink,
            on_signal_pending_candidates,
            should_reply,
        )),
    );

    // TODO: handle errors [@legomushroom]

    trace.info("Done.");

    return Ok(());
}

async fn listen_until_connected<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static>(
    trace: Box<dyn Tracer>,
    should_reply: bool,
    peer_connection: Arc<RTCPeerConnection>,
    on_data_channel_sink: Arc<Mutex<Sender<Box<dyn Channel>>>>,
    signaling_source: Arc<Mutex<SplitStream<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>>>>,
    signaling_sink: Arc<Mutex<SplitSink<Framed<Box<TAsyncDuplexStream>, GenericCodec<RTCSignalingMessage>>, RTCSignalingMessage>>>,
) -> anyhow::Result<()> {
    let (connection_signal_sink, connection_signal_source) = tokio::sync::mpsc::channel::<()>(1);

    tokio::select! {
        _ = listen_impl(&trace, should_reply, Arc::clone(&peer_connection), Arc::new(connection_signal_sink), signaling_source, signaling_sink) => {
            trace.trace("listeners complete");

            bail!("Connection listeners stopped before connection is established.");
        },
        connection_signal_result = on_connection(connection_signal_source) => {
            trace.info("connection complete");

            // let on_data_channel_sink = Arc::clone(&on_data_channel_sink);
            let on_data_channel_trace = child!(trace, "channel");
            peer_connection
                .on_data_channel(Box::new(move |data_channel| {
                    let on_data_channel_sink = Arc::clone(&on_data_channel_sink);

                    let trace = child!(on_data_channel_trace, "");
                    return Box::pin(async move {
                        let data_stream = RtcChannel::new(
                            &trace,
                            data_channel,
                        ).await;

                        let _res = on_data_channel_sink
                            .lock()
                            .await
                            .try_send(Box::new(data_stream));
                    });
                }))
                .await;

            trace.info("on_data_channel handler set up");

            return Ok(connection_signal_result?);
        },
    };
}

#[async_trait]
impl<TAsyncDuplexStream: AsyncRead + AsyncWrite + Unpin + Send + 'static> Disconnected for RtcConnection<TAsyncDuplexStream> {
    async fn connect(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
        let trace  =&self.trace;
        let trace = child!(trace, "connect");
        let peer_connection = Arc::clone(&self.peer_connection);
        let signaling_sink = Arc::clone(&self.signaling_sink);

        let (data_channel, _) = try_join!(
            // initialize the connection
            initiate_connection(child!(trace, "init"), peer_connection, signaling_sink),
            // setup peer_connection event listeners
            listen_until_connected(
                child!(trace, "events"),
                    false,
                    Arc::clone(&self.peer_connection),
                    Arc::clone(&self.on_data_channel_sink),
                    Arc::clone(&self.signaling_source),
                    Arc::clone(&self.signaling_sink),
            ),
        )?;

        let inital_channel = Box::new(
            RtcChannel::new(
                &self.trace,
                data_channel,
            ).await,
        );

        return Ok(
            Box::new(
                RtcConnectionConnected::new(
                    &self.trace,
                    Arc::clone(&self.peer_connection),
                    Some(inital_channel),
                    self.on_data_channel_source.take().unwrap(),
                ).unwrap(),
            ),
        ); 
    }

    async fn listen(mut self: Box<Self>) -> Result<Box<dyn Connected>> {
        let trace = &self.trace;
        listen_until_connected(
            child!(trace, "listen"),
            true,
            Arc::clone(&self.peer_connection),
            Arc::clone(&self.on_data_channel_sink),
            Arc::clone(&self.signaling_source),
            Arc::clone(&self.signaling_sink),
        ).await?;
    
        return Ok(
            Box::new(
                RtcConnectionConnected::new(
                    &self.trace,
                    Arc::clone(&self.peer_connection),
                    None,
                    self.on_data_channel_source.take().unwrap(),
                ).unwrap(),
            ),
        );
    }
}