crypto_com_exchange/
client.rs

1use futures::future::Future;
2use futures::stream::SplitSink;
3use futures::{StreamExt, SinkExt};
4use thiserror::Error;
5use tokio::task::JoinHandle;
6use tokio_tungstenite::{connect_async, WebSocketStream, MaybeTlsStream};
7use tokio_tungstenite::tungstenite::protocol::{CloseFrame, Message};
8use log::{error, debug, info};
9use tokio::net::TcpStream;
10use std::time::{SystemTime, UNIX_EPOCH};
11use sha2::Sha256;
12use hmac::{Hmac, Mac};
13use tokio::sync::Mutex;
14use std::sync::Arc;
15
16use crate::{message, SubscribeResult};
17use crate::subscription;
18
19type HmacSha256 = Hmac<Sha256>;
20
21#[derive(Error, Debug)]
22pub enum CryptoError {
23    #[error("Cannot join to a task")]
24    JoinError(#[from] tokio::task::JoinError),
25
26
27    #[error("Tungstenite error")]
28    TungsteniteError(#[from] tokio_tungstenite::tungstenite::Error),
29
30    #[error("Tungstenite error")]
31    TungsteniteErrorString(String),
32
33
34    #[error("Error \"{}\" ({code}) when subscribing to {} (msgid:{id})", message.as_ref().unwrap_or(&"unknown".to_owned()), channel.as_ref().unwrap_or(&"unknown".to_owned()))]
35    SubscriptionError {
36        id: i64,
37        code: u64,
38        message: Option<String>,
39        channel: Option<String>
40    },
41    
42    #[error("Serde error")]
43    SerdeError(#[from] serde_json::error::Error),
44
45    #[error("Server closed de communication")]
46    CloseError {
47        frame: Option<CloseFrame<'static>>
48    },
49
50    #[error("Unexpected message")]
51    UnexpectedMessageError {
52        message: Message
53    },
54    
55
56    #[error("Not connected")]
57    NotConnectedError,
58
59    #[error("Invalid sha length")]
60    ShaInvalidLength(#[from] hmac::digest::InvalidLength),
61
62
63}
64
65type EventType<T, Fut> = Arc<Mutex<dyn Fn(Result<message::SubscribeResult, CryptoError>, T)-> Fut + Send + Sync>>;
66type WriterType = Option<Arc<Mutex<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>>>;
67
68pub struct CryptoClient<Fut: Future<Output = ()> + Send + Sync + 'static, T> {
69    //events: Arc<Mutex<dyn Fn(Result<message::SubscribeResult>, std::sync::Arc<flume::Sender<T>>)-> Fut + Send + Sync>>,
70    events: EventType<T, Fut>,
71    reader_join: Option<JoinHandle<Result<(), CryptoError>>>,
72    writer: WriterType,
73    message_id: u64,
74    //sender: std::sync::Arc<flume::Sender<T>>
75    container: T
76}
77
78fn nonce() -> u128 {
79    match SystemTime::now().duration_since(UNIX_EPOCH) {
80        Ok(n) => n.as_millis(),
81        Err(_) => 0,
82    }
83}
84
85impl<Fut: Future<Output = ()>  + Send + Sync + 'static, T: Send + 'static> CryptoClient<Fut, T> 
86   where T: Clone {
87
88    //pub fn new(f: impl Fn(Result<message::SubscribeResult>, std::sync::Arc<flume::Sender<T>>)->Fut + Send + Sync + 'static, sender: std::sync::Arc<flume::Sender<T>>) -> CryptoTransport<Fut, T> {
89    pub fn new(f: impl Fn(Result<message::SubscribeResult, CryptoError>, T)->Fut + Send + Sync + 'static, container: T) -> CryptoClient<Fut, T> {
90        CryptoClient {
91            events: Arc::new(Mutex::new(f)),
92            reader_join: None,
93            writer: None,
94            message_id: 1,
95            container
96        }
97
98    }
99
100    pub async fn wait(&mut self) -> Result<(), CryptoError> {
101        if let Some(join) = self.reader_join.as_mut() {
102            if join.is_finished() {
103                Ok(())
104            } else {
105                join.await?
106            }
107            
108        } else {
109            Ok(())
110        }
111        
112    }
113
114    pub async fn disconnect(&mut self) -> Result<(), CryptoError> {
115        info!("Disconnecting");
116        if let Some(writer) = self.writer.as_mut() {
117            debug!("Closing connection");
118            writer.lock().await.close().await?;
119            debug!("Connection closed");
120        }
121
122        if let Some(reader) = self.reader_join.as_mut() {
123            debug!("Closing reader");
124            reader.abort();
125            reader.await.ok();
126            debug!("Reader closed");
127        }
128        info!("Disconnected");
129        Ok(())
130    }
131
132    pub async fn connect_market(&mut self) -> Result<(), CryptoError> {
133        self.connect("wss://stream.crypto.com/v2/market").await?;
134        Ok(())
135    }
136
137    pub async fn connect_user(&mut self) -> Result<(), CryptoError> {
138        self.connect("wss://stream.crypto.com/v2/user").await?;
139        Ok(())
140    }
141
142    pub async fn connect(&mut self, uri: &str) -> Result<(), CryptoError> {
143        info!("Connecting");
144        let connection = connect_async(uri).await?;
145        let (ws_stream, _) = connection;
146        
147        let (write, mut read) = ws_stream.split();
148        let writer = Arc::new(Mutex::new(write));
149        let inner_writer = writer.clone();
150        
151        let events = Arc::clone(&self.events);
152        
153        //let cosa = self.sender.clone();
154        let cosa = self.container.clone();
155        let join = tokio::spawn(async move {
156            let top_inner_cosa = cosa.clone();
157            let mut join_result: Result<(), CryptoError> = Ok(());
158            
159            info!("Listener ready");
160            while let Some(next) = read.next().await {
161                let inner_cosa = top_inner_cosa.clone();
162                match next {
163                    Ok(message) => {
164                        let e = events.lock().await;
165                        match message {
166                            Message::Text(text) => {
167                                debug!("Text received {text}");
168                                // Json parse
169                                match serde_json::from_str::<message::Message>(&text) {
170                                    Ok(msg) => {
171                                        match msg {
172                                            message::Message::HeartbeatRequest{id} => {                   
173                                                debug!("heartbeat received");
174                                                let message = subscription::Request::HeartbeatResponse{id};
175                                                match serde_json::to_string(&message) {
176                                                    Ok(text) => {
177                                                        if let Err(error) = inner_writer.lock().await.send(Message::text(text)).await{
178                                                            error!("Cannot send heartbeat");
179                                                            e(Err(CryptoError::TungsteniteError(error)), inner_cosa);
180                                                        } else {
181                                                            debug!("heartbeat sent");
182                                                        }
183                                                    },
184                                                    Err(error) => {
185                                                        error!("Cannot serialize heartbeat");
186                                                        e(Err(CryptoError::SerdeError(error)), inner_cosa);
187                                                    }
188                                                }
189                                                
190                                                
191                                            },
192                                            message::Message::SubscriptionResponse{result, id, code, channel, message} => {
193                                                if let Some(result) = result {
194                                                    debug!("Message received: {:?}", result);
195                                                    e(Ok(result), inner_cosa).await;
196                                                } else if code != 0 {
197                                                    
198                                                    e(Err(CryptoError::SubscriptionError {
199                                                        id,
200                                                        code,
201                                                        message,
202                                                        channel
203                                                    }), inner_cosa);
204                                                }
205                                            },
206                                            message::Message::UnsubscriptionResponse{id, code} => {
207                                                debug!("Unsubscription: {id} {code}");
208                                                e(Ok(SubscribeResult::UnsubscriptionResult{success: code == 0}), inner_cosa).await;
209                                                
210                                            },
211                                            message::Message::AuthResponse{id, code} => {
212                                                debug!("Notify auth response: {id} {code}");
213                                                e(Ok(SubscribeResult::AuthResult{success: code == 0}), inner_cosa).await;
214                                            }
215                                        }
216                                    }
217                                    Err(err) => {
218                                        error!("Error when parsing JSON:\n{}\n{}", text, err);
219                                        e(Err(CryptoError::SerdeError(err)), inner_cosa).await;
220                                    }
221                                }
222                            },
223                            Message::Ping(message) => {
224                                debug!("Ping received {:?}", message);
225                                if let Err(error) = inner_writer.lock().await.send(Message::Pong(message)).await {
226                                    error!("Cannot send pong");
227                                    e(Err(CryptoError::TungsteniteError(error)), inner_cosa).await;
228                                } else {
229                                    debug!("Pong sent");
230                                }
231                                
232                            },
233                            Message::Pong(message) => {
234                                debug!("PONG RECEIVED {:?}", message);
235                            },
236                            Message::Close(frame) => {
237                                e(Err(CryptoError::CloseError { frame: frame.clone() }), inner_cosa).await;
238                                return Err(CryptoError::CloseError { frame });
239                            },
240                            message => {
241                                error!("Unexpected message {:?}", message);
242                                e(Err(CryptoError::UnexpectedMessageError{message}), inner_cosa).await;
243                            }
244                        }
245                    },
246                    Err(error) => {
247                        let e = events.lock().await;
248                        error!("Websocket read error: {:?}", error);   
249                        e(Err(CryptoError::TungsteniteErrorString(error.to_string())), inner_cosa).await;
250                        join_result = Err(CryptoError::TungsteniteError(error));
251                    }
252                }
253            }
254            join_result
255        });
256        
257        self.reader_join = Some(join);
258        self.writer = Some(writer);
259        info!("Connected");
260        Ok(())
261    }
262
263    pub async fn subscribe(&mut self, channels: Vec<String>) ->Result<(), CryptoError> {
264        debug!("Subscribing to {:?} channels", channels.len());
265        if let Some(writer) = self.writer.as_mut() {
266            let message = subscription::Request::Subscribe{
267                id: self.message_id,
268                params: subscription::SubscribeParams{channels},
269                nonce: nonce()
270            };
271
272            let text = serde_json::to_string(&message)?;
273            writer.lock().await.send(Message::text(text)).await?;
274            // Increase message_id only if the message was actually sent
275            self.message_id += 1;
276            debug!("New message id {:?}", self.message_id);
277            Ok(())
278        } else {
279            Err(CryptoError::NotConnectedError)
280        }
281        
282    }
283
284    pub async fn unsubscribe(&mut self, channels: Vec<String>) ->Result<(), CryptoError> {
285        debug!("Unsubscribing to {:?} channels", channels.len());
286        if let Some(writer) = self.writer.as_mut() {
287            let message = subscription::Request::Unsubscribe{
288                id: self.message_id,
289                params: subscription::UnsubscribeParams{channels},
290                nonce: nonce()
291            };
292
293            let text = serde_json::to_string(&message)?;
294            writer.lock().await.send(Message::text(text)).await?;
295            // Increase message_id only if the message was actually sent
296            self.message_id += 1;
297            debug!("New message id {:?}", self.message_id);
298            Ok(())
299        } else {
300            Err(CryptoError::NotConnectedError)
301        }
302        
303    }
304
305
306    pub async fn auth(&mut self, api_key: &str, api_secret: &str) ->Result<(), CryptoError> {
307        if let Some(writer) = self.writer.as_mut() {
308            let n = nonce();
309            let message_to_sig = ["public/auth".into(), self.message_id.to_string(), api_key.to_owned(), n.to_string()].concat();
310            let mut mac = HmacSha256::new_from_slice(api_secret.as_bytes())?;
311            mac.update(message_to_sig.as_bytes());
312            let result = mac.finalize();
313            let f = result.into_bytes();
314
315            let message = subscription::Request::Auth{
316                id: self.message_id,
317                api_key: api_key.to_owned(),
318                sig: hex::encode(f),
319                nonce: n
320            };
321
322            let text = serde_json::to_string(&message)?;
323            writer.lock().await.send(Message::text(text)).await?;
324            // Increase message_id only if the message was actually sent
325            self.message_id += 1;
326            Ok(())
327        } else {
328            Err(CryptoError::NotConnectedError)
329        }
330
331        
332
333    }
334
335}