deribit/
lib.rs

1#![recursion_limit = "512"]
2
3mod api_client;
4pub mod errors;
5mod macros;
6pub mod models;
7mod subscription_client;
8
9pub use crate::{
10    api_client::{DeribitAPICallRawResult, DeribitAPICallResult, DeribitAPIClient},
11    errors::{DeribitError, Result},
12    subscription_client::{DeribitSubscriptionClient, DeribitSubscriptionLimitedClient},
13};
14
15use anyhow::Error;
16use derive_builder::Builder;
17use fehler::throws;
18use futures::{
19    channel::{mpsc, oneshot},
20    select, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt,
21};
22use lazy_static::lazy_static;
23use log::{info, trace, warn};
24use regex::Regex;
25use std::{collections::HashMap, time::Duration};
26use tokio::{net::TcpStream, time::timeout};
27use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
28use tungstenite::Message;
29use url::Url;
30
31lazy_static! {
32    static ref RE: Regex = Regex::new(r#""jsonrpc":"2.0","id":(\d+),"#).unwrap();
33}
34
35type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
36
37pub const WS_URL: &'static str = "wss://www.deribit.com/ws/api/v2";
38pub const WS_URL_TESTNET: &'static str = "wss://test.deribit.com/ws/api/v2";
39
40#[derive(Default, Builder, Debug)]
41#[builder(setter(into))]
42pub struct Deribit {
43    #[builder(default)]
44    testnet: bool,
45    #[builder(default = "10")]
46    subscription_buffer_size: usize,
47    #[builder(setter(into, strip_option), default)]
48    timeout: Option<Duration>,
49}
50
51impl Deribit {
52    pub fn new() -> Deribit {
53        DeribitBuilder::default().build().unwrap()
54    }
55
56    pub fn builder() -> DeribitBuilder {
57        DeribitBuilder::default()
58    }
59
60    #[throws(Error)]
61    pub async fn connect(self) -> (DeribitAPIClient, DeribitSubscriptionClient) {
62        let ws_url = if self.testnet { WS_URL_TESTNET } else { WS_URL };
63        info!("Connecting");
64        let (ws, _) = connect_async(Url::parse(ws_url)?).await?;
65
66        let (wstx, wsrx) = ws.split();
67
68        let (stx, srx) = mpsc::channel(self.subscription_buffer_size);
69        let (waiter_tx, waiter_rx) = mpsc::channel(10);
70        let background = Self::servo(wsrx.err_into(), waiter_rx, stx)
71            .inspect(|r| {
72                if let Err(e) = r {
73                    warn!("[Servo] Exiting because of '{}'", e)
74                }
75            })
76            .then(|_| async { () });
77
78        tokio::spawn(background);
79
80        (
81            DeribitAPIClient::new(
82                wstx,
83                waiter_tx,
84                self.timeout.unwrap_or(Duration::from_secs(3600)), // default timeout, 1H
85            ),
86            DeribitSubscriptionClient::new(srx),
87        )
88    }
89
90    #[throws(Error)]
91    async fn servo(
92        ws: impl Stream<Item = Result<Message>> + Unpin,
93        mut waiter_rx: mpsc::Receiver<(i64, oneshot::Sender<String>)>,
94        mut stx: mpsc::Sender<String>,
95    ) {
96        let mut ws = ws.fuse();
97        let mut waiters: HashMap<i64, oneshot::Sender<String>> = HashMap::new();
98
99        let mut orphan_messages = HashMap::new();
100
101        let (mut sdropped, mut cdropped) = (false, false);
102        while !(sdropped && cdropped) {
103            select! {
104                msg = ws.next() => {
105                    trace!("[Servo] Message: {:?}", msg);
106                    if sdropped { continue; }
107                    let msg = if let Some(msg) = msg { msg } else { Err(DeribitError::WebsocketDisconnected)? };
108
109                    match msg? {
110                        Message::Text(msg) => {
111                            if let Some(cap) = RE.captures(&msg) { // TODO: If deribit returns unordered keys, then this will fail.
112                                // is a API call response
113                                let id_str = cap.get(1).expect("No captured group in a capture result, this cannot happen").as_str();
114                                let id = id_str.parse().expect("Cannot parse integer while it is deemed as integer by regex, this cannot happen");
115                                let waiter = match waiters.remove(&id) {
116                                    Some(waiter) => waiter,
117                                    None => {
118                                        orphan_messages.insert(id, msg);
119                                        continue;
120                                    }
121                                };
122
123                                if let Err(msg) = waiter.send(msg) {
124                                    info!("[Servo] Orphan response: {:?}", msg);
125                                }
126                            } else {
127                                // is a subscription messasge
128                                let fut = stx.send(msg);
129                                let fut = timeout(Duration::from_millis(1),fut, );
130                                match fut.await {
131                                    Ok(Ok(_)) => {}
132                                    Ok(Err(ref e)) if e.is_disconnected() => sdropped = true,
133                                    Ok(Err(_)) => { unreachable!("[Servo] futures::mpsc won't complain channel is full") }, // MPSC ERROR
134                                    Err(_) => { warn!("[Servo] Subscription channel is full") }, // Elapsed
135                                }
136                            }
137                        }
138                        Message::Ping(_) => {
139                            trace!("[Servo] Received Ping");
140                        }
141                        Message::Pong(_) => {
142                            trace!("[Servo] Received Ping");
143                        }
144                        Message::Binary(_) => {
145                            trace!("[Servo] Received Binary");
146                        }
147                        Message::Frame(_) => {
148                            trace!("[Servo] Received Frame");
149                        }
150                        Message::Close(_) => {
151                            trace!("[Servo] Received Close");
152                        }
153                    }
154                }
155                waiter = waiter_rx.next() => {
156                    if let Some((id, waiter)) = waiter {
157                        if orphan_messages.contains_key(&id) {
158                            info!("[Servo] Message come before waiter");
159                            let msg = orphan_messages.remove(&id).unwrap();
160                            if let Err(msg) = waiter.send(msg) {
161                                info!("[Servo] The client for request {} is dropped, response is {:?}", id, msg);
162                            }
163                        } else {
164                            waiters.insert(id, waiter);
165                        }
166                    } else {
167                        cdropped = true;
168                        info!("[Servo] API Client dropped");
169                    }
170                }
171            };
172        }
173        info!("Servo exit with all receiver dropped");
174        // Exit with all receiver dropped
175    }
176}