Skip to main content

barter_data/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(
3    unused,
4    clippy::cognitive_complexity,
5    unused_crate_dependencies,
6    unused_extern_crates,
7    clippy::unused_self,
8    clippy::useless_let_if_seq,
9    missing_debug_implementations,
10    rust_2018_idioms,
11    rust_2024_compatibility
12)]
13#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
14
15//! # Barter-Data
16//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
17//! exchanges - batteries included. It is:
18//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
19//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
20//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
21
22//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
23//!
24//! ## User API
25//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
26//! - [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
27//! - Define what exchange market data you want to stream using the [`Subscription`] type.
28//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init) methods.
29//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init))
30//!   method opens a new WebSocket connection to the exchange - giving you full control.
31//!
32//! ## Examples
33//! For a comprehensive collection of examples, see the /examples directory.
34//!
35//! ### Multi Exchange Public Trades
36//! ```rust,no_run
37//! use barter_data::{
38//!     exchange::{
39//!         gateio::spot::GateioSpot,
40//!         binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
41//!         coinbase::Coinbase,
42//!         okx::Okx,
43//!     },
44//!     streams::{Streams, reconnect::stream::ReconnectingStream},
45//!     subscription::trade::PublicTrades,
46//! };
47//! use barter_instrument::instrument::market_data::kind::MarketDataInstrumentKind;
48//! use futures::StreamExt;
49//! use tracing::warn;
50//!
51//! #[tokio::main]
52//! async fn main() {
53//!     // Initialise PublicTrades Streams for various exchanges
54//!     // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
55//!
56//!     let streams = Streams::<PublicTrades>::builder()
57//!         .subscribe([
58//!             (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
59//!             (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
60//!         ])
61//!         .subscribe([
62//!             (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
63//!             (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
64//!         ])
65//!         .subscribe([
66//!             (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
67//!             (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
68//!         ])
69//!         .subscribe([
70//!             (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
71//!             (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
72//!         ])
73//!         .subscribe([
74//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
75//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
76//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
77//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
78//!        ])
79//!         .init()
80//!         .await
81//!         .unwrap();
82//!
83//!     // Select and merge every exchange Stream using futures_util::stream::select_all
84//!     // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
85//!     let mut joined_stream = streams
86//!         .select_all()
87//!         .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
88//!
89//!     while let Some(event) = joined_stream.next().await {
90//!         println!("{event:?}");
91//!     }
92//! }
93//! ```
94use crate::{
95    error::DataError,
96    event::MarketEvent,
97    exchange::{Connector, PingInterval},
98    instrument::InstrumentData,
99    subscriber::{Subscribed, Subscriber},
100    subscription::{Subscription, SubscriptionKind},
101    transformer::ExchangeTransformer,
102};
103use async_trait::async_trait;
104use barter_instrument::exchange::ExchangeId;
105use barter_integration::{
106    Transformer,
107    error::SocketError,
108    protocol::{
109        StreamParser,
110        websocket::{WsError, WsMessage, WsSink, WsStream},
111    },
112};
113use futures::{SinkExt, Stream, StreamExt};
114
115use barter_integration::stream::ExchangeStream;
116use std::{collections::VecDeque, future::Future};
117use tokio::sync::mpsc;
118use tracing::{debug, error, warn};
119
120/// All [`Error`](std::error::Error)s generated in Barter-Data.
121pub mod error;
122
123/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
124pub mod event;
125
126/// [`Connector`] implementations for each exchange.
127pub mod exchange;
128
129/// High-level API types used for building [`MarketStream`]s from collections
130/// of Barter [`Subscription`]s.
131pub mod streams;
132
133/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
134/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator)  traits that define how a
135/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
136///
137/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
138pub mod subscriber;
139
140/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
141/// Barter output type the exchange will be transformed into.
142pub mod subscription;
143
144/// [`InstrumentData`] trait for instrument describing data.
145pub mod instrument;
146
147/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
148/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
149pub mod books;
150
151/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
152/// specific types to normalised Barter types.
153///
154/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
155/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
156///
157/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
158/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
159/// [`ExchangeTransformer`] implementations.
160/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
161/// [`ExchangeTransformer`] implementations for
162/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
163/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
164pub mod transformer;
165
166/// Convenient type alias for an [`ExchangeStream`] utilizing a tungstenite
167/// [`WebSocket`](barter_integration::protocol::websocket::WebSocket).
168pub type ExchangeWsStream<Parser, Transformer> = ExchangeStream<Parser, WsStream, Transformer>;
169
170/// Defines a generic identification type for the implementor.
171pub trait Identifier<T> {
172    fn id(&self) -> T;
173}
174
175/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
176/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
177#[async_trait]
178pub trait MarketStream<Exchange, Instrument, Kind>
179where
180    Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
181        + Send
182        + Sized
183        + Unpin,
184    Exchange: Connector,
185    Instrument: InstrumentData,
186    Kind: SubscriptionKind,
187{
188    async fn init<SnapFetcher>(
189        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
190    ) -> Result<Self, DataError>
191    where
192        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
193        Subscription<Exchange, Instrument, Kind>:
194            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
195}
196
197/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
198///
199/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
200///
201/// See examples such as Binance OrderBooksL2: <br>
202/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
203/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
204pub trait SnapshotFetcher<Exchange, Kind> {
205    fn fetch_snapshots<Instrument>(
206        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
207    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
208    where
209        Exchange: Connector,
210        Instrument: InstrumentData,
211        Kind: SubscriptionKind,
212        Kind::Event: Send,
213        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
214}
215
216#[async_trait]
217impl<Exchange, Instrument, Kind, Transformer, Parser> MarketStream<Exchange, Instrument, Kind>
218    for ExchangeWsStream<Parser, Transformer>
219where
220    Exchange: Connector + Send + Sync,
221    Instrument: InstrumentData,
222    Kind: SubscriptionKind + Send + Sync,
223    Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
224    Kind::Event: Send,
225    Parser: StreamParser<Transformer::Input, Message = WsMessage, Error = WsError> + Send,
226{
227    async fn init<SnapFetcher>(
228        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
229    ) -> Result<Self, DataError>
230    where
231        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
232        Subscription<Exchange, Instrument, Kind>:
233            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
234    {
235        // Connect & subscribe
236        let Subscribed {
237            websocket,
238            map: instrument_map,
239            buffered_websocket_events,
240        } = Exchange::Subscriber::subscribe(subscriptions).await?;
241
242        // Fetch any required initial MarketEvent snapshots
243        let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
244
245        // Split WebSocket into WsStream & WsSink components
246        let (ws_sink, ws_stream) = websocket.split();
247
248        // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
249        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
250        tokio::spawn(distribute_messages_to_exchange(
251            Exchange::ID,
252            ws_sink,
253            ws_sink_rx,
254        ));
255
256        // Spawn optional task to distribute custom application-level pings to the exchange
257        if let Some(ping_interval) = Exchange::ping_interval() {
258            tokio::spawn(schedule_pings_to_exchange(
259                Exchange::ID,
260                ws_sink_tx.clone(),
261                ping_interval,
262            ));
263        }
264
265        // Initialise Transformer associated with this Exchange and SubscriptionKind
266        let mut transformer =
267            Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
268
269        // Process any buffered active subscription events received during Subscription validation
270        let mut processed = process_buffered_events::<Parser, Transformer>(
271            &mut transformer,
272            buffered_websocket_events,
273        );
274
275        // Extend buffered events with any initial snapshot events
276        processed.extend(initial_snapshots.into_iter().map(Ok));
277
278        Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
279    }
280}
281
282/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
283/// Often used for stateless [`MarketStream`]s, such as public trades.
284#[derive(Debug)]
285pub struct NoInitialSnapshots;
286
287impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
288    fn fetch_snapshots<Instrument>(
289        _: &[Subscription<Exchange, Instrument, Kind>],
290    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
291    where
292        Exchange: Connector,
293        Instrument: InstrumentData,
294        Kind: SubscriptionKind,
295        Kind::Event: Send,
296        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
297    {
298        std::future::ready(Ok(vec![]))
299    }
300}
301
302pub fn process_buffered_events<Parser, StreamTransformer>(
303    transformer: &mut StreamTransformer,
304    events: Vec<Parser::Message>,
305) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
306where
307    Parser: StreamParser<StreamTransformer::Input>,
308    StreamTransformer: Transformer,
309{
310    events
311        .into_iter()
312        .filter_map(|event| {
313            Parser::parse(Ok(event))?
314                .inspect_err(|error| {
315                    warn!(
316                        ?error,
317                        "failed to parse message buffered during Subscription validation"
318                    )
319                })
320                .ok()
321        })
322        .flat_map(|parsed| transformer.transform(parsed))
323        .collect()
324}
325
326/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
327/// the [`WsSink`].
328///
329/// **Note:**
330/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
331/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
332pub async fn distribute_messages_to_exchange(
333    exchange: ExchangeId,
334    mut ws_sink: WsSink,
335    mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
336) {
337    while let Some(message) = ws_sink_rx.recv().await {
338        if let Err(error) = ws_sink.send(message).await {
339            if barter_integration::protocol::websocket::is_websocket_disconnected(&error) {
340                break;
341            }
342
343            // Log error only if WsMessage failed to send over a connected WebSocket
344            error!(
345                %exchange,
346                %error,
347                "failed to send output message to the exchange via WsSink"
348            );
349        }
350    }
351}
352
353/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
354/// the provided [`PingInterval`].
355///
356/// **Notes:**
357///  - This is only used for those exchanges that require custom application-level pings.
358///  - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
359pub async fn schedule_pings_to_exchange(
360    exchange: ExchangeId,
361    ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
362    PingInterval { mut interval, ping }: PingInterval,
363) {
364    loop {
365        // Wait for next scheduled ping
366        interval.tick().await;
367
368        // Construct exchange custom application-level ping payload
369        let payload = ping();
370        debug!(%exchange, %payload, "sending custom application-level ping to exchange");
371
372        if ws_sink_tx.send(payload).is_err() {
373            break;
374        }
375    }
376}
377
378pub mod test_utils {
379    use crate::{
380        event::{DataKind, MarketEvent},
381        subscription::trade::PublicTrade,
382    };
383    use barter_instrument::{Side, exchange::ExchangeId};
384    use chrono::{DateTime, Utc};
385
386    pub fn market_event_trade_buy<InstrumentKey>(
387        time_exchange: DateTime<Utc>,
388        time_received: DateTime<Utc>,
389        instrument: InstrumentKey,
390        price: f64,
391        quantity: f64,
392    ) -> MarketEvent<InstrumentKey, DataKind> {
393        MarketEvent {
394            time_exchange,
395            time_received,
396            exchange: ExchangeId::BinanceSpot,
397            instrument,
398            kind: DataKind::Trade(PublicTrade {
399                id: "trade_id".to_string(),
400                price,
401                amount: quantity,
402                side: Side::Buy,
403            }),
404        }
405    }
406}