bitbank_api/stream/
mod.rs

1use super::*;
2use std::{cell::RefCell, rc::Rc};
3use tokio_tungstenite::tungstenite::Message;
4
5/// Get depth diff in stream.
6pub mod depth_diff;
7/// Get whole depth in stream.
8pub mod depth_whole;
9/// Get tickers in stream.
10pub mod ticker;
11/// Get transactions in stream.
12pub mod transactions;
13
14enum ParsedMessage<R: serde::de::DeserializeOwned> {
15    Parsed(R),
16    Ping,
17}
18
19fn parse_message<R: serde::de::DeserializeOwned>(msg: Message) -> Option<ParsedMessage<R>> {
20    use std::str::FromStr;
21
22    let txt = msg.into_text().ok()?;
23
24    // Find the boundary between number and JSON
25    let boundary = txt
26        .char_indices()
27        .find(|(_, c)| !c.is_ascii_digit())
28        .map(|(idx, _)| idx)
29        .unwrap_or(txt.len());
30
31    // If the index or txt.len() is 0, no number to parse
32    if boundary == 0 {
33        return None;
34    }
35
36    match u8::from_str(&txt[0..boundary]).ok()? {
37        42 => {}
38        2 => return Some(ParsedMessage::Ping),
39        _ => return None,
40    }
41
42    let mut v = serde_json::from_str::<serde_json::Value>(&txt[boundary..]).ok()?;
43    let mut body = {
44        let tmp = v.as_array_mut()?;
45        tmp.remove(1)
46    };
47    let mut message = {
48        let tmp = body.as_object_mut()?;
49        tmp.remove("message")?
50    };
51    let data = {
52        let tmp = message.as_object_mut()?;
53        tmp.remove("data")?
54    };
55    serde_json::from_value::<R>(data)
56        .ok()
57        .map(ParsedMessage::Parsed)
58}
59
60async fn do_connect<R: serde::de::DeserializeOwned>(
61    room_id: &str,
62) -> anyhow::Result<impl tokio_stream::Stream<Item = R>> {
63    use futures_util::{SinkExt, StreamExt};
64
65    let url = "wss://stream.bitbank.cc/socket.io/?EIO=4&transport=websocket";
66    let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
67    let (mut writer, reader) = ws_stream.split();
68
69    // Request connection
70    writer.send(Message::text("40")).await?;
71
72    let msg = format!("42[\"join-room\", \"{room_id}\"]");
73    writer.send(Message::Text(msg)).await?;
74
75    let rc_writer = Rc::new(RefCell::new(writer));
76    let st = reader.filter_map(move |msg| {
77        let fut_writer = Rc::clone(&rc_writer);
78        async move {
79            let msg = msg.ok()?;
80
81            use ParsedMessage::*;
82            match parse_message::<R>(msg)? {
83                Parsed(v) => Some(v),
84                Ping => {
85                    // Send Pong
86                    fut_writer
87                        .borrow_mut()
88                        .send(Message::text("3"))
89                        .await
90                        .ok()?;
91                    None
92                }
93            }
94        }
95    });
96    Ok(st)
97}