use crate::{
error::DataError,
event::{MarketEvent, MarketIter},
exchange::Connector,
subscription::{book::OrderBook, Map, SubscriptionKind},
transformer::ExchangeTransformer,
Identifier,
};
use async_trait::async_trait;
use barter_integration::{
model::{instrument::Instrument, SubscriptionId},
protocol::websocket::WsMessage,
Transformer,
};
use serde::{Deserialize, Serialize};
use std::marker::PhantomData;
use tokio::sync::mpsc;
#[async_trait]
pub trait OrderBookUpdater
where
Self: Sized,
{
type OrderBook;
type Update;
async fn init<Exchange, Kind>(
ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
instrument: Instrument,
) -> Result<InstrumentOrderBook<Instrument, Self>, DataError>
where
Exchange: Send,
Kind: Send;
fn update(
&mut self,
book: &mut Self::OrderBook,
update: Self::Update,
) -> Result<Option<Self::OrderBook>, DataError>;
}
#[derive(Clone, PartialEq, Eq, PartialOrd, Ord, Debug, Deserialize, Serialize)]
pub struct InstrumentOrderBook<InstrumentId, Updater> {
pub instrument: InstrumentId,
pub updater: Updater,
pub book: OrderBook,
}
#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize)]
pub struct MultiBookTransformer<Exchange, InstrumentId, Kind, Updater> {
pub book_map: Map<InstrumentOrderBook<InstrumentId, Updater>>,
phantom: PhantomData<(Exchange, Kind)>,
}
#[async_trait]
impl<Exchange, Kind, Updater> ExchangeTransformer<Exchange, Instrument, Kind>
for MultiBookTransformer<Exchange, Instrument, Kind, Updater>
where
Exchange: Connector + Send,
Kind: SubscriptionKind<Event = OrderBook> + Send,
Updater: OrderBookUpdater<OrderBook = Kind::Event> + Send,
Updater::Update: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
{
async fn new(
ws_sink_tx: mpsc::UnboundedSender<WsMessage>,
map: Map<Instrument>,
) -> Result<Self, DataError> {
let (sub_ids, init_book_requests): (Vec<_>, Vec<_>) = map
.0
.into_iter()
.map(|(sub_id, instrument)| {
(
sub_id,
Updater::init::<Exchange, Kind>(ws_sink_tx.clone(), instrument),
)
})
.unzip();
let init_order_books = futures::future::join_all(init_book_requests)
.await
.into_iter()
.collect::<Result<Vec<InstrumentOrderBook<Instrument, Updater>>, DataError>>()?;
let book_map = sub_ids
.into_iter()
.zip(init_order_books.into_iter())
.collect::<Map<InstrumentOrderBook<Instrument, Updater>>>();
Ok(Self {
book_map,
phantom: PhantomData,
})
}
}
impl<Exchange, InstrumentId, Kind, Updater> Transformer
for MultiBookTransformer<Exchange, InstrumentId, Kind, Updater>
where
Exchange: Connector,
InstrumentId: Clone,
Kind: SubscriptionKind<Event = OrderBook>,
Updater: OrderBookUpdater<OrderBook = Kind::Event>,
Updater::Update: Identifier<Option<SubscriptionId>> + for<'de> Deserialize<'de>,
{
type Error = DataError;
type Input = Updater::Update;
type Output = MarketEvent<InstrumentId, Kind::Event>;
type OutputIter = Vec<Result<Self::Output, Self::Error>>;
fn transform(&mut self, update: Self::Input) -> Self::OutputIter {
let subscription_id = match update.id() {
Some(subscription_id) => subscription_id,
None => return vec![],
};
let book = match self.book_map.find_mut(&subscription_id) {
Ok(book) => book,
Err(unidentifiable) => return vec![Err(DataError::Socket(unidentifiable))],
};
let InstrumentOrderBook {
instrument,
book,
updater,
} = book;
match updater.update(book, update) {
Ok(Some(book)) => {
MarketIter::<InstrumentId, OrderBook>::from((
Exchange::ID,
instrument.clone(),
book,
))
.0
}
Ok(None) => vec![],
Err(error) => vec![Err(error)],
}
}
}