barter_data/exchange/bitfinex/
subscription.rs

1use barter_integration::{Validator, error::SocketError};
2use serde::{Deserialize, Serialize};
3
4/// [`Bitfinex`](super::Bitfinex) platform event detailing the variants expected to be received
5/// while connecting and subscribing.
6///
7/// ### Raw Payload Examples
8/// See docs: <https://docs.bitfinex.com/docs/ws-general>
9/// #### Platform Status Online
10/// ``` json
11/// {
12///   "event": "info",
13///   "version": VERSION,
14///   "platform": {
15///     "status": 1
16///   }
17/// }
18/// ```
19///
20/// #### Subscription Trades Success
21/// ``` json
22/// {
23///   event: "subscribed",
24///   channel: "trades",
25///   chanId: CHANNEL_ID,
26///   symbol: "tBTCUSD"
27///   pair: "BTCUSD"
28/// }
29/// ```
30///
31/// #### Subscription Failure
32/// ``` json
33/// {
34///    "event": "error",
35///    "msg": ERROR_MSG,
36///    "code": ERROR_CODE
37/// }
38#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
39#[serde(tag = "event", rename_all = "lowercase")]
40pub enum BitfinexPlatformEvent {
41    #[serde(rename = "info")]
42    PlatformStatus(BitfinexPlatformStatus),
43    Subscribed(BitfinexSubResponse),
44    Error(BitfinexError),
45}
46
47impl Validator for BitfinexPlatformEvent {
48    fn validate(self) -> Result<Self, SocketError>
49    where
50        Self: Sized,
51    {
52        match &self {
53            BitfinexPlatformEvent::PlatformStatus(status) => match status.status {
54                Status::Operative => Ok(self),
55                Status::Maintenance => Err(SocketError::Subscribe(format!(
56                    "exchange version: {} with server_id: {} is in maintenance mode",
57                    status.api_version, status.server_id,
58                ))),
59            },
60            BitfinexPlatformEvent::Subscribed(_) => Ok(self),
61            BitfinexPlatformEvent::Error(error) => Err(SocketError::Subscribe(format!(
62                "received failure subscription response code: {} with message: {}",
63                error.code, error.msg,
64            ))),
65        }
66    }
67}
68
69/// [`Bitfinex`](super::Bitfinex) platform status message containing the server we are connecting
70/// to, the version of the API, and if it is in maintenance mode.
71///
72/// ### Raw Payload Examples
73/// See docs: <https://docs.bitfinex.com/docs/ws-general#info-messages>
74/// #### Platform Status Operative
75/// ``` json
76/// {
77///   "event": "info",
78///   "version": 2,
79///   "serverId": ""
80///   "platform": {
81///     "status": 1
82///   }
83/// }
84/// ```
85///
86/// #### Platform Status In Maintenance
87/// ``` json
88/// {
89///   "event": "info",
90///   "version": 2,
91///   "serverId": ""
92///   "platform": {
93///     "status": 0
94///   }
95/// }
96/// ```
97#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
98pub struct BitfinexPlatformStatus {
99    #[serde(rename = "version")]
100    api_version: u8,
101    #[serde(rename = "serverId")]
102    server_id: String,
103    #[serde(rename = "platform")]
104    status: Status,
105}
106
107/// [`Bitfinex`](super::Bitfinex) platform [`Status`] indicating if the API is in maintenance mode.
108///
109/// See [`BitfinexPlatformStatus`] for full raw payload examples.
110///
111/// See docs: <https://docs.bitfinex.com/docs/ws-general#info-messages>
112#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
113pub enum Status {
114    Maintenance,
115    Operative,
116}
117
118/// [`Bitfinex`](super::Bitfinex) subscription success response variants for each channel.
119///
120/// ### Raw Payload Examples
121/// See docs: <https://docs.bitfinex.com/docs/ws-general>
122/// #### Subscription Trades Success
123/// ``` json
124/// {
125///   event: "subscribed",
126///   channel: "trades",
127///   chanId: CHANNEL_ID,
128///   symbol: "tBTCUSD"
129///   pair: "BTCUSD"
130/// }
131/// ```
132///
133/// #### Subscription Failure
134/// ``` json
135/// {
136///    "event": "error",
137///    "msg": ERROR_MSG,
138///    "code": ERROR_CODE
139/// }
140#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
141pub struct BitfinexSubResponse {
142    pub channel: String,
143    #[serde(rename = "symbol")]
144    pub market: String,
145    #[serde(rename = "chanId")]
146    pub channel_id: BitfinexChannelId,
147}
148
149/// [`Bitfinex`](super::Bitfinex) channel identifier that is used to identify the subscription
150/// associated with incoming events. See the module level "SubscriptionId" documentation notes
151/// for more details.
152///
153/// See docs: <https://docs.bitfinex.com/docs/ws-general#subscribe-to-channels>
154#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
155pub struct BitfinexChannelId(pub u32);
156
157/// [`Bitfinex`](super::Bitfinex) error message that is received if a [`BitfinexSubResponse`]
158/// indicates a WebSocket subscription failure.
159///
160/// ### Subscription Error Codes:
161/// 10300: Generic failure
162/// 10301: Already subscribed
163/// 10302: Unknown channel
164///
165/// See [`BitfinexPlatformStatus`] for full raw payload examples.
166///
167/// See docs: <https://docs.bitfinex.com/docs/ws-general>
168#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
169pub struct BitfinexError {
170    msg: String,
171    code: u32,
172}
173
174impl<'de> Deserialize<'de> for Status {
175    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
176    where
177        D: serde::de::Deserializer<'de>,
178    {
179        #[derive(Deserialize)]
180        struct Outer {
181            #[serde(deserialize_with = "de_status_from_u8")]
182            status: Status,
183        }
184
185        // Deserialise Outer struct
186        let Outer { status } = Outer::deserialize(deserializer)?;
187
188        Ok(status)
189    }
190}
191
192/// Deserialize a `u8` as a `Bitfinex` platform [`Status`].
193///
194/// 0u8 => [`Status::Maintenance`](Status), <br>
195/// 1u8 => [`Status::Operative`](Status), <br>
196/// other => [`de::Error`]
197fn de_status_from_u8<'de, D>(deserializer: D) -> Result<Status, D::Error>
198where
199    D: serde::de::Deserializer<'de>,
200{
201    match Deserialize::deserialize(deserializer)? {
202        0 => Ok(Status::Maintenance),
203        1 => Ok(Status::Operative),
204        other => Err(serde::de::Error::invalid_value(
205            serde::de::Unexpected::Unsigned(other as u64),
206            &"0 or 1",
207        )),
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn test_de_bitfinex_platform_event() {
217        struct TestCase {
218            input: &'static str,
219            expected: Result<BitfinexPlatformEvent, SocketError>,
220        }
221
222        let cases = vec![
223            // TC0: platform status is online
224            TestCase {
225                input: r#"{"event": "info", "version": 2, "serverId": "5b73a436-19ca-4a15-8160-9069bdd7f181", "platform": { "status": 1 }}"#,
226                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
227                    BitfinexPlatformStatus {
228                        api_version: 2,
229                        server_id: "5b73a436-19ca-4a15-8160-9069bdd7f181".to_string(),
230                        status: Status::Operative,
231                    },
232                )),
233            },
234            // TC1: platform status is offline
235            TestCase {
236                input: r#"{"event": "info", "version": 2, "serverId": "5b73a436-19ca-4a15-8160-9069bdd7f181", "platform": { "status": 0 }}"#,
237                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
238                    BitfinexPlatformStatus {
239                        api_version: 2,
240                        server_id: "5b73a436-19ca-4a15-8160-9069bdd7f181".to_string(),
241                        status: Status::Maintenance,
242                    },
243                )),
244            },
245            // TC1: successful trades channel subscription
246            TestCase {
247                input: r#"{"event": "subscribed", "channel": "trades", "chanId": 2203, "symbol": "tBTCUSD", "pair": "BTCUSD"}"#,
248                expected: Ok(BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
249                    channel: "trades".to_string(),
250                    channel_id: BitfinexChannelId(2203),
251                    market: "tBTCUSD".to_owned(),
252                })),
253            },
254            // TC2: Input response is error
255            TestCase {
256                input: r#"{"event": "error", "msg": "Already subscribed", "code": 10202}"#,
257                expected: Ok(BitfinexPlatformEvent::Error(BitfinexError {
258                    msg: "Already subscribed".to_owned(),
259                    code: 10202,
260                })),
261            },
262        ];
263
264        for (index, test) in cases.into_iter().enumerate() {
265            let actual = serde_json::from_str::<BitfinexPlatformEvent>(test.input);
266            match (actual, test.expected) {
267                (Ok(actual), Ok(expected)) => {
268                    assert_eq!(actual, expected, "TC{} failed", index)
269                }
270                (Err(_), Err(_)) => {
271                    // Test passed
272                }
273                (actual, expected) => {
274                    // Test failed
275                    panic!(
276                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
277                    );
278                }
279            }
280        }
281    }
282
283    #[test]
284    fn test_bitfinex_platform_sub_response_validate() {
285        struct TestCase {
286            input: BitfinexPlatformEvent,
287            expected: Result<BitfinexPlatformEvent, SocketError>,
288        }
289
290        let tests = vec![
291            TestCase {
292                // TC0: bitfinex server is offline
293                input: BitfinexPlatformEvent::PlatformStatus(BitfinexPlatformStatus {
294                    api_version: 2,
295                    server_id: "server_id".to_string(),
296                    status: Status::Maintenance,
297                }),
298                expected: Err(SocketError::Subscribe(format!(
299                    "exchange version: {} with server_id: {} is in maintenance mode",
300                    2, "server_id",
301                ))),
302            },
303            TestCase {
304                // TC1: bitfinex server is online
305                input: BitfinexPlatformEvent::PlatformStatus(BitfinexPlatformStatus {
306                    api_version: 2,
307                    server_id: "server_id".to_string(),
308                    status: Status::Operative,
309                }),
310                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
311                    BitfinexPlatformStatus {
312                        api_version: 2,
313                        server_id: "server_id".to_string(),
314                        status: Status::Operative,
315                    },
316                )),
317            },
318            TestCase {
319                // TC2: subscription success
320                input: BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
321                    channel: "channel".to_string(),
322                    market: "market".to_string(),
323                    channel_id: BitfinexChannelId(1),
324                }),
325                expected: Ok(BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
326                    channel: "channel".to_string(),
327                    market: "market".to_string(),
328                    channel_id: BitfinexChannelId(1),
329                })),
330            },
331            TestCase {
332                // TC3: subscription error
333                input: BitfinexPlatformEvent::Error(BitfinexError {
334                    msg: "error message".to_string(),
335                    code: 0,
336                }),
337                expected: Err(SocketError::Subscribe(format!(
338                    "received failure subscription response code: {} with message: {}",
339                    0, "error message",
340                ))),
341            },
342        ];
343
344        for (index, test) in tests.into_iter().enumerate() {
345            let actual = test.input.validate();
346            match (actual, test.expected) {
347                (Ok(actual), Ok(expected)) => {
348                    assert_eq!(actual, expected, "TC{} failed", index)
349                }
350                (Err(_), Err(_)) => {
351                    // Test passed
352                }
353                (actual, expected) => {
354                    // Test failed
355                    panic!(
356                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
357                    );
358                }
359            }
360        }
361    }
362}