Skip to main content

binance/spot/ws/
outgoing_message.rs

1use serde::{Deserialize, Deserializer, Serialize, Serializer};
2
3use crate::{serde::deserialize_json, spot::KlineInterval};
4
5/// The MessageID is used as an identifier to uniquely identify the messages going back and forth. The following formats are accepted:
6///     64-bit signed integer
7///     alphanumeric strings; max length 36
8#[derive(PartialEq, Deserialize, Serialize, Debug)]
9#[serde(untagged)]
10pub enum MessageID {
11    Str(String),
12    Int(i64),
13}
14
15impl From<String> for MessageID {
16    fn from(s: String) -> Self {
17        MessageID::Str(s)
18    }
19}
20
21impl From<&str> for MessageID {
22    fn from(s: &str) -> Self {
23        MessageID::Str(s.to_string())
24    }
25}
26
27impl From<i64> for MessageID {
28    fn from(n: i64) -> Self {
29        MessageID::Int(n)
30    }
31}
32
33#[derive(Debug, Clone, PartialEq)]
34pub enum StreamName {
35    /// "<symbol>@aggTrade"
36    AggTrade { symbol: String },
37    /// "<symbol>@trade"
38    Trade { symbol: String },
39    /// "<symbol>@depth"
40    Depth { symbol: String },
41    /// "<symbol>@kline_<interval>"
42    Kline {
43        symbol: String,
44        interval: KlineInterval,
45    },
46    /// "<symbol>@24hrMiniTicker"
47    MiniTicker24 { symbol: String },
48    /// "serverShutdown"
49    ServerShutdownRaw,
50    /// "!serverShutdown"
51    ServerShutdownCombined,
52}
53
54impl Serialize for StreamName {
55    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
56    where
57        S: Serializer,
58    {
59        let s = match self {
60            Self::AggTrade { symbol } => format!("{symbol}@aggTrade"),
61            Self::Trade { symbol } => format!("{symbol}@trade"),
62            Self::Depth { symbol } => format!("{symbol}@depth"),
63            Self::Kline { symbol, interval } => format!("{symbol}@kline_{interval}"),
64            Self::MiniTicker24 { symbol } => format!("{symbol}@24hrMiniTicker"),
65            Self::ServerShutdownRaw => String::from("serverShutdown"),
66            Self::ServerShutdownCombined => String::from("!serverShutdown"),
67        };
68        serializer.serialize_str(&s)
69    }
70}
71
72impl<'de> Deserialize<'de> for StreamName {
73    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
74    where
75        D: Deserializer<'de>,
76    {
77        let s: &str = Deserialize::deserialize(deserializer)?;
78        if let Some((symbol, kind)) = s.split_once('@') {
79            match kind {
80                "aggTrade" => Ok(Self::AggTrade {
81                    symbol: symbol.to_owned(),
82                }),
83                "trade" => Ok(Self::Trade {
84                    symbol: symbol.to_owned(),
85                }),
86                "depth" => Ok(Self::Depth {
87                    symbol: symbol.to_owned(),
88                }),
89                "24hrMiniTicker" => Ok(Self::MiniTicker24 {
90                    symbol: symbol.to_owned(),
91                }),
92                kind => {
93                    if let Some((kind, params)) = kind.split_once('_') {
94                        match kind {
95                            "kline" => {
96                                let interval = format!("\"{params}\"");
97                                let interval = match deserialize_json(&interval) {
98                                    Ok(interval) => interval,
99                                    Err(_) => {
100                                        return Err(serde::de::Error::custom(
101                                            "invalid stream format",
102                                        ));
103                                    }
104                                };
105                                Ok(Self::Kline {
106                                    symbol: symbol.to_owned(),
107                                    interval,
108                                })
109                            }
110                            _ => Err(serde::de::Error::custom(format!(
111                                "unknown stream type: {kind}"
112                            ))),
113                        }
114                    } else {
115                        Err(serde::de::Error::custom("invalid stream format"))
116                    }
117                }
118            }
119        } else {
120            match s {
121                "serverShutdown" => Ok(Self::ServerShutdownRaw),
122                "!serverShutdown" => Ok(Self::ServerShutdownCombined),
123                _ => Err(serde::de::Error::custom("invalid stream format")),
124            }
125        }
126    }
127}
128
129#[derive(Serialize, Debug)]
130#[serde(tag = "method")]
131pub enum OutgoingMessage {
132    Empty,
133    #[serde(rename = "SUBSCRIBE")]
134    Subscribe {
135        id: Option<MessageID>,
136        params: Vec<StreamName>,
137    },
138    #[serde(rename = "UNSUBSCRIBE")]
139    Unsubscribe {
140        id: Option<MessageID>,
141        params: Vec<StreamName>,
142    },
143    #[serde(rename = "LIST_SUBSCRIPTIONS")]
144    ListSubscriptions {
145        id: Option<MessageID>,
146    },
147    #[serde(rename = "SET_PROPERTY")]
148    SetProperty {
149        id: Option<MessageID>,
150        params: (String, bool), // ("combined", true | false)
151    },
152    #[serde(rename = "GET_PROPERTY")]
153    GetProperty {
154        id: Option<MessageID>,
155        params: String, // "combined"
156    },
157}
158
159#[cfg(test)]
160mod tests {
161    use crate::serde::{deserialize_json, serialize_json};
162
163    use super::*;
164
165    #[test]
166    fn test_message_id_serializes_as_bare_value() {
167        assert_eq!(
168            serialize_json(&MessageID::Str("req-0001".into())).unwrap(),
169            r#""req-0001""#,
170        );
171        assert_eq!(serialize_json(&MessageID::Int(42)).unwrap(), r#"42"#);
172        assert_eq!(
173            deserialize_json::<MessageID>(r#""req-0001""#).unwrap(),
174            MessageID::Str("req-0001".into()),
175        );
176        assert_eq!(
177            deserialize_json::<MessageID>(r#"42"#).unwrap(),
178            MessageID::Int(42),
179        );
180    }
181
182    #[test]
183    fn test_serialize_stream_name() {
184        let cases = vec![
185            (
186                StreamName::AggTrade {
187                    symbol: String::from("btcusdt"),
188                },
189                r#""btcusdt@aggTrade""#,
190            ),
191            (
192                StreamName::Trade {
193                    symbol: String::from("btcusdt"),
194                },
195                r#""btcusdt@trade""#,
196            ),
197            (
198                StreamName::Depth {
199                    symbol: String::from("btcusdt"),
200                },
201                r#""btcusdt@depth""#,
202            ),
203            (
204                StreamName::Kline {
205                    symbol: String::from("btcusdt"),
206                    interval: KlineInterval::Minute1,
207                },
208                r#""btcusdt@kline_1m""#,
209            ),
210            (
211                StreamName::MiniTicker24 {
212                    symbol: String::from("btcusdt"),
213                },
214                r#""btcusdt@24hrMiniTicker""#,
215            ),
216            (StreamName::ServerShutdownRaw, r#""serverShutdown""#),
217            (StreamName::ServerShutdownCombined, r#""!serverShutdown""#),
218        ];
219
220        cases.into_iter().for_each(|(stream, expected)| {
221            let serialized = serialize_json(&stream).unwrap();
222            assert_eq!(expected, serialized);
223        });
224    }
225
226    #[test]
227    fn test_deserialize_stream_name() {
228        let cases = vec![
229            (
230                r#""btcusdt@aggTrade""#,
231                StreamName::AggTrade {
232                    symbol: String::from("btcusdt"),
233                },
234            ),
235            (
236                r#""btcusdt@trade""#,
237                StreamName::Trade {
238                    symbol: String::from("btcusdt"),
239                },
240            ),
241            (
242                r#""btcusdt@depth""#,
243                StreamName::Depth {
244                    symbol: String::from("btcusdt"),
245                },
246            ),
247            (
248                r#""btcusdt@kline_1m""#,
249                StreamName::Kline {
250                    symbol: String::from("btcusdt"),
251                    interval: KlineInterval::Minute1,
252                },
253            ),
254            (
255                r#""btcusdt@24hrMiniTicker""#,
256                StreamName::MiniTicker24 {
257                    symbol: String::from("btcusdt"),
258                },
259            ),
260            (r#""serverShutdown""#, StreamName::ServerShutdownRaw),
261            (r#""!serverShutdown""#, StreamName::ServerShutdownCombined),
262        ];
263
264        cases.into_iter().for_each(|(serialized, expected)| {
265            let stream = deserialize_json(serialized).unwrap();
266            assert_eq!(expected, stream);
267        });
268    }
269}