Skip to main content

rustrade_data/
lib.rs

1#![forbid(unsafe_code)]
2#![deny(
3    clippy::unwrap_used,
4    clippy::expect_used,
5    clippy::cast_possible_truncation,
6    clippy::cast_sign_loss
7)]
8#![warn(
9    unused,
10    clippy::cognitive_complexity,
11    unused_crate_dependencies,
12    unused_extern_crates,
13    clippy::unused_self,
14    clippy::useless_let_if_seq,
15    missing_debug_implementations,
16    rust_2018_idioms
17)]
18#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
19
20//! # Barter-Data
21//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
22//! exchanges - batteries included. It is:
23//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
24//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
25//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
26
27//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
28//!
29//! ## User API
30//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
31//! - [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
32//! - Define what exchange market data you want to stream using the [`Subscription`] type.
33//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init) methods.
34//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init))
35//!   method opens a new WebSocket connection to the exchange - giving you full control.
36//!
37//! ## Examples
38//! For a comprehensive collection of examples, see the /examples directory.
39//!
40//! ### Multi Exchange Public Trades
41//! ```rust,no_run
42//! use rustrade_data::{
43//!     exchange::{
44//!         gateio::spot::GateioSpot,
45//!         binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
46//!         coinbase::Coinbase,
47//!         okx::Okx,
48//!     },
49//!     streams::{Streams, reconnect::stream::ReconnectingStream},
50//!     subscription::trade::PublicTrades,
51//! };
52//! use rustrade_instrument::instrument::market_data::kind::MarketDataInstrumentKind;
53//! use futures::StreamExt;
54//! use tracing::warn;
55//!
56//! #[tokio::main]
57//! async fn main() {
58//!     // Initialise PublicTrades Streams for various exchanges
59//!     // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
60//!
61//!     let streams = Streams::<PublicTrades>::builder()
62//!         .subscribe([
63//!             (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
64//!             (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
65//!         ])
66//!         .subscribe([
67//!             (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
68//!             (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
69//!         ])
70//!         .subscribe([
71//!             (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
72//!             (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
73//!         ])
74//!         .subscribe([
75//!             (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
76//!             (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
77//!         ])
78//!         .subscribe([
79//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
80//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
81//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
82//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
83//!        ])
84//!         .init()
85//!         .await
86//!         .unwrap();
87//!
88//!     // Select and merge every exchange Stream using futures_util::stream::select_all
89//!     // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
90//!     let mut joined_stream = streams
91//!         .select_all()
92//!         .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
93//!
94//!     while let Some(event) = joined_stream.next().await {
95//!         println!("{event:?}");
96//!     }
97//! }
98//! ```
99
100// Silence unused_crate_dependencies for dev-dependencies used only in tests
101#[cfg(test)]
102use tracing_subscriber as _;
103
104use crate::{
105    error::DataError,
106    event::MarketEvent,
107    exchange::{Connector, PingInterval},
108    instrument::InstrumentData,
109    subscriber::{Subscribed, Subscriber},
110    subscription::{Subscription, SubscriptionKind},
111    transformer::ExchangeTransformer,
112};
113use async_trait::async_trait;
114use futures::{SinkExt, Stream, StreamExt};
115use rustrade_instrument::exchange::ExchangeId;
116use rustrade_integration::{
117    Transformer,
118    error::SocketError,
119    protocol::{
120        StreamParser,
121        websocket::{WsError, WsMessage, WsSink, WsStream},
122    },
123};
124
125use rustrade_integration::stream::ExchangeStream;
126use std::{collections::VecDeque, future::Future};
127use tokio::sync::mpsc;
128use tracing::{debug, error, warn};
129
130/// All [`Error`](std::error::Error)s generated in Barter-Data.
131pub mod error;
132
133/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
134pub mod event;
135
136/// [`Connector`] implementations for each exchange.
137pub mod exchange;
138
139/// High-level API types used for building [`MarketStream`]s from collections
140/// of Barter [`Subscription`]s.
141pub mod streams;
142
143/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
144/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator)  traits that define how a
145/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
146///
147/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
148pub mod subscriber;
149
150/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
151/// Barter output type the exchange will be transformed into.
152pub mod subscription;
153
154/// [`InstrumentData`] trait for instrument describing data.
155pub mod instrument;
156
157/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
158/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
159pub mod books;
160
161/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
162/// specific types to normalised Barter types.
163///
164/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
165/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
166///
167/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
168/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
169/// [`ExchangeTransformer`] implementations.
170/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
171/// [`ExchangeTransformer`] implementations for
172/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
173/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
174pub mod transformer;
175
176/// Convenient type alias for an [`ExchangeStream`] utilizing a tungstenite
177/// [`WebSocket`](rustrade_integration::protocol::websocket::WebSocket).
178pub type ExchangeWsStream<Parser, Transformer> = ExchangeStream<Parser, WsStream, Transformer>;
179
180/// Defines a generic identification type for the implementor.
181pub trait Identifier<T> {
182    fn id(&self) -> T;
183}
184
185/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
186/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
187#[async_trait]
188pub trait MarketStream<Exchange, Instrument, Kind>
189where
190    Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
191        + Send
192        + Sized
193        + Unpin,
194    Exchange: Connector,
195    Instrument: InstrumentData,
196    Kind: SubscriptionKind,
197{
198    async fn init<SnapFetcher>(
199        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
200    ) -> Result<Self, DataError>
201    where
202        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
203        Subscription<Exchange, Instrument, Kind>:
204            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
205}
206
207/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
208///
209/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
210///
211/// See examples such as Binance OrderBooksL2: <br>
212/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
213/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
214pub trait SnapshotFetcher<Exchange, Kind> {
215    fn fetch_snapshots<Instrument>(
216        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
217    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
218    where
219        Exchange: Connector,
220        Instrument: InstrumentData,
221        Kind: SubscriptionKind,
222        Kind::Event: Send,
223        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
224}
225
226#[async_trait]
227impl<Exchange, Instrument, Kind, Transformer, Parser> MarketStream<Exchange, Instrument, Kind>
228    for ExchangeWsStream<Parser, Transformer>
229where
230    Exchange: Connector + Send + Sync,
231    Instrument: InstrumentData,
232    Kind: SubscriptionKind + Send + Sync,
233    Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
234    Kind::Event: Send,
235    Parser: StreamParser<Transformer::Input, Message = WsMessage, Error = WsError> + Send,
236{
237    async fn init<SnapFetcher>(
238        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
239    ) -> Result<Self, DataError>
240    where
241        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
242        Subscription<Exchange, Instrument, Kind>:
243            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
244    {
245        // Connect & subscribe
246        let Subscribed {
247            websocket,
248            map: instrument_map,
249            buffered_websocket_events,
250        } = Exchange::Subscriber::subscribe(subscriptions).await?;
251
252        // Fetch any required initial MarketEvent snapshots
253        let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
254
255        // Split WebSocket into WsStream & WsSink components
256        let (ws_sink, ws_stream) = websocket.split();
257
258        // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
259        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
260        tokio::spawn(distribute_messages_to_exchange(
261            Exchange::ID,
262            ws_sink,
263            ws_sink_rx,
264        ));
265
266        // Spawn optional task to distribute custom application-level pings to the exchange
267        if let Some(ping_interval) = Exchange::ping_interval() {
268            tokio::spawn(schedule_pings_to_exchange(
269                Exchange::ID,
270                ws_sink_tx.clone(),
271                ping_interval,
272            ));
273        }
274
275        // Initialise Transformer associated with this Exchange and SubscriptionKind
276        let mut transformer =
277            Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
278
279        // Process any buffered active subscription events received during Subscription validation
280        let mut processed = process_buffered_events::<Parser, Transformer>(
281            &mut transformer,
282            buffered_websocket_events,
283        );
284
285        // Extend buffered events with any initial snapshot events
286        processed.extend(initial_snapshots.into_iter().map(Ok));
287
288        Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
289    }
290}
291
292/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
293/// Often used for stateless [`MarketStream`]s, such as public trades.
294#[derive(Debug)]
295pub struct NoInitialSnapshots;
296
297impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
298    fn fetch_snapshots<Instrument>(
299        _: &[Subscription<Exchange, Instrument, Kind>],
300    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
301    where
302        Exchange: Connector,
303        Instrument: InstrumentData,
304        Kind: SubscriptionKind,
305        Kind::Event: Send,
306        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
307    {
308        std::future::ready(Ok(vec![]))
309    }
310}
311
312pub fn process_buffered_events<Parser, StreamTransformer>(
313    transformer: &mut StreamTransformer,
314    events: Vec<Parser::Message>,
315) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
316where
317    Parser: StreamParser<StreamTransformer::Input>,
318    StreamTransformer: Transformer,
319{
320    events
321        .into_iter()
322        .filter_map(|event| {
323            Parser::parse(Ok(event))?
324                .inspect_err(|error| {
325                    warn!(
326                        ?error,
327                        "failed to parse message buffered during Subscription validation"
328                    )
329                })
330                .ok()
331        })
332        .flat_map(|parsed| transformer.transform(parsed))
333        .collect()
334}
335
336/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
337/// the [`WsSink`].
338///
339/// **Note:**
340/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
341/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
342pub async fn distribute_messages_to_exchange(
343    exchange: ExchangeId,
344    mut ws_sink: WsSink,
345    mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
346) {
347    while let Some(message) = ws_sink_rx.recv().await {
348        if let Err(error) = ws_sink.send(message).await {
349            if rustrade_integration::protocol::websocket::is_websocket_disconnected(&error) {
350                break;
351            }
352
353            // Log error only if WsMessage failed to send over a connected WebSocket
354            error!(
355                %exchange,
356                %error,
357                "failed to send output message to the exchange via WsSink"
358            );
359        }
360    }
361}
362
363/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
364/// the provided [`PingInterval`].
365///
366/// **Notes:**
367///  - This is only used for those exchanges that require custom application-level pings.
368///  - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
369pub async fn schedule_pings_to_exchange(
370    exchange: ExchangeId,
371    ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
372    PingInterval { mut interval, ping }: PingInterval,
373) {
374    loop {
375        // Wait for next scheduled ping
376        interval.tick().await;
377
378        // Construct exchange custom application-level ping payload
379        let payload = ping();
380        debug!(%exchange, %payload, "sending custom application-level ping to exchange");
381
382        if ws_sink_tx.send(payload).is_err() {
383            break;
384        }
385    }
386}
387
388pub mod test_utils {
389    use crate::{
390        event::{DataKind, MarketEvent},
391        subscription::trade::PublicTrade,
392    };
393    use chrono::{DateTime, Utc};
394    use rustrade_instrument::{Side, exchange::ExchangeId};
395
396    pub fn market_event_trade_buy<InstrumentKey>(
397        time_exchange: DateTime<Utc>,
398        time_received: DateTime<Utc>,
399        instrument: InstrumentKey,
400        price: f64,
401        quantity: f64,
402    ) -> MarketEvent<InstrumentKey, DataKind> {
403        MarketEvent {
404            time_exchange,
405            time_received,
406            exchange: ExchangeId::BinanceSpot,
407            instrument,
408            kind: DataKind::Trade(PublicTrade {
409                id: "trade_id".into(),
410                price,
411                amount: quantity,
412                side: Side::Buy,
413            }),
414        }
415    }
416}