1use crate::error::Result;
2use base64ct::{Base64, Encoding};
3use ed25519_dalek::Signer;
4use futures_util::{SinkExt, StreamExt};
5use serde::de::DeserializeOwned;
6use serde_json::{Value, json};
7use tokio::sync::mpsc::Sender;
8use tokio_tungstenite::tungstenite::protocol::Message;
9use tokio_tungstenite::{connect_async, tungstenite::Utf8Bytes};
10
11use crate::{BpxClient, DEFAULT_WINDOW, Error, now_millis};
12
13impl BpxClient {
14 pub async fn subscribe<T>(&self, stream: &str, tx: Sender<T>) -> Result<()>
16 where
17 T: DeserializeOwned + Send + 'static,
18 {
19 self.internal_subscribe(&[stream], tx).await
20 }
21
22 pub async fn subscribe_multiple<T>(&self, streams: &[&str], tx: Sender<T>) -> Result<()>
24 where
25 T: DeserializeOwned + Send + 'static,
26 {
27 self.internal_subscribe(streams, tx).await
28 }
29
30 async fn internal_subscribe<T>(&self, streams: &[&str], tx: Sender<T>) -> Result<()>
31 where
32 T: DeserializeOwned + Send + 'static,
33 {
34 let timestamp = now_millis();
35 let window = DEFAULT_WINDOW;
36
37 let is_private = streams.iter().any(|s| is_private_stream(s));
38 let subscribe_message = if is_private {
39 let signing_key = self.signing_key.as_ref().ok_or(Error::NotAuthenticated)?;
40
41 let message = format!("instruction=subscribe×tamp={timestamp}&window={window}");
42
43 let verifying_key = Base64::encode_string(&signing_key.verifying_key().to_bytes());
44 let signature = Base64::encode_string(&signing_key.sign(message.as_bytes()).to_bytes());
45
46 json!({
47 "method": "SUBSCRIBE",
48 "params": streams,
49 "signature": [verifying_key, signature, timestamp.to_string(), window.to_string()],
50 })
51 } else {
52 json!({
53 "method": "SUBSCRIBE",
54 "params": streams
55 })
56 };
57
58 let ws_url = self.ws_url.as_str();
59 let (mut ws_stream, _) = connect_async(ws_url)
60 .await
61 .expect("Error connecting to WebSocket");
62 ws_stream
63 .send(Message::Text(Utf8Bytes::from(
64 subscribe_message.to_string(),
65 )))
66 .await
67 .expect("Error subscribing to WebSocket");
68
69 tracing::debug!("Subscribed to {streams:#?} streams...");
70
71 while let Some(message) = ws_stream.next().await {
72 match message {
73 Ok(msg) => match msg {
74 Message::Text(text) => {
75 if let Ok(value) = serde_json::from_str::<Value>(&text) {
76 if let Some(payload) = value.get("data") {
77 match T::deserialize(payload) {
78 Ok(data) => {
79 if tx.send(data).await.is_err() {
80 tracing::warn!("Channel is closed");
81 break;
82 }
83 }
84 Err(err) => {
85 tracing::error!("Could not deserialize ws payload: {err}");
86 }
87 }
88 } else if let Some(payload) = value.get("error") {
89 tracing::error!(?payload, "Websocket Error Response");
90 }
91 }
92 }
93 Message::Close(_) => break,
94 _ => {}
95 },
96 Err(error) => tracing::error!(%error, "WebSocket error"),
97 }
98 }
99 Ok(())
100 }
101}
102
103fn is_private_stream(stream: &str) -> bool {
104 stream.starts_with("account.")
105}