barter_data_sniper/lib.rs
1#![forbid(unsafe_code)]
2#![warn(clippy::all)]
3#![allow(clippy::pedantic, clippy::type_complexity)]
4#![warn(
5 missing_debug_implementations,
6 rust_2018_idioms,
7 rust_2024_compatibility
8)]
9
10//! # Barter-Data
11//! A high-performance WebSocket integration library for streaming public market data from leading cryptocurrency
12//! exchanges - batteries included. It is:
13//! * **Easy**: Barter-Data's simple [`StreamBuilder`](streams::builder::StreamBuilder) and [`DynamicStreams`](streams::builder::DynamicStreams) interface allows for easy & quick setup (see example below and /examples!).
14//! * **Normalised**: Barter-Data's unified interface for consuming public WebSocket data means every Exchange returns a normalised data model.
15//! * **Real-Time**: Barter-Data utilises real-time WebSocket integrations enabling the consumption of normalised tick-by-tick data.
16//! * **Extensible**: Barter-Data is highly extensible, and therefore easy to contribute to with coding new integrations!
17//!
18//! ## User API
19//! - [`StreamBuilder`](streams::builder::StreamBuilder) for initialising [`MarketStream`]s of specific data kinds.
20//! - [`DynamicStreams`](streams::builder::DynamicStreams) for initialising [`MarketStream`]s of every supported data kind at once.
21//! - Define what exchange market data you want to stream using the [`Subscription`] type.
22//! - Pass [`Subscription`]s to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) or [`DynamicStreams::init`](streams::builder::DynamicStreams::init) methods.
23//! - Each call to the [`StreamBuilder::subscribe`](streams::builder::StreamBuilder::subscribe) (or each batch passed to the [`DynamicStreams::init`](streams::builder::DynamicStreams::init))
24//! method opens a new WebSocket connection to the exchange - giving you full control.
25//!
26//! ## Examples
27//! For a comprehensive collection of examples, see the /examples directory.
28//!
29//! ### Multi Exchange Public Trades
30//! ```rust,no_run
31//! use barter_data_sniper::exchange::gateio::spot::GateioSpot;
32//! use barter_data_sniper::{
33//! exchange::{
34//! binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
35//! coinbase::Coinbase,
36//! okx::Okx,
37//! },
38//! streams::{Streams, reconnect::stream::ReconnectingStream},
39//! subscription::trade::PublicTrades,
40//! };
41//! use barter_instrument_copy::instrument::market_data::kind::MarketDataInstrumentKind;
42//! use futures::StreamExt;
43//! use tracing::warn;
44//!
45//! #[tokio::main]
46//! async fn main() {
47//! // Initialise PublicTrades Streams for various exchanges
48//! // '--> each call to StreamBuilder::subscribe() initialises a separate WebSocket connection
49//!
50//! let streams = Streams::<PublicTrades>::builder()
51//! .subscribe([
52//! (BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
53//! (BinanceSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
54//! ])
55//! .subscribe([
56//! (BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
57//! (BinanceFuturesUsd::default(), "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
58//! ])
59//! .subscribe([
60//! (Coinbase, "btc", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
61//! (Coinbase, "eth", "usd", MarketDataInstrumentKind::Spot, PublicTrades),
62//! ])
63//! .subscribe([
64//! (GateioSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
65//! (GateioSpot::default(), "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
66//! ])
67//! .subscribe([
68//! (Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
69//! (Okx, "eth", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
70//! (Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
71//! (Okx, "eth", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
72//! ])
73//! .init()
74//! .await
75//! .unwrap();
76//!
77//! // Select and merge every exchange Stream using futures_util::stream::select_all
78//! // Note: use `Streams.select(ExchangeId)` to interact with individual exchange streams!
79//! let mut joined_stream = streams
80//! .select_all()
81//! .with_error_handler(|error| warn!(?error, "MarketStream generated error"));
82//!
83//! while let Some(event) = joined_stream.next().await {
84//! println!("{event:?}");
85//! }
86//! }
87//! ```
88
89use crate::{
90 error::DataError,
91 event::MarketEvent,
92 exchange::{Connector, PingInterval},
93 instrument::InstrumentData,
94 subscriber::{Subscribed, Subscriber},
95 subscription::{Subscription, SubscriptionKind},
96 transformer::ExchangeTransformer,
97};
98use async_trait::async_trait;
99use barter_instrument_copy::exchange::ExchangeId;
100use barter_integration_copy::{
101 error::SocketError,
102 protocol::{
103 websocket::{WebSocketParser, WsMessage, WsSink, WsStream},
104 StreamParser,
105 },
106 ExchangeStream, Transformer,
107};
108use futures::{SinkExt, Stream, StreamExt};
109use std::{collections::VecDeque, future::Future};
110use tokio::sync::mpsc;
111use tracing::{debug, error, warn};
112
113/// All [`Error`](std::error::Error)s generated in Barter-Data.
114pub mod error;
115
116/// Defines the generic [`MarketEvent<T>`](MarketEvent) used in every [`MarketStream`].
117pub mod event;
118
119/// [`Connector`] implementations for each exchange.
120pub mod exchange;
121
122/// High-level API types used for building [`MarketStream`]s from collections
123/// of Barter [`Subscription`]s.
124pub mod streams;
125
126/// [`Subscriber`], [`SubscriptionMapper`](subscriber::mapper::SubscriptionMapper) and
127/// [`SubscriptionValidator`](subscriber::validator::SubscriptionValidator) traits that define how a
128/// [`Connector`] will subscribe to exchange [`MarketStream`]s.
129///
130/// Standard implementations for subscribing to WebSocket [`MarketStream`]s are included.
131pub mod subscriber;
132
133/// Types that communicate the type of each [`MarketStream`] to initialise, and what normalised
134/// Barter output type the exchange will be transformed into.
135pub mod subscription;
136
137/// [`InstrumentData`] trait for instrument describing data.
138pub mod instrument;
139
140/// [`OrderBook`](books::OrderBook) related types, and utilities for initialising and maintaining
141/// a collection of sorted local Instrument [`OrderBook`](books::OrderBook)s
142pub mod books;
143
144/// Generic [`ExchangeTransformer`] implementations used by [`MarketStream`]s to translate exchange
145/// specific types to normalised Barter types.
146///
147/// A standard [`StatelessTransformer`](transformer::stateless::StatelessTransformer) implementation
148/// that works for most `Exchange`-`SubscriptionKind` combinations is included.
149///
150/// Cases that need custom logic, such as fetching initial [`OrderBooksL2`](subscription::book::OrderBooksL2)
151/// and [`OrderBooksL3`](subscription::book::OrderBooksL3) snapshots on startup, may require custom
152/// [`ExchangeTransformer`] implementations.
153/// For examples, see [`Binance`](exchange::binance::Binance) [`OrderBooksL2`](subscription::book::OrderBooksL2)
154/// [`ExchangeTransformer`] implementations for
155/// [`spot`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2Transformer) and
156/// [`futures_usd`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2Transformer).
157pub mod transformer;
158
159/// Convenient type alias for an [`ExchangeStream`] utilising a tungstenite
160/// [`WebSocket`](barter_integration_copy::protocol::websocket::WebSocket).
161pub type ExchangeWsStream<Transformer> = ExchangeStream<WebSocketParser, WsStream, Transformer>;
162
163/// Defines a generic identification type for the implementor.
164pub trait Identifier<T> {
165 fn id(&self) -> T;
166}
167
168/// [`Stream`] that yields [`Market<Kind>`](MarketEvent) events. The type of [`Market<Kind>`](MarketEvent)
169/// depends on the provided [`SubscriptionKind`] of the passed [`Subscription`]s.
170#[async_trait]
171pub trait MarketStream<Exchange, Instrument, Kind>
172where
173 Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
174 + Send
175 + Sized
176 + Unpin,
177 Exchange: Connector,
178 Instrument: InstrumentData,
179 Kind: SubscriptionKind,
180{
181 async fn init<SnapFetcher>(
182 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
183 ) -> Result<Self, DataError>
184 where
185 SnapFetcher: SnapshotFetcher<Exchange, Kind>,
186 Subscription<Exchange, Instrument, Kind>:
187 Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
188}
189
190/// Defines how to fetch market data snapshots for a collection of [`Subscription`]s.
191///
192/// Useful when a [`MarketStream`] requires an initial snapshot on start-up.
193///
194/// See examples such as Binance OrderBooksL2: <br>
195/// - [`BinanceSpotOrderBooksL2SnapshotFetcher`](exchange::binance::spot::l2::BinanceSpotOrderBooksL2SnapshotFetcher)
196/// - [`BinanceFuturesUsdOrderBooksL2SnapshotFetcher`](exchange::binance::futures::l2::BinanceFuturesUsdOrderBooksL2SnapshotFetcher)
197pub trait SnapshotFetcher<Exchange, Kind> {
198 fn fetch_snapshots<Instrument>(
199 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
200 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
201 where
202 Exchange: Connector,
203 Instrument: InstrumentData,
204 Kind: SubscriptionKind,
205 Kind::Event: Send,
206 Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
207}
208
209#[async_trait]
210impl<Exchange, Instrument, Kind, Transformer> MarketStream<Exchange, Instrument, Kind>
211 for ExchangeWsStream<Transformer>
212where
213 Exchange: Connector + Send + Sync,
214 Instrument: InstrumentData,
215 Kind: SubscriptionKind + Send + Sync,
216 Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
217 Kind::Event: Send,
218{
219 async fn init<SnapFetcher>(
220 subscriptions: &[Subscription<Exchange, Instrument, Kind>],
221 ) -> Result<Self, DataError>
222 where
223 SnapFetcher: SnapshotFetcher<Exchange, Kind>,
224 Subscription<Exchange, Instrument, Kind>:
225 Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
226 {
227 // Connect & subscribe
228 let Subscribed {
229 websocket,
230 map: instrument_map,
231 buffered_websocket_events,
232 } = Exchange::Subscriber::subscribe(subscriptions).await?;
233
234 // Fetch any required initial MarketEvent snapshots
235 let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
236
237 // Split WebSocket into WsStream & WsSink components
238 let (ws_sink, ws_stream) = websocket.split();
239
240 // Spawn task to distribute Transformer messages (eg/ custom pongs) to the exchange
241 let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
242 tokio::spawn(distribute_messages_to_exchange(
243 Exchange::ID,
244 ws_sink,
245 ws_sink_rx,
246 ));
247
248 // Spawn optional task to distribute custom application-level pings to the exchange
249 if let Some(ping_interval) = Exchange::ping_interval() {
250 tokio::spawn(schedule_pings_to_exchange(
251 Exchange::ID,
252 ws_sink_tx.clone(),
253 ping_interval,
254 ));
255 }
256
257 // Initialise Transformer associated with this Exchange and SubscriptionKind
258 let mut transformer =
259 Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
260
261 // Process any buffered active subscription events received during Subscription validation
262 let mut processed = process_buffered_events::<WebSocketParser, _>(
263 &mut transformer,
264 buffered_websocket_events,
265 );
266
267 // Extend buffered events with any initial snapshot events
268 processed.extend(initial_snapshots.into_iter().map(Ok));
269
270 Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
271 }
272}
273
274/// Implementation of [`SnapshotFetcher`] that does not fetch any initial market data snapshots.
275/// Often used for stateless [`MarketStream`]s, such as public trades.
276#[derive(Debug)]
277pub struct NoInitialSnapshots;
278
279impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
280 fn fetch_snapshots<Instrument>(
281 _: &[Subscription<Exchange, Instrument, Kind>],
282 ) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
283 where
284 Exchange: Connector,
285 Instrument: InstrumentData,
286 Kind: SubscriptionKind,
287 Kind::Event: Send,
288 Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
289 {
290 std::future::ready(Ok(vec![]))
291 }
292}
293
294pub fn process_buffered_events<Protocol, StreamTransformer>(
295 transformer: &mut StreamTransformer,
296 events: Vec<Protocol::Message>,
297) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
298where
299 Protocol: StreamParser,
300 StreamTransformer: Transformer,
301{
302 events
303 .into_iter()
304 .filter_map(|event| {
305 Protocol::parse::<StreamTransformer::Input>(Ok(event))?
306 .inspect_err(|error| {
307 warn!(
308 ?error,
309 "failed to parse message buffered during Subscription validation"
310 )
311 })
312 .ok()
313 })
314 .flat_map(|parsed| transformer.transform(parsed))
315 .collect()
316}
317
318/// Transmit [`WsMessage`]s sent from the [`ExchangeTransformer`] to the exchange via
319/// the [`WsSink`].
320///
321/// **Note:**
322/// ExchangeTransformer is operating in a synchronous trait context so we use this separate task
323/// to avoid adding `#[\async_trait\]` to the transformer - this avoids allocations.
324pub async fn distribute_messages_to_exchange(
325 exchange: ExchangeId,
326 mut ws_sink: WsSink,
327 mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
328) {
329 while let Some(message) = ws_sink_rx.recv().await {
330 if let Err(error) = ws_sink.send(message).await {
331 if barter_integration_copy::protocol::websocket::is_websocket_disconnected(&error) {
332 break;
333 }
334
335 // Log error only if WsMessage failed to send over a connected WebSocket
336 error!(
337 %exchange,
338 %error,
339 "failed to send output message to the exchange via WsSink"
340 );
341 }
342 }
343}
344
345/// Schedule the sending of custom application-level ping [`WsMessage`]s to the exchange using
346/// the provided [`PingInterval`].
347///
348/// **Notes:**
349/// - This is only used for those exchanges that require custom application-level pings.
350/// - This is additional to the protocol-level pings already handled by `tokio_tungstenite`.
351pub async fn schedule_pings_to_exchange(
352 exchange: ExchangeId,
353 ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
354 PingInterval { mut interval, ping }: PingInterval,
355) {
356 loop {
357 // Wait for next scheduled ping
358 interval.tick().await;
359
360 // Construct exchange custom application-level ping payload
361 let payload = ping();
362 debug!(%exchange, %payload, "sending custom application-level ping to exchange");
363
364 if ws_sink_tx.send(payload).is_err() {
365 break;
366 }
367 }
368}