bitbank_api/stream/
mod.rs1use super::*;
2use std::{cell::RefCell, rc::Rc};
3use tokio_tungstenite::tungstenite::Message;
4
5pub mod depth_diff;
7pub mod depth_whole;
9pub mod ticker;
11pub mod transactions;
13
14enum ParsedMessage<R: serde::de::DeserializeOwned> {
15 Parsed(R),
16 Ping,
17}
18
19fn parse_message<R: serde::de::DeserializeOwned>(msg: Message) -> Option<ParsedMessage<R>> {
20 use std::str::FromStr;
21
22 let txt = msg.into_text().ok()?;
23
24 let boundary = txt
26 .char_indices()
27 .find(|(_, c)| !c.is_ascii_digit())
28 .map(|(idx, _)| idx)
29 .unwrap_or(txt.len());
30
31 if boundary == 0 {
33 return None;
34 }
35
36 match u8::from_str(&txt[0..boundary]).ok()? {
37 42 => {}
38 2 => return Some(ParsedMessage::Ping),
39 _ => return None,
40 }
41
42 let mut v = serde_json::from_str::<serde_json::Value>(&txt[boundary..]).ok()?;
43 let mut body = {
44 let tmp = v.as_array_mut()?;
45 tmp.remove(1)
46 };
47 let mut message = {
48 let tmp = body.as_object_mut()?;
49 tmp.remove("message")?
50 };
51 let data = {
52 let tmp = message.as_object_mut()?;
53 tmp.remove("data")?
54 };
55 serde_json::from_value::<R>(data)
56 .ok()
57 .map(ParsedMessage::Parsed)
58}
59
60async fn do_connect<R: serde::de::DeserializeOwned>(
61 room_id: &str,
62) -> anyhow::Result<impl tokio_stream::Stream<Item = R>> {
63 use futures_util::{SinkExt, StreamExt};
64
65 let url = "wss://stream.bitbank.cc/socket.io/?EIO=4&transport=websocket";
66 let (ws_stream, _) = tokio_tungstenite::connect_async(url).await?;
67 let (mut writer, reader) = ws_stream.split();
68
69 writer.send(Message::text("40")).await?;
71
72 let msg = format!("42[\"join-room\", \"{room_id}\"]");
73 writer.send(Message::Text(msg)).await?;
74
75 let rc_writer = Rc::new(RefCell::new(writer));
76 let st = reader.filter_map(move |msg| {
77 let fut_writer = Rc::clone(&rc_writer);
78 async move {
79 let msg = msg.ok()?;
80
81 use ParsedMessage::*;
82 match parse_message::<R>(msg)? {
83 Parsed(v) => Some(v),
84 Ping => {
85 fut_writer
87 .borrow_mut()
88 .send(Message::text("3"))
89 .await
90 .ok()?;
91 None
92 }
93 }
94 }
95 });
96 Ok(st)
97}