1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use hashbrown::HashMap;
use serde_json::Value;
use cxmr_api_binance::private::parse::order_status;
use cxmr_balances::{points, Balance};
use cxmr_exchanges::{AccountInfo, OrderStatus};
use cxmr_feeds::{ExecutionReport, UserEvent};
use cxmr_ws_client::{Command, Error, Parser, Protocol, Subscription};
/// Binance Private feed WebSocket protocol.
pub struct BinancePrivateProtocol;
// Implementation of Binance Private feed WebSocket protocol.
impl Protocol<Vec<UserEvent>> for BinancePrivateProtocol {
/// Private protocol parser.
type Parser = BinancePrivateParser;
/// Creates protocol parser and stream address.
fn subscription(sub: &Subscription) -> Option<(Command, Self::Parser)> {
// let parser = BinancePrivateParser
match sub {
Subscription::User(key) => {
let address = format!("wss://stream.binance.com:9443/ws/{}", key);
Some((Command::Connect(address), BinancePrivateParser(key.clone())))
}
Subscription::Pairs(_) => None,
}
}
}
/// Binance Private feed WebSocket protocol parser.
pub struct BinancePrivateParser(String);
// Implementation of Binance WebSocket protocol parser.
impl Parser<Vec<UserEvent>> for BinancePrivateParser {
/// Parses received WebSocket message.
fn parse(&mut self, msg: &str) -> Result<Option<Vec<UserEvent>>, Error> {
let msg = ::serde_json::from_str::<Value>(msg).map_err(|e| Error::Client(e.into()))?;
match msg.get("e")?.as_str()? {
"executionReport" => {
// "c": "mUvoqJxFIILMdfAW5iGSOW", // Client order ID
// "C": "null", // Original client order ID; This is the ID of the order being canceled
// "p": "0.10264410", // Order price
// "P": "0.00000000", // Stop price
// "z": "0.00000000", // Cumulative filled quantity
// "q": "1.00000000", // Order quantity
// "X": "NEW", // Current order status
// "T": 1499405658657, // Transaction time
// -------------------------------------------------
// "S": "BUY", // Side
// "o": "LIMIT", // Order type
// "f": "GTC", // Time in force
// "F": "0.00000000", // Iceberg quantity
// "x": "NEW", // Current execution type
// "r": "NONE", // Order reject reason; will be an error code.
// "i": 4293153, // Order ID
// "l": "0.00000000", // Last executed quantity
// "L": "0.00000000", // Last executed price
// "n": "0", // Commission amount
// "N": null, // Commission asset
// "t": -1, // Trade ID
// "I": 8641984, // Ignore
// "w": true, // Is the order working? Stops will have
// "m": false, // Is this trade the maker side?
// "O": 1499405658657, // Order creation time
// "Z": "0.00000000", // Cumulative quote asset transacted quantity
// "Y": "0.00000000" // Last quote asset transacted quantity (i.e. lastPrice * lastQty)
let status = order_status(msg.get("X")?.as_str()?)?;
let id = match status {
OrderStatus::Canceled => msg.get("C")?.as_str()?.to_string(),
_ => msg.get("c")?.as_str()?.to_string(),
};
let updated_at = msg.get("T")?.as_u64()?;
let report = ExecutionReport {
id: id,
account: self.0.clone(),
status: status,
rate: points(msg.get("p")?.as_str()?.parse()?),
stop: msg
.get("P")?
.as_str()?
.parse::<f64>()
.ok()
.map(|v| points(v))
.filter(|v| *v != 0),
executed: msg
.get("z")?
.as_str()?
.parse::<f64>()
.ok()
.map(|v| points(v))
.filter(|v| *v != 0),
amount: points(msg.get("q")?.as_str()?.parse()?),
updated_at: updated_at,
};
Ok(Some(vec![UserEvent::OrderExecution(report)]))
}
"outboundAccountInfo" => {
let updated_at = msg.get("E")?.as_u64()?;
let mut balances = HashMap::new();
msg.get("B")?
.as_array()?
.iter()
.map(|b| {
let free = points(b.get("f")?.as_str()?.parse()?);
let locked = points(b.get("l")?.as_str()?.parse()?);
balances.insert(
b.get("a")?.as_str()?.into(),
Balance {
total: free + locked,
unused: free,
in_use: locked,
total_btc: None,
},
);
Ok(())
})
.collect::<Result<_, Error>>()?;
let info = AccountInfo {
account: self.0.clone(),
can_trade: msg.get("T")?.as_bool()?,
balances: balances,
maker_fee_rate: msg.get("m")?.as_u64()?,
taker_fee_rate: msg.get("t")?.as_u64()?,
updated_at: updated_at,
};
Ok(Some(vec![UserEvent::UpdateAccount(info)]))
}
_ => Ok(None),
}
}
}