1use futures::future::Future;
2use futures::stream::SplitSink;
3use futures::{StreamExt, SinkExt};
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
7use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message};
8use log::{error, debug, info};
9use tokio::net::TcpStream;
10use std::time::{SystemTime, UNIX_EPOCH};
11use sha2::Sha256;
12use hmac::{Hmac, Mac};
13use tokio::sync::Mutex;
14use std::sync::Arc;
15
16use crate::{message, SubscribeResult};
17use crate::subscription;
18
19type HmacSha256 = Hmac<Sha256>;
20
21#[derive(Error, Debug)]
22pub enum CryptoError {
23 #[error("Cannot join to a task")]
24 JoinError(#[from] tokio::task::JoinError),
25
26
27 #[error("Tungstenite error")]
28 TungsteniteError(#[from] tokio_tungstenite::tungstenite::Error),
29
30 #[error("Tungstenite error")]
31 TungsteniteErrorString(String),
32
33
34 #[error("Error \"{}\" ({code}) when subscribing to {} (msgid:{id})", message.as_ref().unwrap_or(&"unknown".to_owned()), channel.as_ref().unwrap_or(&"unknown".to_owned()))]
35 SubscriptionError {
36 id: i64,
37 code: u64,
38 message: Option<String>,
39 channel: Option<String>
40 },
41
42 #[error("Serde error")]
43 SerdeError(#[from] serde_json::error::Error),
44
45 #[error("Server closed de communication")]
46 CloseError {
47 frame: Option<CloseFrame<'static>>
48 },
49
50 #[error("Unexpected message")]
51 UnexpectedMessageError {
52 message: Message
53 },
54
55
56 #[error("Not connected")]
57 NotConnectedError,
58
59 #[error("Invalid sha length")]
60 ShaInvalidLength(#[from] hmac::digest::InvalidLength),
61
62
63}
64
65type EventType<T, Fut> = Arc<Mutex<dyn Fn(Result<message::SubscribeResult, CryptoError>, T)-> Fut + Send + Sync>>;
66type WriterType = Option<Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>;
67
68pub struct CryptoClient<Fut: Future<Output = ()> + Send + Sync + 'static, T> {
69 events: EventType<T, Fut>,
71 reader_join: Option<JoinHandle<Result<(), CryptoError>>>,
72 writer: WriterType,
73 message_id: u64,
74 container: T
76}
77
78fn nonce() -> u128 {
79 match SystemTime::now().duration_since(UNIX_EPOCH) {
80 Ok(n) => n.as_millis(),
81 Err(_) => 0,
82 }
83}
84
85impl<Fut: Future<Output = ()> + Send + Sync + 'static, T: Send + 'static> CryptoClient<Fut, T>
86 where T: Clone {
87
88 pub fn new(f: impl Fn(Result<message::SubscribeResult, CryptoError>, T)->Fut + Send + Sync + 'static, container: T) -> CryptoClient<Fut, T> {
90 CryptoClient {
91 events: Arc::new(Mutex::new(f)),
92 reader_join: None,
93 writer: None,
94 message_id: 1,
95 container
96 }
97
98 }
99
100 pub async fn wait(&mut self) -> Result<(), CryptoError> {
101 if let Some(join) = self.reader_join.as_mut() {
102 if join.is_finished() {
103 Ok(())
104 } else {
105 join.await?
106 }
107
108 } else {
109 Ok(())
110 }
111
112 }
113
114 pub async fn disconnect(&mut self) -> Result<(), CryptoError> {
115 info!("Disconnecting");
116 if let Some(writer) = self.writer.as_mut() {
117 debug!("Closing connection");
118 writer.lock().await.close().await?;
119 debug!("Connection closed");
120 }
121
122 if let Some(reader) = self.reader_join.as_mut() {
123 debug!("Closing reader");
124 reader.abort();
125 reader.await.ok();
126 debug!("Reader closed");
127 }
128 info!("Disconnected");
129 Ok(())
130 }
131
132 pub async fn connect_market(&mut self) -> Result<(), CryptoError> {
133 self.connect("wss://stream.crypto.com/v2/market").await?;
134 Ok(())
135 }
136
137 pub async fn connect_user(&mut self) -> Result<(), CryptoError> {
138 self.connect("wss://stream.crypto.com/v2/user").await?;
139 Ok(())
140 }
141
142 pub async fn connect(&mut self, uri: &str) -> Result<(), CryptoError> {
143 info!("Connecting");
144 let connection = connect_async(uri).await?;
145 let (ws_stream, _) = connection;
146
147 let (write, mut read) = ws_stream.split();
148 let writer = Arc::new(Mutex::new(write));
149 let inner_writer = writer.clone();
150
151 let events = Arc::clone(&self.events);
152
153 let cosa = self.container.clone();
155 let join = tokio::spawn(async move {
156 let top_inner_cosa = cosa.clone();
157 let mut join_result: Result<(), CryptoError> = Ok(());
158
159 info!("Listener ready");
160 while let Some(next) = read.next().await {
161 let inner_cosa = top_inner_cosa.clone();
162 match next {
163 Ok(message) => {
164 let e = events.lock().await;
165 match message {
166 Message::Text(text) => {
167 debug!("Text received {text}");
168 match serde_json::from_str::<message::Message>(&text) {
170 Ok(msg) => {
171 match msg {
172 message::Message::HeartbeatRequest{id} => {
173 debug!("heartbeat received");
174 let message = subscription::Request::HeartbeatResponse{id};
175 match serde_json::to_string(&message) {
176 Ok(text) => {
177 if let Err(error) = inner_writer.lock().await.send(Message::text(text)).await{
178 error!("Cannot send heartbeat");
179 e(Err(CryptoError::TungsteniteError(error)), inner_cosa);
180 } else {
181 debug!("heartbeat sent");
182 }
183 },
184 Err(error) => {
185 error!("Cannot serialize heartbeat");
186 e(Err(CryptoError::SerdeError(error)), inner_cosa);
187 }
188 }
189
190
191 },
192 message::Message::SubscriptionResponse{result, id, code, channel, message} => {
193 if let Some(result) = result {
194 debug!("Message received: {:?}", result);
195 e(Ok(result), inner_cosa).await;
196 } else if code != 0 {
197
198 e(Err(CryptoError::SubscriptionError {
199 id,
200 code,
201 message,
202 channel
203 }), inner_cosa);
204 }
205 },
206 message::Message::UnsubscriptionResponse{id, code} => {
207 debug!("Unsubscription: {id} {code}");
208 e(Ok(SubscribeResult::UnsubscriptionResult{success: code == 0}), inner_cosa).await;
209
210 },
211 message::Message::AuthResponse{id, code} => {
212 debug!("Notify auth response: {id} {code}");
213 e(Ok(SubscribeResult::AuthResult{success: code == 0}), inner_cosa).await;
214 }
215 }
216 }
217 Err(err) => {
218 error!("Error when parsing JSON:\n{}\n{}", text, err);
219 e(Err(CryptoError::SerdeError(err)), inner_cosa).await;
220 }
221 }
222 },
223 Message::Ping(message) => {
224 debug!("Ping received {:?}", message);
225 if let Err(error) = inner_writer.lock().await.send(Message::Pong(message)).await {
226 error!("Cannot send pong");
227 e(Err(CryptoError::TungsteniteError(error)), inner_cosa).await;
228 } else {
229 debug!("Pong sent");
230 }
231
232 },
233 Message::Pong(message) => {
234 debug!("PONG RECEIVED {:?}", message);
235 },
236 Message::Close(frame) => {
237 e(Err(CryptoError::CloseError { frame: frame.clone() }), inner_cosa).await;
238 return Err(CryptoError::CloseError { frame });
239 },
240 message => {
241 error!("Unexpected message {:?}", message);
242 e(Err(CryptoError::UnexpectedMessageError{message}), inner_cosa).await;
243 }
244 }
245 },
246 Err(error) => {
247 let e = events.lock().await;
248 error!("Websocket read error: {:?}", error);
249 e(Err(CryptoError::TungsteniteErrorString(error.to_string())), inner_cosa).await;
250 join_result = Err(CryptoError::TungsteniteError(error));
251 }
252 }
253 }
254 join_result
255 });
256
257 self.reader_join = Some(join);
258 self.writer = Some(writer);
259 info!("Connected");
260 Ok(())
261 }
262
263 pub async fn subscribe(&mut self, channels: Vec<String>) ->Result<(), CryptoError> {
264 debug!("Subscribing to {:?} channels", channels.len());
265 if let Some(writer) = self.writer.as_mut() {
266 let message = subscription::Request::Subscribe{
267 id: self.message_id,
268 params: subscription::SubscribeParams{channels},
269 nonce: nonce()
270 };
271
272 let text = serde_json::to_string(&message)?;
273 writer.lock().await.send(Message::text(text)).await?;
274 self.message_id += 1;
276 debug!("New message id {:?}", self.message_id);
277 Ok(())
278 } else {
279 Err(CryptoError::NotConnectedError)
280 }
281
282 }
283
284 pub async fn unsubscribe(&mut self, channels: Vec<String>) ->Result<(), CryptoError> {
285 debug!("Unsubscribing to {:?} channels", channels.len());
286 if let Some(writer) = self.writer.as_mut() {
287 let message = subscription::Request::Unsubscribe{
288 id: self.message_id,
289 params: subscription::UnsubscribeParams{channels},
290 nonce: nonce()
291 };
292
293 let text = serde_json::to_string(&message)?;
294 writer.lock().await.send(Message::text(text)).await?;
295 self.message_id += 1;
297 debug!("New message id {:?}", self.message_id);
298 Ok(())
299 } else {
300 Err(CryptoError::NotConnectedError)
301 }
302
303 }
304
305
306 pub async fn auth(&mut self, api_key: &str, api_secret: &str) ->Result<(), CryptoError> {
307 if let Some(writer) = self.writer.as_mut() {
308 let n = nonce();
309 let message_to_sig = ["public/auth".into(), self.message_id.to_string(), api_key.to_owned(), n.to_string()].concat();
310 let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
311 mac.update(message_to_sig.as_bytes());
312 let result = mac.finalize();
313 let f = result.into_bytes();
314
315 let message = subscription::Request::Auth{
316 id: self.message_id,
317 api_key: api_key.to_owned(),
318 sig: hex::encode(f),
319 nonce: n
320 };
321
322 let text = serde_json::to_string(&message)?;
323 writer.lock().await.send(Message::text(text)).await?;
324 self.message_id += 1;
326 Ok(())
327 } else {
328 Err(CryptoError::NotConnectedError)
329 }
330
331
332
333 }
334
335}