bpx_api_client/ws/
mod.rs

1use base64::Engine;
2use base64::engine::general_purpose::STANDARD;
3use ed25519_dalek::Signer;
4use futures_util::{SinkExt, StreamExt};
5use serde::de::DeserializeOwned;
6use serde_json::{Value, json};
7use tokio::sync::mpsc::Sender;
8use tokio_tungstenite::tungstenite::protocol::Message;
9use tokio_tungstenite::{connect_async, tungstenite::Utf8Bytes};
10
11use crate::{BACKPACK_WS_URL, BpxClient, DEFAULT_WINDOW, now_millis};
12
13impl BpxClient {
14    /// Subscribes to a private WebSocket stream and sends messages of type `T` through a transmitter channel.
15    pub async fn subscribe<T>(&self, stream: &str, tx: Sender<T>)
16    where
17        T: DeserializeOwned + Send + 'static,
18    {
19        self.internal_subscribe(&[stream], tx).await
20    }
21
22    /// Subscribes to multiple private WebSocket streams and sends messages of type `T` through a transmitter channel.
23    pub async fn subscribe_multiple<T>(&self, stream: &[&str], tx: Sender<T>)
24    where
25        T: DeserializeOwned + Send + 'static,
26    {
27        self.internal_subscribe(stream, tx).await
28    }
29
30    async fn internal_subscribe<T>(&self, stream: &[&str], tx: Sender<T>)
31    where
32        T: DeserializeOwned + Send + 'static,
33    {
34        let timestamp = now_millis();
35        let window = DEFAULT_WINDOW;
36        let message = format!("instruction=subscribe&timestamp={timestamp}&window={window}");
37
38        let verifying_key = STANDARD.encode(self.verifier.to_bytes());
39        let signature = STANDARD.encode(self.signer.sign(message.as_bytes()).to_bytes());
40
41        let subscribe_message = json!({
42            "method": "SUBSCRIBE",
43            "params": stream,
44            "signature": [verifying_key, signature, timestamp.to_string(), window.to_string()],
45        });
46
47        let ws_url = self.ws_url.as_deref().unwrap_or(BACKPACK_WS_URL);
48        let (mut ws_stream, _) = connect_async(ws_url).await.expect("Error connecting to WebSocket");
49        ws_stream
50            .send(Message::Text(Utf8Bytes::from(subscribe_message.to_string())))
51            .await
52            .expect("Error subscribing to WebSocket");
53
54        tracing::debug!("Subscribed to {stream:#?} streams...");
55
56        while let Some(message) = ws_stream.next().await {
57            match message {
58                Ok(msg) => match msg {
59                    Message::Text(text) => {
60                        if let Ok(value) = serde_json::from_str::<Value>(&text) {
61                            if let Some(payload) = value.get("data") {
62                                if let Ok(data) = T::deserialize(payload)
63                                    && tx.send(data).await.is_err()
64                                {
65                                    tracing::error!("Failed to send message through the channel");
66                                }
67                            } else if let Some(payload) = value.get("error") {
68                                tracing::error!("Websocket Error Response: {}", payload);
69                            }
70                        }
71                    }
72                    Message::Close(_) => break,
73                    _ => {}
74                },
75                Err(error) => tracing::error!("WebSocket error: {}", error),
76            }
77        }
78    }
79}