1#![recursion_limit = "512"]
2
3mod api_client;
4pub mod errors;
5mod macros;
6pub mod models;
7mod subscription_client;
8
9pub use crate::{
10 api_client::{DeribitAPICallRawResult, DeribitAPICallResult, DeribitAPIClient},
11 errors::{DeribitError, Result},
12 subscription_client::{DeribitSubscriptionClient, DeribitSubscriptionLimitedClient},
13};
14
15use anyhow::Error;
16use derive_builder::Builder;
17use fehler::throws;
18use futures::{
19 channel::{mpsc, oneshot},
20 select, FutureExt, SinkExt, Stream, StreamExt, TryStreamExt,
21};
22use lazy_static::lazy_static;
23use log::{info, trace, warn};
24use regex::Regex;
25use std::{collections::HashMap, time::Duration};
26use tokio::{net::TcpStream, time::timeout};
27use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream};
28use tungstenite::Message;
29use url::Url;
30
31lazy_static! {
32 static ref RE: Regex = Regex::new(r#""jsonrpc":"2.0","id":(\d+),"#).unwrap();
33}
34
35type WSStream = WebSocketStream<MaybeTlsStream<TcpStream>>;
36
37pub const WS_URL: &'static str = "wss://www.deribit.com/ws/api/v2";
38pub const WS_URL_TESTNET: &'static str = "wss://test.deribit.com/ws/api/v2";
39
40#[derive(Default, Builder, Debug)]
41#[builder(setter(into))]
42pub struct Deribit {
43 #[builder(default)]
44 testnet: bool,
45 #[builder(default = "10")]
46 subscription_buffer_size: usize,
47 #[builder(setter(into, strip_option), default)]
48 timeout: Option<Duration>,
49}
50
51impl Deribit {
52 pub fn new() -> Deribit {
53 DeribitBuilder::default().build().unwrap()
54 }
55
56 pub fn builder() -> DeribitBuilder {
57 DeribitBuilder::default()
58 }
59
60 #[throws(Error)]
61 pub async fn connect(self) -> (DeribitAPIClient, DeribitSubscriptionClient) {
62 let ws_url = if self.testnet { WS_URL_TESTNET } else { WS_URL };
63 info!("Connecting");
64 let (ws, _) = connect_async(Url::parse(ws_url)?).await?;
65
66 let (wstx, wsrx) = ws.split();
67
68 let (stx, srx) = mpsc::channel(self.subscription_buffer_size);
69 let (waiter_tx, waiter_rx) = mpsc::channel(10);
70 let background = Self::servo(wsrx.err_into(), waiter_rx, stx)
71 .inspect(|r| {
72 if let Err(e) = r {
73 warn!("[Servo] Exiting because of '{}'", e)
74 }
75 })
76 .then(|_| async { () });
77
78 tokio::spawn(background);
79
80 (
81 DeribitAPIClient::new(
82 wstx,
83 waiter_tx,
84 self.timeout.unwrap_or(Duration::from_secs(3600)), ),
86 DeribitSubscriptionClient::new(srx),
87 )
88 }
89
90 #[throws(Error)]
91 async fn servo(
92 ws: impl Stream<Item = Result<Message>> + Unpin,
93 mut waiter_rx: mpsc::Receiver<(i64, oneshot::Sender<String>)>,
94 mut stx: mpsc::Sender<String>,
95 ) {
96 let mut ws = ws.fuse();
97 let mut waiters: HashMap<i64, oneshot::Sender<String>> = HashMap::new();
98
99 let mut orphan_messages = HashMap::new();
100
101 let (mut sdropped, mut cdropped) = (false, false);
102 while !(sdropped && cdropped) {
103 select! {
104 msg = ws.next() => {
105 trace!("[Servo] Message: {:?}", msg);
106 if sdropped { continue; }
107 let msg = if let Some(msg) = msg { msg } else { Err(DeribitError::WebsocketDisconnected)? };
108
109 match msg? {
110 Message::Text(msg) => {
111 if let Some(cap) = RE.captures(&msg) { let id_str = cap.get(1).expect("No captured group in a capture result, this cannot happen").as_str();
114 let id = id_str.parse().expect("Cannot parse integer while it is deemed as integer by regex, this cannot happen");
115 let waiter = match waiters.remove(&id) {
116 Some(waiter) => waiter,
117 None => {
118 orphan_messages.insert(id, msg);
119 continue;
120 }
121 };
122
123 if let Err(msg) = waiter.send(msg) {
124 info!("[Servo] Orphan response: {:?}", msg);
125 }
126 } else {
127 let fut = stx.send(msg);
129 let fut = timeout(Duration::from_millis(1),fut, );
130 match fut.await {
131 Ok(Ok(_)) => {}
132 Ok(Err(ref e)) if e.is_disconnected() => sdropped = true,
133 Ok(Err(_)) => { unreachable!("[Servo] futures::mpsc won't complain channel is full") }, Err(_) => { warn!("[Servo] Subscription channel is full") }, }
136 }
137 }
138 Message::Ping(_) => {
139 trace!("[Servo] Received Ping");
140 }
141 Message::Pong(_) => {
142 trace!("[Servo] Received Ping");
143 }
144 Message::Binary(_) => {
145 trace!("[Servo] Received Binary");
146 }
147 Message::Frame(_) => {
148 trace!("[Servo] Received Frame");
149 }
150 Message::Close(_) => {
151 trace!("[Servo] Received Close");
152 }
153 }
154 }
155 waiter = waiter_rx.next() => {
156 if let Some((id, waiter)) = waiter {
157 if orphan_messages.contains_key(&id) {
158 info!("[Servo] Message come before waiter");
159 let msg = orphan_messages.remove(&id).unwrap();
160 if let Err(msg) = waiter.send(msg) {
161 info!("[Servo] The client for request {} is dropped, response is {:?}", id, msg);
162 }
163 } else {
164 waiters.insert(id, waiter);
165 }
166 } else {
167 cdropped = true;
168 info!("[Servo] API Client dropped");
169 }
170 }
171 };
172 }
173 info!("Servo exit with all receiver dropped");
174 }
176}