bpx_api_client/ws/
mod.rs

1use crate::error::Result;
2use base64::Engine;
3use base64::engine::general_purpose::STANDARD;
4use ed25519_dalek::Signer;
5use futures_util::{SinkExt, StreamExt};
6use serde::de::DeserializeOwned;
7use serde_json::{Value, json};
8use tokio::sync::mpsc::Sender;
9use tokio_tungstenite::tungstenite::protocol::Message;
10use tokio_tungstenite::{connect_async, tungstenite::Utf8Bytes};
11
12use crate::{BpxClient, DEFAULT_WINDOW, Error, now_millis};
13
14impl BpxClient {
15    /// Subscribes to a private WebSocket stream and sends messages of type `T` through a transmitter channel.
16    pub async fn subscribe<T>(&self, stream: &str, tx: Sender<T>) -> Result<()>
17    where
18        T: DeserializeOwned + Send + 'static,
19    {
20        self.internal_subscribe(&[stream], tx).await
21    }
22
23    /// Subscribes to multiple private WebSocket streams and sends messages of type `T` through a transmitter channel.
24    pub async fn subscribe_multiple<T>(&self, streams: &[&str], tx: Sender<T>) -> Result<()>
25    where
26        T: DeserializeOwned + Send + 'static,
27    {
28        self.internal_subscribe(streams, tx).await
29    }
30
31    async fn internal_subscribe<T>(&self, streams: &[&str], tx: Sender<T>) -> Result<()>
32    where
33        T: DeserializeOwned + Send + 'static,
34    {
35        let timestamp = now_millis();
36        let window = DEFAULT_WINDOW;
37
38        let is_private = streams.iter().any(|s| is_private_stream(s));
39        let subscribe_message = if is_private {
40            let signing_key = self.signing_key.as_ref().ok_or(Error::NotAuthenticated)?;
41
42            let message = format!("instruction=subscribe&timestamp={timestamp}&window={window}");
43
44            let verifying_key = STANDARD.encode(signing_key.verifying_key().to_bytes());
45            let signature = STANDARD.encode(signing_key.sign(message.as_bytes()).to_bytes());
46
47            json!({
48                "method": "SUBSCRIBE",
49                "params": streams,
50                "signature": [verifying_key, signature, timestamp.to_string(), window.to_string()],
51            })
52        } else {
53            json!({
54                "method": "SUBSCRIBE",
55                "params": streams
56            })
57        };
58
59        let ws_url = self.ws_url.as_str();
60        let (mut ws_stream, _) = connect_async(ws_url)
61            .await
62            .expect("Error connecting to WebSocket");
63        ws_stream
64            .send(Message::Text(Utf8Bytes::from(
65                subscribe_message.to_string(),
66            )))
67            .await
68            .expect("Error subscribing to WebSocket");
69
70        tracing::debug!("Subscribed to {streams:#?} streams...");
71
72        while let Some(message) = ws_stream.next().await {
73            match message {
74                Ok(msg) => match msg {
75                    Message::Text(text) => {
76                        if let Ok(value) = serde_json::from_str::<Value>(&text) {
77                            if let Some(payload) = value.get("data") {
78                                match T::deserialize(payload) {
79                                    Ok(data) => {
80                                        if tx.send(data).await.is_err() {
81                                            tracing::warn!("Channel is closed");
82                                            break;
83                                        }
84                                    }
85                                    Err(err) => {
86                                        tracing::error!("Could not deserialize ws payload: {err}");
87                                    }
88                                }
89                            } else if let Some(payload) = value.get("error") {
90                                tracing::error!(?payload, "Websocket Error Response");
91                            }
92                        }
93                    }
94                    Message::Close(_) => break,
95                    _ => {}
96                },
97                Err(error) => tracing::error!(%error, "WebSocket error"),
98            }
99        }
100        Ok(())
101    }
102}
103
104fn is_private_stream(stream: &str) -> bool {
105    stream.starts_with("account.")
106}