tx5 0.0.10-alpha

The main holochain tx5 webrtc networking crate
Documentation
use crate::*;

use tx5_connection::*;

enum MaybeReady {
    Ready(Arc<FramedConn>),
    Wait(Arc<tokio::sync::Semaphore>),
}

pub(crate) struct Peer {
    ready: Arc<Mutex<MaybeReady>>,
    task: tokio::task::JoinHandle<()>,
    pub(crate) pub_key: PubKey,
    pub(crate) opened_at_s: u64,
}

fn timestamp() -> u64 {
    std::time::SystemTime::UNIX_EPOCH
        .elapsed()
        .expect("failed to get time")
        .as_secs()
}

impl Drop for Peer {
    fn drop(&mut self) {
        self.task.abort();
    }
}

impl Peer {
    pub fn new_connect(
        config: Arc<Config>,
        recv_limit: Arc<tokio::sync::Semaphore>,
        ep: Weak<Mutex<EpInner>>,
        peer_url: PeerUrl,
        evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
    ) -> Arc<Self> {
        Arc::new_cyclic(|_this| {
            let wait = Arc::new(tokio::sync::Semaphore::new(0));
            let ready = Arc::new(Mutex::new(MaybeReady::Wait(wait)));
            let pub_key = peer_url.pub_key().clone();

            let task = tokio::task::spawn(connect(
                config,
                recv_limit,
                ep,
                peer_url,
                evt_send,
                ready.clone(),
            ));

            let opened_at_s = timestamp();

            Self {
                ready,
                task,
                pub_key,
                opened_at_s,
            }
        })
    }

    pub fn new_accept(
        config: Arc<Config>,
        recv_limit: Arc<tokio::sync::Semaphore>,
        ep: Weak<Mutex<EpInner>>,
        peer_url: PeerUrl,
        conn: Arc<Conn>,
        conn_recv: ConnRecv,
        evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
    ) -> Arc<Self> {
        Arc::new_cyclic(|_this| {
            let wait = Arc::new(tokio::sync::Semaphore::new(0));
            let ready = Arc::new(Mutex::new(MaybeReady::Wait(wait)));
            let pub_key = peer_url.pub_key().clone();

            let task = tokio::task::spawn(task(
                config,
                recv_limit,
                ep,
                Some((conn, conn_recv)),
                peer_url,
                evt_send,
                ready.clone(),
            ));

            let opened_at_s = timestamp();

            Self {
                ready,
                task,
                pub_key,
                opened_at_s,
            }
        })
    }

    pub fn is_using_webrtc(&self) -> bool {
        if let MaybeReady::Ready(r) = &*self.ready.lock().unwrap() {
            r.is_using_webrtc()
        } else {
            false
        }
    }

    pub fn get_stats(&self) -> ConnStats {
        if let MaybeReady::Ready(r) = &*self.ready.lock().unwrap() {
            r.get_stats()
        } else {
            ConnStats::default()
        }
    }

    pub async fn ready(&self) {
        let w = match &*self.ready.lock().unwrap() {
            MaybeReady::Ready(_) => return,
            MaybeReady::Wait(w) => w.clone(),
        };

        let _ = w.acquire().await;
    }

    pub async fn send(&self, msg: Vec<u8>) -> Result<()> {
        let conn = match &*self.ready.lock().unwrap() {
            MaybeReady::Ready(c) => c.clone(),
            _ => return Err(Error::other("not ready")),
        };
        conn.send(msg).await
    }
}

async fn connect(
    config: Arc<Config>,
    recv_limit: Arc<tokio::sync::Semaphore>,
    ep: Weak<Mutex<EpInner>>,
    peer_url: PeerUrl,
    evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
    ready: Arc<Mutex<MaybeReady>>,
) {
    tracing::trace!(?peer_url, "peer try connect");

    let conn = if let Some(ep) = ep.upgrade() {
        let connect_fut = async {
            let sig =
                ep.lock()
                    .unwrap()
                    .assert_sig(peer_url.to_sig(), false, None);
            sig.ready().await;
            sig.connect(peer_url.pub_key().clone()).await
        };

        match tokio::time::timeout(config.timeout, connect_fut)
            .await
            .map_err(Error::other)
        {
            Ok(Ok(conn)) => Some(conn),
            Err(err) | Ok(Err(err)) => {
                tracing::debug!(?err, "peer connect error");
                None
            }
        }
    } else {
        None
    };

    task(config, recv_limit, ep, conn, peer_url, evt_send, ready).await;
}

struct DropPeer {
    ep: Weak<Mutex<EpInner>>,
    peer_url: PeerUrl,
    evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
}

impl Drop for DropPeer {
    fn drop(&mut self) {
        tracing::debug!(?self.peer_url, "peer closed");

        if let Some(ep_inner) = self.ep.upgrade() {
            ep_inner.lock().unwrap().drop_peer_url(&self.peer_url);
        }

        let evt_send = self.evt_send.clone();
        let peer_url = self.peer_url.clone();
        tokio::task::spawn(async move {
            let _ = evt_send
                .send(EndpointEvent::Disconnected { peer_url })
                .await;
        });
    }
}

#[allow(clippy::too_many_arguments)]
async fn task(
    config: Arc<Config>,
    recv_limit: Arc<tokio::sync::Semaphore>,
    ep: Weak<Mutex<EpInner>>,
    conn: Option<(Arc<Conn>, ConnRecv)>,
    peer_url: PeerUrl,
    evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
    ready: Arc<Mutex<MaybeReady>>,
) {
    let _drop = DropPeer {
        ep,
        peer_url: peer_url.clone(),
        evt_send: evt_send.clone(),
    };

    let (conn, conn_recv) = match conn {
        None => return,
        Some(conn) => conn,
    };

    conn.ready().await;

    let (conn, mut conn_recv) =
        match FramedConn::new(conn, conn_recv, recv_limit).await {
            Ok(conn) => conn,
            Err(_) => return,
        };

    if let Some((pf_send, pf_check)) = &config.preflight {
        let pf_data = match pf_send(&peer_url).await {
            Ok(pf_data) => pf_data,
            Err(err) => {
                tracing::debug!(?err, "preflight get send error");
                return;
            }
        };

        if let Err(err) = conn.send(pf_data).await {
            tracing::debug!(?err, "preflight send error");
            return;
        }

        let pf_data = match conn_recv.recv().await {
            Some(pf_data) => pf_data,
            None => {
                tracing::debug!("closed awaiting preflight data");
                return;
            }
        };

        if let Err(err) = pf_check(&peer_url, pf_data).await {
            tracing::debug!(?err, "preflight check error");
            return;
        }
    }

    {
        let mut lock = ready.lock().unwrap();
        if let MaybeReady::Wait(w) = &*lock {
            w.close();
        }
        *lock = MaybeReady::Ready(Arc::new(conn));
    }

    drop(ready);

    let _ = evt_send
        .send(EndpointEvent::Connected {
            peer_url: peer_url.clone(),
        })
        .await;

    tracing::info!(?peer_url, "peer connected");

    while let Some(msg) = conn_recv.recv().await {
        let _ = evt_send
            .send(EndpointEvent::Message {
                peer_url: peer_url.clone(),
                message: msg,
            })
            .await;
    }

    let _ = evt_send
        .send(EndpointEvent::Disconnected {
            peer_url: peer_url.clone(),
        })
        .await;
}