Skip to main content

binance/spot/ws/
outgoing_message.rs

1use crate::{serde::deserialize_json, spot::KlineInterval};
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::fmt;
4
5/// The MessageID is used as an identifier to uniquely identify the messages going back and forth. The following formats are accepted:
6///     64-bit signed integer
7///     alphanumeric strings; max length 36
8#[derive(PartialEq, Deserialize, Serialize, Debug)]
9#[serde(untagged)]
10pub enum MessageID {
11    Str(String),
12    Int(i64),
13}
14
15impl From<String> for MessageID {
16    fn from(s: String) -> Self {
17        MessageID::Str(s)
18    }
19}
20
21impl From<&str> for MessageID {
22    fn from(s: &str) -> Self {
23        MessageID::Str(s.to_string())
24    }
25}
26
27impl From<i64> for MessageID {
28    fn from(n: i64) -> Self {
29        MessageID::Int(n)
30    }
31}
32
33#[derive(Debug, Clone, PartialEq)]
34pub enum StreamName {
35    /// "<symbol>@aggTrade"
36    AggTrade { symbol: String },
37    /// "<symbol>@trade"
38    Trade { symbol: String },
39    /// Top <levels> bids and asks, pushed every second. Valid <levels> are 5, 10, or 20.
40    /// Stream Names: <symbol>@depth<levels> OR <symbol>@depth<levels>@100ms
41    PartialBookDepth {
42        symbol: String,
43        levels: PartialBookDepthLevels,
44        update_speed: Option<PartialBookDepthUpdateSpeed>,
45    },
46    /// Order book price and quantity depth updates used to locally manage an order book.
47    /// Stream Names: <symbol>@depth OR <symbol>@depth@100ms
48    DiffDepth {
49        symbol: String,
50        update_speed: Option<DiffDepthUpdateSpeed>,
51    },
52    /// "<symbol>@kline_<interval>"
53    Kline {
54        symbol: String,
55        interval: KlineInterval,
56    },
57    /// "<symbol>@24hrMiniTicker"
58    MiniTicker24 { symbol: String },
59    /// "serverShutdown"
60    ServerShutdownRaw,
61    /// "!serverShutdown"
62    ServerShutdownCombined,
63}
64
65impl Serialize for StreamName {
66    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
67    where
68        S: Serializer,
69    {
70        let s = match self {
71            Self::AggTrade { symbol } => format!("{symbol}@aggTrade"),
72            Self::Trade { symbol } => format!("{symbol}@trade"),
73            Self::PartialBookDepth {
74                symbol,
75                levels,
76                update_speed,
77            } => update_speed.as_ref().map_or_else(
78                || format!("{symbol}@depth{levels}"),
79                |speed| format!("{symbol}@depth{levels}@{speed}"),
80            ),
81            Self::DiffDepth {
82                symbol,
83                update_speed,
84            } => update_speed.as_ref().map_or_else(
85                || format!("{symbol}@depth"),
86                |speed| format!("{symbol}@depth@{speed}"),
87            ),
88            Self::Kline { symbol, interval } => format!("{symbol}@kline_{interval}"),
89            Self::MiniTicker24 { symbol } => format!("{symbol}@24hrMiniTicker"),
90            Self::ServerShutdownRaw => String::from("serverShutdown"),
91            Self::ServerShutdownCombined => String::from("!serverShutdown"),
92        };
93        serializer.serialize_str(&s)
94    }
95}
96
97impl<'de> Deserialize<'de> for StreamName {
98    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99    where
100        D: Deserializer<'de>,
101    {
102        let s: &str = Deserialize::deserialize(deserializer)?;
103        if let Some((symbol, kind)) = s.split_once('@') {
104            match kind {
105                "aggTrade" => Ok(Self::AggTrade {
106                    symbol: symbol.to_owned(),
107                }),
108                "trade" => Ok(Self::Trade {
109                    symbol: symbol.to_owned(),
110                }),
111                "24hrMiniTicker" => Ok(Self::MiniTicker24 {
112                    symbol: symbol.to_owned(),
113                }),
114                kind => {
115                    if let Some(params) = kind.strip_prefix("depth") {
116                        let symbol = symbol.to_string();
117                        return if let Some((levels, update_speed)) = params.split_once('@') {
118                            if !levels.is_empty() {
119                                // PartialBookDepth: <symbol>@depth<levels>@100ms
120                                let levels = format!("\"{levels}\"");
121                                let levels = deserialize_json(&levels).map_err(|_| {
122                                    serde::de::Error::custom("invalid stream format")
123                                })?;
124
125                                let update_speed = format!("\"{update_speed}\"");
126                                let update_speed =
127                                    deserialize_json(&update_speed).map_err(|_| {
128                                        serde::de::Error::custom("invalid stream format")
129                                    })?;
130                                let stream = Self::PartialBookDepth {
131                                    symbol,
132                                    levels,
133                                    update_speed: Some(update_speed),
134                                };
135                                Ok(stream)
136                            } else {
137                                // DiffDepth: <symbol>@depth@100ms
138                                let update_speed = format!("\"{update_speed}\"");
139                                let update_speed =
140                                    deserialize_json(&update_speed).map_err(|_| {
141                                        serde::de::Error::custom("invalid stream format")
142                                    })?;
143                                let stream = Self::DiffDepth {
144                                    symbol,
145                                    update_speed: Some(update_speed),
146                                };
147                                Ok(stream)
148                            }
149                        } else {
150                            let levels = params;
151                            if !levels.is_empty() {
152                                // PartialBookDepth: <symbol>@depth<levels>
153                                let levels = format!("\"{levels}\"");
154                                let levels = deserialize_json(&levels).map_err(|_| {
155                                    serde::de::Error::custom("invalid stream format")
156                                })?;
157                                let stream = Self::PartialBookDepth {
158                                    symbol,
159                                    levels,
160                                    update_speed: None,
161                                };
162                                Ok(stream)
163                            } else {
164                                // DiffDepth: <symbol>@depth
165                                let stream = Self::DiffDepth {
166                                    symbol,
167                                    update_speed: None,
168                                };
169                                Ok(stream)
170                            }
171                        };
172                    }
173
174                    if let Some((kind, params)) = kind.split_once('_') {
175                        match kind {
176                            "kline" => {
177                                let interval = format!("\"{params}\"");
178                                let interval = match deserialize_json(&interval) {
179                                    Ok(interval) => interval,
180                                    Err(_) => {
181                                        return Err(serde::de::Error::custom(
182                                            "invalid stream format",
183                                        ));
184                                    }
185                                };
186                                Ok(Self::Kline {
187                                    symbol: symbol.to_owned(),
188                                    interval,
189                                })
190                            }
191                            _ => Err(serde::de::Error::custom(format!(
192                                "unknown stream type: {kind}"
193                            ))),
194                        }
195                    } else {
196                        Err(serde::de::Error::custom("invalid stream format"))
197                    }
198                }
199            }
200        } else {
201            match s {
202                "serverShutdown" => Ok(Self::ServerShutdownRaw),
203                "!serverShutdown" => Ok(Self::ServerShutdownCombined),
204                _ => Err(serde::de::Error::custom("invalid stream format")),
205            }
206        }
207    }
208}
209
210/// Top <levels> bids and asks, pushed every second. Valid <levels> are 5, 10, or 20.
211#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
212pub enum PartialBookDepthLevels {
213    #[serde(rename = "5")]
214    Level5,
215    #[serde(rename = "10")]
216    Level10,
217    #[serde(rename = "20")]
218    Level20,
219}
220
221impl fmt::Display for PartialBookDepthLevels {
222    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
223        let value = match self {
224            Self::Level5 => "5",
225            Self::Level10 => "10",
226            Self::Level20 => "20",
227        };
228        write!(f, "{value}")
229    }
230}
231
232/// Update Speed: 1000ms or 100ms
233#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
234pub enum PartialBookDepthUpdateSpeed {
235    #[serde(rename = "1000ms")]
236    Speed1000ms,
237    #[serde(rename = "100ms")]
238    Speed100ms,
239}
240
241impl fmt::Display for PartialBookDepthUpdateSpeed {
242    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243        let value = match self {
244            Self::Speed1000ms => "1000ms",
245            Self::Speed100ms => "100ms",
246        };
247        write!(f, "{value}")
248    }
249}
250
251/// Update Speed: 1000ms or 100ms
252#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
253pub enum DiffDepthUpdateSpeed {
254    #[serde(rename = "1000ms")]
255    Speed1000ms,
256    #[serde(rename = "100ms")]
257    Speed100ms,
258}
259
260impl fmt::Display for DiffDepthUpdateSpeed {
261    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262        let value = match self {
263            Self::Speed1000ms => "1000ms",
264            Self::Speed100ms => "100ms",
265        };
266        write!(f, "{value}")
267    }
268}
269
270#[derive(Serialize, Debug)]
271#[serde(tag = "method")]
272pub enum OutgoingMessage {
273    Empty,
274    #[serde(rename = "SUBSCRIBE")]
275    Subscribe {
276        id: Option<MessageID>,
277        params: Vec<StreamName>,
278    },
279    #[serde(rename = "UNSUBSCRIBE")]
280    Unsubscribe {
281        id: Option<MessageID>,
282        params: Vec<StreamName>,
283    },
284    #[serde(rename = "LIST_SUBSCRIPTIONS")]
285    ListSubscriptions {
286        id: Option<MessageID>,
287    },
288    #[serde(rename = "SET_PROPERTY")]
289    SetProperty {
290        id: Option<MessageID>,
291        params: (String, bool), // ("combined", true | false)
292    },
293    #[serde(rename = "GET_PROPERTY")]
294    GetProperty {
295        id: Option<MessageID>,
296        params: String, // "combined"
297    },
298}
299
300#[cfg(test)]
301mod tests {
302    use crate::serde::{deserialize_json, serialize_json};
303
304    use super::*;
305
306    #[test]
307    fn test_message_id_serializes_as_bare_value() {
308        assert_eq!(
309            serialize_json(&MessageID::Str("req-0001".into())).unwrap(),
310            r#""req-0001""#,
311        );
312        assert_eq!(serialize_json(&MessageID::Int(42)).unwrap(), r#"42"#);
313        assert_eq!(
314            deserialize_json::<MessageID>(r#""req-0001""#).unwrap(),
315            MessageID::Str("req-0001".into()),
316        );
317        assert_eq!(
318            deserialize_json::<MessageID>(r#"42"#).unwrap(),
319            MessageID::Int(42),
320        );
321    }
322
323    #[test]
324    fn test_serialize_stream_name() {
325        let cases = vec![
326            (
327                StreamName::AggTrade {
328                    symbol: String::from("btcusdt"),
329                },
330                r#""btcusdt@aggTrade""#,
331            ),
332            (
333                StreamName::Trade {
334                    symbol: String::from("btcusdt"),
335                },
336                r#""btcusdt@trade""#,
337            ),
338            (
339                StreamName::Kline {
340                    symbol: String::from("btcusdt"),
341                    interval: KlineInterval::Minute1,
342                },
343                r#""btcusdt@kline_1m""#,
344            ),
345            (
346                StreamName::MiniTicker24 {
347                    symbol: String::from("btcusdt"),
348                },
349                r#""btcusdt@24hrMiniTicker""#,
350            ),
351            (
352                StreamName::PartialBookDepth {
353                    symbol: String::from("btcusdt"),
354                    levels: PartialBookDepthLevels::Level20,
355                    update_speed: None,
356                },
357                r#""btcusdt@depth20""#,
358            ),
359            (
360                StreamName::PartialBookDepth {
361                    symbol: String::from("btcusdt"),
362                    levels: PartialBookDepthLevels::Level5,
363                    update_speed: Some(PartialBookDepthUpdateSpeed::Speed100ms),
364                },
365                r#""btcusdt@depth5@100ms""#,
366            ),
367            (
368                StreamName::DiffDepth {
369                    symbol: String::from("btcusdt"),
370                    update_speed: None,
371                },
372                r#""btcusdt@depth""#,
373            ),
374            (
375                StreamName::DiffDepth {
376                    symbol: String::from("btcusdt"),
377                    update_speed: Some(DiffDepthUpdateSpeed::Speed100ms),
378                },
379                r#""btcusdt@depth@100ms""#,
380            ),
381            (StreamName::ServerShutdownRaw, r#""serverShutdown""#),
382            (StreamName::ServerShutdownCombined, r#""!serverShutdown""#),
383        ];
384
385        cases.into_iter().for_each(|(stream, expected)| {
386            let serialized = serialize_json(&stream).unwrap();
387            assert_eq!(expected, serialized);
388        });
389    }
390
391    #[test]
392    fn test_deserialize_stream_name() {
393        let cases = vec![
394            (
395                r#""btcusdt@aggTrade""#,
396                StreamName::AggTrade {
397                    symbol: String::from("btcusdt"),
398                },
399            ),
400            (
401                r#""btcusdt@trade""#,
402                StreamName::Trade {
403                    symbol: String::from("btcusdt"),
404                },
405            ),
406            (
407                r#""btcusdt@kline_1m""#,
408                StreamName::Kline {
409                    symbol: String::from("btcusdt"),
410                    interval: KlineInterval::Minute1,
411                },
412            ),
413            (
414                r#""btcusdt@24hrMiniTicker""#,
415                StreamName::MiniTicker24 {
416                    symbol: String::from("btcusdt"),
417                },
418            ),
419            (
420                r#""btcusdt@depth20""#,
421                StreamName::PartialBookDepth {
422                    symbol: String::from("btcusdt"),
423                    levels: PartialBookDepthLevels::Level20,
424                    update_speed: None,
425                },
426            ),
427            (
428                r#""btcusdt@depth5@100ms""#,
429                StreamName::PartialBookDepth {
430                    symbol: String::from("btcusdt"),
431                    levels: PartialBookDepthLevels::Level5,
432                    update_speed: Some(PartialBookDepthUpdateSpeed::Speed100ms),
433                },
434            ),
435            (
436                r#""btcusdt@depth""#,
437                StreamName::DiffDepth {
438                    symbol: String::from("btcusdt"),
439                    update_speed: None,
440                },
441            ),
442            (
443                r#""btcusdt@depth@100ms""#,
444                StreamName::DiffDepth {
445                    symbol: String::from("btcusdt"),
446                    update_speed: Some(DiffDepthUpdateSpeed::Speed100ms),
447                },
448            ),
449            (r#""serverShutdown""#, StreamName::ServerShutdownRaw),
450            (r#""!serverShutdown""#, StreamName::ServerShutdownCombined),
451        ];
452
453        cases.into_iter().for_each(|(serialized, expected)| {
454            let stream = deserialize_json(serialized).unwrap();
455            assert_eq!(expected, stream);
456        });
457    }
458}