krakenrs/ws/
mod.rs

1//! An interface for getting data from Kraken websockets API, while another thread manages
2//! the updates from the websockets connection.
3//!
4//! This follows the pattern of bridging async code into a sync interface
5//! See also: <https://tokio.rs/tokio/topics/bridging>
6//! and the `reqwest::blocking` module
7
8use crate::{LimitOrder, MarketOrder};
9use futures::stream::StreamExt;
10use std::sync::{Arc, atomic::Ordering};
11use std::{
12    collections::{BTreeMap, HashMap},
13    thread,
14    time::{Duration, Instant},
15};
16use tokio::{
17    runtime,
18    sync::{mpsc, oneshot},
19    time,
20};
21
22mod config;
23pub use config::{KrakenWsConfig, KrakenWsConfigBuilder};
24
25mod conn;
26pub use conn::{Error, KrakenWsClient, WsAPIResults};
27
28mod types;
29pub use types::{BookData, BookEntry, Candle, PublicTrade};
30
31mod messages;
32pub use messages::*;
33
34/// A handle to Kraken websockets API feeds
35///
36/// This is a sync API, but under the hood it contains a thread driving a small
37/// tokio runtime
38pub struct KrakenWsAPI {
39    // The worker thread that is consuming kraken api messages
40    worker_thread: Option<thread::JoinHandle<()>>,
41    // Sender object to send messages to the worker thread
42    sender: mpsc::UnboundedSender<LocalRequest>,
43    // Handle to the output of the worker thread
44    output: Arc<WsAPIResults>,
45}
46
47impl KrakenWsAPI {
48    /// Create a new web sockets connection to Kraken and subscribe to
49    /// specified channels
50    ///
51    /// Note: This is the same as using `TryFrom::try_from` to construct an instance
52    ///
53    /// Note: This call attempts to fail fast if a websockets connection cannot be established,
54    /// so it will block the current thread on that and return an error if connection fails.
55    /// If you are using the tokio multi-threaded runtime, you must call this from a blocking thread,
56    /// or the runtime will detect this and panic. You may wrap it in `task::spawn_blocking` or similar.
57    pub fn new(src: KrakenWsConfig) -> Result<Self, Error> {
58        // Build the runtime for the new thread.
59        //
60        // The runtime is created before spawning the thread
61        // to more cleanly forward errors if the `unwrap()`
62        // panics.
63        let rt = runtime::Builder::new_current_thread().enable_all().build().unwrap();
64
65        let (mut client, mut stream, output) = rt.block_on(KrakenWsClient::new(src))?;
66        let (sender, mut receiver) = mpsc::unbounded_channel();
67
68        let worker_thread = Some(thread::Builder::new().name("kraken-ws-internal-runtime".into()).spawn(
69            move || {
70                rt.block_on(async move {
71                    // Every second, confirm that we got a heart beat, or send a ping / expect a pong
72                    let mut interval = time::interval(Duration::from_secs(1));
73                    loop {
74                        tokio::select! {
75                            stream_result = stream.next() => {
76                                match stream_result {
77                                    Some(result) => {
78                                        match client.update(result) {
79                                            Ok(()) => {
80                                                // Maybe adjust subscriptions, closing corrupted subscriptions,
81                                                // and resubscribing to any subscriptions that are missing for a while
82                                                // to any subscriptions that were canceled
83                                                client.check_subscriptions().await;
84                                            }
85                                            Err(err) => {
86                                                log::error!("error, closing stream: {}", err);
87                                                drop(client.close().await);
88                                                return;
89                                            }
90                                        }
91                                    }
92                                    None => {
93                                        log::warn!("stream closed by kraken");
94                                        drop(client.close().await);
95                                        return;
96                                    }
97                                }
98                            }
99                            msg = receiver.recv() => {
100                                match msg {
101                                    None | Some(LocalRequest::Stop) => {
102                                        drop(client.close().await);
103                                        return;
104                                    }
105                                    Some(LocalRequest::AddOrder{request, result_sender}) => {
106                                        if let Err(err) = client.add_order(request, result_sender).await {
107                                            log::error!("error submitting an order, closing stream: {}", err);
108                                            drop(client.close().await);
109                                            return;
110                                        }
111                                    }
112                                    Some(LocalRequest::CancelOrder{tx_id, result_sender}) => {
113                                        if let Err(err) = client.cancel_order(tx_id, result_sender).await {
114                                            log::error!("error canceling an order, closing stream: {}", err);
115                                            drop(client.close().await);
116                                            return;
117                                        }
118                                    }
119                                    Some(LocalRequest::CancelAllOrders{result_sender}) => {
120                                        if let Err(err) = client.cancel_all_orders(result_sender).await {
121                                            log::error!("error canceling all orders, closing stream: {}", err);
122                                            drop(client.close().await);
123                                            return;
124                                        }
125                                    }
126                                }
127                            }
128                            _ = interval.tick() => {
129                                if let Some(time) = client.get_last_message_time() {
130                                    // If we haven't heard anything in a while that's bad
131                                    // Kraken says they send a heartbeat about every second
132                                    let now = Instant::now();
133                                    if time + Duration::from_secs(2) < now {
134                                        // Check if we earlier sent a ping
135                                        if let Some(ping_time) = client.get_last_outstanding_ping_time() {
136                                            if ping_time + Duration::from_secs(1) < now {
137                                                log::error!("Kraken did not respond to ping, closing stream");
138                                                drop(client.close().await);
139                                                return;
140                                            }
141                                        } else {
142                                            // There is no outstanding ping, let's send a ping
143                                            if let Err(err) = client.ping().await {
144                                                log::error!("error sending ping, closing stream: {}", err);
145                                                drop(client.close().await);
146                                                return;
147                                            }
148                                        }
149                                    }
150                                }
151                            }
152                        }
153                    }
154                })
155            },
156        )?);
157        Ok(Self {
158            worker_thread,
159            sender,
160            output,
161        })
162    }
163
164    /// Get the system status
165    pub fn system_status(&self) -> Option<SystemStatus> {
166        self.output.system_status.lock().expect("mutex poisoned").clone()
167    }
168
169    /// Get all latest book data that we have subscribed to
170    pub fn get_all_books(&self) -> BTreeMap<String, BookData> {
171        self.output
172            .book
173            .iter()
174            .map(|(asset_pair, lock)| (asset_pair.clone(), lock.lock().expect("mutex poisoned").clone()))
175            .collect()
176    }
177
178    /// Get latest book data that we have subscribed to, for an individual book
179    pub fn get_book(&self, asset_pair: &str) -> Option<BookData> {
180        self.output
181            .book
182            .get(asset_pair)
183            .map(|lock| lock.lock().expect("mutex poisoned").clone())
184    }
185
186    /// Get the most recent trades that we have seen, for an individual asset pair
187    /// Note that these can only be retrieved once and are not delivered to the next consumer.
188    ///
189    /// Returns None only if the asset pair is unknown, which is usually a logic error.
190    pub fn get_ohlc(&self, asset_pair: &str) -> Option<Vec<Candle>> {
191        self.output.ohlc.get(asset_pair).map(|lock| {
192            let mut lk = lock.lock().expect("mutex poisoned");
193            let result = lk.clone();
194            lk.clear(); // note, this doesn't reduce the capacity
195            result
196        })
197    }
198
199    /// Get the most recent trades that we have seen, for an individual asset pair
200    /// Note that these can only be retrieved once and are not delivered to the next consumer.
201    ///
202    /// Returns None only if the asset pair is unknown, which is usually a logic error.
203    pub fn get_trades(&self, asset_pair: &str) -> Option<Vec<PublicTrade>> {
204        self.output.trades.get(asset_pair).map(|lock| {
205            let mut lk = lock.lock().expect("mutex poisoned");
206            let result = lk.clone();
207            lk.clear(); // note, this doesn't reduce the capacity
208            result
209        })
210    }
211
212    /// Get latest openOrder data
213    pub fn get_open_orders(&self) -> HashMap<String, OrderInfo> {
214        self.output.open_orders.lock().expect("mutex poisoned").clone()
215    }
216
217    /// Get latest ownTrades data
218    /// Note that each trade can only be retrieved once and is not delivered to the next consumer.
219    pub fn get_own_trades(&self) -> Vec<OwnTrade> {
220        let mut lk = self.output.own_trades.lock().expect("mutex poisoned");
221        let result = lk.clone();
222        lk.clear(); // note, this doesn't reduce the capacity
223        result
224    }
225
226    /// Check if the stream is closed. If so then we should abandon this
227    /// instance of KrakenWsAPI and create a new one in order to reconnect.
228    ///
229    /// Note Kraken's advisory:
230    /// Cloudflare imposes a connection/re-connection rate limit (per IP address) of approximately 150 attempts per rolling 10 minutes. If this is exceeded, the IP is banned for 10 minutes.
231    /// Recommended reconnection behaviour is to (1) attempt reconnection instantly up to a handful of times if the websocket is dropped randomly during normal operation but (2) after maintenance or extended downtime, attempt to reconnect no more quickly than once every 5 seconds. There is no advantage to reconnecting more rapidly after maintenance during cancel_only mode.
232    pub fn stream_closed(&self) -> bool {
233        self.output.stream_closed.load(Ordering::SeqCst)
234    }
235
236    /// Submit a market order over the websockets connection.
237    /// This must be a private connection configured with the auth token.
238    ///
239    /// Arguments:
240    /// market_order: The market order to place
241    /// user_ref_id: The user-ref-id to associate to this order. Orders may be filtered or canceled by user-ref-id.
242    /// validate: If true, we just validate that the order was well formed and the order doesn't actually hit the books.
243    ///
244    /// Returns:
245    /// A oneshot::Reciever which yields either the TxID for the placed order, or an error message from kraken.
246    /// The Receiver produces no value if the order could not be successfully placed, and this will be logged.
247    /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
248    /// The return value will be None if the stream is already closed.
249    pub fn add_market_order(
250        &self,
251        market_order: MarketOrder,
252        user_ref_id: Option<i32>,
253        validate: bool,
254    ) -> Option<oneshot::Receiver<Result<String, String>>> {
255        let (result_sender, result_receiver) = oneshot::channel();
256        let request = AddOrderRequest {
257            ordertype: OrderType::Market,
258            bs_type: market_order.bs_type.into(),
259            volume: market_order.volume,
260            pair: market_order.pair,
261            price: Default::default(),
262            oflags: market_order.oflags.into_iter().map(OrderFlag::from).collect(),
263            userref: user_ref_id,
264            validate,
265            ..Default::default()
266        };
267        if self
268            .sender
269            .send(LocalRequest::AddOrder { request, result_sender })
270            .is_ok()
271        {
272            Some(result_receiver)
273        } else {
274            None
275        }
276    }
277
278    /// Submit a limit order over the websockets connection.
279    /// This must be a private connection configured with the auth token.
280    ///
281    /// Arguments:
282    /// limit_order: The order order to place
283    /// user_ref_id: The user-ref-id to associate to this order. Orders may be filtered or canceled by user-ref-id.
284    /// validate: If true, we just validate that the order was well formed and the order doesn't actually hit the books.
285    ///
286    /// Returns:
287    /// A oneshot::Reciever which yields either the TxID for the placed order, or an error message from kraken.
288    /// The Receiver produces no value if the order could not be successfully placed, and this will be logged.
289    /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
290    /// The return value will be None if the stream is already closed.
291    pub fn add_limit_order(
292        &self,
293        limit_order: LimitOrder,
294        user_ref_id: Option<i32>,
295        validate: bool,
296    ) -> Option<oneshot::Receiver<Result<String, String>>> {
297        let (result_sender, result_receiver) = oneshot::channel();
298        let request = AddOrderRequest {
299            ordertype: OrderType::Limit,
300            bs_type: limit_order.bs_type.into(),
301            volume: limit_order.volume,
302            pair: limit_order.pair,
303            price: limit_order.price,
304            oflags: limit_order.oflags.into_iter().map(OrderFlag::from).collect(),
305            userref: user_ref_id,
306            validate,
307            ..Default::default()
308        };
309        if self
310            .sender
311            .send(LocalRequest::AddOrder { request, result_sender })
312            .is_ok()
313        {
314            Some(result_receiver)
315        } else {
316            None
317        }
318    }
319
320    /// Submit a request to cancel an order over the websockets connection.
321    /// This must be a private connection configured with the auth token.
322    ///
323    /// Arguments:
324    /// tx_id: The TxId associated to an order, or, a user-ref-id
325    ///
326    /// Returns:
327    /// A oneshot::Reciever which yields either Ok on success canceling, or an error message from kraken.
328    /// The Receiver produces no value if the request could not be successfully placed, and this will be logged.
329    /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
330    /// The return value will be None if the stream is already closed.
331    pub fn cancel_order(&self, tx_id: String) -> Option<oneshot::Receiver<Result<(), String>>> {
332        let (result_sender, result_receiver) = oneshot::channel();
333        if self
334            .sender
335            .send(LocalRequest::CancelOrder { tx_id, result_sender })
336            .is_ok()
337        {
338            Some(result_receiver)
339        } else {
340            None
341        }
342    }
343
344    /// Submit a request to cancel all orders over the websockets connection.
345    /// This must be a private connection configured with the auth token.
346    ///
347    /// Returns:
348    /// A oneshot::Reciever which yields either Ok and a count of canceled orders, or an error message from kraken.
349    /// The Receiver produces no value if the request could not be successfully placed, and this will be logged.
350    /// The Receiver may be dropped if you don't care about the errors -- these error messages will be logged regardless.
351    /// The return value will be None if the stream is already closed.
352    pub fn cancel_all_orders(&self) -> Option<oneshot::Receiver<Result<u64, String>>> {
353        let (result_sender, result_receiver) = oneshot::channel();
354        if self
355            .sender
356            .send(LocalRequest::CancelAllOrders { result_sender })
357            .is_ok()
358        {
359            Some(result_receiver)
360        } else {
361            None
362        }
363    }
364}
365
366impl Drop for KrakenWsAPI {
367    fn drop(&mut self) {
368        if let Some(worker_thread) = self.worker_thread.take() {
369            drop(self.sender.send(LocalRequest::Stop));
370            worker_thread.join().expect("Could not join thread");
371        }
372    }
373}
374
375impl std::convert::TryFrom<KrakenWsConfig> for KrakenWsAPI {
376    type Error = Error;
377    fn try_from(src: KrakenWsConfig) -> Result<KrakenWsAPI, Error> {
378        KrakenWsAPI::new(src)
379    }
380}
381
382/// A request made from the local handle (KrakenWsAPI) to
383/// the thread perfoming the websockets operations.
384enum LocalRequest {
385    /// Requests to stop the worker thread and close the connection gracefully
386    Stop,
387    /// Requests to add an order to the order book
388    AddOrder {
389        request: AddOrderRequest,
390        result_sender: oneshot::Sender<Result<String, String>>,
391    },
392    /// Requests to cancel one of our orders
393    CancelOrder {
394        tx_id: String,
395        result_sender: oneshot::Sender<Result<(), String>>,
396    },
397    /// Requests to cancel all of our orders
398    CancelAllOrders {
399        result_sender: oneshot::Sender<Result<u64, String>>,
400    },
401}