tx5 0.8.1

The main holochain tx5 webrtc networking crate
Documentation
use crate::*;

/// Endpoint event.
pub enum EndpointEvent {
    /// We have a new listening address at which we can be contacted.
    ListeningAddressOpen {
        /// An address at which we can be reached.
        local_url: PeerUrl,
    },

    /// A listening address at which we were reachable has been closed.
    ListeningAddressClosed {
        /// The address at which we can no longer be reached.
        local_url: PeerUrl,
    },

    /// Connection established.
    Connected {
        /// Url of the remote peer.
        peer_url: PeerUrl,
    },

    /// Connection closed.
    Disconnected {
        /// Url of the remote peer.
        peer_url: PeerUrl,
    },

    /// Receiving an incoming message from a remote peer.
    Message {
        /// Url of the remote peer.
        peer_url: PeerUrl,

        /// Message sent by the remote peer.
        message: Vec<u8>,
    },
}

impl std::fmt::Debug for EndpointEvent {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::ListeningAddressOpen { local_url } => f
                .debug_struct("ListeningAddressOpen")
                .field("peer_url", local_url)
                .finish(),
            Self::ListeningAddressClosed { local_url } => f
                .debug_struct("ListeningAddressClosed")
                .field("peer_url", local_url)
                .finish(),
            Self::Connected { peer_url } => f
                .debug_struct("Connected")
                .field("peer_url", peer_url)
                .finish(),
            Self::Disconnected { peer_url } => f
                .debug_struct("Disconnected")
                .field("peer_url", peer_url)
                .finish(),
            Self::Message { peer_url, message } => {
                let byte_count = message.len();
                f.debug_struct("Message")
                    .field("peer_url", peer_url)
                    .field("byte_count", &byte_count)
                    .finish()
            }
        }
    }
}

/// Receiver for endpoint events.
pub struct EndpointRecv(tokio::sync::mpsc::Receiver<EndpointEvent>);

impl EndpointRecv {
    /// Receive an endpoint event.
    pub async fn recv(&mut self) -> Option<EndpointEvent> {
        self.0.recv().await
    }
}

pub(crate) struct EpInner {
    this: Weak<Mutex<EpInner>>,
    config: Arc<Config>,
    recv_limit: Arc<tokio::sync::Semaphore>,
    evt_send: tokio::sync::mpsc::Sender<EndpointEvent>,
    sig_map: HashMap<SigUrl, Arc<Sig>>,
    peer_map: HashMap<PeerUrl, Arc<Peer>>,
}

impl EpInner {
    /// In case a signal connection is closed, clean up our references to it.
    pub fn drop_sig(&mut self, sig: Arc<Sig>) {
        let sig_url = sig.sig_url.clone();
        let listener = sig.listener;

        let should_remove = match self.sig_map.get(&sig_url) {
            Some(s) => Arc::ptr_eq(s, &sig),
            None => false,
        };

        if should_remove {
            self.sig_map.remove(&sig_url);

            if listener {
                self.assert_sig(sig_url, listener, None);
            }
        }
    }

    /// Get an existing signal connection or create a new one.
    pub fn assert_sig(
        &mut self,
        sig_url: SigUrl,
        listener: bool,
        resp_url: Option<tokio::sync::oneshot::Sender<PeerUrl>>,
    ) -> Arc<Sig> {
        self.sig_map
            .entry(sig_url.clone())
            .or_insert_with(|| {
                Sig::new(
                    self.this.clone(),
                    self.config.clone(),
                    sig_url,
                    listener,
                    self.evt_send.clone(),
                    resp_url,
                )
            })
            .clone()
    }

    /// In case a peer connection is closed, clean up our references to it.
    pub fn drop_peer_url(&mut self, peer_url: &PeerUrl) {
        self.peer_map.remove(peer_url);
    }

    /// Get an existing peer connection if already established.
    pub fn peer_if_ready(&self, peer_url: &PeerUrl) -> Option<DynBackCon> {
        self.peer_map.get(peer_url).and_then(|p| p.get_if_ready())
    }

    /// Get an existing peer connection or create a new outgoing one.
    pub fn connect_peer(&mut self, peer_url: PeerUrl) -> Arc<Peer> {
        self.peer_map
            .entry(peer_url.clone())
            .or_insert_with(|| {
                Peer::new_connect(
                    self.config.clone(),
                    self.recv_limit.clone(),
                    self.this.clone(),
                    peer_url,
                    self.evt_send.clone(),
                )
            })
            .clone()
    }

    /// Get an existing peer connection or accept an incoming one.
    pub fn accept_peer(&mut self, peer_url: PeerUrl, wc: DynBackWaitCon) {
        self.peer_map.entry(peer_url.clone()).or_insert_with(|| {
            Peer::new_accept(
                self.config.clone(),
                self.recv_limit.clone(),
                self.this.clone(),
                peer_url,
                wc,
                self.evt_send.clone(),
            )
        });
    }

    /// Get all open peer connections.
    pub fn clone_peers(&mut self) -> Vec<Arc<Peer>> {
        self.peer_map.values().cloned().collect()
    }
}

/// Tx5 endpoint.
pub struct Endpoint {
    config: Arc<Config>,
    inner: Arc<Mutex<EpInner>>,
}

impl std::fmt::Debug for Endpoint {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Endpoint")
            .field("config", &self.config)
            .finish()
    }
}

impl Endpoint {
    /// Construct a new tx5 endpoint.
    pub fn new(config: Arc<Config>) -> (Self, EndpointRecv) {
        #[cfg(not(feature = "test-utils"))]
        if config.danger_force_signal_relay && config.danger_deny_signal_relay {
            tracing::warn!("Both `danger_force_signal_relay` and `danger_deny_signal_relay` are set to true. This is likely a misconfiguration, as these options are mutually exclusive.");
        }

        let recv_limit = Arc::new(tokio::sync::Semaphore::new(
            config.incoming_message_bytes_max as usize,
        ));

        let (evt_send, evt_recv) = tokio::sync::mpsc::channel(32);

        (
            Self {
                config: config.clone(),
                inner: Arc::new_cyclic(|this| {
                    Mutex::new(EpInner {
                        this: this.clone(),
                        config,
                        recv_limit,
                        evt_send,
                        sig_map: HashMap::default(),
                        peer_map: HashMap::default(),
                    })
                }),
            },
            EndpointRecv(evt_recv),
        )
    }

    /// Connect to a signal server as a listener, allowing incoming connections.
    /// You probably only want to call this once.
    pub async fn listen(&self, sig_url: SigUrl) -> Option<PeerUrl> {
        let (s, r) = tokio::sync::oneshot::channel();
        let _ = self
            .inner
            .lock()
            .unwrap()
            .assert_sig(sig_url, true, Some(s));
        r.await.ok()
    }

    /// Request that the peer connection identified by the given `peer_url`
    /// is closed.
    pub fn close(&self, peer_url: &PeerUrl) {
        self.inner.lock().unwrap().drop_peer_url(peer_url);
    }

    /// Send data to a remote on this tx5 endpoint.
    ///
    /// The future returned from this method will resolve when the data is handed off to the
    /// networking backend or the connection attempt to the peer has failed.
    pub async fn send(&self, peer_url: PeerUrl, data: Vec<u8>) -> Result<()> {
        let listening_addresses = self.get_listening_addresses();
        if listening_addresses.contains(&peer_url) {
            return Err(Error::other(
                "Endpoint trying to connect/send a message to itself",
            ));
        }
        tokio::time::timeout(self.config.timeout, async {
            let peer = self.inner.lock().unwrap().connect_peer(peer_url.clone());
            if let Some(conn) = peer.wait_for_ready().await {
                conn.send(data).await
            } else {
                // Check to see if a new incoming connection was established
                // in the mean time. If so, we can send over that.
                let maybe_peer = self.inner.lock().unwrap().peer_if_ready(
                    &peer_url
                );
                if let Some(conn) = maybe_peer {
                    conn.send(data).await
                } else {
                    tracing::debug!(?peer_url, "Dropping outgoing message to peer because the connection failed to establish");
                    Err(Error::other("Peer connection failed"))
                }
            }
        })
        .await?
    }

    /// Broadcast data to all connections that happen to be open.
    ///
    /// If no connections are open, no data will be broadcast.
    /// The future returned from this method will resolve when all
    /// broadcast messages have been handed off to our networking backend
    /// (or have timed out).
    pub async fn broadcast(&self, data: &[u8]) {
        let peers = self.inner.lock().unwrap().clone_peers();

        let timeout = self.config.timeout;

        let all = peers
            .into_iter()
            .map(|peer| {
                let data = data.to_vec();
                tokio::task::spawn(async move {
                    let _ = tokio::time::timeout(timeout, async {
                        if let Some(conn) = peer.wait_for_ready().await {
                            let _ = conn.send(data).await;
                        }
                    })
                    .await;
                })
            })
            .collect::<Vec<_>>();

        for task in all {
            let _ = task.await;
        }
    }

    /// Get a list of listening addresses (PeerUrls) at which this endpoint
    /// is currently reachable.
    pub fn get_listening_addresses(&self) -> Vec<PeerUrl> {
        let all_sigs = self
            .inner
            .lock()
            .unwrap()
            .sig_map
            .values()
            .cloned()
            .collect::<Vec<_>>();

        all_sigs
            .into_iter()
            .filter_map(|sig| {
                if !sig.listener {
                    return None;
                }
                sig.get_peer_url()
            })
            .collect()
    }

    /// Get the list of peers (PeerUrls) that this endpoint is currently connected to.
    pub fn get_connected_peer_addresses(&self) -> Vec<PeerUrl> {
        self.inner
            .lock()
            .unwrap()
            .peer_map
            .keys()
            .cloned()
            .collect::<Vec<_>>()
    }

    /// Get stats.
    pub fn get_stats(&self) -> stats::Stats {
        let backend = match self.config.backend_module {
            #[cfg(feature = "backend-libdatachannel")]
            BackendModule::LibDataChannel => {
                stats::StatsBackend::BackendLibDataChannel
            }
            #[cfg(feature = "backend-go-pion")]
            BackendModule::GoPion => stats::StatsBackend::BackendGoPion,
            BackendModule::Mem => stats::StatsBackend::BackendMem,
        };

        let connection_list = self
            .inner
            .lock()
            .unwrap()
            .peer_map
            .values()
            .map(|peer| {
                let stats = peer.get_stats();
                stats::StatsConnection {
                    pub_key: *peer.pub_key.0,
                    send_message_count: stats.send_msg_count,
                    send_bytes: stats.send_byte_count,
                    recv_message_count: stats.recv_msg_count,
                    recv_bytes: stats.recv_byte_count,
                    opened_at_s: peer.opened_at_s,
                    is_webrtc: peer.is_using_webrtc(),
                }
            })
            .collect();

        stats::Stats {
            backend,
            peer_url_list: self.get_listening_addresses(),
            connection_list,
        }
    }
}