Skip to main content

rustrade_data/
lib.rs

1#![forbid(unsafe_code)]
2#![deny(
3    clippy::unwrap_used,
4    clippy::expect_used,
5    clippy::cast_possible_truncation,
6    clippy::cast_sign_loss
7)]
8#![warn(
9    unused,
10    clippy::cognitive_complexity,
11    unused_crate_dependencies,
12    unused_extern_crates,
13    clippy::unused_self,
14    clippy::useless_let_if_seq,
15    missing_debug_implementations,
16    rust_2018_idioms
17)]
18#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
19
20//! # Barter-Data
21//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
22//! exchanges - batteries included. It is:
23//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
24//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
25//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
26
27//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
28//!
29//! ## User API
30//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
31//! - [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
32//! - Define what exchange market data you want to stream using the [`Subscription`] type.
33//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init) methods.
34//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init))
35//!   method opens a new WebSocket connection to the exchange - giving you full control.
36//!
37//! ## Examples
38//! For a comprehensive collection of examples, see the /examples directory.
39//!
40//! ### Multi Exchange Public Trades
41//! ```rust,no_run
42//! use rustrade_data::{
43//!     exchange::{
44//!         gateio::spot::GateioSpot,
45//!         binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
46//!         coinbase::Coinbase,
47//!         okx::Okx,
48//!     },
49//!     streams::{Streams, reconnect::stream::ReconnectingStream},
50//!     subscriber::WebSocketSubscriber,
51//!     subscription::trade::PublicTrades,
52//! };
53//! use rustrade_instrument::instrument::market_data::kind::MarketDataInstrumentKind;
54//! use futures::StreamExt;
55//! use tracing::warn;
56//!
57//! #[tokio::main]
58//! async fn main() {
59//!     // Initialise PublicTrades Streams for various exchanges
60//!     // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
61//!
62//!     let streams = Streams::<PublicTrades>::builder()
63//!         .subscribe(WebSocketSubscriber, [
64//!             (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
65//!             (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
66//!         ])
67//!         .subscribe(WebSocketSubscriber, [
68//!             (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
69//!             (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
70//!         ])
71//!         .subscribe(WebSocketSubscriber, [
72//!             (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
73//!             (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
74//!         ])
75//!         .subscribe(WebSocketSubscriber, [
76//!             (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
77//!             (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
78//!         ])
79//!         .subscribe(WebSocketSubscriber, [
80//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
81//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
82//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
83//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
84//!        ])
85//!         .init()
86//!         .await
87//!         .unwrap();
88//!
89//!     // Select and merge every exchange Stream using futures_util::stream::select_all
90//!     // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
91//!     let mut joined_stream = streams
92//!         .select_all()
93//!         .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
94//!
95//!     while let Some(event) = joined_stream.next().await {
96//!         println!("{event:?}");
97//!     }
98//! }
99//! ```
100
101// Silence unused_crate_dependencies for dev-dependencies used only in tests
102#[cfg(test)]
103use tracing_subscriber as _;
104// serial_test is only referenced by integration tests under the `massive` feature
105// (tests/massive_integration.rs), which compile as separate units.
106#[cfg(test)]
107use serial_test as _;
108
109use crate::{
110    error::DataError,
111    event::MarketEvent,
112    exchange::{Connector, PingInterval},
113    instrument::InstrumentData,
114    subscriber::{Subscribed, Subscriber},
115    subscription::{Subscription, SubscriptionKind},
116    transformer::ExchangeTransformer,
117};
118use futures::{SinkExt, Stream, StreamExt};
119use rustrade_instrument::exchange::ExchangeId;
120use rustrade_integration::{
121    Transformer,
122    error::SocketError,
123    protocol::{
124        StreamParser,
125        websocket::{WsError, WsMessage, WsSink, WsStream},
126    },
127};
128
129use rustrade_integration::stream::ExchangeStream;
130use std::{collections::VecDeque, future::Future};
131use tokio::sync::mpsc;
132use tracing::{debug, error, warn};
133
134/// All [`Error`](std::error::Error)s generated in Barter-Data.
135pub mod error;
136
137/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
138pub mod event;
139
140/// [`Connector`] implementations for each exchange.
141pub mod exchange;
142
143/// High-level API types used for building [`MarketStream`]s from collections
144/// of Barter [`Subscription`]s.
145pub mod streams;
146
147/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
148/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator)  traits that define how a
149/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
150///
151/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
152pub mod subscriber;
153
154/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
155/// Barter output type the exchange will be transformed into.
156pub mod subscription;
157
158/// [`InstrumentData`] trait for instrument describing data.
159pub mod instrument;
160
161/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
162/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
163pub mod books;
164
165/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
166/// specific types to normalised Barter types.
167///
168/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
169/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
170///
171/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
172/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
173/// [`ExchangeTransformer`] implementations.
174/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
175/// [`ExchangeTransformer`] implementations for
176/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
177/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
178pub mod transformer;
179
180/// Convenient type alias for an [`ExchangeStream`] utilizing a tungstenite
181/// [`WebSocket`](rustrade_integration::protocol::websocket::WebSocket).
182pub type ExchangeWsStream<Parser, Transformer> = ExchangeStream<Parser, WsStream, Transformer>;
183
184/// Defines a generic identification type for the implementor.
185pub trait Identifier<T> {
186    fn id(&self) -> T;
187}
188
189/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
190/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
191pub trait MarketStream<Exchange, Instrument, Kind>
192where
193    Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
194        + Send
195        + Sized
196        + Unpin,
197    Exchange: Connector,
198    Instrument: InstrumentData,
199    Kind: SubscriptionKind,
200{
201    fn init<SnapFetcher>(
202        subscriber: &Exchange::Subscriber,
203        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
204    ) -> impl Future<Output = Result<Self, DataError>> + Send
205    where
206        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
207        Subscription<Exchange, Instrument, Kind>:
208            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
209}
210
211/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
212///
213/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
214///
215/// See examples such as Binance OrderBooksL2: <br>
216/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
217/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
218pub trait SnapshotFetcher<Exchange, Kind> {
219    fn fetch_snapshots<Instrument>(
220        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
221    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
222    where
223        Exchange: Connector,
224        Instrument: InstrumentData,
225        Kind: SubscriptionKind,
226        Kind::Event: Send,
227        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
228}
229
230impl<Exchange, Instrument, Kind, Transformer, Parser> MarketStream<Exchange, Instrument, Kind>
231    for ExchangeWsStream<Parser, Transformer>
232where
233    Exchange: Connector + Send + Sync,
234    Instrument: InstrumentData,
235    Kind: SubscriptionKind + Send + Sync,
236    Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
237    Kind::Event: Send + Sync,
238    Parser: StreamParser<Transformer::Input, Message = WsMessage, Error = WsError> + Send,
239{
240    async fn init<SnapFetcher>(
241        subscriber: &Exchange::Subscriber,
242        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
243    ) -> Result<Self, DataError>
244    where
245        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
246        Subscription<Exchange, Instrument, Kind>:
247            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
248    {
249        // Connect & subscribe
250        let Subscribed {
251            websocket,
252            map: instrument_map,
253            buffered_websocket_events,
254        } = subscriber.subscribe(subscriptions).await?;
255
256        // Fetch any required initial MarketEvent snapshots
257        let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
258
259        // Split WebSocket into WsStream & WsSink components
260        let (ws_sink, ws_stream) = websocket.split();
261
262        // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
263        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
264        tokio::spawn(distribute_messages_to_exchange(
265            Exchange::ID,
266            ws_sink,
267            ws_sink_rx,
268        ));
269
270        // Spawn optional task to distribute custom application-level pings to the exchange
271        if let Some(ping_interval) = Exchange::ping_interval() {
272            tokio::spawn(schedule_pings_to_exchange(
273                Exchange::ID,
274                ws_sink_tx.clone(),
275                ping_interval,
276            ));
277        }
278
279        // Initialise Transformer associated with this Exchange and SubscriptionKind
280        let mut transformer =
281            Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
282
283        // Process any buffered active subscription events received during Subscription validation
284        let mut processed = process_buffered_events::<Parser, Transformer>(
285            &mut transformer,
286            buffered_websocket_events,
287        );
288
289        // Extend buffered events with any initial snapshot events
290        processed.extend(initial_snapshots.into_iter().map(Ok));
291
292        Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
293    }
294}
295
296/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
297/// Often used for stateless [`MarketStream`]s, such as public trades.
298#[derive(Debug)]
299pub struct NoInitialSnapshots;
300
301impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
302    fn fetch_snapshots<Instrument>(
303        _: &[Subscription<Exchange, Instrument, Kind>],
304    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
305    where
306        Exchange: Connector,
307        Instrument: InstrumentData,
308        Kind: SubscriptionKind,
309        Kind::Event: Send,
310        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
311    {
312        std::future::ready(Ok(vec![]))
313    }
314}
315
316pub fn process_buffered_events<Parser, StreamTransformer>(
317    transformer: &mut StreamTransformer,
318    events: Vec<Parser::Message>,
319) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
320where
321    Parser: StreamParser<StreamTransformer::Input>,
322    StreamTransformer: Transformer,
323{
324    events
325        .into_iter()
326        .filter_map(|event| {
327            Parser::parse(Ok(event))?
328                .inspect_err(|error| {
329                    warn!(
330                        ?error,
331                        "failed to parse message buffered during Subscription validation"
332                    )
333                })
334                .ok()
335        })
336        .flat_map(|parsed| transformer.transform(parsed))
337        .collect()
338}
339
340/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
341/// the [`WsSink`].
342///
343/// **Note:**
344/// [`Transformer`] is a synchronous trait, so we use this separate task to handle
345/// async WebSocket writes without requiring the transformer to be async.
346pub async fn distribute_messages_to_exchange(
347    exchange: ExchangeId,
348    mut ws_sink: WsSink,
349    mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
350) {
351    while let Some(message) = ws_sink_rx.recv().await {
352        if let Err(error) = ws_sink.send(message).await {
353            if rustrade_integration::protocol::websocket::is_websocket_disconnected(&error) {
354                break;
355            }
356
357            // Log error only if WsMessage failed to send over a connected WebSocket
358            error!(
359                %exchange,
360                %error,
361                "failed to send output message to the exchange via WsSink"
362            );
363        }
364    }
365}
366
367/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
368/// the provided [`PingInterval`].
369///
370/// **Notes:**
371///  - This is only used for those exchanges that require custom application-level pings.
372///  - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
373pub async fn schedule_pings_to_exchange(
374    exchange: ExchangeId,
375    ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
376    PingInterval { mut interval, ping }: PingInterval,
377) {
378    loop {
379        // Wait for next scheduled ping
380        interval.tick().await;
381
382        // Construct exchange custom application-level ping payload
383        let payload = ping();
384        debug!(%exchange, %payload, "sending custom application-level ping to exchange");
385
386        if ws_sink_tx.send(payload).is_err() {
387            break;
388        }
389    }
390}
391
392pub mod test_utils {
393    use crate::{
394        event::{DataKind, MarketEvent},
395        subscription::trade::PublicTrade,
396    };
397    use chrono::{DateTime, Utc};
398    use rust_decimal::Decimal;
399    use rustrade_instrument::{Side, exchange::ExchangeId};
400
401    pub fn market_event_trade_buy<InstrumentKey>(
402        time_exchange: DateTime<Utc>,
403        time_received: DateTime<Utc>,
404        instrument: InstrumentKey,
405        price: Decimal,
406        quantity: Decimal,
407    ) -> MarketEvent<InstrumentKey, DataKind> {
408        MarketEvent {
409            time_exchange,
410            time_received,
411            exchange: ExchangeId::BinanceSpot,
412            instrument,
413            kind: DataKind::Trade(PublicTrade {
414                id: "trade_id".into(),
415                price,
416                amount: quantity,
417                side: Some(Side::Buy),
418            }),
419        }
420    }
421}