Skip to main content

barter_data/exchange/bitfinex/
subscription.rs

1use barter_integration::{Validator, error::SocketError};
2use serde::{Deserialize, Serialize};
3
4/// [`Bitfinex`](super::Bitfinex) platform event detailing the variants expected to be received
5/// while connecting and subscribing.
6///
7/// ### Raw Payload Examples
8/// See docs: <https://docs.bitfinex.com/docs/ws-general>
9/// #### Platform Status Online
10/// ``` json
11/// {
12///   "event": "info",
13///   "version": VERSION,
14///   "platform": {
15///     "status": 1
16///   }
17/// }
18/// ```
19///
20/// #### Subscription Trades Success
21/// ``` json
22/// {
23///   event: "subscribed",
24///   channel: "trades",
25///   chanId: CHANNEL_ID,
26///   symbol: "tBTCUSD"
27///   pair: "BTCUSD"
28/// }
29/// ```
30///
31/// #### Subscription Failure
32/// ``` json
33/// {
34///    "event": "error",
35///    "msg": ERROR_MSG,
36///    "code": ERROR_CODE
37/// }
38#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
39#[serde(tag = "event", rename_all = "lowercase")]
40pub enum BitfinexPlatformEvent {
41    #[serde(rename = "info")]
42    PlatformStatus(BitfinexPlatformStatus),
43    Subscribed(BitfinexSubResponse),
44    Error(BitfinexError),
45}
46
47impl Validator for BitfinexPlatformEvent {
48    type Error = SocketError;
49
50    fn validate(self) -> Result<Self, SocketError>
51    where
52        Self: Sized,
53    {
54        match &self {
55            BitfinexPlatformEvent::PlatformStatus(status) => match status.status {
56                Status::Operative => Ok(self),
57                Status::Maintenance => Err(SocketError::Subscribe(format!(
58                    "exchange version: {} with server_id: {} is in maintenance mode",
59                    status.api_version, status.server_id,
60                ))),
61            },
62            BitfinexPlatformEvent::Subscribed(_) => Ok(self),
63            BitfinexPlatformEvent::Error(error) => Err(SocketError::Subscribe(format!(
64                "received failure subscription response code: {} with message: {}",
65                error.code, error.msg,
66            ))),
67        }
68    }
69}
70
71/// [`Bitfinex`](super::Bitfinex) platform status message containing the server we are connecting
72/// to, the version of the API, and if it is in maintenance mode.
73///
74/// ### Raw Payload Examples
75/// See docs: <https://docs.bitfinex.com/docs/ws-general#info-messages>
76/// #### Platform Status Operative
77/// ``` json
78/// {
79///   "event": "info",
80///   "version": 2,
81///   "serverId": ""
82///   "platform": {
83///     "status": 1
84///   }
85/// }
86/// ```
87///
88/// #### Platform Status In Maintenance
89/// ``` json
90/// {
91///   "event": "info",
92///   "version": 2,
93///   "serverId": ""
94///   "platform": {
95///     "status": 0
96///   }
97/// }
98/// ```
99#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
100pub struct BitfinexPlatformStatus {
101    #[serde(rename = "version")]
102    api_version: u8,
103    #[serde(rename = "serverId")]
104    server_id: String,
105    #[serde(rename = "platform")]
106    status: Status,
107}
108
109/// [`Bitfinex`](super::Bitfinex) platform [`Status`] indicating if the API is in maintenance mode.
110///
111/// See [`BitfinexPlatformStatus`] for full raw payload examples.
112///
113/// See docs: <https://docs.bitfinex.com/docs/ws-general#info-messages>
114#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Serialize)]
115pub enum Status {
116    Maintenance,
117    Operative,
118}
119
120/// [`Bitfinex`](super::Bitfinex) subscription success response variants for each channel.
121///
122/// ### Raw Payload Examples
123/// See docs: <https://docs.bitfinex.com/docs/ws-general>
124/// #### Subscription Trades Success
125/// ``` json
126/// {
127///   event: "subscribed",
128///   channel: "trades",
129///   chanId: CHANNEL_ID,
130///   symbol: "tBTCUSD"
131///   pair: "BTCUSD"
132/// }
133/// ```
134///
135/// #### Subscription Failure
136/// ``` json
137/// {
138///    "event": "error",
139///    "msg": ERROR_MSG,
140///    "code": ERROR_CODE
141/// }
142#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
143pub struct BitfinexSubResponse {
144    pub channel: String,
145    #[serde(rename = "symbol")]
146    pub market: String,
147    #[serde(rename = "chanId")]
148    pub channel_id: BitfinexChannelId,
149}
150
151/// [`Bitfinex`](super::Bitfinex) channel identifier that is used to identify the subscription
152/// associated with incoming events. See the module level "SubscriptionId" documentation notes
153/// for more details.
154///
155/// See docs: <https://docs.bitfinex.com/docs/ws-general#subscribe-to-channels>
156#[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
157pub struct BitfinexChannelId(pub u32);
158
159/// [`Bitfinex`](super::Bitfinex) error message that is received if a [`BitfinexSubResponse`]
160/// indicates a WebSocket subscription failure.
161///
162/// ### Subscription Error Codes:
163/// 10300: Generic failure
164/// 10301: Already subscribed
165/// 10302: Unknown channel
166///
167/// See [`BitfinexPlatformStatus`] for full raw payload examples.
168///
169/// See docs: <https://docs.bitfinex.com/docs/ws-general>
170#[derive(Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)]
171pub struct BitfinexError {
172    msg: String,
173    code: u32,
174}
175
176impl<'de> Deserialize<'de> for Status {
177    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
178    where
179        D: serde::de::Deserializer<'de>,
180    {
181        #[derive(Deserialize)]
182        struct Outer {
183            #[serde(deserialize_with = "de_status_from_u8")]
184            status: Status,
185        }
186
187        // Deserialise Outer struct
188        let Outer { status } = Outer::deserialize(deserializer)?;
189
190        Ok(status)
191    }
192}
193
194/// Deserialize a `u8` as a `Bitfinex` platform [`Status`].
195///
196/// 0u8 => [`Status::Maintenance`](Status), <br>
197/// 1u8 => [`Status::Operative`](Status), <br>
198/// other => [`de::Error`]
199fn de_status_from_u8<'de, D>(deserializer: D) -> Result<Status, D::Error>
200where
201    D: serde::de::Deserializer<'de>,
202{
203    match Deserialize::deserialize(deserializer)? {
204        0 => Ok(Status::Maintenance),
205        1 => Ok(Status::Operative),
206        other => Err(serde::de::Error::invalid_value(
207            serde::de::Unexpected::Unsigned(other as u64),
208            &"0 or 1",
209        )),
210    }
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn test_de_bitfinex_platform_event() {
219        struct TestCase {
220            input: &'static str,
221            expected: Result<BitfinexPlatformEvent, SocketError>,
222        }
223
224        let cases = vec![
225            // TC0: platform status is online
226            TestCase {
227                input: r#"{"event": "info", "version": 2, "serverId": "5b73a436-19ca-4a15-8160-9069bdd7f181", "platform": { "status": 1 }}"#,
228                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
229                    BitfinexPlatformStatus {
230                        api_version: 2,
231                        server_id: "5b73a436-19ca-4a15-8160-9069bdd7f181".to_string(),
232                        status: Status::Operative,
233                    },
234                )),
235            },
236            // TC1: platform status is offline
237            TestCase {
238                input: r#"{"event": "info", "version": 2, "serverId": "5b73a436-19ca-4a15-8160-9069bdd7f181", "platform": { "status": 0 }}"#,
239                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
240                    BitfinexPlatformStatus {
241                        api_version: 2,
242                        server_id: "5b73a436-19ca-4a15-8160-9069bdd7f181".to_string(),
243                        status: Status::Maintenance,
244                    },
245                )),
246            },
247            // TC1: successful trades channel subscription
248            TestCase {
249                input: r#"{"event": "subscribed", "channel": "trades", "chanId": 2203, "symbol": "tBTCUSD", "pair": "BTCUSD"}"#,
250                expected: Ok(BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
251                    channel: "trades".to_string(),
252                    channel_id: BitfinexChannelId(2203),
253                    market: "tBTCUSD".to_owned(),
254                })),
255            },
256            // TC2: Input response is error
257            TestCase {
258                input: r#"{"event": "error", "msg": "Already subscribed", "code": 10202}"#,
259                expected: Ok(BitfinexPlatformEvent::Error(BitfinexError {
260                    msg: "Already subscribed".to_owned(),
261                    code: 10202,
262                })),
263            },
264        ];
265
266        for (index, test) in cases.into_iter().enumerate() {
267            let actual = serde_json::from_str::<BitfinexPlatformEvent>(test.input);
268            match (actual, test.expected) {
269                (Ok(actual), Ok(expected)) => {
270                    assert_eq!(actual, expected, "TC{} failed", index)
271                }
272                (Err(_), Err(_)) => {
273                    // Test passed
274                }
275                (actual, expected) => {
276                    // Test failed
277                    panic!(
278                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
279                    );
280                }
281            }
282        }
283    }
284
285    #[test]
286    fn test_bitfinex_platform_sub_response_validate() {
287        struct TestCase {
288            input: BitfinexPlatformEvent,
289            expected: Result<BitfinexPlatformEvent, SocketError>,
290        }
291
292        let tests = vec![
293            TestCase {
294                // TC0: bitfinex server is offline
295                input: BitfinexPlatformEvent::PlatformStatus(BitfinexPlatformStatus {
296                    api_version: 2,
297                    server_id: "server_id".to_string(),
298                    status: Status::Maintenance,
299                }),
300                expected: Err(SocketError::Subscribe(format!(
301                    "exchange version: {} with server_id: {} is in maintenance mode",
302                    2, "server_id",
303                ))),
304            },
305            TestCase {
306                // TC1: bitfinex server is online
307                input: BitfinexPlatformEvent::PlatformStatus(BitfinexPlatformStatus {
308                    api_version: 2,
309                    server_id: "server_id".to_string(),
310                    status: Status::Operative,
311                }),
312                expected: Ok(BitfinexPlatformEvent::PlatformStatus(
313                    BitfinexPlatformStatus {
314                        api_version: 2,
315                        server_id: "server_id".to_string(),
316                        status: Status::Operative,
317                    },
318                )),
319            },
320            TestCase {
321                // TC2: subscription success
322                input: BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
323                    channel: "channel".to_string(),
324                    market: "market".to_string(),
325                    channel_id: BitfinexChannelId(1),
326                }),
327                expected: Ok(BitfinexPlatformEvent::Subscribed(BitfinexSubResponse {
328                    channel: "channel".to_string(),
329                    market: "market".to_string(),
330                    channel_id: BitfinexChannelId(1),
331                })),
332            },
333            TestCase {
334                // TC3: subscription error
335                input: BitfinexPlatformEvent::Error(BitfinexError {
336                    msg: "error message".to_string(),
337                    code: 0,
338                }),
339                expected: Err(SocketError::Subscribe(format!(
340                    "received failure subscription response code: {} with message: {}",
341                    0, "error message",
342                ))),
343            },
344        ];
345
346        for (index, test) in tests.into_iter().enumerate() {
347            let actual = test.input.validate();
348            match (actual, test.expected) {
349                (Ok(actual), Ok(expected)) => {
350                    assert_eq!(actual, expected, "TC{} failed", index)
351                }
352                (Err(_), Err(_)) => {
353                    // Test passed
354                }
355                (actual, expected) => {
356                    // Test failed
357                    panic!(
358                        "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
359                    );
360                }
361            }
362        }
363    }
364}