1use crate::{serde::deserialize_json, spot::KlineInterval};
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3use std::fmt;
4
5#[derive(PartialEq, Deserialize, Serialize, Debug)]
9#[serde(untagged)]
10pub enum MessageID {
11 Str(String),
12 Int(i64),
13}
14
15impl From<String> for MessageID {
16 fn from(s: String) -> Self {
17 MessageID::Str(s)
18 }
19}
20
21impl From<&str> for MessageID {
22 fn from(s: &str) -> Self {
23 MessageID::Str(s.to_string())
24 }
25}
26
27impl From<i64> for MessageID {
28 fn from(n: i64) -> Self {
29 MessageID::Int(n)
30 }
31}
32
33#[derive(Debug, Clone, PartialEq)]
34pub enum StreamName {
35 AggTrade { symbol: String },
37 Trade { symbol: String },
39 PartialBookDepth {
42 symbol: String,
43 levels: PartialBookDepthLevels,
44 update_speed: Option<PartialBookDepthUpdateSpeed>,
45 },
46 DiffDepth {
49 symbol: String,
50 update_speed: Option<DiffDepthUpdateSpeed>,
51 },
52 Kline {
54 symbol: String,
55 interval: KlineInterval,
56 },
57 MiniTicker24 { symbol: String },
59 ServerShutdownRaw,
61 ServerShutdownCombined,
63}
64
65impl Serialize for StreamName {
66 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
67 where
68 S: Serializer,
69 {
70 let s = match self {
71 Self::AggTrade { symbol } => format!("{symbol}@aggTrade"),
72 Self::Trade { symbol } => format!("{symbol}@trade"),
73 Self::PartialBookDepth {
74 symbol,
75 levels,
76 update_speed,
77 } => update_speed.as_ref().map_or_else(
78 || format!("{symbol}@depth{levels}"),
79 |speed| format!("{symbol}@depth{levels}@{speed}"),
80 ),
81 Self::DiffDepth {
82 symbol,
83 update_speed,
84 } => update_speed.as_ref().map_or_else(
85 || format!("{symbol}@depth"),
86 |speed| format!("{symbol}@depth@{speed}"),
87 ),
88 Self::Kline { symbol, interval } => format!("{symbol}@kline_{interval}"),
89 Self::MiniTicker24 { symbol } => format!("{symbol}@24hrMiniTicker"),
90 Self::ServerShutdownRaw => String::from("serverShutdown"),
91 Self::ServerShutdownCombined => String::from("!serverShutdown"),
92 };
93 serializer.serialize_str(&s)
94 }
95}
96
97impl<'de> Deserialize<'de> for StreamName {
98 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
99 where
100 D: Deserializer<'de>,
101 {
102 let s: &str = Deserialize::deserialize(deserializer)?;
103 if let Some((symbol, kind)) = s.split_once('@') {
104 match kind {
105 "aggTrade" => Ok(Self::AggTrade {
106 symbol: symbol.to_owned(),
107 }),
108 "trade" => Ok(Self::Trade {
109 symbol: symbol.to_owned(),
110 }),
111 "24hrMiniTicker" => Ok(Self::MiniTicker24 {
112 symbol: symbol.to_owned(),
113 }),
114 kind => {
115 if let Some(params) = kind.strip_prefix("depth") {
116 let symbol = symbol.to_string();
117 return if let Some((levels, update_speed)) = params.split_once('@') {
118 if !levels.is_empty() {
119 let levels = format!("\"{levels}\"");
121 let levels = deserialize_json(&levels).map_err(|_| {
122 serde::de::Error::custom("invalid stream format")
123 })?;
124
125 let update_speed = format!("\"{update_speed}\"");
126 let update_speed =
127 deserialize_json(&update_speed).map_err(|_| {
128 serde::de::Error::custom("invalid stream format")
129 })?;
130 let stream = Self::PartialBookDepth {
131 symbol,
132 levels,
133 update_speed: Some(update_speed),
134 };
135 Ok(stream)
136 } else {
137 let update_speed = format!("\"{update_speed}\"");
139 let update_speed =
140 deserialize_json(&update_speed).map_err(|_| {
141 serde::de::Error::custom("invalid stream format")
142 })?;
143 let stream = Self::DiffDepth {
144 symbol,
145 update_speed: Some(update_speed),
146 };
147 Ok(stream)
148 }
149 } else {
150 let levels = params;
151 if !levels.is_empty() {
152 let levels = format!("\"{levels}\"");
154 let levels = deserialize_json(&levels).map_err(|_| {
155 serde::de::Error::custom("invalid stream format")
156 })?;
157 let stream = Self::PartialBookDepth {
158 symbol,
159 levels,
160 update_speed: None,
161 };
162 Ok(stream)
163 } else {
164 let stream = Self::DiffDepth {
166 symbol,
167 update_speed: None,
168 };
169 Ok(stream)
170 }
171 };
172 }
173
174 if let Some((kind, params)) = kind.split_once('_') {
175 match kind {
176 "kline" => {
177 let interval = format!("\"{params}\"");
178 let interval = match deserialize_json(&interval) {
179 Ok(interval) => interval,
180 Err(_) => {
181 return Err(serde::de::Error::custom(
182 "invalid stream format",
183 ));
184 }
185 };
186 Ok(Self::Kline {
187 symbol: symbol.to_owned(),
188 interval,
189 })
190 }
191 _ => Err(serde::de::Error::custom(format!(
192 "unknown stream type: {kind}"
193 ))),
194 }
195 } else {
196 Err(serde::de::Error::custom("invalid stream format"))
197 }
198 }
199 }
200 } else {
201 match s {
202 "serverShutdown" => Ok(Self::ServerShutdownRaw),
203 "!serverShutdown" => Ok(Self::ServerShutdownCombined),
204 _ => Err(serde::de::Error::custom("invalid stream format")),
205 }
206 }
207 }
208}
209
210#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
212pub enum PartialBookDepthLevels {
213 #[serde(rename = "5")]
214 Level5,
215 #[serde(rename = "10")]
216 Level10,
217 #[serde(rename = "20")]
218 Level20,
219}
220
221impl fmt::Display for PartialBookDepthLevels {
222 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
223 let value = match self {
224 Self::Level5 => "5",
225 Self::Level10 => "10",
226 Self::Level20 => "20",
227 };
228 write!(f, "{value}")
229 }
230}
231
232#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
234pub enum PartialBookDepthUpdateSpeed {
235 #[serde(rename = "1000ms")]
236 Speed1000ms,
237 #[serde(rename = "100ms")]
238 Speed100ms,
239}
240
241impl fmt::Display for PartialBookDepthUpdateSpeed {
242 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
243 let value = match self {
244 Self::Speed1000ms => "1000ms",
245 Self::Speed100ms => "100ms",
246 };
247 write!(f, "{value}")
248 }
249}
250
251#[derive(Debug, Deserialize, Serialize, Clone, PartialEq)]
253pub enum DiffDepthUpdateSpeed {
254 #[serde(rename = "1000ms")]
255 Speed1000ms,
256 #[serde(rename = "100ms")]
257 Speed100ms,
258}
259
260impl fmt::Display for DiffDepthUpdateSpeed {
261 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
262 let value = match self {
263 Self::Speed1000ms => "1000ms",
264 Self::Speed100ms => "100ms",
265 };
266 write!(f, "{value}")
267 }
268}
269
270#[derive(Serialize, Debug)]
271#[serde(tag = "method")]
272pub enum OutgoingMessage {
273 Empty,
274 #[serde(rename = "SUBSCRIBE")]
275 Subscribe {
276 id: Option<MessageID>,
277 params: Vec<StreamName>,
278 },
279 #[serde(rename = "UNSUBSCRIBE")]
280 Unsubscribe {
281 id: Option<MessageID>,
282 params: Vec<StreamName>,
283 },
284 #[serde(rename = "LIST_SUBSCRIPTIONS")]
285 ListSubscriptions {
286 id: Option<MessageID>,
287 },
288 #[serde(rename = "SET_PROPERTY")]
289 SetProperty {
290 id: Option<MessageID>,
291 params: (String, bool), },
293 #[serde(rename = "GET_PROPERTY")]
294 GetProperty {
295 id: Option<MessageID>,
296 params: String, },
298}
299
300#[cfg(test)]
301mod tests {
302 use crate::serde::{deserialize_json, serialize_json};
303
304 use super::*;
305
306 #[test]
307 fn test_message_id_serializes_as_bare_value() {
308 assert_eq!(
309 serialize_json(&MessageID::Str("req-0001".into())).unwrap(),
310 r#""req-0001""#,
311 );
312 assert_eq!(serialize_json(&MessageID::Int(42)).unwrap(), r#"42"#);
313 assert_eq!(
314 deserialize_json::<MessageID>(r#""req-0001""#).unwrap(),
315 MessageID::Str("req-0001".into()),
316 );
317 assert_eq!(
318 deserialize_json::<MessageID>(r#"42"#).unwrap(),
319 MessageID::Int(42),
320 );
321 }
322
323 #[test]
324 fn test_serialize_stream_name() {
325 let cases = vec![
326 (
327 StreamName::AggTrade {
328 symbol: String::from("btcusdt"),
329 },
330 r#""btcusdt@aggTrade""#,
331 ),
332 (
333 StreamName::Trade {
334 symbol: String::from("btcusdt"),
335 },
336 r#""btcusdt@trade""#,
337 ),
338 (
339 StreamName::Kline {
340 symbol: String::from("btcusdt"),
341 interval: KlineInterval::Minute1,
342 },
343 r#""btcusdt@kline_1m""#,
344 ),
345 (
346 StreamName::MiniTicker24 {
347 symbol: String::from("btcusdt"),
348 },
349 r#""btcusdt@24hrMiniTicker""#,
350 ),
351 (
352 StreamName::PartialBookDepth {
353 symbol: String::from("btcusdt"),
354 levels: PartialBookDepthLevels::Level20,
355 update_speed: None,
356 },
357 r#""btcusdt@depth20""#,
358 ),
359 (
360 StreamName::PartialBookDepth {
361 symbol: String::from("btcusdt"),
362 levels: PartialBookDepthLevels::Level5,
363 update_speed: Some(PartialBookDepthUpdateSpeed::Speed100ms),
364 },
365 r#""btcusdt@depth5@100ms""#,
366 ),
367 (
368 StreamName::DiffDepth {
369 symbol: String::from("btcusdt"),
370 update_speed: None,
371 },
372 r#""btcusdt@depth""#,
373 ),
374 (
375 StreamName::DiffDepth {
376 symbol: String::from("btcusdt"),
377 update_speed: Some(DiffDepthUpdateSpeed::Speed100ms),
378 },
379 r#""btcusdt@depth@100ms""#,
380 ),
381 (StreamName::ServerShutdownRaw, r#""serverShutdown""#),
382 (StreamName::ServerShutdownCombined, r#""!serverShutdown""#),
383 ];
384
385 cases.into_iter().for_each(|(stream, expected)| {
386 let serialized = serialize_json(&stream).unwrap();
387 assert_eq!(expected, serialized);
388 });
389 }
390
391 #[test]
392 fn test_deserialize_stream_name() {
393 let cases = vec![
394 (
395 r#""btcusdt@aggTrade""#,
396 StreamName::AggTrade {
397 symbol: String::from("btcusdt"),
398 },
399 ),
400 (
401 r#""btcusdt@trade""#,
402 StreamName::Trade {
403 symbol: String::from("btcusdt"),
404 },
405 ),
406 (
407 r#""btcusdt@kline_1m""#,
408 StreamName::Kline {
409 symbol: String::from("btcusdt"),
410 interval: KlineInterval::Minute1,
411 },
412 ),
413 (
414 r#""btcusdt@24hrMiniTicker""#,
415 StreamName::MiniTicker24 {
416 symbol: String::from("btcusdt"),
417 },
418 ),
419 (
420 r#""btcusdt@depth20""#,
421 StreamName::PartialBookDepth {
422 symbol: String::from("btcusdt"),
423 levels: PartialBookDepthLevels::Level20,
424 update_speed: None,
425 },
426 ),
427 (
428 r#""btcusdt@depth5@100ms""#,
429 StreamName::PartialBookDepth {
430 symbol: String::from("btcusdt"),
431 levels: PartialBookDepthLevels::Level5,
432 update_speed: Some(PartialBookDepthUpdateSpeed::Speed100ms),
433 },
434 ),
435 (
436 r#""btcusdt@depth""#,
437 StreamName::DiffDepth {
438 symbol: String::from("btcusdt"),
439 update_speed: None,
440 },
441 ),
442 (
443 r#""btcusdt@depth@100ms""#,
444 StreamName::DiffDepth {
445 symbol: String::from("btcusdt"),
446 update_speed: Some(DiffDepthUpdateSpeed::Speed100ms),
447 },
448 ),
449 (r#""serverShutdown""#, StreamName::ServerShutdownRaw),
450 (r#""!serverShutdown""#, StreamName::ServerShutdownCombined),
451 ];
452
453 cases.into_iter().for_each(|(serialized, expected)| {
454 let stream = deserialize_json(serialized).unwrap();
455 assert_eq!(expected, stream);
456 });
457 }
458}