datachannel 0.16.0

Rust wrappers for libdatachannel.
Documentation
use std::collections::HashSet;
use std::thread;
use std::time::Duration;

use crossbeam_channel::{self as chan, select};

use datachannel::{
    ConnectionState, DataChannelHandler, DataChannelInfo, GatheringState, IceCandidate,
    PeerConnectionHandler, RtcConfig, RtcDataChannel, RtcPeerConnection, SessionDescription,
};

#[cfg(feature = "log")]
use log as logger;
#[cfg(feature = "tracing")]
use tracing as logger;

enum ConnectionMsg {
    RemoteDescription { sess_desc: SessionDescription },
    RemoteCandidate { cand: IceCandidate },
    Stop,
}

struct Ping {
    output: chan::Sender<String>,
    ready: chan::Sender<()>,
}

impl Ping {
    fn new(output: chan::Sender<String>, ready: chan::Sender<()>) -> Self {
        Ping { output, ready }
    }
}

impl DataChannelHandler for Ping {
    fn on_open(&mut self) {
        logger::info!("DataChannel PING: Open");
        self.ready.send(()).ok();
    }

    fn on_message(&mut self, msg: &[u8]) {
        let msg = String::from_utf8_lossy(msg).to_string();
        logger::info!("DataChannel PING: Received message: {}", &msg);
        self.output.send(msg).ok();
    }
}

#[derive(Clone)]
struct Pong {
    output: chan::Sender<String>,
}

impl Pong {
    fn new(output: chan::Sender<String>) -> Self {
        Pong { output }
    }
}

impl DataChannelHandler for Pong {
    fn on_message(&mut self, msg: &[u8]) {
        let msg = String::from_utf8_lossy(msg).to_string();
        logger::info!("DataChannel PONG: Received message: {}", &msg);
        self.output.send(msg).ok();
    }
}

struct LocalConn {
    id: usize,
    signaling: chan::Sender<ConnectionMsg>,
    pong: Pong,
    dc: Option<Box<RtcDataChannel<Pong>>>,
}

impl LocalConn {
    fn new(id: usize, pong: Pong, signaling: chan::Sender<ConnectionMsg>) -> Self {
        LocalConn {
            id,
            signaling,
            pong,
            dc: None,
        }
    }
}

impl PeerConnectionHandler for LocalConn {
    type DCH = Pong;

    fn data_channel_handler(&mut self, _info: DataChannelInfo) -> Pong {
        self.pong.clone()
    }

    fn on_description(&mut self, sess_desc: SessionDescription) {
        logger::info!("Description {}: {:?}", self.id, &sess_desc);
        self.signaling
            .send(ConnectionMsg::RemoteDescription { sess_desc })
            .ok();
    }

    fn on_candidate(&mut self, cand: IceCandidate) {
        logger::info!("Candidate {}: {} {}", self.id, &cand.candidate, &cand.mid);
        self.signaling
            .send(ConnectionMsg::RemoteCandidate { cand })
            .ok();
    }

    fn on_connection_state_change(&mut self, state: ConnectionState) {
        logger::info!("State {}: {:?}", self.id, state);
    }

    fn on_gathering_state_change(&mut self, state: GatheringState) {
        logger::info!("Gathering state {}: {:?}", self.id, state);
    }

    fn on_data_channel(&mut self, mut dc: Box<RtcDataChannel<Pong>>) {
        logger::info!(
            "PeerConnection {}: Received DataChannel with label={}, protocol={:?}, reliability={:?}",
            self.id,
            dc.label(),
            dc.protocol(),
            dc.reliability()
        );
        dc.send(format!("PONG from {}", self.id).as_bytes()).ok();
        self.dc.replace(dc);
    }
}

#[test]
fn test_connectivity() {
    #[cfg(feature = "tracing")]
    {
        tracing::subscriber::set_global_default(
            tracing_subscriber::FmtSubscriber::builder()
                .with_max_level(tracing::Level::INFO)
                .finish(),
        )
        .ok();

        datachannel::configure_logging(tracing::Level::INFO);
    }
    #[cfg(feature = "log")]
    {
        std::env::set_var("RUST_LOG", "info");
        let _ = env_logger::try_init();
    }

    let (tx_res, rx_res) = chan::unbounded::<String>();
    let (tx_peer1, rx_peer1) = chan::unbounded::<ConnectionMsg>();
    let (tx_peer2, rx_peer2) = chan::unbounded::<ConnectionMsg>();

    let id1 = 1;
    let id2 = 2;

    let pong1 = Pong::new(tx_res.clone());
    let pong2 = Pong::new(tx_res.clone());

    let conn1 = LocalConn::new(id1, pong1, tx_peer2.clone());
    let conn2 = LocalConn::new(id2, pong2, tx_peer1.clone());

    let ice_servers = vec!["stun:stun.l.google.com:19302"];
    let conf = RtcConfig::new(&ice_servers);

    let mut pc1 = RtcPeerConnection::new(&conf, conn1).unwrap();
    let mut pc2 = RtcPeerConnection::new(&conf, conn2).unwrap();

    let t2 = thread::spawn(move || {
        while let Ok(msg) = rx_peer2.recv() {
            match msg {
                ConnectionMsg::RemoteDescription { sess_desc } => {
                    pc2.set_remote_description(&sess_desc).ok();
                }
                ConnectionMsg::RemoteCandidate { cand } => {
                    pc2.add_remote_candidate(&cand).ok();
                }
                ConnectionMsg::Stop => break,
            }
        }
    });

    let t1 = thread::spawn(move || {
        let (tx_ready, rx_ready) = chan::unbounded();
        let ping = Ping::new(tx_res.clone(), tx_ready);
        let mut dc = pc1.create_data_channel("ping-pong", ping).unwrap();

        loop {
            select! {
                recv(rx_peer1) -> msg => {
                    match msg.unwrap() {
                        ConnectionMsg::RemoteDescription { sess_desc } => {
                            pc1.set_remote_description(&sess_desc).ok();
                        }
                        ConnectionMsg::RemoteCandidate { cand } => {
                            pc1.add_remote_candidate(&cand).ok();
                        },
                        ConnectionMsg::Stop => break,
                    }
                },
                recv(rx_ready) -> _ => {
                    dc.send(format!("PING from {}", id1).as_bytes()).ok();
                }
            }
        }
    });

    let mut expected = HashSet::new();
    expected.insert("PING from 1".to_string());
    expected.insert("PONG from 2".to_string());

    let mut res = HashSet::new();
    res.insert(rx_res.recv_timeout(Duration::from_secs(10)).unwrap());
    res.insert(rx_res.recv_timeout(Duration::from_secs(10)).unwrap());

    assert_eq!(expected, res);

    tx_peer1.send(ConnectionMsg::Stop).unwrap();
    tx_peer2.send(ConnectionMsg::Stop).unwrap();

    t2.join().unwrap();
    t1.join().unwrap();
}