krakenrs/ws/mod.rs
1//! An interface for getting data from Kraken websockets API, while another thread manages
2//! the updates from the websockets connection.
3//!
4//! This follows the pattern of bridging async code into a sync interface
5//! See also: <https://tokio.rs/tokio/topics/bridging>
6//! and the `reqwest::blocking` module
7
8use crate::{LimitOrder, MarketOrder};
9use futures::stream::StreamExt;
10use std::sync::{Arc, atomic::Ordering};
11use std::{
12 collections::{BTreeMap, HashMap},
13 thread,
14 time::{Duration, Instant},
15};
16use tokio::{
17 runtime,
18 sync::{mpsc, oneshot},
19 time,
20};
21
22mod config;
23pub use config::{KrakenWsConfig, KrakenWsConfigBuilder};
24
25mod conn;
26pub use conn::{Error, KrakenWsClient, WsAPIResults};
27
28mod types;
29pub use types::{BookData, BookEntry, Candle, PublicTrade};
30
31mod messages;
32pub use messages::*;
33
34/// A handle to Kraken websockets API feeds
35///
36/// This is a sync API, but under the hood it contains a thread driving a small
37/// tokio runtime
38pub struct KrakenWsAPI {
39 // The worker thread that is consuming kraken api messages
40 worker_thread: Option<thread::JoinHandle<()>>,
41 // Sender object to send messages to the worker thread
42 sender: mpsc::UnboundedSender<LocalRequest>,
43 // Handle to the output of the worker thread
44 output: Arc<WsAPIResults>,
45}
46
47impl KrakenWsAPI {
48 /// Create a new web sockets connection to Kraken and subscribe to
49 /// specified channels
50 ///
51 /// Note: This is the same as using `TryFrom::try_from` to construct an instance
52 ///
53 /// Note: This call attempts to fail fast if a websockets connection cannot be established,
54 /// so it will block the current thread on that and return an error if connection fails.
55 /// If you are using the tokio multi-threaded runtime, you must call this from a blocking thread,
56 /// or the runtime will detect this and panic. You may wrap it in `task::spawn_blocking` or similar.
57 pub fn new(src: KrakenWsConfig) -> Result<Self, Error> {
58 // Build the runtime for the new thread.
59 //
60 // The runtime is created before spawning the thread
61 // to more cleanly forward errors if the `unwrap()`
62 // panics.
63 let rt = runtime::Builder::new_current_thread().enable_all().build().unwrap();
64
65 let (mut client, mut stream, output) = rt.block_on(KrakenWsClient::new(src))?;
66 let (sender, mut receiver) = mpsc::unbounded_channel();
67
68 let worker_thread = Some(thread::Builder::new().name("kraken-ws-internal-runtime".into()).spawn(
69 move || {
70 rt.block_on(async move {
71 // Every second, confirm that we got a heart beat, or send a ping / expect a pong
72 let mut interval = time::interval(Duration::from_secs(1));
73 loop {
74 tokio::select! {
75 stream_result = stream.next() => {
76 match stream_result {
77 Some(result) => {
78 match client.update(result) {
79 Ok(()) => {
80 // Maybe adjust subscriptions, closing corrupted subscriptions,
81 // and resubscribing to any subscriptions that are missing for a while
82 // to any subscriptions that were canceled
83 client.check_subscriptions().await;
84 }
85 Err(err) => {
86 log::error!("error, closing stream: {}", err);
87 drop(client.close().await);
88 return;
89 }
90 }
91 }
92 None => {
93 log::warn!("stream closed by kraken");
94 drop(client.close().await);
95 return;
96 }
97 }
98 }
99 msg = receiver.recv() => {
100 match msg {
101 None | Some(LocalRequest::Stop) => {
102 drop(client.close().await);
103 return;
104 }
105 Some(LocalRequest::AddOrder{request, result_sender}) => {
106 if let Err(err) = client.add_order(request, result_sender).await {
107 log::error!("error submitting an order, closing stream: {}", err);
108 drop(client.close().await);
109 return;
110 }
111 }
112 Some(LocalRequest::CancelOrder{tx_id, result_sender}) => {
113 if let Err(err) = client.cancel_order(tx_id, result_sender).await {
114 log::error!("error canceling an order, closing stream: {}", err);
115 drop(client.close().await);
116 return;
117 }
118 }
119 Some(LocalRequest::CancelAllOrders{result_sender}) => {
120 if let Err(err) = client.cancel_all_orders(result_sender).await {
121 log::error!("error canceling all orders, closing stream: {}", err);
122 drop(client.close().await);
123 return;
124 }
125 }
126 }
127 }
128 _ = interval.tick() => {
129 if let Some(time) = client.get_last_message_time() {
130 // If we haven't heard anything in a while that's bad
131 // Kraken says they send a heartbeat about every second
132 let now = Instant::now();
133 if time + Duration::from_secs(2) < now {
134 // Check if we earlier sent a ping
135 if let Some(ping_time) = client.get_last_outstanding_ping_time() {
136 if ping_time + Duration::from_secs(1) < now {
137 log::error!("Kraken did not respond to ping, closing stream");
138 drop(client.close().await);
139 return;
140 }
141 } else {
142 // There is no outstanding ping, let's send a ping
143 if let Err(err) = client.ping().await {
144 log::error!("error sending ping, closing stream: {}", err);
145 drop(client.close().await);
146 return;
147 }
148 }
149 }
150 }
151 }
152 }
153 }
154 })
155 },
156 )?);
157 Ok(Self {
158 worker_thread,
159 sender,
160 output,
161 })
162 }
163
164 /// Get the system status
165 pub fn system_status(&self) -> Option<SystemStatus> {
166 self.output.system_status.lock().expect("mutex poisoned").clone()
167 }
168
169 /// Get all latest book data that we have subscribed to
170 pub fn get_all_books(&self) -> BTreeMap<String, BookData> {
171 self.output
172 .book
173 .iter()
174 .map(|(asset_pair, lock)| (asset_pair.clone(), lock.lock().expect("mutex poisoned").clone()))
175 .collect()
176 }
177
178 /// Get latest book data that we have subscribed to, for an individual book
179 pub fn get_book(&self, asset_pair: &str) -> Option<BookData> {
180 self.output
181 .book
182 .get(asset_pair)
183 .map(|lock| lock.lock().expect("mutex poisoned").clone())
184 }
185
186 /// Get the most recent trades that we have seen, for an individual asset pair
187 /// Note that these can only be retrieved once and are not delivered to the next consumer.
188 ///
189 /// Returns None only if the asset pair is unknown, which is usually a logic error.
190 pub fn get_ohlc(&self, asset_pair: &str) -> Option<Vec<Candle>> {
191 self.output.ohlc.get(asset_pair).map(|lock| {
192 let mut lk = lock.lock().expect("mutex poisoned");
193 let result = lk.clone();
194 lk.clear(); // note, this doesn't reduce the capacity
195 result
196 })
197 }
198
199 /// Get the most recent trades that we have seen, for an individual asset pair
200 /// Note that these can only be retrieved once and are not delivered to the next consumer.
201 ///
202 /// Returns None only if the asset pair is unknown, which is usually a logic error.
203 pub fn get_trades(&self, asset_pair: &str) -> Option<Vec<PublicTrade>> {
204 self.output.trades.get(asset_pair).map(|lock| {
205 let mut lk = lock.lock().expect("mutex poisoned");
206 let result = lk.clone();
207 lk.clear(); // note, this doesn't reduce the capacity
208 result
209 })
210 }
211
212 /// Get latest openOrder data
213 pub fn get_open_orders(&self) -> HashMap<String, OrderInfo> {
214 self.output.open_orders.lock().expect("mutex poisoned").clone()
215 }
216
217 /// Get latest ownTrades data
218 /// Note that each trade can only be retrieved once and is not delivered to the next consumer.
219 pub fn get_own_trades(&self) -> Vec<OwnTrade> {
220 let mut lk = self.output.own_trades.lock().expect("mutex poisoned");
221 let result = lk.clone();
222 lk.clear(); // note, this doesn't reduce the capacity
223 result
224 }
225
226 /// Check if the stream is closed. If so then we should abandon this
227 /// instance of KrakenWsAPI and create a new one in order to reconnect.
228 ///
229 /// Note Kraken's advisory:
230 /// Cloudflare imposes a connection/re-connection rate limit (per IP address) of approximately 150 attempts per rolling 10 minutes. If this is exceeded, the IP is banned for 10 minutes.
231 /// Recommended reconnection behaviour is to (1) attempt reconnection instantly up to a handful of times if the websocket is dropped randomly during normal operation but (2) after maintenance or extended downtime, attempt to reconnect no more quickly than once every 5 seconds. There is no advantage to reconnecting more rapidly after maintenance during cancel_only mode.
232 pub fn stream_closed(&self) -> bool {
233 self.output.stream_closed.load(Ordering::SeqCst)
234 }
235
236 /// Submit a market order over the websockets connection.
237 /// This must be a private connection configured with the auth token.
238 ///
239 /// Arguments:
240 /// market_order: The market order to place
241 /// user_ref_id: The user-ref-id to associate to this order. Orders may be filtered or canceled by user-ref-id.
242 /// validate: If true, we just validate that the order was well formed and the order doesn't actually hit the books.
243 ///
244 /// Returns:
245 /// A oneshot::Reciever which yields either the TxID for the placed order, or an error message from kraken.
246 /// The Receiver produces no value if the order could not be successfully placed, and this will be logged.
247 /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
248 /// The return value will be None if the stream is already closed.
249 pub fn add_market_order(
250 &self,
251 market_order: MarketOrder,
252 user_ref_id: Option<i32>,
253 validate: bool,
254 ) -> Option<oneshot::Receiver<Result<String, String>>> {
255 let (result_sender, result_receiver) = oneshot::channel();
256 let request = AddOrderRequest {
257 ordertype: OrderType::Market,
258 bs_type: market_order.bs_type.into(),
259 volume: market_order.volume,
260 pair: market_order.pair,
261 price: Default::default(),
262 oflags: market_order.oflags.into_iter().map(OrderFlag::from).collect(),
263 userref: user_ref_id,
264 validate,
265 ..Default::default()
266 };
267 if self
268 .sender
269 .send(LocalRequest::AddOrder { request, result_sender })
270 .is_ok()
271 {
272 Some(result_receiver)
273 } else {
274 None
275 }
276 }
277
278 /// Submit a limit order over the websockets connection.
279 /// This must be a private connection configured with the auth token.
280 ///
281 /// Arguments:
282 /// limit_order: The order order to place
283 /// user_ref_id: The user-ref-id to associate to this order. Orders may be filtered or canceled by user-ref-id.
284 /// validate: If true, we just validate that the order was well formed and the order doesn't actually hit the books.
285 ///
286 /// Returns:
287 /// A oneshot::Reciever which yields either the TxID for the placed order, or an error message from kraken.
288 /// The Receiver produces no value if the order could not be successfully placed, and this will be logged.
289 /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
290 /// The return value will be None if the stream is already closed.
291 pub fn add_limit_order(
292 &self,
293 limit_order: LimitOrder,
294 user_ref_id: Option<i32>,
295 validate: bool,
296 ) -> Option<oneshot::Receiver<Result<String, String>>> {
297 let (result_sender, result_receiver) = oneshot::channel();
298 let request = AddOrderRequest {
299 ordertype: OrderType::Limit,
300 bs_type: limit_order.bs_type.into(),
301 volume: limit_order.volume,
302 pair: limit_order.pair,
303 price: limit_order.price,
304 oflags: limit_order.oflags.into_iter().map(OrderFlag::from).collect(),
305 userref: user_ref_id,
306 validate,
307 ..Default::default()
308 };
309 if self
310 .sender
311 .send(LocalRequest::AddOrder { request, result_sender })
312 .is_ok()
313 {
314 Some(result_receiver)
315 } else {
316 None
317 }
318 }
319
320 /// Submit a request to cancel an order over the websockets connection.
321 /// This must be a private connection configured with the auth token.
322 ///
323 /// Arguments:
324 /// tx_id: The TxId associated to an order, or, a user-ref-id
325 ///
326 /// Returns:
327 /// A oneshot::Reciever which yields either Ok on success canceling, or an error message from kraken.
328 /// The Receiver produces no value if the request could not be successfully placed, and this will be logged.
329 /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
330 /// The return value will be None if the stream is already closed.
331 pub fn cancel_order(&self, tx_id: String) -> Option<oneshot::Receiver<Result<(), String>>> {
332 let (result_sender, result_receiver) = oneshot::channel();
333 if self
334 .sender
335 .send(LocalRequest::CancelOrder { tx_id, result_sender })
336 .is_ok()
337 {
338 Some(result_receiver)
339 } else {
340 None
341 }
342 }
343
344 /// Submit a request to cancel all orders over the websockets connection.
345 /// This must be a private connection configured with the auth token.
346 ///
347 /// Returns:
348 /// A oneshot::Reciever which yields either Ok and a count of canceled orders, or an error message from kraken.
349 /// The Receiver produces no value if the request could not be successfully placed, and this will be logged.
350 /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
351 /// The return value will be None if the stream is already closed.
352 pub fn cancel_all_orders(&self) -> Option<oneshot::Receiver<Result<u64, String>>> {
353 let (result_sender, result_receiver) = oneshot::channel();
354 if self
355 .sender
356 .send(LocalRequest::CancelAllOrders { result_sender })
357 .is_ok()
358 {
359 Some(result_receiver)
360 } else {
361 None
362 }
363 }
364}
365
366impl Drop for KrakenWsAPI {
367 fn drop(&mut self) {
368 if let Some(worker_thread) = self.worker_thread.take() {
369 drop(self.sender.send(LocalRequest::Stop));
370 worker_thread.join().expect("Could not join thread");
371 }
372 }
373}
374
375impl std::convert::TryFrom<KrakenWsConfig> for KrakenWsAPI {
376 type Error = Error;
377 fn try_from(src: KrakenWsConfig) -> Result<KrakenWsAPI, Error> {
378 KrakenWsAPI::new(src)
379 }
380}
381
382/// A request made from the local handle (KrakenWsAPI) to
383/// the thread perfoming the websockets operations.
384enum LocalRequest {
385 /// Requests to stop the worker thread and close the connection gracefully
386 Stop,
387 /// Requests to add an order to the order book
388 AddOrder {
389 request: AddOrderRequest,
390 result_sender: oneshot::Sender<Result<String, String>>,
391 },
392 /// Requests to cancel one of our orders
393 CancelOrder {
394 tx_id: String,
395 result_sender: oneshot::Sender<Result<(), String>>,
396 },
397 /// Requests to cancel all of our orders
398 CancelAllOrders {
399 result_sender: oneshot::Sender<Result<u64, String>>,
400 },
401}