balter-runtime 0.3.1

A load/stress testing framework.
Documentation
use super::{message::Message, GossipError};
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use std::future::Future;
use tokio::io::{AsyncRead, AsyncWrite};

#[allow(dead_code)]
#[trait_variant::make(GossipStream: Send)]
pub trait LocalGossipStream {
    async fn recv_bytes(&mut self) -> Option<Result<Vec<u8>, GossipError>>;
    async fn send_bytes(&mut self, bytes: Vec<u8>) -> Result<(), GossipError>;

    fn try_recv<M: for<'a> Deserialize<'a>>(
        &mut self,
    ) -> impl Future<Output = Option<Result<Message<M>, GossipError>>> {
        async {
            let bytes = self.recv_bytes().await?;

            Some((|| match bytes {
                Ok(bytes) => Ok(Message::from_bytes(&bytes)?),
                Err(err) => Err(err),
            })())
        }
    }

    fn recv<M: for<'a> Deserialize<'a>>(
        &mut self,
    ) -> impl Future<Output = Result<Message<M>, GossipError>> {
        async {
            if let Some(res) = self.try_recv().await {
                res
            } else {
                Err(GossipError::NoData)
            }
        }
    }

    fn send<M: Serialize + Send>(
        &mut self,
        message: Message<M>,
    ) -> impl Future<Output = Result<(), GossipError>> {
        async move {
            let bytes = message.to_bytes()?;
            self.send_bytes(bytes).await
        }
    }
}

impl GossipStream for axum::extract::ws::WebSocket {
    async fn recv_bytes(&mut self) -> Option<Result<Vec<u8>, GossipError>> {
        let message = self.recv().await?;

        use axum::extract::ws::Message as AxumMessage;
        Some(match message {
            Ok(AxumMessage::Binary(bytes)) => Ok(bytes),
            Ok(_) => Err(GossipError::InvalidType),
            Err(err) => Err(GossipError::from(err)),
        })
    }

    async fn send_bytes(&mut self, bytes: Vec<u8>) -> Result<(), GossipError> {
        use axum::extract::ws::Message as AxumMessage;
        self.send(AxumMessage::Binary(bytes)).await?;
        Ok(())
    }
}

impl<T> GossipStream for tokio_tungstenite::WebSocketStream<T>
where
    T: AsyncRead + AsyncWrite + Unpin + Send,
{
    async fn recv_bytes(&mut self) -> Option<Result<Vec<u8>, GossipError>> {
        let message = self.next().await?;
        use tungstenite::protocol::Message as TMessage;
        Some(match message {
            Ok(TMessage::Binary(bytes)) => Ok(bytes),
            Ok(_) => Err(GossipError::InvalidType),
            Err(err) => Err(GossipError::from(err)),
        })
    }

    async fn send_bytes(&mut self, bytes: Vec<u8>) -> Result<(), GossipError> {
        use tungstenite::protocol::Message as TMessage;
        <Self as SinkExt<TMessage>>::send(self, TMessage::Binary(bytes)).await?;
        Ok(())
    }
}