exc_binance/websocket/protocol/frame/
mod.rs1#![allow(clippy::large_enum_variant)]
2
3use std::fmt;
4
5use exc_core::ExchangeError;
6use futures::{future, stream, Sink, SinkExt, Stream, TryStreamExt};
7use serde::{Deserialize, Serialize};
8use serde_with::{serde_as, DisplayFromStr};
9
10use crate::websocket::error::WsError;
11
12use self::{account::AccountEvent, agg_trade::AggTrade, book_ticker::BookTicker};
13
14pub mod agg_trade;
16
17pub mod trade;
19
20pub mod book_ticker;
22
23pub mod depth;
25
26pub mod account;
28
29#[derive(Debug, Clone, Copy, Serialize)]
31#[serde(rename_all = "UPPERCASE")]
32pub enum Op {
33 Subscribe,
35 Unsubscribe,
37}
38
39#[derive(Debug, Clone, Hash, PartialEq, Eq)]
41pub struct Name {
42 inst: Option<String>,
43 channel: String,
44}
45
46impl Name {
47 pub fn new(channel: &str) -> Self {
49 Self {
50 inst: None,
51 channel: channel.to_string(),
52 }
53 }
54
55 pub fn with_inst(mut self, inst: &str) -> Self {
57 self.inst = Some(inst.to_string());
58 self
59 }
60
61 pub fn agg_trade(inst: &str) -> Self {
63 Self {
64 inst: Some(inst.to_string()),
65 channel: "aggTrade".to_string(),
66 }
67 }
68
69 pub fn trade(inst: &str) -> Self {
71 Self {
72 inst: Some(inst.to_string()),
73 channel: "trade".to_string(),
74 }
75 }
76
77 pub fn book_ticker(inst: &str) -> Self {
79 Self {
80 inst: Some(inst.to_string()),
81 channel: "bookTicker".to_string(),
82 }
83 }
84
85 pub fn depth(inst: &str, levels: &str, rate: &str) -> Self {
87 Self {
88 inst: Some(inst.to_string()),
89 channel: format!("depth{levels}@{rate}"),
90 }
91 }
92
93 pub fn listen_key_expired() -> Self {
95 Self::new("listenKeyExpired")
96 }
97
98 pub fn order_trade_update(inst: &str) -> Self {
100 Self::new("orderTradeUpdate").with_inst(inst)
101 }
102}
103
104impl fmt::Display for Name {
105 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106 if let Some(inst) = self.inst.as_ref() {
107 write!(f, "{}@{}", inst, self.channel)
108 } else {
109 write!(f, "{}", self.channel)
110 }
111 }
112}
113
114#[serde_as]
116#[derive(Debug, Clone, Serialize)]
117pub struct RequestFrame {
118 pub id: usize,
120 pub method: Op,
122 #[serde_as(as = "Vec<DisplayFromStr>")]
124 pub(super) params: Vec<Name>,
125}
126
127impl RequestFrame {
128 pub fn subscribe(id: usize, stream: Name) -> Self {
130 Self {
131 id,
132 method: Op::Subscribe,
133 params: vec![stream],
134 }
135 }
136
137 pub fn unsubscribe(id: usize, stream: Name) -> Self {
139 Self {
140 id,
141 method: Op::Unsubscribe,
142 params: vec![stream],
143 }
144 }
145}
146
147#[derive(Debug, Clone, Deserialize)]
149pub struct ResponseFrame {
150 pub id: usize,
152 #[serde(default)]
154 pub result: Option<serde_json::Value>,
155}
156
157impl ResponseFrame {
158 pub(super) fn is_close_stream(&self) -> bool {
159 false
160 }
161}
162
163#[derive(Debug, Clone, Deserialize)]
165#[serde(untagged)]
166pub enum ServerFrame {
167 Response(ResponseFrame),
169 Stream(StreamFrame),
171 Empty,
173}
174
175impl ServerFrame {
176 fn health(self) -> Result<Self, WsError> {
177 match &self {
178 Self::Stream(f) => match &f.data {
179 StreamFrameKind::AccountEvent(AccountEvent::ListenKeyExpired { ts }) => {
180 Err(WsError::ListenKeyExpired(*ts))
181 }
182 _ => Ok(self),
183 },
184 _ => Ok(self),
185 }
186 }
187
188 fn break_down(self) -> Vec<Self> {
189 match &self {
190 Self::Empty | Self::Response(_) => vec![self],
191 Self::Stream(f) => match &f.data {
192 StreamFrameKind::OptionsOrderUpdate(_) => {
193 let Self::Stream(f) = self else {
194 unreachable!()
195 };
196 let StreamFrameKind::OptionsOrderUpdate(update) = f.data else {
197 unreachable!()
198 };
199 let stream = f.stream;
200 update
201 .order
202 .into_iter()
203 .map(|o| {
204 let frame = StreamFrame {
205 stream: stream.clone(),
206 data: StreamFrameKind::OptionsOrder(o),
207 };
208 Self::Stream(frame)
209 })
210 .collect()
211 }
212 _ => vec![self],
213 },
214 }
215 }
216}
217
218pub trait Nameable {
220 fn to_name(&self) -> Name;
222}
223
224#[derive(Debug, Clone, Deserialize)]
226#[serde(untagged)]
227#[non_exhaustive]
228pub enum StreamFrameKind {
229 AggTrade(AggTrade),
231 Trade(trade::Trade),
233 BookTicker(BookTicker),
235 Depth(depth::Depth),
237 AccountEvent(AccountEvent),
239 OptionsOrder(account::OptionsOrder),
241 OptionsOrderUpdate(account::OptionsOrderUpdate),
243 Unknwon(serde_json::Value),
245}
246
247#[derive(Debug, Clone, Deserialize)]
249pub struct StreamFrame {
250 pub stream: String,
252 pub data: StreamFrameKind,
254}
255
256impl StreamFrame {
257 pub fn to_name(&self) -> Option<Name> {
259 match &self.data {
260 StreamFrameKind::AggTrade(f) => Some(f.to_name()),
261 StreamFrameKind::Trade(f) => Some(f.to_name()),
262 StreamFrameKind::BookTicker(f) => Some(f.to_name()),
263 StreamFrameKind::Depth(_) => {
264 let (inst, channel) = self.stream.split_once('@')?;
265 Some(Name {
266 inst: Some(inst.to_string()),
267 channel: channel.to_string(),
268 })
269 }
270 StreamFrameKind::AccountEvent(e) => Some(e.to_name()),
271 StreamFrameKind::OptionsOrder(e) => Some(e.to_name()),
272 StreamFrameKind::OptionsOrderUpdate(_) => None,
273 StreamFrameKind::Unknwon(_) => {
274 let (inst, channel) = self.stream.split_once('@')?;
275 Some(Name {
276 inst: Some(inst.to_string()),
277 channel: channel.to_string(),
278 })
279 }
280 }
281 }
282}
283
284impl TryFrom<StreamFrame> for serde_json::Value {
285 type Error = WsError;
286
287 fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
288 match frame.data {
289 StreamFrameKind::Unknwon(v) => Ok(v),
290 _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
291 }
292 }
293}
294
295#[derive(Debug, Clone, Deserialize)]
297#[non_exhaustive]
298pub enum TradeFrame {
299 AggTrade(AggTrade),
301 Trade(trade::Trade),
303}
304
305impl TryFrom<StreamFrame> for TradeFrame {
306 type Error = WsError;
307
308 fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
309 match frame.data {
310 StreamFrameKind::AggTrade(trade) => Ok(Self::AggTrade(trade)),
311 StreamFrameKind::Trade(trade) => Ok(Self::Trade(trade)),
312 _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
313 }
314 }
315}
316
317impl TryFrom<TradeFrame> for exc_core::types::Trade {
318 type Error = ExchangeError;
319
320 fn try_from(value: TradeFrame) -> Result<Self, Self::Error> {
321 match value {
322 TradeFrame::AggTrade(trade) => Ok(exc_core::types::Trade {
323 ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?,
324 price: trade.price.normalize(),
325 size: trade.size.normalize(),
326 buy: !trade.buy_maker,
327 }),
328 TradeFrame::Trade(trade) => Ok(exc_core::types::Trade {
329 ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?,
330 price: trade.price.normalize(),
331 size: trade.size.normalize(),
332 buy: trade.is_taker_buy(),
333 }),
334 }
335 }
336}
337
338#[derive(Debug, Clone, Deserialize)]
340#[non_exhaustive]
341pub enum DepthFrame {
342 BookTicker(BookTicker),
344 Depth(depth::Depth),
346}
347
348impl TryFrom<StreamFrame> for DepthFrame {
349 type Error = WsError;
350
351 fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
352 match frame.data {
353 StreamFrameKind::BookTicker(t) => Ok(Self::BookTicker(t)),
354 StreamFrameKind::Depth(t) => Ok(Self::Depth(t)),
355 _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
356 }
357 }
358}
359
360impl TryFrom<DepthFrame> for exc_core::types::BidAsk {
361 type Error = ExchangeError;
362
363 fn try_from(value: DepthFrame) -> Result<Self, Self::Error> {
364 match value {
365 DepthFrame::BookTicker(t) => Ok(exc_core::types::BidAsk {
366 ts: t
367 .trade_timestamp
368 .map(crate::types::adaptations::from_timestamp)
369 .transpose()?
370 .unwrap_or_else(time::OffsetDateTime::now_utc),
371 bid: Some((t.bid.normalize(), t.bid_size.normalize())),
372 ask: Some((t.ask.normalize(), t.ask_size.normalize())),
373 }),
374 DepthFrame::Depth(t) => Ok(exc_core::types::BidAsk {
375 ts: crate::types::adaptations::from_timestamp(t.trade_timestamp)?,
376 bid: t.bids.first().map(|b| (b.0.normalize(), b.1.normalize())),
377 ask: t.asks.first().map(|a| (a.0.normalize(), a.1.normalize())),
378 }),
379 }
380 }
381}
382
383pub fn layer<T>(
385 transport: T,
386) -> impl Sink<RequestFrame, Error = WsError> + Stream<Item = Result<ServerFrame, WsError>>
387where
388 T: Sink<String, Error = WsError>,
389 T: Stream<Item = Result<String, WsError>>,
390{
391 transport
392 .with_flat_map(|f| {
393 let msg = serde_json::to_string(&f).map_err(WsError::from);
394 stream::once(future::ready(msg))
395 })
396 .and_then(|msg| {
397 let f = serde_json::from_str::<ServerFrame>(&msg)
398 .map_err(WsError::from)
399 .and_then(ServerFrame::health)
400 .map(|f| stream::iter(f.break_down().into_iter().map(Ok)));
401 future::ready(f)
402 })
403 .try_flatten()
404}
405
406#[cfg(test)]
407mod test {
408 use futures::{pin_mut, TryStreamExt};
409 use tower::ServiceExt;
410
411 use crate::{types::Name, Binance, Request};
412
413 use super::agg_trade::AggTrade;
414 use super::book_ticker::BookTicker;
415
416 #[tokio::test]
417 async fn test_aggregate_trade() -> anyhow::Result<()> {
418 let mut api = Binance::usd_margin_futures().connect();
419 let stream = (&mut api)
420 .oneshot(Request::subscribe(Name::agg_trade("btcusdt")))
421 .await?
422 .into_stream::<AggTrade>()?;
423 pin_mut!(stream);
424 let trade = stream.try_next().await?.unwrap();
425 println!("{trade:?}");
426 Ok(())
427 }
428
429 #[tokio::test]
430 async fn test_book_ticker() -> anyhow::Result<()> {
431 let mut api = Binance::usd_margin_futures().connect();
432 let stream = (&mut api)
433 .oneshot(Request::subscribe(Name::book_ticker("btcusdt")))
434 .await?
435 .into_stream::<BookTicker>()?;
436 pin_mut!(stream);
437 let trade = stream.try_next().await?.unwrap();
438 println!("{trade:?}");
439 Ok(())
440 }
441}