1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
use barter_data::{
event::DataKind,
streams::{
builder::dynamic::DynamicStreams, consumer::MarketStreamResult,
reconnect::stream::ReconnectingStream,
},
subscription::SubKind,
};
use barter_instrument::{
exchange::ExchangeId,
instrument::market_data::{MarketDataInstrument, kind::MarketDataInstrumentKind},
};
use futures::StreamExt;
use tracing::{info, warn};
#[rustfmt::skip]
#[tokio::main]
async fn main() {
// Initialise INFO Tracing log subscriber
init_logging();
use ExchangeId::*;
use MarketDataInstrumentKind::*;
use SubKind::*;
// Notes:
// - DynamicStream::init requires an IntoIterator<Item = "subscription batch">.
// - Each "subscription batch" is an IntoIterator<Item = Subscription>.
// - Every "subscription batch" will initialise at-least-one WebSocket stream under the hood.
// - If the "subscription batch" contains more-than-one ExchangeId and/or SubKind, the batch
// will be further split under the hood for compile-time reasons.
// Initialise market reconnect::Event streams for various ExchangeIds and SubscriptionKinds
let streams = DynamicStreams::init([
// Batch notes:
// Since batch contains 1 ExchangeId and 1 SubscriptionKind, so only 1 (1x1) WebSockets
// will be spawned for this batch.
vec![
(BinanceSpot, "btc", "usdt", Spot, PublicTrades),
(BinanceSpot, "eth", "usdt", Spot, PublicTrades),
],
// Batch notes:
// Since batch contains 1 ExchangeId and 3 SubscriptionKinds, 3 (1x3) WebSocket connections
// will be spawned for this batch (back-end requires to further split).
vec![
(BinanceFuturesUsd, "btc", "usdt", Perpetual, PublicTrades),
(BinanceFuturesUsd, "btc", "usdt", Perpetual, OrderBooksL1),
(BinanceFuturesUsd, "btc", "usdt", Perpetual, Liquidations),
],
// Batch notes:
// Since batch contains 2 ExchangeIds and 1 SubscriptionKind, 2 (2x1) WebSocket connections
// will be spawned for this batch (back-end requires to further split).
vec![
(Okx, "btc", "usdt", Spot, PublicTrades),
(Okx, "btc", "usdt", Perpetual, PublicTrades),
(Bitmex, "btc", "usdt", Perpetual, PublicTrades),
(Okx, "eth", "usdt", Spot, PublicTrades),
(Okx, "eth", "usdt", Perpetual, PublicTrades),
(Bitmex, "eth", "usdt", Perpetual, PublicTrades),
],
]).await.unwrap();
// Select all streams, mapping each SubscriptionKind `MarketStreamResult<T>` into a unified
// `Output` (eg/ `MarketStreamResult<_, DataKind>`), where MarketStreamResult<T>: Into<Output>
// Notes on other DynamicStreams methods:
// - Use `streams.select_trades(ExchangeId)` to return a stream of trades from a given exchange.
// - Use `streams.select_<T>(ExchangeId)` to return a stream of T from a given exchange.
// - Use `streams.select_all_trades(ExchangeId)` to return a stream of trades from all exchanges
let mut merged = streams
.select_all::<MarketStreamResult<MarketDataInstrument, DataKind>>()
.with_error_handler(|error| warn!(?error, "MarketStream generated error"));
while let Some(event) = merged.next().await {
info!("{event:?}");
}
}
// Initialise an INFO `Subscriber` for `Tracing` Json logs and install it as the global default.
fn init_logging() {
tracing_subscriber::fmt()
// Filter messages based on the INFO
.with_env_filter(
tracing_subscriber::filter::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy(),
)
// Disable colours on release builds
.with_ansi(cfg!(debug_assertions))
// Enable Json formatting
.json()
// Install this Tracing subscriber as global default
.init()
}