hashtree-network 0.2.47

Mesh networking stack for hashtree: routing, signaling, peer links, and stores
Documentation
use anyhow::Result;
use async_trait::async_trait;
use std::sync::Arc;
use std::time::Duration;

use nostr_sdk::nostr::{Event, Filter};

use super::bluetooth_peer::BluetoothPeer;
use super::peer::Peer;
use super::types::{MeshNostrFrame, PeerHTLConfig};
use crate::mesh_session::MeshSession;
use crate::runtime_peer::PeerTransport;

#[derive(Clone)]
pub enum MeshPeer {
    WebRtc(Arc<Peer>),
    Bluetooth(Arc<BluetoothPeer>),
    #[cfg(test)]
    Mock(Arc<TestMeshPeer>),
}

impl MeshPeer {
    pub fn is_ready(&self) -> bool {
        match self {
            Self::WebRtc(peer) => peer.has_data_channel(),
            Self::Bluetooth(peer) => peer.is_connected(),
            #[cfg(test)]
            Self::Mock(peer) => peer.ready,
        }
    }

    pub fn is_connected(&self) -> bool {
        match self {
            Self::WebRtc(peer) => peer.is_connected(),
            Self::Bluetooth(peer) => peer.is_connected(),
            #[cfg(test)]
            Self::Mock(peer) => peer.connected,
        }
    }

    pub fn htl_config(&self) -> PeerHTLConfig {
        match self {
            Self::WebRtc(peer) => *peer.htl_config(),
            Self::Bluetooth(peer) => *peer.htl_config(),
            #[cfg(test)]
            Self::Mock(peer) => peer.htl_config,
        }
    }

    pub fn transport(&self) -> PeerTransport {
        match self {
            Self::WebRtc(_) => PeerTransport::WebRtc,
            Self::Bluetooth(_) => PeerTransport::Bluetooth,
            #[cfg(test)]
            Self::Mock(_) => PeerTransport::WebRtc,
        }
    }

    pub async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>> {
        match self {
            Self::WebRtc(peer) => peer.request_with_timeout(hash_hex, timeout).await,
            Self::Bluetooth(peer) => peer.request_with_timeout(hash_hex, timeout).await,
            #[cfg(test)]
            Self::Mock(peer) => peer.request(hash_hex, timeout).await,
        }
    }

    pub async fn query_nostr_events(
        &self,
        filters: Vec<Filter>,
        timeout: Duration,
    ) -> Result<Vec<Event>> {
        match self {
            Self::WebRtc(peer) => peer.query_nostr_events(filters, timeout).await,
            Self::Bluetooth(peer) => peer.query_nostr_events(filters, timeout).await,
            #[cfg(test)]
            Self::Mock(peer) => peer.query_nostr_events(filters, timeout).await,
        }
    }

    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
        match self {
            Self::WebRtc(peer) => peer.send_mesh_frame_text(frame).await,
            Self::Bluetooth(peer) => peer.send_mesh_frame_text(frame).await,
            #[cfg(test)]
            Self::Mock(peer) => peer.send_mesh_frame_text(frame).await,
        }
    }

    pub async fn close(&self) -> Result<()> {
        match self {
            Self::WebRtc(peer) => peer.close().await,
            Self::Bluetooth(peer) => peer.close().await,
            #[cfg(test)]
            Self::Mock(peer) => peer.close().await,
        }
    }

    pub fn as_webrtc(&self) -> Option<&Arc<Peer>> {
        match self {
            Self::WebRtc(peer) => Some(peer),
            Self::Bluetooth(_) => None,
            #[cfg(test)]
            Self::Mock(_) => None,
        }
    }
}

#[async_trait]
impl MeshSession for MeshPeer {
    fn is_ready(&self) -> bool {
        Self::is_ready(self)
    }

    fn is_connected(&self) -> bool {
        Self::is_connected(self)
    }

    fn htl_config(&self) -> PeerHTLConfig {
        Self::htl_config(self)
    }

    async fn request(&self, hash_hex: &str, timeout: Duration) -> Result<Option<Vec<u8>>> {
        Self::request(self, hash_hex, timeout).await
    }

    async fn query_nostr_events(
        &self,
        filters: Vec<Filter>,
        timeout: Duration,
    ) -> Result<Vec<Event>> {
        Self::query_nostr_events(self, filters, timeout).await
    }

    async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
        Self::send_mesh_frame_text(self, frame).await
    }

    async fn close(&self) -> Result<()> {
        Self::close(self).await
    }

    fn transport_debug_state(&self) -> Option<String> {
        match self {
            Self::WebRtc(peer) => Some(format!("{:?}", peer.state())),
            Self::Bluetooth(_) => None,
            #[cfg(test)]
            Self::Mock(_) => None,
        }
    }
}

#[cfg(test)]
use anyhow::anyhow;

#[cfg(test)]
pub struct TestMeshPeer {
    pub ready: bool,
    pub connected: bool,
    pub htl_config: PeerHTLConfig,
    request_response: tokio::sync::Mutex<Option<Vec<u8>>>,
    response_delay: Duration,
    query_events: tokio::sync::Mutex<Vec<Event>>,
    query_delay: Duration,
    sent_frames: tokio::sync::Mutex<Vec<MeshNostrFrame>>,
    close_delay: Duration,
    closed: std::sync::atomic::AtomicBool,
}

#[cfg(test)]
impl TestMeshPeer {
    pub fn with_response(response: Option<Vec<u8>>) -> Self {
        Self {
            ready: true,
            connected: true,
            htl_config: PeerHTLConfig::from_flags(false, false),
            request_response: tokio::sync::Mutex::new(response),
            response_delay: Duration::ZERO,
            query_events: tokio::sync::Mutex::new(Vec::new()),
            query_delay: Duration::ZERO,
            sent_frames: tokio::sync::Mutex::new(Vec::new()),
            close_delay: Duration::ZERO,
            closed: std::sync::atomic::AtomicBool::new(false),
        }
    }

    pub fn with_delayed_response(response: Option<Vec<u8>>, response_delay: Duration) -> Self {
        Self {
            ready: true,
            connected: true,
            htl_config: PeerHTLConfig::from_flags(false, false),
            request_response: tokio::sync::Mutex::new(response),
            response_delay,
            query_events: tokio::sync::Mutex::new(Vec::new()),
            query_delay: Duration::ZERO,
            sent_frames: tokio::sync::Mutex::new(Vec::new()),
            close_delay: Duration::ZERO,
            closed: std::sync::atomic::AtomicBool::new(false),
        }
    }

    pub fn with_events(events: Vec<Event>) -> Self {
        Self {
            ready: true,
            connected: true,
            htl_config: PeerHTLConfig::from_flags(false, false),
            request_response: tokio::sync::Mutex::new(None),
            response_delay: Duration::ZERO,
            query_events: tokio::sync::Mutex::new(events),
            query_delay: Duration::ZERO,
            sent_frames: tokio::sync::Mutex::new(Vec::new()),
            close_delay: Duration::ZERO,
            closed: std::sync::atomic::AtomicBool::new(false),
        }
    }

    pub fn with_delayed_events(events: Vec<Event>, query_delay: Duration) -> Self {
        Self {
            ready: true,
            connected: true,
            htl_config: PeerHTLConfig::from_flags(false, false),
            request_response: tokio::sync::Mutex::new(None),
            response_delay: Duration::ZERO,
            query_events: tokio::sync::Mutex::new(events),
            query_delay,
            sent_frames: tokio::sync::Mutex::new(Vec::new()),
            close_delay: Duration::ZERO,
            closed: std::sync::atomic::AtomicBool::new(false),
        }
    }

    pub fn with_delayed_close(close_delay: Duration) -> Self {
        Self {
            ready: true,
            connected: false,
            htl_config: PeerHTLConfig::from_flags(false, false),
            request_response: tokio::sync::Mutex::new(None),
            response_delay: Duration::ZERO,
            query_events: tokio::sync::Mutex::new(Vec::new()),
            query_delay: Duration::ZERO,
            sent_frames: tokio::sync::Mutex::new(Vec::new()),
            close_delay,
            closed: std::sync::atomic::AtomicBool::new(false),
        }
    }

    pub async fn request(&self, _hash_hex: &str, _timeout: Duration) -> Result<Option<Vec<u8>>> {
        if !self.response_delay.is_zero() {
            tokio::time::sleep(self.response_delay).await;
        }
        Ok(self.request_response.lock().await.clone())
    }

    pub async fn query_nostr_events(
        &self,
        _filters: Vec<Filter>,
        _timeout: Duration,
    ) -> Result<Vec<Event>> {
        if !self.query_delay.is_zero() {
            tokio::time::sleep(self.query_delay).await;
        }
        Ok(self.query_events.lock().await.clone())
    }

    pub async fn send_mesh_frame_text(&self, frame: &MeshNostrFrame) -> Result<()> {
        self.sent_frames.lock().await.push(frame.clone());
        Ok(())
    }

    pub async fn close(&self) -> Result<()> {
        if !self.close_delay.is_zero() {
            tokio::time::sleep(self.close_delay).await;
        }
        self.closed
            .store(true, std::sync::atomic::Ordering::Relaxed);
        Ok(())
    }

    pub async fn sent_frame_count(&self) -> usize {
        self.sent_frames.lock().await.len()
    }

    pub fn is_closed(&self) -> bool {
        self.closed.load(std::sync::atomic::Ordering::Relaxed)
    }
}

#[cfg(test)]
impl MeshPeer {
    pub fn mock_for_tests(peer: TestMeshPeer) -> Self {
        Self::Mock(Arc::new(peer))
    }

    pub fn mock_ref(&self) -> Result<&Arc<TestMeshPeer>> {
        match self {
            Self::Mock(peer) => Ok(peer),
            _ => Err(anyhow!("mesh peer is not a mock")),
        }
    }
}