barter_data/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(
3    unused,
4    clippy::cognitive_complexity,
5    unused_crate_dependencies,
6    unused_extern_crates,
7    clippy::unused_self,
8    clippy::useless_let_if_seq,
9    missing_debug_implementations,
10    rust_2018_idioms,
11    rust_2024_compatibility
12)]
13#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
14
15//! # Barter-Data
16//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
17//! exchanges - batteries included. It is:
18//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
19//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
20//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
21//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
22//!
23//! ## User API
24//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
25//! - [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
26//! - Define what exchange market data you want to stream using the [`Subscription`] type.
27//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init) methods.
28//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init))
29//!   method opens a new WebSocket connection to the exchange - giving you full control.
30//!
31//! ## Examples
32//! For a comprehensive collection of examples, see the /examples directory.
33//!
34//! ### Multi Exchange Public Trades
35//! ```rust,no_run
36//! use barter_data::{
37//!     exchange::{
38//!         gateio::spot::GateioSpot,
39//!         binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
40//!         coinbase::Coinbase,
41//!         okx::Okx,
42//!     },
43//!     streams::{Streams, reconnect::stream::ReconnectingStream},
44//!     subscription::trade::PublicTrades,
45//! };
46//! use barter_instrument::instrument::market_data::kind::MarketDataInstrumentKind;
47//! use futures::StreamExt;
48//! use tracing::warn;
49//!
50//! #[tokio::main]
51//! async fn main() {
52//!     // Initialise PublicTrades Streams for various exchanges
53//!     // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
54//!
55//!     let streams = Streams::<PublicTrades>::builder()
56//!         .subscribe([
57//!             (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
58//!             (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
59//!         ])
60//!         .subscribe([
61//!             (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
62//!             (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
63//!         ])
64//!         .subscribe([
65//!             (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
66//!             (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
67//!         ])
68//!         .subscribe([
69//!             (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
70//!             (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
71//!         ])
72//!         .subscribe([
73//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
74//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
75//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
76//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
77//!        ])
78//!         .init()
79//!         .await
80//!         .unwrap();
81//!
82//!     // Select and merge every exchange Stream using futures_util::stream::select_all
83//!     // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
84//!     let mut joined_stream = streams
85//!         .select_all()
86//!         .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
87//!
88//!     while let Some(event) = joined_stream.next().await {
89//!         println!("{event:?}");
90//!     }
91//! }
92//! ```
93use crate::{
94    error::DataError,
95    event::MarketEvent,
96    exchange::{Connector, PingInterval},
97    instrument::InstrumentData,
98    subscriber::{Subscribed, Subscriber},
99    subscription::{Subscription, SubscriptionKind},
100    transformer::ExchangeTransformer,
101};
102use async_trait::async_trait;
103use barter_instrument::exchange::ExchangeId;
104use barter_integration::{
105    Transformer,
106    error::SocketError,
107    protocol::{
108        StreamParser,
109        websocket::{WebSocketParser, WsMessage, WsSink, WsStream},
110    },
111    stream::ExchangeStream,
112};
113use futures::{SinkExt, Stream, StreamExt};
114use std::{collections::VecDeque, future::Future};
115use tokio::sync::mpsc;
116use tracing::{debug, error, warn};
117
118/// All [`Error`](std::error::Error)s generated in Barter-Data.
119pub mod error;
120
121/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
122pub mod event;
123
124/// [`Connector`] implementations for each exchange.
125pub mod exchange;
126
127/// High-level API types used for building [`MarketStream`]s from collections
128/// of Barter [`Subscription`]s.
129pub mod streams;
130
131/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
132/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator)  traits that define how a
133/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
134///
135/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
136pub mod subscriber;
137
138/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
139/// Barter output type the exchange will be transformed into.
140pub mod subscription;
141
142/// [`InstrumentData`] trait for instrument describing data.
143pub mod instrument;
144
145/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
146/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
147pub mod books;
148
149/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
150/// specific types to normalised Barter types.
151///
152/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
153/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
154///
155/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
156/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
157/// [`ExchangeTransformer`] implementations.
158/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
159/// [`ExchangeTransformer`] implementations for
160/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
161/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
162pub mod transformer;
163
164/// Convenient type alias for an [`ExchangeStream`] utilising a tungstenite
165/// [`WebSocket`](barter_integration::protocol::websocket::WebSocket).
166pub type ExchangeWsStream<Transformer> = ExchangeStream<WebSocketParser, WsStream, Transformer>;
167
168/// Defines a generic identification type for the implementor.
169pub trait Identifier<T> {
170    fn id(&self) -> T;
171}
172
173/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
174/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
175#[async_trait]
176pub trait MarketStream<Exchange, Instrument, Kind>
177where
178    Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
179        + Send
180        + Sized
181        + Unpin,
182    Exchange: Connector,
183    Instrument: InstrumentData,
184    Kind: SubscriptionKind,
185{
186    async fn init<SnapFetcher>(
187        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
188    ) -> Result<Self, DataError>
189    where
190        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
191        Subscription<Exchange, Instrument, Kind>:
192            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
193}
194
195/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
196///
197/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
198///
199/// See examples such as Binance OrderBooksL2: <br>
200/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
201/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
202pub trait SnapshotFetcher<Exchange, Kind> {
203    fn fetch_snapshots<Instrument>(
204        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
205    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
206    where
207        Exchange: Connector,
208        Instrument: InstrumentData,
209        Kind: SubscriptionKind,
210        Kind::Event: Send,
211        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
212}
213
214#[async_trait]
215impl<Exchange, Instrument, Kind, Transformer> MarketStream<Exchange, Instrument, Kind>
216    for ExchangeWsStream<Transformer>
217where
218    Exchange: Connector + Send + Sync,
219    Instrument: InstrumentData,
220    Kind: SubscriptionKind + Send + Sync,
221    Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
222    Kind::Event: Send,
223{
224    async fn init<SnapFetcher>(
225        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
226    ) -> Result<Self, DataError>
227    where
228        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
229        Subscription<Exchange, Instrument, Kind>:
230            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
231    {
232        // Connect & subscribe
233        let Subscribed {
234            websocket,
235            map: instrument_map,
236            buffered_websocket_events,
237        } = Exchange::Subscriber::subscribe(subscriptions).await?;
238
239        // Fetch any required initial MarketEvent snapshots
240        let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
241
242        // Split WebSocket into WsStream & WsSink components
243        let (ws_sink, ws_stream) = websocket.split();
244
245        // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
246        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
247        tokio::spawn(distribute_messages_to_exchange(
248            Exchange::ID,
249            ws_sink,
250            ws_sink_rx,
251        ));
252
253        // Spawn optional task to distribute custom application-level pings to the exchange
254        if let Some(ping_interval) = Exchange::ping_interval() {
255            tokio::spawn(schedule_pings_to_exchange(
256                Exchange::ID,
257                ws_sink_tx.clone(),
258                ping_interval,
259            ));
260        }
261
262        // Initialise Transformer associated with this Exchange and SubscriptionKind
263        let mut transformer =
264            Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
265
266        // Process any buffered active subscription events received during Subscription validation
267        let mut processed = process_buffered_events::<WebSocketParser, _>(
268            &mut transformer,
269            buffered_websocket_events,
270        );
271
272        // Extend buffered events with any initial snapshot events
273        processed.extend(initial_snapshots.into_iter().map(Ok));
274
275        Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
276    }
277}
278
279/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
280/// Often used for stateless [`MarketStream`]s, such as public trades.
281#[derive(Debug)]
282pub struct NoInitialSnapshots;
283
284impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
285    fn fetch_snapshots<Instrument>(
286        _: &[Subscription<Exchange, Instrument, Kind>],
287    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
288    where
289        Exchange: Connector,
290        Instrument: InstrumentData,
291        Kind: SubscriptionKind,
292        Kind::Event: Send,
293        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
294    {
295        std::future::ready(Ok(vec![]))
296    }
297}
298
299pub fn process_buffered_events<Protocol, StreamTransformer>(
300    transformer: &mut StreamTransformer,
301    events: Vec<Protocol::Message>,
302) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
303where
304    Protocol: StreamParser,
305    StreamTransformer: Transformer,
306{
307    events
308        .into_iter()
309        .filter_map(|event| {
310            Protocol::parse::<StreamTransformer::Input>(Ok(event))?
311                .inspect_err(|error| {
312                    warn!(
313                        ?error,
314                        "failed to parse message buffered during Subscription validation"
315                    )
316                })
317                .ok()
318        })
319        .flat_map(|parsed| transformer.transform(parsed))
320        .collect()
321}
322
323/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
324/// the [`WsSink`].
325///
326/// **Note:**
327/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
328/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
329pub async fn distribute_messages_to_exchange(
330    exchange: ExchangeId,
331    mut ws_sink: WsSink,
332    mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
333) {
334    while let Some(message) = ws_sink_rx.recv().await {
335        if let Err(error) = ws_sink.send(message).await {
336            if barter_integration::protocol::websocket::is_websocket_disconnected(&error) {
337                break;
338            }
339
340            // Log error only if WsMessage failed to send over a connected WebSocket
341            error!(
342                %exchange,
343                %error,
344                "failed to send output message to the exchange via WsSink"
345            );
346        }
347    }
348}
349
350/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
351/// the provided [`PingInterval`].
352///
353/// **Notes:**
354///  - This is only used for those exchanges that require custom application-level pings.
355///  - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
356pub async fn schedule_pings_to_exchange(
357    exchange: ExchangeId,
358    ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
359    PingInterval { mut interval, ping }: PingInterval,
360) {
361    loop {
362        // Wait for next scheduled ping
363        interval.tick().await;
364
365        // Construct exchange custom application-level ping payload
366        let payload = ping();
367        debug!(%exchange, %payload, "sending custom application-level ping to exchange");
368
369        if ws_sink_tx.send(payload).is_err() {
370            break;
371        }
372    }
373}
374
375pub mod test_utils {
376    use crate::{
377        event::{DataKind, MarketEvent},
378        subscription::trade::PublicTrade,
379    };
380    use barter_instrument::{Side, exchange::ExchangeId};
381    use chrono::{DateTime, Utc};
382
383    pub fn market_event_trade_buy<InstrumentKey>(
384        time_exchange: DateTime<Utc>,
385        time_received: DateTime<Utc>,
386        instrument: InstrumentKey,
387        price: f64,
388        quantity: f64,
389    ) -> MarketEvent<InstrumentKey, DataKind> {
390        MarketEvent {
391            time_exchange,
392            time_received,
393            exchange: ExchangeId::BinanceSpot,
394            instrument,
395            kind: DataKind::Trade(PublicTrade {
396                id: "trade_id".to_string(),
397                price,
398                amount: quantity,
399                side: Side::Buy,
400            }),
401        }
402    }
403}