barter_data/exchange/bitfinex/
message.rs1use super::trade::BitfinexTrade;
2use crate::{Identifier, event::MarketIter, subscription::trade::PublicTrade};
3use barter_instrument::exchange::ExchangeId;
4use barter_integration::{de::extract_next, subscription::SubscriptionId};
5use serde::Serialize;
6
7#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
33pub struct BitfinexMessage {
34 pub channel_id: u32,
35 pub payload: BitfinexPayload,
36}
37
38#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
45pub enum BitfinexPayload {
46 Heartbeat,
47 Trade(BitfinexTrade),
48}
49
50impl Identifier<Option<SubscriptionId>> for BitfinexMessage {
51 fn id(&self) -> Option<SubscriptionId> {
52 match self.payload {
53 BitfinexPayload::Heartbeat => None,
54 BitfinexPayload::Trade(_) => Some(SubscriptionId::from(self.channel_id.to_string())),
55 }
56 }
57}
58
59impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BitfinexMessage)>
60 for MarketIter<InstrumentKey, PublicTrade>
61{
62 fn from(
63 (exchange_id, instrument, message): (ExchangeId, InstrumentKey, BitfinexMessage),
64 ) -> Self {
65 match message.payload {
66 BitfinexPayload::Heartbeat => Self(vec![]),
67 BitfinexPayload::Trade(trade) => Self::from((exchange_id, instrument, trade)),
68 }
69 }
70}
71
72impl<'de> serde::Deserialize<'de> for BitfinexMessage {
73 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74 where
75 D: serde::de::Deserializer<'de>,
76 {
77 struct SeqVisitor;
78
79 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
80 type Value = BitfinexMessage;
81
82 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 formatter.write_str("BitfinexMessage struct from the Bitfinex WebSocket API")
84 }
85
86 fn visit_seq<SeqAccessor>(
87 self,
88 mut seq: SeqAccessor,
89 ) -> Result<Self::Value, SeqAccessor::Error>
90 where
91 SeqAccessor: serde::de::SeqAccess<'de>,
92 {
93 let channel_id: u32 = extract_next(&mut seq, "channel_id")?;
99
100 let message_tag: String = extract_next(&mut seq, "message_tag")?;
102
103 let payload = match message_tag.as_str() {
105 "hb" | "tu" => BitfinexPayload::Heartbeat,
108 "te" => BitfinexPayload::Trade(extract_next(&mut seq, "BitfinexTrade")?),
109 other => {
110 return Err(serde::de::Error::unknown_variant(
111 other,
112 &["heartbeat (hb)", "trade (te | tu)"],
113 ));
114 }
115 };
116
117 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
120 Ok(BitfinexMessage {
121 channel_id,
122 payload,
123 })
124 }
125 }
126
127 deserializer.deserialize_seq(SeqVisitor)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use barter_instrument::Side;
136 use barter_integration::{de::datetime_utc_from_epoch_duration, error::SocketError};
137 use std::time::Duration;
138
139 #[test]
140 fn test_de_bitfinex_message() {
141 struct TestCase {
142 input: &'static str,
143 expected: Result<BitfinexMessage, SocketError>,
144 }
145
146 let cases = vec![
151 TestCase {
153 input: r#"[420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
154 expected: Ok(BitfinexMessage {
155 channel_id: 420191,
156 payload: BitfinexPayload::Trade(BitfinexTrade {
157 id: 1225484398,
158 time: datetime_utc_from_epoch_duration(Duration::from_millis(
159 1665452200022,
160 )),
161 side: Side::Sell,
162 price: 19027.02807752,
163 amount: 0.08980641,
164 }),
165 }),
166 },
167 TestCase {
169 input: r#"[420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]"#,
170 expected: Ok(BitfinexMessage {
171 channel_id: 420191,
172 payload: BitfinexPayload::Trade(BitfinexTrade {
173 id: 1225484398,
174 time: datetime_utc_from_epoch_duration(Duration::from_millis(
175 1665452200022,
176 )),
177 side: Side::Buy,
178 price: 19027.02807752,
179 amount: 0.08980641,
180 }),
181 }),
182 },
183 TestCase {
185 input: r#"[420191,"tu",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
186 expected: Ok(BitfinexMessage {
187 channel_id: 420191,
188 payload: BitfinexPayload::Heartbeat,
189 }),
190 },
191 TestCase {
193 input: r#"[420191,"hb"]"#,
194 expected: Ok(BitfinexMessage {
195 channel_id: 420191,
196 payload: BitfinexPayload::Heartbeat,
197 }),
198 },
199 ];
200
201 for (index, test) in cases.into_iter().enumerate() {
202 let actual = serde_json::from_str::<BitfinexMessage>(test.input);
203 match (actual, test.expected) {
204 (Ok(actual), Ok(expected)) => {
205 assert_eq!(actual, expected, "TC{} failed", index)
206 }
207 (Err(_), Err(_)) => {
208 }
210 (actual, expected) => {
211 panic!(
213 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
214 );
215 }
216 }
217 }
218 }
219}