#![forbid(unsafe_code)]
#![warn(
unused,
clippy::cognitive_complexity,
unused_crate_dependencies,
unused_extern_crates,
clippy::unused_self,
clippy::useless_let_if_seq,
missing_debug_implementations,
rust_2018_idioms,
rust_2024_compatibility
)]
#![allow(clippy::type_complexity, clippy::too_many_arguments, type_alias_bounds)]
use crate::{
error::DataError,
event::MarketEvent,
exchange::{Connector, PingInterval},
instrument::InstrumentData,
subscriber::{Subscribed, Subscriber},
subscription::{Subscription, SubscriptionKind},
transformer::ExchangeTransformer,
};
use async_trait::async_trait;
use barter_instrument::exchange::ExchangeId;
use barter_integration::{
Transformer,
error::SocketError,
protocol::{
StreamParser,
websocket::{WsError, WsMessage, WsSink, WsStream},
},
};
use futures::{SinkExt, Stream, StreamExt};
use barter_integration::stream::ExchangeStream;
use std::{collections::VecDeque, future::Future};
use tokio::sync::mpsc;
use tracing::{debug, error, warn};
pub mod error;
pub mod event;
pub mod exchange;
pub mod streams;
pub mod subscriber;
pub mod subscription;
pub mod instrument;
pub mod books;
pub mod transformer;
pub type ExchangeWsStream<Parser, Transformer> = ExchangeStream<Parser, WsStream, Transformer>;
pub trait Identifier<T> {
fn id(&self) -> T;
}
#[async_trait]
pub trait MarketStream<Exchange, Instrument, Kind>
where
Self: Stream<Item = Result<MarketEvent<Instrument::Key, Kind::Event>, DataError>>
+ Send
+ Sized
+ Unpin,
Exchange: Connector,
Instrument: InstrumentData,
Kind: SubscriptionKind,
{
async fn init<SnapFetcher>(
subscriptions: &[Subscription<Exchange, Instrument, Kind>],
) -> Result<Self, DataError>
where
SnapFetcher: SnapshotFetcher<Exchange, Kind>,
Subscription<Exchange, Instrument, Kind>:
Identifier<Exchange::Channel> + Identifier<Exchange::Market>;
}
pub trait SnapshotFetcher<Exchange, Kind> {
fn fetch_snapshots<Instrument>(
subscriptions: &[Subscription<Exchange, Instrument, Kind>],
) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
where
Exchange: Connector,
Instrument: InstrumentData,
Kind: SubscriptionKind,
Kind::Event: Send,
Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>;
}
#[async_trait]
impl<Exchange, Instrument, Kind, Transformer, Parser> MarketStream<Exchange, Instrument, Kind>
for ExchangeWsStream<Parser, Transformer>
where
Exchange: Connector + Send + Sync,
Instrument: InstrumentData,
Kind: SubscriptionKind + Send + Sync,
Transformer: ExchangeTransformer<Exchange, Instrument::Key, Kind> + Send,
Kind::Event: Send,
Parser: StreamParser<Transformer::Input, Message = WsMessage, Error = WsError> + Send,
{
async fn init<SnapFetcher>(
subscriptions: &[Subscription<Exchange, Instrument, Kind>],
) -> Result<Self, DataError>
where
SnapFetcher: SnapshotFetcher<Exchange, Kind>,
Subscription<Exchange, Instrument, Kind>:
Identifier<Exchange::Channel> + Identifier<Exchange::Market>,
{
let Subscribed {
websocket,
map: instrument_map,
buffered_websocket_events,
} = Exchange::Subscriber::subscribe(subscriptions).await?;
let initial_snapshots = SnapFetcher::fetch_snapshots(subscriptions).await?;
let (ws_sink, ws_stream) = websocket.split();
let (ws_sink_tx, ws_sink_rx) = mpsc::unbounded_channel();
tokio::spawn(distribute_messages_to_exchange(
Exchange::ID,
ws_sink,
ws_sink_rx,
));
if let Some(ping_interval) = Exchange::ping_interval() {
tokio::spawn(schedule_pings_to_exchange(
Exchange::ID,
ws_sink_tx.clone(),
ping_interval,
));
}
let mut transformer =
Transformer::init(instrument_map, &initial_snapshots, ws_sink_tx).await?;
let mut processed = process_buffered_events::<Parser, Transformer>(
&mut transformer,
buffered_websocket_events,
);
processed.extend(initial_snapshots.into_iter().map(Ok));
Ok(ExchangeWsStream::new(ws_stream, transformer, processed))
}
}
#[derive(Debug)]
pub struct NoInitialSnapshots;
impl<Exchange, Kind> SnapshotFetcher<Exchange, Kind> for NoInitialSnapshots {
fn fetch_snapshots<Instrument>(
_: &[Subscription<Exchange, Instrument, Kind>],
) -> impl Future<Output = Result<Vec<MarketEvent<Instrument::Key, Kind::Event>>, SocketError>> + Send
where
Exchange: Connector,
Instrument: InstrumentData,
Kind: SubscriptionKind,
Kind::Event: Send,
Subscription<Exchange, Instrument, Kind>: Identifier<Exchange::Market>,
{
std::future::ready(Ok(vec![]))
}
}
pub fn process_buffered_events<Parser, StreamTransformer>(
transformer: &mut StreamTransformer,
events: Vec<Parser::Message>,
) -> VecDeque<Result<StreamTransformer::Output, StreamTransformer::Error>>
where
Parser: StreamParser<StreamTransformer::Input>,
StreamTransformer: Transformer,
{
events
.into_iter()
.filter_map(|event| {
Parser::parse(Ok(event))?
.inspect_err(|error| {
warn!(
?error,
"failed to parse message buffered during Subscription validation"
)
})
.ok()
})
.flat_map(|parsed| transformer.transform(parsed))
.collect()
}
pub async fn distribute_messages_to_exchange(
exchange: ExchangeId,
mut ws_sink: WsSink,
mut ws_sink_rx: mpsc::UnboundedReceiver<WsMessage>,
) {
while let Some(message) = ws_sink_rx.recv().await {
if let Err(error) = ws_sink.send(message).await {
if barter_integration::protocol::websocket::is_websocket_disconnected(&error) {
break;
}
error!(
%exchange,
%error,
"failed to send output message to the exchange via WsSink"
);
}
}
}
pub async fn schedule_pings_to_exchange(
exchange: ExchangeId,
ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
PingInterval { mut interval, ping }: PingInterval,
) {
loop {
interval.tick().await;
let payload = ping();
debug!(%exchange, %payload, "sending custom application-level ping to exchange");
if ws_sink_tx.send(payload).is_err() {
break;
}
}
}
pub mod test_utils {
use crate::{
event::{DataKind, MarketEvent},
subscription::trade::PublicTrade,
};
use barter_instrument::{Side, exchange::ExchangeId};
use chrono::{DateTime, Utc};
pub fn market_event_trade_buy<InstrumentKey>(
time_exchange: DateTime<Utc>,
time_received: DateTime<Utc>,
instrument: InstrumentKey,
price: f64,
quantity: f64,
) -> MarketEvent<InstrumentKey, DataKind> {
MarketEvent {
time_exchange,
time_received,
exchange: ExchangeId::BinanceSpot,
instrument,
kind: DataKind::Trade(PublicTrade {
id: "trade_id".to_string(),
price,
amount: quantity,
side: Side::Buy,
}),
}
}
}