barter_data/lib.rs
1#![forbid(unsafe_code)]
2#![warn(
3 unused,
4 clippy::cognitive_complexity,
5 unused_crate_dependencies,
6 unused_extern_crates,
7 clippy::unused_self,
8 clippy::useless_let_if_seq,
9 missing_debug_implementations,
10 rust_2018_idioms,
11 rust_2024_compatibility
12)]
13#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
14
15//! # Barter-Data
16//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
17//! exchanges - batteries included. It is:
18//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
19//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
20//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
21//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
22//!
23//! ## User API
24//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
25//! - [`DynamicStreams`](streams::builder::dynamic::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
26//! - Define what exchange market data you want to stream using the [`Subscription`] type.
27//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init) methods.
28//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::dynamic::DynamicStreams::init))
29//! method opens a new WebSocket connection to the exchange - giving you full control.
30//!
31//! ## Examples
32//! For a comprehensive collection of examples, see the /examples directory.
33//!
34//! ### Multi Exchange Public Trades
35//! ```rust,no_run
36//! use barter_data::{
37//! exchange::{
38//! gateio::spot::GateioSpot,
39//! binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
40//! coinbase::Coinbase,
41//! okx::Okx,
42//! },
43//! streams::{Streams, reconnect::stream::ReconnectingStream},
44//! subscription::trade::PublicTrades,
45//! };
46//! use barter_instrument::instrument::market_data::kind::MarketDataInstrumentKind;
47//! use futures::StreamExt;
48//! use tracing::warn;
49//!
50//! #[tokio::main]
51//! async fn main() {
52//! // Initialise PublicTrades Streams for various exchanges
53//! // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
54//!
55//! let streams = Streams::<PublicTrades>::builder()
56//! .subscribe([
57//! (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
58//! (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
59//! ])
60//! .subscribe([
61//! (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
62//! (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
63//! ])
64//! .subscribe([
65//! (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
66//! (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
67//! ])
68//! .subscribe([
69//! (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
70//! (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
71//! ])
72//! .subscribe([
73//! (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
74//! (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
75//! (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
76//! (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
77//! ])
78//! .init()
79//! .await
80//! .unwrap();
81//!
82//! // Select and merge every exchange Stream using futures_util::stream::select_all
83//! // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
84//! let mut joined_stream = streams
85//! .select_all()
86//! .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
87//!
88//! while let Some(event) = joined_stream.next().await {
89//! println!("{event:?}");
90//! }
91//! }
92//! ```
93use crate::{
94 error::DataError,
95 event::MarketEvent,
96 exchange::{Connector, PingInterval},
97 instrument::InstrumentData,
98 subscriber::{Subscribed, Subscriber},
99 subscription::{Subscription, SubscriptionKind},
100 transformer::ExchangeTransformer,
101};
102use async_trait::async_trait;
103use barter_instrument::exchange::ExchangeId;
104use barter_integration::{
105 Transformer,
106 error::SocketError,
107 protocol::{
108 StreamParser,
109 websocket::{WebSocketParser, WsMessage, WsSink, WsStream},
110 },
111 stream::ExchangeStream,
112};
113use futures::{SinkExt, Stream, StreamExt};
114use std::{collections::VecDeque, future::Future};
115use tokio::sync::mpsc;
116use tracing::{debug, error, warn};
117
118/// All [`Error`](std::error::Error)s generated in Barter-Data.
119pub mod error;
120
121/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
122pub mod event;
123
124/// [`Connector`] implementations for each exchange.
125pub mod exchange;
126
127/// High-level API types used for building [`MarketStream`]s from collections
128/// of Barter [`Subscription`]s.
129pub mod streams;
130
131/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
132/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator) traits that define how a
133/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
134///
135/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
136pub mod subscriber;
137
138/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
139/// Barter output type the exchange will be transformed into.
140pub mod subscription;
141
142/// [`InstrumentData`] trait for instrument describing data.
143pub mod instrument;
144
145/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
146/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
147pub mod books;
148
149/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
150/// specific types to normalised Barter types.
151///
152/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
153/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
154///
155/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
156/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
157/// [`ExchangeTransformer`] implementations.
158/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
159/// [`ExchangeTransformer`] implementations for
160/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
161/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
162pub mod transformer;
163
164/// Convenient type alias for an [`ExchangeStream`] utilising a tungstenite
165/// [`WebSocket`](barter_integration::protocol::websocket::WebSocket).
166pub type ExchangeWsStream<Transformer> = ExchangeStream<WebSocketParser, WsStream, Transformer>;
167
168/// Defines a generic identification type for the implementor.
169pub trait Identifier<T> {
170 fn id(&self) -> T;
171}
172
173/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
174/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
175#[async_trait]
176pub trait MarketStream<Exchange, Instrument, Kind>
177where
178 Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
179 + Send
180 + Sized
181 + Unpin,
182 Exchange: Connector,
183 Instrument: InstrumentData,
184 Kind: SubscriptionKind,
185{
186 async fn init<SnapFetcher>(
187 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
188 ) -> Result<Self, DataError>
189 where
190 SnapFetcher: SnapshotFetcher<Exchange, Kind>,
191 Subscription<Exchange, Instrument, Kind>:
192 Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
193}
194
195/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
196///
197/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
198///
199/// See examples such as Binance OrderBooksL2: <br>
200/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
201/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
202pub trait SnapshotFetcher<Exchange, Kind> {
203 fn fetch_snapshots<Instrument>(
204 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
205 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
206 where
207 Exchange: Connector,
208 Instrument: InstrumentData,
209 Kind: SubscriptionKind,
210 Kind::Event: Send,
211 Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
212}
213
214#[async_trait]
215impl<Exchange, Instrument, Kind, Transformer> MarketStream<Exchange, Instrument, Kind>
216 for ExchangeWsStream<Transformer>
217where
218 Exchange: Connector + Send + Sync,
219 Instrument: InstrumentData,
220 Kind: SubscriptionKind + Send + Sync,
221 Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
222 Kind::Event: Send,
223{
224 async fn init<SnapFetcher>(
225 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
226 ) -> Result<Self, DataError>
227 where
228 SnapFetcher: SnapshotFetcher<Exchange, Kind>,
229 Subscription<Exchange, Instrument, Kind>:
230 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
231 {
232 // Connect & subscribe
233 let Subscribed {
234 websocket,
235 map: instrument_map,
236 buffered_websocket_events,
237 } = Exchange::Subscriber::subscribe(subscriptions).await?;
238
239 // Fetch any required initial MarketEvent snapshots
240 let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
241
242 // Split WebSocket into WsStream & WsSink components
243 let (ws_sink, ws_stream) = websocket.split();
244
245 // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
246 let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
247 tokio::spawn(distribute_messages_to_exchange(
248 Exchange::ID,
249 ws_sink,
250 ws_sink_rx,
251 ));
252
253 // Spawn optional task to distribute custom application-level pings to the exchange
254 if let Some(ping_interval) = Exchange::ping_interval() {
255 tokio::spawn(schedule_pings_to_exchange(
256 Exchange::ID,
257 ws_sink_tx.clone(),
258 ping_interval,
259 ));
260 }
261
262 // Initialise Transformer associated with this Exchange and SubscriptionKind
263 let mut transformer =
264 Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
265
266 // Process any buffered active subscription events received during Subscription validation
267 let mut processed = process_buffered_events::<WebSocketParser, _>(
268 &mut transformer,
269 buffered_websocket_events,
270 );
271
272 // Extend buffered events with any initial snapshot events
273 processed.extend(initial_snapshots.into_iter().map(Ok));
274
275 Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
276 }
277}
278
279/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
280/// Often used for stateless [`MarketStream`]s, such as public trades.
281#[derive(Debug)]
282pub struct NoInitialSnapshots;
283
284impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
285 fn fetch_snapshots<Instrument>(
286 _: &[Subscription<Exchange, Instrument, Kind>],
287 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
288 where
289 Exchange: Connector,
290 Instrument: InstrumentData,
291 Kind: SubscriptionKind,
292 Kind::Event: Send,
293 Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
294 {
295 std::future::ready(Ok(vec![]))
296 }
297}
298
299pub fn process_buffered_events<Protocol, StreamTransformer>(
300 transformer: &mut StreamTransformer,
301 events: Vec<Protocol::Message>,
302) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
303where
304 Protocol: StreamParser,
305 StreamTransformer: Transformer,
306{
307 events
308 .into_iter()
309 .filter_map(|event| {
310 Protocol::parse::<StreamTransformer::Input>(Ok(event))?
311 .inspect_err(|error| {
312 warn!(
313 ?error,
314 "failed to parse message buffered during Subscription validation"
315 )
316 })
317 .ok()
318 })
319 .flat_map(|parsed| transformer.transform(parsed))
320 .collect()
321}
322
323/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
324/// the [`WsSink`].
325///
326/// **Note:**
327/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
328/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
329pub async fn distribute_messages_to_exchange(
330 exchange: ExchangeId,
331 mut ws_sink: WsSink,
332 mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
333) {
334 while let Some(message) = ws_sink_rx.recv().await {
335 if let Err(error) = ws_sink.send(message).await {
336 if barter_integration::protocol::websocket::is_websocket_disconnected(&error) {
337 break;
338 }
339
340 // Log error only if WsMessage failed to send over a connected WebSocket
341 error!(
342 %exchange,
343 %error,
344 "failed to send output message to the exchange via WsSink"
345 );
346 }
347 }
348}
349
350/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
351/// the provided [`PingInterval`].
352///
353/// **Notes:**
354/// - This is only used for those exchanges that require custom application-level pings.
355/// - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
356pub async fn schedule_pings_to_exchange(
357 exchange: ExchangeId,
358 ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
359 PingInterval { mut interval, ping }: PingInterval,
360) {
361 loop {
362 // Wait for next scheduled ping
363 interval.tick().await;
364
365 // Construct exchange custom application-level ping payload
366 let payload = ping();
367 debug!(%exchange, %payload, "sending custom application-level ping to exchange");
368
369 if ws_sink_tx.send(payload).is_err() {
370 break;
371 }
372 }
373}
374
375pub mod test_utils {
376 use crate::{
377 event::{DataKind, MarketEvent},
378 subscription::trade::PublicTrade,
379 };
380 use barter_instrument::{Side, exchange::ExchangeId};
381 use chrono::{DateTime, Utc};
382
383 pub fn market_event_trade_buy<InstrumentKey>(
384 time_exchange: DateTime<Utc>,
385 time_received: DateTime<Utc>,
386 instrument: InstrumentKey,
387 price: f64,
388 quantity: f64,
389 ) -> MarketEvent<InstrumentKey, DataKind> {
390 MarketEvent {
391 time_exchange,
392 time_received,
393 exchange: ExchangeId::BinanceSpot,
394 instrument,
395 kind: DataKind::Trade(PublicTrade {
396 id: "trade_id".to_string(),
397 price,
398 amount: quantity,
399 side: Side::Buy,
400 }),
401 }
402 }
403}