barter_data/exchange/bitfinex/
message.rs

1use super::trade::BitfinexTrade;
2use crate::{Identifier, event::MarketIter, subscription::trade::PublicTrade};
3use barter_instrument::exchange::ExchangeId;
4use barter_integration::{de::extract_next, subscription::SubscriptionId};
5use serde::Serialize;
6
7/// [`Bitfinex`](super::Bitfinex) message received over
8/// [`WebSocket`](barter_integration::protocol::websocket::WebSocket) relating to an active
9/// [`Subscription`](crate::Subscription).
10///
11/// The message is associated with the original [`Subscription`](crate::Subscription) using the
12/// `channel_id` field as the [`SubscriptionId`].
13///
14/// ### Raw Payload Examples
15/// #### Heartbeat
16/// See docs: <https://docs.bitfinex.com/docs/ws-general#heartbeating>
17/// ```json
18/// [420191,"hb"]
19/// ```
20///
21/// #### Side::Buy Trade
22/// See docs: <https://docs.bitfinex.com/reference/ws-public-trades>
23/// ```json
24/// [420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]
25/// ```
26///
27/// #### Side::Sell Trade
28/// See docs: <https://docs.bitfinex.com/reference/ws-public-trades>
29/// ```json
30/// [420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]
31/// ```
32#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
33pub struct BitfinexMessage {
34    pub channel_id: u32,
35    pub payload: BitfinexPayload,
36}
37
38/// [`Bitfinex`](super::Bitfinex) market data variants associated with an
39/// active [`Subscription`](crate::Subscription).
40///
41/// See [`BitfinexMessage`] for full raw payload examples.
42///
43/// See docs: <https://docs.bitfinex.com/docs/ws-general>
44#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
45pub enum BitfinexPayload {
46    Heartbeat,
47    Trade(BitfinexTrade),
48}
49
50impl Identifier<Option<SubscriptionId>> for BitfinexMessage {
51    fn id(&self) -> Option<SubscriptionId> {
52        match self.payload {
53            BitfinexPayload::Heartbeat => None,
54            BitfinexPayload::Trade(_) => Some(SubscriptionId::from(self.channel_id.to_string())),
55        }
56    }
57}
58
59impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BitfinexMessage)>
60    for MarketIter<InstrumentKey, PublicTrade>
61{
62    fn from(
63        (exchange_id, instrument, message): (ExchangeId, InstrumentKey, BitfinexMessage),
64    ) -> Self {
65        match message.payload {
66            BitfinexPayload::Heartbeat => Self(vec![]),
67            BitfinexPayload::Trade(trade) => Self::from((exchange_id, instrument, trade)),
68        }
69    }
70}
71
72impl<'de> serde::Deserialize<'de> for BitfinexMessage {
73    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74    where
75        D: serde::de::Deserializer<'de>,
76    {
77        struct SeqVisitor;
78
79        impl<'de> serde::de::Visitor<'de> for SeqVisitor {
80            type Value = BitfinexMessage;
81
82            fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83                formatter.write_str("BitfinexMessage struct from the Bitfinex WebSocket API")
84            }
85
86            fn visit_seq<SeqAccessor>(
87                self,
88                mut seq: SeqAccessor,
89            ) -> Result<Self::Value, SeqAccessor::Error>
90            where
91                SeqAccessor: serde::de::SeqAccess<'de>,
92            {
93                // Trade: [CHANNEL_ID, <"te", "tu">, [ID, TIME, AMOUNT, PRICE]]
94                // Heartbeat: [ CHANNEL_ID, "hb" ]
95                // Candle: [CHANNEL_ID, [MTS, OPEN, CLOSE, HIGH, LOW, VOLUME]]
96
97                // Extract CHANNEL_ID used to identify SubscriptionId: 1st element of the sequence
98                let channel_id: u32 = extract_next(&mut seq, "channel_id")?;
99
100                // Extract message tag to identify payload type: 2nd element of the sequence
101                let message_tag: String = extract_next(&mut seq, "message_tag")?;
102
103                // Use message tag to extract the payload: 3rd element of sequence
104                let payload = match message_tag.as_str() {
105                    // Filter "tu" Trades since they are identical but slower
106                    // '--> use as additional Heartbeat
107                    "hb" | "tu" => BitfinexPayload::Heartbeat,
108                    "te" => BitfinexPayload::Trade(extract_next(&mut seq, "BitfinexTrade")?),
109                    other => {
110                        return Err(serde::de::Error::unknown_variant(
111                            other,
112                            &["heartbeat (hb)", "trade (te | tu)"],
113                        ));
114                    }
115                };
116
117                // Ignore any additional elements or SerDe will fail
118                //  '--> Bitfinex may add fields without warning
119                while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
120                Ok(BitfinexMessage {
121                    channel_id,
122                    payload,
123                })
124            }
125        }
126
127        // Use Visitor implementation to deserialise the WebSocket BitfinexMessage
128        deserializer.deserialize_seq(SeqVisitor)
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use barter_instrument::Side;
136    use barter_integration::{de::datetime_utc_from_epoch_duration, error::SocketError};
137    use std::time::Duration;
138
139    #[test]
140    fn test_de_bitfinex_message() {
141        struct TestCase {
142            input: &'static str,
143            expected: Result<BitfinexMessage, SocketError>,
144        }
145
146        // Trade: [CHANNEL_ID, <"te", "tu">, [ID, TIME, AMOUNT, PRICE]]
147        // Heartbeat: [ CHANNEL_ID, "hb" ]
148        // Candle: [CHANNEL_ID, [MTS, OPEN, CLOSE, HIGH, LOW, VOLUME]]
149
150        let cases = vec![
151            // TC0: Trade message te Sell
152            TestCase {
153                input: r#"[420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
154                expected: Ok(BitfinexMessage {
155                    channel_id: 420191,
156                    payload: BitfinexPayload::Trade(BitfinexTrade {
157                        id: 1225484398,
158                        time: datetime_utc_from_epoch_duration(Duration::from_millis(
159                            1665452200022,
160                        )),
161                        side: Side::Sell,
162                        price: 19027.02807752,
163                        amount: 0.08980641,
164                    }),
165                }),
166            },
167            // TC1: Trade message te Buy
168            TestCase {
169                input: r#"[420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]"#,
170                expected: Ok(BitfinexMessage {
171                    channel_id: 420191,
172                    payload: BitfinexPayload::Trade(BitfinexTrade {
173                        id: 1225484398,
174                        time: datetime_utc_from_epoch_duration(Duration::from_millis(
175                            1665452200022,
176                        )),
177                        side: Side::Buy,
178                        price: 19027.02807752,
179                        amount: 0.08980641,
180                    }),
181                }),
182            },
183            // TC2: Trade tu --> Should be marked as a heartbeat
184            TestCase {
185                input: r#"[420191,"tu",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
186                expected: Ok(BitfinexMessage {
187                    channel_id: 420191,
188                    payload: BitfinexPayload::Heartbeat,
189                }),
190            },
191            // TC3: Heartbeat message
192            TestCase {
193                input: r#"[420191,"hb"]"#,
194                expected: Ok(BitfinexMessage {
195                    channel_id: 420191,
196                    payload: BitfinexPayload::Heartbeat,
197                }),
198            },
199        ];
200
201        for (index, test) in cases.into_iter().enumerate() {
202            let actual = serde_json::from_str::<BitfinexMessage>(test.input);
203            match (actual, test.expected) {
204                (Ok(actual), Ok(expected)) => {
205                    assert_eq!(actual, expected, "TC{} failed", index)
206                }
207                (Err(_), Err(_)) => {
208                    // Test passed
209                }
210                (actual, expected) => {
211                    // Test failed
212                    panic!(
213                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
214                    );
215                }
216            }
217        }
218    }
219}