bitmex 0.2.2

Rust Library for the BitMEX API (Async)
mod command;
mod message;
mod topic;

pub use self::command::Command;
pub use self::message::Message as BitMEXWsMessage;
pub use self::message::{
    Action, CancelAllAfterMessage, ErrorMessage, InfoMessage, Limit, SuccessMessage, TableFilter,
    TableMessage,
};
pub use self::topic::Topic;
use crate::consts::WS_URL;
use crate::BitMEX;
use failure::Fallible;
use futures::sink::Sink;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use log::trace;
use pin_project::pin_project;
use serde_json::{from_str, to_string};
use std::pin::Pin;
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tungstenite::protocol::Message as WSMessage;
use url::Url;

#[allow(dead_code)]
type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

impl BitMEX {
    pub async fn websocket(&self) -> Fallible<BitMEXWebsocket> {
        let (stream, _) = connect_async(Url::parse(&WS_URL).unwrap()).await?;
        Ok(BitMEXWebsocket::new(stream))
    }
}

#[pin_project]
pub struct BitMEXWebsocket {
    #[pin]
    inner: WSStream,
}

impl BitMEXWebsocket {
    fn new(ws: WSStream) -> Self {
        Self { inner: ws }
    }
}

impl Sink<Command> for BitMEXWebsocket {
    type Error = failure::Error;

    fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.inner.poll_ready(cx).map_err(|e| e.into())
    }

    fn start_send(self: Pin<&mut Self>, item: Command) -> Result<(), Self::Error> {
        let this = self.project();
        let command = match &item {
            &Command::Ping => "ping".to_string(),
            command => to_string(command)?,
        };
        trace!("Sending '{}' through websocket", command);
        Ok(this.inner.start_send(WSMessage::Text(command))?)
    }

    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.inner.poll_flush(cx).map_err(|e| e.into())
    }

    fn poll_close(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        let this = self.project();
        this.inner.poll_close(cx).map_err(|e| e.into())
    }
}

impl Stream for BitMEXWebsocket {
    type Item = Fallible<BitMEXWsMessage>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let this = self.project();
        let poll = this.inner.poll_next(cx);
        match poll {
            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
            Poll::Ready(Some(Ok(m))) => Poll::Ready(Some(Ok(parse_message(m)))),
            Poll::Ready(None) => Poll::Ready(None),
            Poll::Pending => Poll::Pending,
        }
    }
}

fn parse_message(msg: WSMessage) -> BitMEXWsMessage {
    match msg {
        WSMessage::Text(message) => match message.as_str() {
            "pong" => BitMEXWsMessage::Pong,
            others => match from_str(others) {
                Ok(r) => r,
                Err(_) => unreachable!("Received message from BitMEX: '{}'", others),
            },
        },
        _ => unreachable!(),
    }
}