1use crate::error::Result;
2use base64::Engine;
3use base64::engine::general_purpose::STANDARD;
4use ed25519_dalek::Signer;
5use futures_util::{SinkExt, StreamExt};
6use serde::de::DeserializeOwned;
7use serde_json::{Value, json};
8use tokio::sync::mpsc::Sender;
9use tokio_tungstenite::tungstenite::protocol::Message;
10use tokio_tungstenite::{connect_async, tungstenite::Utf8Bytes};
11
12use crate::{BpxClient, DEFAULT_WINDOW, Error, now_millis};
13
14impl BpxClient {
15 pub async fn subscribe<T>(&self, stream: &str, tx: Sender<T>) -> Result<()>
17 where
18 T: DeserializeOwned + Send + 'static,
19 {
20 self.internal_subscribe(&[stream], tx).await
21 }
22
23 pub async fn subscribe_multiple<T>(&self, stream: &[&str], tx: Sender<T>) -> Result<()>
25 where
26 T: DeserializeOwned + Send + 'static,
27 {
28 self.internal_subscribe(stream, tx).await
29 }
30
31 async fn internal_subscribe<T>(&self, stream: &[&str], tx: Sender<T>) -> Result<()>
32 where
33 T: DeserializeOwned + Send + 'static,
34 {
35 let timestamp = now_millis();
36 let window = DEFAULT_WINDOW;
37 let message = format!("instruction=subscribe×tamp={timestamp}&window={window}");
38 let Some(signing_key) = &self.signing_key else {
39 return Err(Error::NotAuthenticated);
40 };
41
42 let verifying_key = STANDARD.encode(signing_key.verifying_key().to_bytes());
43 let signature = STANDARD.encode(signing_key.sign(message.as_bytes()).to_bytes());
44
45 let subscribe_message = json!({
46 "method": "SUBSCRIBE",
47 "params": stream,
48 "signature": [verifying_key, signature, timestamp.to_string(), window.to_string()],
49 });
50
51 let ws_url = self.ws_url.to_string();
52 let (mut ws_stream, _) = connect_async(ws_url).await.expect("Error connecting to WebSocket");
53 ws_stream
54 .send(Message::Text(Utf8Bytes::from(subscribe_message.to_string())))
55 .await
56 .expect("Error subscribing to WebSocket");
57
58 tracing::debug!("Subscribed to {stream:#?} streams...");
59
60 while let Some(message) = ws_stream.next().await {
61 match message {
62 Ok(msg) => match msg {
63 Message::Text(text) => {
64 if let Ok(value) = serde_json::from_str::<Value>(&text) {
65 if let Some(payload) = value.get("data") {
66 if let Ok(data) = T::deserialize(payload)
67 && tx.send(data).await.is_err()
68 {
69 tracing::error!("Failed to send message through the channel");
70 }
71 } else if let Some(payload) = value.get("error") {
72 tracing::error!(?payload, "Websocket Error Response");
73 }
74 }
75 }
76 Message::Close(_) => break,
77 _ => {}
78 },
79 Err(error) => tracing::error!(%error, "WebSocket error"),
80 }
81 }
82
83 Ok(())
84 }
85}