collab-server 0.0.7

Nomad's collab server
Documentation
//! Symbols available in tests.

use common::tests::Global;
use common::{ClientMessage, PeerId, ServerMessage, SessionId};
use tokio::sync::{mpsc, oneshot};

use crate::*;

/// TODO: docs
pub struct TestSender {
    sender: mpsc::Sender<SessionMessage>,
}

impl TestSender {
    pub(crate) fn new(sender: mpsc::Sender<SessionMessage>) -> Self {
        Self { sender }
    }

    /// TODO: docs
    pub async fn sessions(&self) -> anyhow::Result<Vec<SessionSnapshot>> {
        let (sender, receiver) = oneshot::channel();

        self.sender
            .send(SessionMessage::Sessions(sender))
            .await
            .map_err(|_| anyhow::anyhow!("failed to send message"))?;

        let sessions = receiver
            .await?
            .active()
            .iter()
            .map(|(id, session)| SessionSnapshot {
                id: *id,
                peers: session.peers().iter().copied().collect(),
            })
            .collect::<Vec<_>>();

        Ok(sessions)
    }

    /// TODO: docs
    pub async fn start(&self) -> anyhow::Result<()> {
        let (sender, receiver) = oneshot::channel();

        self.sender
            .send(SessionMessage::Ping(sender))
            .await
            .map_err(|_| anyhow::anyhow!("failed to send message"))?;

        receiver.await?;

        Ok(())
    }
}

/// TODO: docs
pub struct SessionSnapshot {
    id: SessionId,
    peers: Vec<PeerId>,
}

impl SessionSnapshot {
    /// TODO: docs
    pub fn id(&self) -> SessionId {
        self.id
    }

    /// TODO: docs
    pub fn peers(&self) -> &[PeerId] {
        &self.peers
    }
}

type AfterReceiveMsg = Box<dyn Fn(&ClientMessage) + Send + Sync + 'static>;

type BeforeSendMsg = Box<dyn Fn(&ServerMessage) + Send + Sync + 'static>;

type WithMsgBytes = Box<dyn Fn(&[u8]) + Send + Sync + 'static>;

static AFTER_RECEIVE: Global<WithMsgBytes> = Global::new();

static AFTER_RECEIVE_MSG: Global<AfterReceiveMsg> = Global::new();

static BEFORE_SEND: Global<WithMsgBytes> = Global::new();

static BEFORE_SEND_MSG: Global<BeforeSendMsg> = Global::new();

/// Returns the callback to be called after receiving a message from a peer.
pub(crate) fn after_receive_bytes(buf: &[u8]) {
    AFTER_RECEIVE.with(|cb| {
        if let Some(cb) = cb {
            (cb)(buf);
        }
    });
}

/// Returns the callback to be called before sending a message to the server.
pub(crate) fn after_receive_msg(msg: &ClientMessage) {
    AFTER_RECEIVE_MSG.with(|cb| {
        if let Some(cb) = cb {
            (cb)(msg);
        }
    });
}

/// Returns the callback to be called before sending a message to the server.
pub(crate) fn before_send_bytes(buf: &[u8]) {
    BEFORE_SEND.with(|cb| {
        if let Some(cb) = cb {
            (cb)(buf);
        }
    });
}

/// Returns the callback to be called before sending a message to the server.
pub(crate) fn before_send_msg(msg: &ServerMessage) {
    BEFORE_SEND_MSG.with(|cb| {
        if let Some(cb) = cb {
            (cb)(msg);
        }
    });
}

/// Configures a callback to be called everytime a message is received from a
/// peer.
///
/// The callback takes the byte slice containing the yet-to-be-deserialized
/// message as its only parameter.
///
/// # Panics
///
/// This function panics if called more than once.
pub fn set_after_receive<Cb>(callback: Cb)
where
    Cb: Fn(&[u8]) + Send + Sync + 'static,
{
    AFTER_RECEIVE.set(Box::new(callback));
}

/// TODO: docs
pub fn set_after_receive_msg<Cb>(callback: Cb)
where
    Cb: Fn(&ClientMessage) + Send + Sync + 'static,
{
    AFTER_RECEIVE_MSG.set(Box::new(callback));
}

/// Configures a callback to be called everytime a message is sent to a peer.
///
/// The callback takes the byte slice containing the serialized message as
/// its only parameter.
///
/// # Panics
///
/// This function panics if called more than once.
pub fn set_before_send<Cb>(callback: Cb)
where
    Cb: Fn(&[u8]) + Send + Sync + 'static,
{
    BEFORE_SEND.set(Box::new(callback));
}

/// TODO: docs
pub fn set_before_send_msg<Cb>(callback: Cb)
where
    Cb: Fn(&ServerMessage) + Send + Sync + 'static,
{
    BEFORE_SEND_MSG.set(Box::new(callback));
}