binance-async 0.3.0

Rust Library for the Binance API (Async)
Documentation
pub mod coinm;
mod models;
pub mod spot;
pub mod usdm;

use crate::{
    error::BinanceError::{self, *},
    models::Product,
    Config,
};
use fehler::{throw, throws};
use futures::{stream::Stream, StreamExt};
use log::debug;
pub use models::*;
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::{from_str, value::RawValue};
use std::{
    marker::PhantomData,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
use tungstenite::Message;

type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;

pub trait ParseMessage: Sized {
    const PRODUCT: Product;

    fn parse(stream: &str, data: &str) -> Result<Self, BinanceError>;
}

pub struct BinanceWebsocket<M> {
    stream: WSStream,
    _phantom: PhantomData<M>,
}

impl<M> BinanceWebsocket<M>
where
    M: ParseMessage,
{
    #[throws(BinanceError)]
    pub async fn new<I, S>(topics: I) -> BinanceWebsocket<M>
    where
        I: IntoIterator<Item = S>,
        S: AsRef<str>,
    {
        let config = Config::default();
        Self::with_config(&config, topics).await?
    }

    #[throws(BinanceError)]
    pub async fn with_config<I, S>(config: &Config, topics: I) -> BinanceWebsocket<M>
    where
        I: IntoIterator<Item = S>,
        S: AsRef<str>,
    {
        let mut combined = String::new();
        for topic in topics {
            if !combined.is_empty() {
                combined.push('/');
            }

            combined.push_str(topic.as_ref())
        }

        if combined.is_empty() {
            throw!(EmptyTopics)
        }

        let base = match M::PRODUCT {
            Product::Spot => &config.ws_endpoint,
            Product::UsdMFutures => &config.usdm_futures_ws_endpoint,
            Product::CoinMFutures => &config.coinm_futures_ws_endpoint,
            Product::EuropeanOptions => &config.european_options_ws_endpoint,
        };
        let endpoint = Url::parse(&format!("{}/stream?streams={}", base, combined)).unwrap();
        debug!("ws endpoint: {endpoint:?}");
        let (stream, _) = match connect_async(endpoint).await {
            Ok(v) => v,
            Err(tungstenite::Error::Http(ref http)) => throw!(StartWebsocketError(
                http.status(),
                String::from_utf8_lossy(http.body().as_deref().unwrap_or_default()).to_string()
            )),
            Err(e) => throw!(e),
        };
        Self {
            stream,
            _phantom: PhantomData,
        }
    }
}

#[derive(Deserialize)]
struct MessageWithTopic<'a> {
    stream: String,
    #[serde(borrow)]
    data: &'a RawValue,
}

impl<M> Stream for BinanceWebsocket<M>
where
    M: ParseMessage + Unpin + std::fmt::Debug,
{
    type Item = Result<M, BinanceError>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
        let c = match self.stream.poll_next_unpin(cx) {
            Poll::Ready(Some(Ok(c))) => c,
            Poll::Ready(Some(Err(e))) => return dbg!(Poll::Ready(Some(Err(e.into())))),
            Poll::Pending => return Poll::Pending,
            Poll::Ready(None) => return Poll::Ready(None),
        };
        let msg = match c {
            Message::Text(msg) => msg,
            Message::Binary(_) | Message::Frame(_) | Message::Pong(..) | Message::Ping(..) => {
                return Poll::Pending
            }
            Message::Close(_) => return Poll::Ready(None),
        };

        let t: MessageWithTopic = match from_str(&msg) {
            Ok(v) => v,
            Err(e) => return Poll::Ready(Some(Err(e.into()))),
        };

        Poll::Ready(Some(M::parse(&t.stream, t.data.get())))
    }
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
enum Either<L, R> {
    Left(L),
    Right(R),
}