use barter_data::{
event::DataKind,
exchange::{
binance::{futures::BinanceFuturesUsd, spot::BinanceSpot},
okx::Okx,
},
streams::{Streams, consumer::MarketStreamResult, reconnect::stream::ReconnectingStream},
subscription::{
book::{OrderBooksL1, OrderBooksL2},
trade::PublicTrades,
},
};
use barter_instrument::instrument::market_data::{
MarketDataInstrument, kind::MarketDataInstrumentKind,
};
use tokio_stream::StreamExt;
use tracing::{info, warn};
#[rustfmt::skip]
#[tokio::main]
async fn main() {
init_logging();
let streams: Streams<MarketStreamResult<MarketDataInstrument, DataKind>> = Streams::builder_multi()
.add(Streams::<PublicTrades>::builder()
.subscribe([
(BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
])
.subscribe([
(BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
])
.subscribe([
(Okx, "btc", "usdt", MarketDataInstrumentKind::Spot, PublicTrades),
(Okx, "btc", "usdt", MarketDataInstrumentKind::Perpetual, PublicTrades),
])
)
.add(Streams::<OrderBooksL1>::builder()
.subscribe([
(BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, OrderBooksL1),
])
.subscribe([
(BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, OrderBooksL1),
])
)
.add(Streams::<OrderBooksL2>::builder()
.subscribe([
(BinanceSpot::default(), "btc", "usdt", MarketDataInstrumentKind::Spot, OrderBooksL2),
])
.subscribe([
(BinanceFuturesUsd::default(), "btc", "usdt", MarketDataInstrumentKind::Perpetual, OrderBooksL2),
])
)
.init()
.await
.unwrap();
let mut joined_stream = streams
.select_all()
.with_error_handler(|error| warn!(?error, "MarketStream generated error"));
while let Some(event) = joined_stream.next().await {
info!("{event:?}");
}
}
fn init_logging() {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::filter::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy(),
)
.with_ansi(cfg!(debug_assertions))
.json()
.init()
}