barter_data_sniper/
lib.rs

1#![forbid(unsafe_code)]
2#![warn(clippy::all)]
3#![allow(clippy::pedantic, clippy::type_complexity)]
4#![warn(
5    missing_debug_implementations,
6    rust_2018_idioms,
7    rust_2024_compatibility
8)]
9
10//! # Barter-Data
11//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
12//! exchanges - batteries included. It is:
13//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
14//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
15//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
16//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
17//!
18//! ## User API
19//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
20//! - [`DynamicStreams`](streams::builder::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
21//! - Define what exchange market data you want to stream using the [`Subscription`] type.
22//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::DynamicStreams::init) methods.
23//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::DynamicStreams::init))
24//!   method opens a new WebSocket connection to the exchange - giving you full control.
25//!
26//! ## Examples
27//! For a comprehensive collection of examples, see the /examples directory.
28//!
29//! ### Multi Exchange Public Trades
30//! ```rust,no_run
31//! use barter_data_sniper::exchange::gateio::spot::GateioSpot;
32//! use barter_data_sniper::{
33//!     exchange::{
34//!         binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
35//!         coinbase::Coinbase,
36//!         okx::Okx,
37//!     },
38//!     streams::{Streams, reconnect::stream::ReconnectingStream},
39//!     subscription::trade::PublicTrades,
40//! };
41//! use barter_instrument_copy::instrument::market_data::kind::MarketDataInstrumentKind;
42//! use futures::StreamExt;
43//! use tracing::warn;
44//!
45//! #[tokio::main]
46//! async fn main() {
47//!     // Initialise PublicTrades Streams for various exchanges
48//!     // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
49//!
50//!     let streams = Streams::<PublicTrades>::builder()
51//!         .subscribe([
52//!             (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
53//!             (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
54//!         ])
55//!         .subscribe([
56//!             (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
57//!             (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
58//!         ])
59//!         .subscribe([
60//!             (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
61//!             (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
62//!         ])
63//!         .subscribe([
64//!             (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
65//!             (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
66//!         ])
67//!         .subscribe([
68//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
69//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
70//!             (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
71//!             (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
72//!        ])
73//!         .init()
74//!         .await
75//!         .unwrap();
76//!
77//!     // Select and merge every exchange Stream using futures_util::stream::select_all
78//!     // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
79//!     let mut joined_stream = streams
80//!         .select_all()
81//!         .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
82//!
83//!     while let Some(event) = joined_stream.next().await {
84//!         println!("{event:?}");
85//!     }
86//! }
87//! ```
88
89use crate::{
90    error::DataError,
91    event::MarketEvent,
92    exchange::{Connector, PingInterval},
93    instrument::InstrumentData,
94    subscriber::{Subscribed, Subscriber},
95    subscription::{Subscription, SubscriptionKind},
96    transformer::ExchangeTransformer,
97};
98use async_trait::async_trait;
99use barter_instrument_copy::exchange::ExchangeId;
100use barter_integration_copy::{
101    error::SocketError,
102    protocol::{
103        websocket::{WebSocketParser, WsMessage, WsSink, WsStream},
104        StreamParser,
105    },
106    ExchangeStream, Transformer,
107};
108use futures::{SinkExt, Stream, StreamExt};
109use std::{collections::VecDeque, future::Future};
110use tokio::sync::mpsc;
111use tracing::{debug, error, warn};
112
113/// All [`Error`](std::error::Error)s generated in Barter-Data.
114pub mod error;
115
116/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
117pub mod event;
118
119/// [`Connector`] implementations for each exchange.
120pub mod exchange;
121
122/// High-level API types used for building [`MarketStream`]s from collections
123/// of Barter [`Subscription`]s.
124pub mod streams;
125
126/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
127/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator)  traits that define how a
128/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
129///
130/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
131pub mod subscriber;
132
133/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
134/// Barter output type the exchange will be transformed into.
135pub mod subscription;
136
137/// [`InstrumentData`] trait for instrument describing data.
138pub mod instrument;
139
140/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
141/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
142pub mod books;
143
144/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
145/// specific types to normalised Barter types.
146///
147/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
148/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
149///
150/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
151/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
152/// [`ExchangeTransformer`] implementations.
153/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
154/// [`ExchangeTransformer`] implementations for
155/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
156/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
157pub mod transformer;
158
159/// Convenient type alias for an [`ExchangeStream`] utilising a tungstenite
160/// [`WebSocket`](barter_integration_copy::protocol::websocket::WebSocket).
161pub type ExchangeWsStream<Transformer> = ExchangeStream<WebSocketParser, WsStream, Transformer>;
162
163/// Defines a generic identification type for the implementor.
164pub trait Identifier<T> {
165    fn id(&self) -> T;
166}
167
168/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
169/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
170#[async_trait]
171pub trait MarketStream<Exchange, Instrument, Kind>
172where
173    Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
174        + Send
175        + Sized
176        + Unpin,
177    Exchange: Connector,
178    Instrument: InstrumentData,
179    Kind: SubscriptionKind,
180{
181    async fn init<SnapFetcher>(
182        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
183    ) -> Result<Self, DataError>
184    where
185        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
186        Subscription<Exchange, Instrument, Kind>:
187            Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
188}
189
190/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
191///
192/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
193///
194/// See examples such as Binance OrderBooksL2: <br>
195/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
196/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
197pub trait SnapshotFetcher<Exchange, Kind> {
198    fn fetch_snapshots<Instrument>(
199        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
200    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
201    where
202        Exchange: Connector,
203        Instrument: InstrumentData,
204        Kind: SubscriptionKind,
205        Kind::Event: Send,
206        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
207}
208
209#[async_trait]
210impl<Exchange, Instrument, Kind, Transformer> MarketStream<Exchange, Instrument, Kind>
211    for ExchangeWsStream<Transformer>
212where
213    Exchange: Connector + Send + Sync,
214    Instrument: InstrumentData,
215    Kind: SubscriptionKind + Send + Sync,
216    Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
217    Kind::Event: Send,
218{
219    async fn init<SnapFetcher>(
220        subscriptions: &[Subscription<Exchange, Instrument, Kind>],
221    ) -> Result<Self, DataError>
222    where
223        SnapFetcher: SnapshotFetcher<Exchange, Kind>,
224        Subscription<Exchange, Instrument, Kind>:
225            Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
226    {
227        // Connect & subscribe
228        let Subscribed {
229            websocket,
230            map: instrument_map,
231            buffered_websocket_events,
232        } = Exchange::Subscriber::subscribe(subscriptions).await?;
233
234        // Fetch any required initial MarketEvent snapshots
235        let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
236
237        // Split WebSocket into WsStream & WsSink components
238        let (ws_sink, ws_stream) = websocket.split();
239
240        // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
241        let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
242        tokio::spawn(distribute_messages_to_exchange(
243            Exchange::ID,
244            ws_sink,
245            ws_sink_rx,
246        ));
247
248        // Spawn optional task to distribute custom application-level pings to the exchange
249        if let Some(ping_interval) = Exchange::ping_interval() {
250            tokio::spawn(schedule_pings_to_exchange(
251                Exchange::ID,
252                ws_sink_tx.clone(),
253                ping_interval,
254            ));
255        }
256
257        // Initialise Transformer associated with this Exchange and SubscriptionKind
258        let mut transformer =
259            Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
260
261        // Process any buffered active subscription events received during Subscription validation
262        let mut processed = process_buffered_events::<WebSocketParser, _>(
263            &mut transformer,
264            buffered_websocket_events,
265        );
266
267        // Extend buffered events with any initial snapshot events
268        processed.extend(initial_snapshots.into_iter().map(Ok));
269
270        Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
271    }
272}
273
274/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
275/// Often used for stateless [`MarketStream`]s, such as public trades.
276#[derive(Debug)]
277pub struct NoInitialSnapshots;
278
279impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
280    fn fetch_snapshots<Instrument>(
281        _: &[Subscription<Exchange, Instrument, Kind>],
282    ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
283    where
284        Exchange: Connector,
285        Instrument: InstrumentData,
286        Kind: SubscriptionKind,
287        Kind::Event: Send,
288        Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
289    {
290        std::future::ready(Ok(vec![]))
291    }
292}
293
294pub fn process_buffered_events<Protocol, StreamTransformer>(
295    transformer: &mut StreamTransformer,
296    events: Vec<Protocol::Message>,
297) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
298where
299    Protocol: StreamParser,
300    StreamTransformer: Transformer,
301{
302    events
303        .into_iter()
304        .filter_map(|event| {
305            Protocol::parse::<StreamTransformer::Input>(Ok(event))?
306                .inspect_err(|error| {
307                    warn!(
308                        ?error,
309                        "failed to parse message buffered during Subscription validation"
310                    )
311                })
312                .ok()
313        })
314        .flat_map(|parsed| transformer.transform(parsed))
315        .collect()
316}
317
318/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
319/// the [`WsSink`].
320///
321/// **Note:**
322/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
323/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
324pub async fn distribute_messages_to_exchange(
325    exchange: ExchangeId,
326    mut ws_sink: WsSink,
327    mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
328) {
329    while let Some(message) = ws_sink_rx.recv().await {
330        if let Err(error) = ws_sink.send(message).await {
331            if barter_integration_copy::protocol::websocket::is_websocket_disconnected(&error) {
332                break;
333            }
334
335            // Log error only if WsMessage failed to send over a connected WebSocket
336            error!(
337                %exchange,
338                %error,
339                "failed to send  output message to the exchange via WsSink"
340            );
341        }
342    }
343}
344
345/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
346/// the provided [`PingInterval`].
347///
348/// **Notes:**
349///  - This is only used for those exchanges that require custom application-level pings.
350///  - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
351pub async fn schedule_pings_to_exchange(
352    exchange: ExchangeId,
353    ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
354    PingInterval { mut interval, ping }: PingInterval,
355) {
356    loop {
357        // Wait for next scheduled ping
358        interval.tick().await;
359
360        // Construct exchange custom application-level ping payload
361        let payload = ping();
362        debug!(%exchange, %payload, "sending custom application-level ping to exchange");
363
364        if ws_sink_tx.send(payload).is_err() {
365            break;
366        }
367    }
368}