exc_binance/websocket/protocol/frame/
mod.rs

1#![allow(clippy::large_enum_variant)]
2
3use std::fmt;
4
5use exc_core::ExchangeError;
6use futures::{future, stream, Sink, SinkExt, Stream, TryStreamExt};
7use serde::{Deserialize, Serialize};
8use serde_with::{serde_as, DisplayFromStr};
9
10use crate::websocket::error::WsError;
11
12use self::{account::AccountEvent, agg_trade::AggTrade, book_ticker::BookTicker};
13
14/// Aggregate trade.
15pub mod agg_trade;
16
17/// Trade.
18pub mod trade;
19
20/// Book ticker.
21pub mod book_ticker;
22
23/// Depth.
24pub mod depth;
25
26/// Account.
27pub mod account;
28
29/// Operations.
30#[derive(Debug, Clone, Copy, Serialize)]
31#[serde(rename_all = "UPPERCASE")]
32pub enum Op {
33    /// Subscribe.
34    Subscribe,
35    /// Unsubscribe.
36    Unsubscribe,
37}
38
39/// Stream name.
40#[derive(Debug, Clone, Hash, PartialEq, Eq)]
41pub struct Name {
42    inst: Option<String>,
43    channel: String,
44}
45
46impl Name {
47    /// Create a new stream name.
48    pub fn new(channel: &str) -> Self {
49        Self {
50            inst: None,
51            channel: channel.to_string(),
52        }
53    }
54
55    /// Set instrument.
56    pub fn with_inst(mut self, inst: &str) -> Self {
57        self.inst = Some(inst.to_string());
58        self
59    }
60
61    /// Aggrated trade
62    pub fn agg_trade(inst: &str) -> Self {
63        Self {
64            inst: Some(inst.to_string()),
65            channel: "aggTrade".to_string(),
66        }
67    }
68
69    /// Trade
70    pub fn trade(inst: &str) -> Self {
71        Self {
72            inst: Some(inst.to_string()),
73            channel: "trade".to_string(),
74        }
75    }
76
77    /// Book ticker
78    pub fn book_ticker(inst: &str) -> Self {
79        Self {
80            inst: Some(inst.to_string()),
81            channel: "bookTicker".to_string(),
82        }
83    }
84
85    /// Depth
86    pub fn depth(inst: &str, levels: &str, rate: &str) -> Self {
87        Self {
88            inst: Some(inst.to_string()),
89            channel: format!("depth{levels}@{rate}"),
90        }
91    }
92
93    /// Listen key expired.
94    pub fn listen_key_expired() -> Self {
95        Self::new("listenKeyExpired")
96    }
97
98    /// Order trade update.
99    pub fn order_trade_update(inst: &str) -> Self {
100        Self::new("orderTradeUpdate").with_inst(inst)
101    }
102}
103
104impl fmt::Display for Name {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        if let Some(inst) = self.inst.as_ref() {
107            write!(f, "{}@{}", inst, self.channel)
108        } else {
109            write!(f, "{}", self.channel)
110        }
111    }
112}
113
114/// Request frame.
115#[serde_as]
116#[derive(Debug, Clone, Serialize)]
117pub struct RequestFrame {
118    /// Id.
119    pub id: usize,
120    /// Method.
121    pub method: Op,
122    /// Params.
123    #[serde_as(as = "Vec<DisplayFromStr>")]
124    pub(super) params: Vec<Name>,
125}
126
127impl RequestFrame {
128    /// Subscribe to a stream.
129    pub fn subscribe(id: usize, stream: Name) -> Self {
130        Self {
131            id,
132            method: Op::Subscribe,
133            params: vec![stream],
134        }
135    }
136
137    /// Unssubscribe a stream.
138    pub fn unsubscribe(id: usize, stream: Name) -> Self {
139        Self {
140            id,
141            method: Op::Unsubscribe,
142            params: vec![stream],
143        }
144    }
145}
146
147/// Response frame.
148#[derive(Debug, Clone, Deserialize)]
149pub struct ResponseFrame {
150    /// Id.
151    pub id: usize,
152    /// Result.
153    #[serde(default)]
154    pub result: Option<serde_json::Value>,
155}
156
157impl ResponseFrame {
158    pub(super) fn is_close_stream(&self) -> bool {
159        false
160    }
161}
162
163/// Server frame.
164#[derive(Debug, Clone, Deserialize)]
165#[serde(untagged)]
166pub enum ServerFrame {
167    /// Response.
168    Response(ResponseFrame),
169    /// Stream.
170    Stream(StreamFrame),
171    /// Empty.
172    Empty,
173}
174
175impl ServerFrame {
176    fn health(self) -> Result<Self, WsError> {
177        match &self {
178            Self::Stream(f) => match &f.data {
179                StreamFrameKind::AccountEvent(AccountEvent::ListenKeyExpired { ts }) => {
180                    Err(WsError::ListenKeyExpired(*ts))
181                }
182                _ => Ok(self),
183            },
184            _ => Ok(self),
185        }
186    }
187
188    fn break_down(self) -> Vec<Self> {
189        match &self {
190            Self::Empty | Self::Response(_) => vec![self],
191            Self::Stream(f) => match &f.data {
192                StreamFrameKind::OptionsOrderUpdate(_) => {
193                    let Self::Stream(f) = self else {
194                        unreachable!()
195                    };
196                    let StreamFrameKind::OptionsOrderUpdate(update) = f.data else {
197                        unreachable!()
198                    };
199                    let stream = f.stream;
200                    update
201                        .order
202                        .into_iter()
203                        .map(|o| {
204                            let frame = StreamFrame {
205                                stream: stream.clone(),
206                                data: StreamFrameKind::OptionsOrder(o),
207                            };
208                            Self::Stream(frame)
209                        })
210                        .collect()
211                }
212                _ => vec![self],
213            },
214        }
215    }
216}
217
218/// Payload that with stream name.
219pub trait Nameable {
220    /// Get name.
221    fn to_name(&self) -> Name;
222}
223
224/// Stream frame kind.
225#[derive(Debug, Clone, Deserialize)]
226#[serde(untagged)]
227#[non_exhaustive]
228pub enum StreamFrameKind {
229    /// Aggregate trade.
230    AggTrade(AggTrade),
231    /// Trade.
232    Trade(trade::Trade),
233    /// Book ticker.
234    BookTicker(BookTicker),
235    /// Depth.
236    Depth(depth::Depth),
237    /// Account event.
238    AccountEvent(AccountEvent),
239    /// Options Order Update.
240    OptionsOrder(account::OptionsOrder),
241    /// Options Order Trade Update.
242    OptionsOrderUpdate(account::OptionsOrderUpdate),
243    /// Unknwon.
244    Unknwon(serde_json::Value),
245}
246
247/// Stream frame.
248#[derive(Debug, Clone, Deserialize)]
249pub struct StreamFrame {
250    /// Stream name.
251    pub stream: String,
252    /// Kind.
253    pub data: StreamFrameKind,
254}
255
256impl StreamFrame {
257    /// Get stream name.
258    pub fn to_name(&self) -> Option<Name> {
259        match &self.data {
260            StreamFrameKind::AggTrade(f) => Some(f.to_name()),
261            StreamFrameKind::Trade(f) => Some(f.to_name()),
262            StreamFrameKind::BookTicker(f) => Some(f.to_name()),
263            StreamFrameKind::Depth(_) => {
264                let (inst, channel) = self.stream.split_once('@')?;
265                Some(Name {
266                    inst: Some(inst.to_string()),
267                    channel: channel.to_string(),
268                })
269            }
270            StreamFrameKind::AccountEvent(e) => Some(e.to_name()),
271            StreamFrameKind::OptionsOrder(e) => Some(e.to_name()),
272            StreamFrameKind::OptionsOrderUpdate(_) => None,
273            StreamFrameKind::Unknwon(_) => {
274                let (inst, channel) = self.stream.split_once('@')?;
275                Some(Name {
276                    inst: Some(inst.to_string()),
277                    channel: channel.to_string(),
278                })
279            }
280        }
281    }
282}
283
284impl TryFrom<StreamFrame> for serde_json::Value {
285    type Error = WsError;
286
287    fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
288        match frame.data {
289            StreamFrameKind::Unknwon(v) => Ok(v),
290            _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
291        }
292    }
293}
294
295/// Trade frame.
296#[derive(Debug, Clone, Deserialize)]
297#[non_exhaustive]
298pub enum TradeFrame {
299    /// Aggregate trade.
300    AggTrade(AggTrade),
301    /// Trade.
302    Trade(trade::Trade),
303}
304
305impl TryFrom<StreamFrame> for TradeFrame {
306    type Error = WsError;
307
308    fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
309        match frame.data {
310            StreamFrameKind::AggTrade(trade) => Ok(Self::AggTrade(trade)),
311            StreamFrameKind::Trade(trade) => Ok(Self::Trade(trade)),
312            _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
313        }
314    }
315}
316
317impl TryFrom<TradeFrame> for exc_core::types::Trade {
318    type Error = ExchangeError;
319
320    fn try_from(value: TradeFrame) -> Result<Self, Self::Error> {
321        match value {
322            TradeFrame::AggTrade(trade) => Ok(exc_core::types::Trade {
323                ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?,
324                price: trade.price.normalize(),
325                size: trade.size.normalize(),
326                buy: !trade.buy_maker,
327            }),
328            TradeFrame::Trade(trade) => Ok(exc_core::types::Trade {
329                ts: crate::types::adaptations::from_timestamp(trade.trade_timestamp)?,
330                price: trade.price.normalize(),
331                size: trade.size.normalize(),
332                buy: trade.is_taker_buy(),
333            }),
334        }
335    }
336}
337
338/// Depth frame.
339#[derive(Debug, Clone, Deserialize)]
340#[non_exhaustive]
341pub enum DepthFrame {
342    /// Book ticker.
343    BookTicker(BookTicker),
344    /// Depth.
345    Depth(depth::Depth),
346}
347
348impl TryFrom<StreamFrame> for DepthFrame {
349    type Error = WsError;
350
351    fn try_from(frame: StreamFrame) -> Result<Self, Self::Error> {
352        match frame.data {
353            StreamFrameKind::BookTicker(t) => Ok(Self::BookTicker(t)),
354            StreamFrameKind::Depth(t) => Ok(Self::Depth(t)),
355            _ => Err(WsError::UnexpectedFrame(anyhow::anyhow!("{frame:?}"))),
356        }
357    }
358}
359
360impl TryFrom<DepthFrame> for exc_core::types::BidAsk {
361    type Error = ExchangeError;
362
363    fn try_from(value: DepthFrame) -> Result<Self, Self::Error> {
364        match value {
365            DepthFrame::BookTicker(t) => Ok(exc_core::types::BidAsk {
366                ts: t
367                    .trade_timestamp
368                    .map(crate::types::adaptations::from_timestamp)
369                    .transpose()?
370                    .unwrap_or_else(time::OffsetDateTime::now_utc),
371                bid: Some((t.bid.normalize(), t.bid_size.normalize())),
372                ask: Some((t.ask.normalize(), t.ask_size.normalize())),
373            }),
374            DepthFrame::Depth(t) => Ok(exc_core::types::BidAsk {
375                ts: crate::types::adaptations::from_timestamp(t.trade_timestamp)?,
376                bid: t.bids.first().map(|b| (b.0.normalize(), b.1.normalize())),
377                ask: t.asks.first().map(|a| (a.0.normalize(), a.1.normalize())),
378            }),
379        }
380    }
381}
382
383/// Frame protocol layer.
384pub fn layer<T>(
385    transport: T,
386) -> impl Sink<RequestFrame, Error = WsError> + Stream<Item = Result<ServerFrame, WsError>>
387where
388    T: Sink<String, Error = WsError>,
389    T: Stream<Item = Result<String, WsError>>,
390{
391    transport
392        .with_flat_map(|f| {
393            let msg = serde_json::to_string(&f).map_err(WsError::from);
394            stream::once(future::ready(msg))
395        })
396        .and_then(|msg| {
397            let f = serde_json::from_str::<ServerFrame>(&msg)
398                .map_err(WsError::from)
399                .and_then(ServerFrame::health)
400                .map(|f| stream::iter(f.break_down().into_iter().map(Ok)));
401            future::ready(f)
402        })
403        .try_flatten()
404}
405
406#[cfg(test)]
407mod test {
408    use futures::{pin_mut, TryStreamExt};
409    use tower::ServiceExt;
410
411    use crate::{types::Name, Binance, Request};
412
413    use super::agg_trade::AggTrade;
414    use super::book_ticker::BookTicker;
415
416    #[tokio::test]
417    async fn test_aggregate_trade() -> anyhow::Result<()> {
418        let mut api = Binance::usd_margin_futures().connect();
419        let stream = (&mut api)
420            .oneshot(Request::subscribe(Name::agg_trade("btcusdt")))
421            .await?
422            .into_stream::<AggTrade>()?;
423        pin_mut!(stream);
424        let trade = stream.try_next().await?.unwrap();
425        println!("{trade:?}");
426        Ok(())
427    }
428
429    #[tokio::test]
430    async fn test_book_ticker() -> anyhow::Result<()> {
431        let mut api = Binance::usd_margin_futures().connect();
432        let stream = (&mut api)
433            .oneshot(Request::subscribe(Name::book_ticker("btcusdt")))
434            .await?
435            .into_stream::<BookTicker>()?;
436        pin_mut!(stream);
437        let trade = stream.try_next().await?.unwrap();
438        println!("{trade:?}");
439        Ok(())
440    }
441}