cxmr-ws-client-binance 0.0.1

Binance WebSocket feed client.
Documentation
use hashbrown::HashMap;
use serde_json::Value;

use cxmr_api_binance::private::parse::order_status;
use cxmr_balances::{points, Balance};
use cxmr_exchanges::{AccountInfo, OrderStatus};
use cxmr_feeds::{ExecutionReport, UserEvent};
use cxmr_ws_client::{Command, Error, Parser, Protocol, Subscription};

/// Binance Private feed WebSocket protocol.
pub struct BinancePrivateProtocol;

// Implementation of Binance Private feed WebSocket protocol.
impl Protocol<Vec<UserEvent>> for BinancePrivateProtocol {
    /// Private protocol parser.
    type Parser = BinancePrivateParser;

    /// Creates protocol parser and stream address.
    fn subscription(sub: &Subscription) -> Option<(Command, Self::Parser)> {
        // let parser = BinancePrivateParser
        match sub {
            Subscription::User(key) => {
                let address = format!("wss://stream.binance.com:9443/ws/{}", key);
                Some((Command::Connect(address), BinancePrivateParser(key.clone())))
            }
            Subscription::Pairs(_) => None,
        }
    }
}

/// Binance Private feed WebSocket protocol parser.
pub struct BinancePrivateParser(String);

// Implementation of Binance WebSocket protocol parser.
impl Parser<Vec<UserEvent>> for BinancePrivateParser {
    /// Parses received WebSocket message.
    fn parse(&mut self, msg: &str) -> Result<Option<Vec<UserEvent>>, Error> {
        let msg = ::serde_json::from_str::<Value>(msg).map_err(|e| Error::Client(e.into()))?;
        match msg.get("e")?.as_str()? {
            "executionReport" => {
                // "c": "mUvoqJxFIILMdfAW5iGSOW", // Client order ID
                // "C": "null",                   // Original client order ID; This is the ID of the order being canceled
                // "p": "0.10264410",             // Order price
                // "P": "0.00000000",             // Stop price
                // "z": "0.00000000",             // Cumulative filled quantity
                // "q": "1.00000000",             // Order quantity
                // "X": "NEW",                    // Current order status
                // "T": 1499405658657,            // Transaction time
                // -------------------------------------------------

                // "S": "BUY",                    // Side
                // "o": "LIMIT",                  // Order type
                // "f": "GTC",                    // Time in force
                // "F": "0.00000000",             // Iceberg quantity
                // "x": "NEW",                    // Current execution type
                // "r": "NONE",                   // Order reject reason; will be an error code.
                // "i": 4293153,                  // Order ID
                // "l": "0.00000000",             // Last executed quantity
                // "L": "0.00000000",             // Last executed price
                // "n": "0",                      // Commission amount
                // "N": null,                     // Commission asset
                // "t": -1,                       // Trade ID
                // "I": 8641984,                  // Ignore
                // "w": true,                     // Is the order working? Stops will have
                // "m": false,                    // Is this trade the maker side?
                // "O": 1499405658657,            // Order creation time
                // "Z": "0.00000000",             // Cumulative quote asset transacted quantity
                // "Y": "0.00000000"              // Last quote asset transacted quantity (i.e. lastPrice * lastQty)
                let status = order_status(msg.get("X")?.as_str()?)?;
                let id = match status {
                    OrderStatus::Canceled => msg.get("C")?.as_str()?.to_string(),
                    _ => msg.get("c")?.as_str()?.to_string(),
                };
                let updated_at = msg.get("T")?.as_u64()?;
                let report = ExecutionReport {
                    id: id,
                    account: self.0.clone(),
                    status: status,
                    rate: points(msg.get("p")?.as_str()?.parse()?),
                    stop: msg
                        .get("P")?
                        .as_str()?
                        .parse::<f64>()
                        .ok()
                        .map(|v| points(v))
                        .filter(|v| *v != 0),
                    executed: msg
                        .get("z")?
                        .as_str()?
                        .parse::<f64>()
                        .ok()
                        .map(|v| points(v))
                        .filter(|v| *v != 0),
                    amount: points(msg.get("q")?.as_str()?.parse()?),
                    updated_at: updated_at,
                };
                Ok(Some(vec![UserEvent::OrderExecution(report)]))
            }
            "outboundAccountInfo" => {
                let updated_at = msg.get("E")?.as_u64()?;
                let mut balances = HashMap::new();
                msg.get("B")?
                    .as_array()?
                    .iter()
                    .map(|b| {
                        let free = points(b.get("f")?.as_str()?.parse()?);
                        let locked = points(b.get("l")?.as_str()?.parse()?);
                        balances.insert(
                            b.get("a")?.as_str()?.into(),
                            Balance {
                                total: free + locked,
                                unused: free,
                                in_use: locked,
                                total_btc: None,
                            },
                        );
                        Ok(())
                    })
                    .collect::<Result<_, Error>>()?;
                let info = AccountInfo {
                    account: self.0.clone(),
                    can_trade: msg.get("T")?.as_bool()?,
                    balances: balances,
                    maker_fee_rate: msg.get("m")?.as_u64()?,
                    taker_fee_rate: msg.get("t")?.as_u64()?,
                    updated_at: updated_at,
                };
                Ok(Some(vec![UserEvent::UpdateAccount(info)]))
            }
            _ => Ok(None),
        }
    }
}