rustrade_data/exchange/bitfinex/
message.rs1use super::trade::BitfinexTrade;
2use crate::{Identifier, event::MarketIter, subscription::trade::PublicTrade};
3use rustrade_instrument::exchange::ExchangeId;
4use rustrade_integration::{serde::de::extract_next, subscription::SubscriptionId};
5use serde::Serialize;
6
7#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
33pub struct BitfinexMessage {
34 pub channel_id: u32,
35 pub payload: BitfinexPayload,
36}
37
38#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)]
45pub enum BitfinexPayload {
46 Heartbeat,
47 Trade(BitfinexTrade),
48}
49
50impl Identifier<Option<SubscriptionId>> for BitfinexMessage {
51 fn id(&self) -> Option<SubscriptionId> {
52 match self.payload {
53 BitfinexPayload::Heartbeat => None,
54 BitfinexPayload::Trade(_) => Some(SubscriptionId::from(self.channel_id.to_string())),
55 }
56 }
57}
58
59impl<InstrumentKey> From<(ExchangeId, InstrumentKey, BitfinexMessage)>
60 for MarketIter<InstrumentKey, PublicTrade>
61{
62 fn from(
63 (exchange_id, instrument, message): (ExchangeId, InstrumentKey, BitfinexMessage),
64 ) -> Self {
65 match message.payload {
66 BitfinexPayload::Heartbeat => Self(vec![]),
67 BitfinexPayload::Trade(trade) => Self::from((exchange_id, instrument, trade)),
68 }
69 }
70}
71
72impl<'de> serde::Deserialize<'de> for BitfinexMessage {
73 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74 where
75 D: serde::de::Deserializer<'de>,
76 {
77 struct SeqVisitor;
78
79 impl<'de> serde::de::Visitor<'de> for SeqVisitor {
80 type Value = BitfinexMessage;
81
82 fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83 formatter.write_str("BitfinexMessage struct from the Bitfinex WebSocket API")
84 }
85
86 fn visit_seq<SeqAccessor>(
87 self,
88 mut seq: SeqAccessor,
89 ) -> Result<Self::Value, SeqAccessor::Error>
90 where
91 SeqAccessor: serde::de::SeqAccess<'de>,
92 {
93 let channel_id: u32 = extract_next(&mut seq, "channel_id")?;
99
100 let message_tag: String = extract_next(&mut seq, "message_tag")?;
102
103 let payload = match message_tag.as_str() {
105 "hb" | "tu" => BitfinexPayload::Heartbeat,
108 "te" => BitfinexPayload::Trade(extract_next(&mut seq, "BitfinexTrade")?),
109 other => {
110 return Err(serde::de::Error::unknown_variant(
111 other,
112 &["heartbeat (hb)", "trade (te | tu)"],
113 ));
114 }
115 };
116
117 while seq.next_element::<serde::de::IgnoredAny>()?.is_some() {}
120 Ok(BitfinexMessage {
121 channel_id,
122 payload,
123 })
124 }
125 }
126
127 deserializer.deserialize_seq(SeqVisitor)
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use rust_decimal_macros::dec;
136 use rustrade_instrument::Side;
137 use rustrade_integration::{error::SocketError, serde::de::datetime_utc_from_epoch_duration};
138 use std::time::Duration;
139
140 #[test]
141 fn test_de_bitfinex_message() {
142 struct TestCase {
143 input: &'static str,
144 expected: Result<BitfinexMessage, SocketError>,
145 }
146
147 let cases = vec![
152 TestCase {
154 input: r#"[420191,"te",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
155 expected: Ok(BitfinexMessage {
156 channel_id: 420191,
157 payload: BitfinexPayload::Trade(BitfinexTrade {
158 id: 1225484398,
159 time: datetime_utc_from_epoch_duration(Duration::from_millis(
160 1665452200022,
161 )),
162 side: Side::Sell,
163 price: dec!(19027.02807752),
164 amount: dec!(0.08980641),
165 }),
166 }),
167 },
168 TestCase {
170 input: r#"[420191,"te",[1225484398,1665452200022,0.08980641,19027.02807752]]"#,
171 expected: Ok(BitfinexMessage {
172 channel_id: 420191,
173 payload: BitfinexPayload::Trade(BitfinexTrade {
174 id: 1225484398,
175 time: datetime_utc_from_epoch_duration(Duration::from_millis(
176 1665452200022,
177 )),
178 side: Side::Buy,
179 price: dec!(19027.02807752),
180 amount: dec!(0.08980641),
181 }),
182 }),
183 },
184 TestCase {
186 input: r#"[420191,"tu",[1225484398,1665452200022,-0.08980641,19027.02807752]]"#,
187 expected: Ok(BitfinexMessage {
188 channel_id: 420191,
189 payload: BitfinexPayload::Heartbeat,
190 }),
191 },
192 TestCase {
194 input: r#"[420191,"hb"]"#,
195 expected: Ok(BitfinexMessage {
196 channel_id: 420191,
197 payload: BitfinexPayload::Heartbeat,
198 }),
199 },
200 ];
201
202 for (index, test) in cases.into_iter().enumerate() {
203 let actual = serde_json::from_str::<BitfinexMessage>(test.input);
204 match (actual, test.expected) {
205 (Ok(actual), Ok(expected)) => {
206 assert_eq!(actual, expected, "TC{} failed", index)
207 }
208 (Err(_), Err(_)) => {
209 }
211 (actual, expected) => {
212 panic!(
214 "TC{index} failed because actual != expected. \nActual: {actual:?}\nExpected: {expected:?}\n"
215 );
216 }
217 }
218 }
219 }
220}