Skip to main content

rustrade_data/exchange/bitfinex/
message.rs

1use super::trade::BitfinexTrade;
2use crate::{Identifier, event::MarketIter, subscription::trade::PublicTrade};
3use rustrade_instrument::exchange::ExchangeId;
4use rustrade_integration::{serde::de::extract_next, subscription::SubscriptionId};
5use serde::Serialize;
6
7/// [`Bitfinex`](super::Bitfinex) message received over
8/// [`WebSocket`](rustrade_integration::protocol::websocket::WebSocket) relating to an active
9/// [`Subscription`](crate::Subscription).
10///
11/// The message is associated with the original [`Subscription`](crate::Subscription) using the
12/// `channel_id` field as the [`SubscriptionId`].
13///
14/// ### Raw Payload Examples
15/// #### Heartbeat
16/// See docs: <https://docs.bitfinex.com/docs/ws-general#heartbeating>
17/// ```json
18/// [420191,"hb"]
19/// ```
20///
21/// #### Side::Buy Trade
22/// See docs: <https://docs.bitfinex.com/reference/ws-public-trades>
23/// ```json
24/// [420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]
25/// ```
26///
27/// #### Side::Sell Trade
28/// See docs: <https://docs.bitfinex.com/reference/ws-public-trades>
29/// ```json
30/// [420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]
31/// ```
32#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
33pub struct BitfinexMessage {
34    pub channel_id: u32,
35    pub payload: BitfinexPayload,
36}
37
38/// [`Bitfinex`](super::Bitfinex) market data variants associated with an
39/// active [`Subscription`](crate::Subscription).
40///
41/// See [`BitfinexMessage`] for full raw payload examples.
42///
43/// See docs: <https://docs.bitfinex.com/docs/ws-general>
44#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
45pub enum BitfinexPayload {
46    Heartbeat,
47    Trade(BitfinexTrade),
48}
49
50impl Identifier<Option<SubscriptionId>> for BitfinexMessage {
51    fn id(&self) -> Option<SubscriptionId> {
52        match self.payload {
53            BitfinexPayload::Heartbeat => None,
54            BitfinexPayload::Trade(_) => Some(SubscriptionId::from(self.channel_id.to_string())),
55        }
56    }
57}
58
59impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BitfinexMessage)>
60    for MarketIter<InstrumentKey, PublicTrade>
61{
62    fn from(
63        (exchange_id, instrument, message): (ExchangeId, InstrumentKey, BitfinexMessage),
64    ) -> Self {
65        match message.payload {
66            BitfinexPayload::Heartbeat => Self(vec![]),
67            BitfinexPayload::Trade(trade) => Self::from((exchange_id, instrument, trade)),
68        }
69    }
70}
71
72impl<'de> serde::Deserialize<'de> for BitfinexMessage {
73    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74    where
75        D: serde::de::Deserializer<'de>,
76    {
77        struct SeqVisitor;
78
79        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
80            type Value = BitfinexMessage;
81
82            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83                formatter.write_str("BitfinexMessage struct from the Bitfinex WebSocket API")
84            }
85
86            fn visit_seq<SeqAccessor>(
87                self,
88                mut seq: SeqAccessor,
89            ) -> Result<Self::Value, SeqAccessor::Error>
90            where
91                SeqAccessor: serde::de::SeqAccess<'de>,
92            {
93                // Trade: [CHANNEL_ID, <"te", "tu">, [ID, TIME, AMOUNT, PRICE]]
94                // Heartbeat: [ CHANNEL_ID, "hb" ]
95                // Candle: [CHANNEL_ID, [MTS, OPEN, CLOSE, HIGH, LOW, VOLUME]]
96
97                // Extract CHANNEL_ID used to identify SubscriptionId: 1st element of the sequence
98                let channel_id: u32 = extract_next(&mut seq, "channel_id")?;
99
100                // Extract message tag to identify payload type: 2nd element of the sequence
101                let message_tag: String = extract_next(&mut seq, "message_tag")?;
102
103                // Use message tag to extract the payload: 3rd element of sequence
104                let payload = match message_tag.as_str() {
105                    // Filter "tu" Trades since they are identical but slower
106                    // '--> use as additional Heartbeat
107                    "hb" | "tu" => BitfinexPayload::Heartbeat,
108                    "te" => BitfinexPayload::Trade(extract_next(&mut seq, "BitfinexTrade")?),
109                    other => {
110                        return Err(serde::de::Error::unknown_variant(
111                            other,
112                            &["heartbeat (hb)", "trade (te | tu)"],
113                        ));
114                    }
115                };
116
117                // Ignore any additional elements or SerDe will fail
118                //  '--> Bitfinex may add fields without warning
119                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
120                Ok(BitfinexMessage {
121                    channel_id,
122                    payload,
123                })
124            }
125        }
126
127        // Use Visitor implementation to deserialise the WebSocket BitfinexMessage
128        deserializer.deserialize_seq(SeqVisitor)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use rust_decimal_macros::dec;
136    use rustrade_instrument::Side;
137    use rustrade_integration::{error::SocketError, serde::de::datetime_utc_from_epoch_duration};
138    use std::time::Duration;
139
140    #[test]
141    fn test_de_bitfinex_message() {
142        struct TestCase {
143            input: &'static str,
144            expected: Result<BitfinexMessage, SocketError>,
145        }
146
147        // Trade: [CHANNEL_ID, <"te", "tu">, [ID, TIME, AMOUNT, PRICE]]
148        // Heartbeat: [ CHANNEL_ID, "hb" ]
149        // Candle: [CHANNEL_ID, [MTS, OPEN, CLOSE, HIGH, LOW, VOLUME]]
150
151        let cases = vec![
152            // TC0: Trade message te Sell
153            TestCase {
154                input: r#"[420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
155                expected: Ok(BitfinexMessage {
156                    channel_id: 420191,
157                    payload: BitfinexPayload::Trade(BitfinexTrade {
158                        id: 1225484398,
159                        time: datetime_utc_from_epoch_duration(Duration::from_millis(
160                            1665452200022,
161                        )),
162                        side: Side::Sell,
163                        price: dec!(19027.02807752),
164                        amount: dec!(0.08980641),
165                    }),
166                }),
167            },
168            // TC1: Trade message te Buy
169            TestCase {
170                input: r#"[420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]"#,
171                expected: Ok(BitfinexMessage {
172                    channel_id: 420191,
173                    payload: BitfinexPayload::Trade(BitfinexTrade {
174                        id: 1225484398,
175                        time: datetime_utc_from_epoch_duration(Duration::from_millis(
176                            1665452200022,
177                        )),
178                        side: Side::Buy,
179                        price: dec!(19027.02807752),
180                        amount: dec!(0.08980641),
181                    }),
182                }),
183            },
184            // TC2: Trade tu --> Should be marked as a heartbeat
185            TestCase {
186                input: r#"[420191,"tu",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
187                expected: Ok(BitfinexMessage {
188                    channel_id: 420191,
189                    payload: BitfinexPayload::Heartbeat,
190                }),
191            },
192            // TC3: Heartbeat message
193            TestCase {
194                input: r#"[420191,"hb"]"#,
195                expected: Ok(BitfinexMessage {
196                    channel_id: 420191,
197                    payload: BitfinexPayload::Heartbeat,
198                }),
199            },
200        ];
201
202        for (index, test) in cases.into_iter().enumerate() {
203            let actual = serde_json::from_str::<BitfinexMessage>(test.input);
204            match (actual, test.expected) {
205                (Ok(actual), Ok(expected)) => {
206                    assert_eq!(actual, expected, "TC{} failed", index)
207                }
208                (Err(_), Err(_)) => {
209                    // Test passed
210                }
211                (actual, expected) => {
212                    // Test failed
213                    panic!(
214                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
215                    );
216                }
217            }
218        }
219    }
220}