mexc-rs 0.3.0

Rust client for the mexc.com exchange
Documentation
use std::sync::Arc;
use rust_decimal::Decimal;
use chrono::{DateTime, Utc};
use futures::{StreamExt};
use futures::stream::{BoxStream, SplitSink};
use tokio::net::TcpStream;
use tokio::sync::{Mutex, MutexGuard};
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use tokio_tungstenite::tungstenite::Message;
use crate::spot::ws::spot_deals::{channel_message_to_spot_deals_message, SpotDealsMessage};

pub mod subscription;
pub mod spot_deals;

pub enum MexcSportWsEndpoint {
    Base,
    Custom(String),
}

impl AsRef<str> for MexcSportWsEndpoint {
    fn as_ref(&self) -> &str {
        match self {
            MexcSportWsEndpoint::Base => "wss://wbs.mexc.com/ws",
            MexcSportWsEndpoint::Custom(endpoint) => endpoint,
        }
    }
}

struct MexcSpotWsClientInner {
    websocket_sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
}

pub struct MexcSpotWsClient {
    endpoint: MexcSportWsEndpoint,
    ws_message_tx: async_broadcast::Sender<Arc<MexcSpotWsMessage>>,
    ws_message_rx: async_broadcast::Receiver<Arc<MexcSpotWsMessage>>,
    ws_raw_message_tx: async_broadcast::Sender<Arc<RawMexcSpotWsMessage>>,
    ws_raw_message_rx: async_broadcast::Receiver<Arc<RawMexcSpotWsMessage>>,
    inner: Arc<Mutex<MexcSpotWsClientInner>>,
}

impl MexcSpotWsClient {
    pub fn new(endpoint: MexcSportWsEndpoint) -> Self {
        let (ws_message_tx, ws_message_rx) = async_broadcast::broadcast(1000000);
        let (ws_raw_message_tx, ws_raw_message_rx) = async_broadcast::broadcast(1000000);
        let inner = Arc::new(Mutex::new(MexcSpotWsClientInner {
            websocket_sink: None,
        }));

        Self { endpoint, ws_message_tx, ws_message_rx, ws_raw_message_tx, ws_raw_message_rx, inner }
    }

    async fn acquire_websocket(&self) -> Result<AcquireWebsocketOutput, AcquireWebsocketError> {
        let mut inner = self.inner.lock().await;
        if inner.websocket_sink.is_some() {
            return Ok(AcquireWebsocketOutput { inner_guard: inner });
        }

        let (ws, _) = tokio_tungstenite::connect_async(self.endpoint.as_ref()).await?;
        let (ws_tx, mut ws_rx) = ws.split();
        inner.websocket_sink = Some(ws_tx);

        tokio::spawn({
            let ws_message_tx = self.ws_message_tx.clone();
            let ws_raw_message_tx = self.ws_raw_message_tx.clone();
            async move {
                while let Some(message_result) = ws_rx.next().await {
                    let message = match message_result {
                        Ok(message) => message,
                        Err(err) => {
                            tracing::error!("Failed to receive message: {:?}", err);
                            break;
                        }
                    };
                    let text = match message {
                        Message::Text(text) => text,
                        _ => {
                            tracing::debug!("Received non-text ws message: {:?}", message);
                            continue;
                        }
                    };
                    let raw_mexc_ws_message = match serde_json::from_str::<RawMexcSpotWsMessage>(&text) {
                        Ok(raw_mexc_ws_message) => Arc::new(raw_mexc_ws_message),
                        Err(err) => {
                            tracing::error!("Failed to parse mexc ws message: {:?}", err);
                            break;
                        }
                    };
                    let mexc_message_opt = match raw_mexc_ws_message.as_ref() {
                        RawMexcSpotWsMessage::ChannelMessage(channel_message) => {
                            let channel = &channel_message.channel;
                            if channel.starts_with("spot@public.deals.v3.api@") {
                                let result = channel_message_to_spot_deals_message(channel_message);
                                match result {
                                    Ok(spot_deals_message) => Some(Arc::new(MexcSpotWsMessage::SpotDeals(spot_deals_message))),
                                    Err(err) => {
                                        tracing::error!("Failed to convert channel message to spot deals message: {:?}", err);
                                        break;
                                    }
                                }
                            } else {
                                None
                            }
                        }
                        _ => {
                            tracing::debug!("Received raw mexc ws message that won't be used for direct consumption by api consumer");
                            None
                        }
                    };

                    match ws_raw_message_tx.broadcast(raw_mexc_ws_message).await {
                        Ok(_) => {}
                        Err(err) => {
                            tracing::error!("Failed to broadcast raw message: {:?}", err);
                            break;
                        }
                    };

                    if let Some(mexc_message) = mexc_message_opt {
                        match ws_message_tx.broadcast(mexc_message).await {
                            Ok(_) => {}
                            Err(err) => {
                                tracing::error!("Failed to broadcast message: {:?}", err);
                                break;
                            }
                        }
                    }
                }
            }
        });

        Ok(AcquireWebsocketOutput { inner_guard: inner })
    }

    pub fn stream(&self) -> BoxStream<Arc<MexcSpotWsMessage>> {
        let mut mr = self.ws_message_rx.clone();
        let stream = async_stream::stream! {
            while let Ok(message) = mr.recv().await {
                yield message;
            }
        };
        stream.boxed()
    }

    fn stream_raw(&self) -> BoxStream<Arc<RawMexcSpotWsMessage>> {
        let mut mr = self.ws_raw_message_rx.clone();
        let stream = async_stream::stream! {
            while let Ok(message) = mr.recv().await {
                tracing::debug!("Received raw mexc ws message: {:?}", &message);
                yield message;
            }
        };
        stream.boxed()
    }
}

pub struct AcquireWebsocketOutput<'a> {
    inner_guard: MutexGuard<'a, MexcSpotWsClientInner>,
}

impl<'a> AcquireWebsocketOutput<'a> {
    pub fn websocket_sink(&mut self) -> &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message> {
        self.inner_guard.websocket_sink.as_mut().expect("Websocket should be set here")
    }
}

#[derive(Debug, thiserror::Error)]
pub enum AcquireWebsocketError {
    #[error("Failed to connect to websocket: {0}")]
    TungesteniteError(#[from] tokio_tungstenite::tungstenite::Error),
}

impl Default for MexcSpotWsClient {
    fn default() -> Self {
        Self::new(MexcSportWsEndpoint::Base)
    }
}

#[derive(Debug, serde::Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ClientMessagePayload<'a, T> {
    pub method: &'a str,
    pub params: T,
}

#[derive(Debug)]
pub enum MexcSpotWsMessage {
    SpotDeals(SpotDealsMessage),
}

#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
enum RawMexcSpotWsMessage {
    IdCodeMsg { id: i64, code: i32, msg: String },
    ChannelMessage(ChannelMessage),
}

#[derive(Debug, serde::Deserialize)]
pub(crate) struct ChannelMessage {
    #[serde(rename = "c")]
    pub channel: String,
    #[serde(rename = "s")]
    pub symbol: String,
    #[serde(rename = "t", with = "chrono::serde::ts_milliseconds")]
    pub timestamp: DateTime<Utc>,
    #[serde(rename = "d")]
    pub data: ChannelMessageData,
}

#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ChannelMessageData {
    pub deals: Option<Vec<ChannelMessageDeal>>,
    #[serde(rename = "e")]
    pub event_type: String,
}

#[derive(Debug, serde::Deserialize)]
#[serde(rename_all = "camelCase")]
pub(crate) struct ChannelMessageDeal {
    #[serde(rename = "p")]
    pub price: Decimal,
    #[serde(rename = "v")]
    pub quantity: Decimal,
    #[serde(rename = "t", with = "chrono::serde::ts_milliseconds")]
    pub timestamp: DateTime<Utc>,
    #[serde(rename = "S")]
    pub trade_type: i32,
}