pub mod book_initializer;
use tokio::sync::{mpsc, watch};
use crate::config::markets::market_config::DataTypesSection;
use crate::exchanges::Exchange;
use crate::sources::binance::rest::BinanceRestClient;
use super::topic_publisher::TopicMessage;
use book_initializer::BookInitializer;
#[async_trait::async_trait]
pub trait EventPipeline: Send + 'static {
async fn run(
self: Box<Self>,
input: mpsc::Receiver<TopicMessage>,
output: mpsc::Sender<TopicMessage>,
shutdown: watch::Receiver<bool>,
);
}
pub struct PassthroughPipeline;
#[async_trait::async_trait]
impl EventPipeline for PassthroughPipeline {
async fn run(
self: Box<Self>,
mut input: mpsc::Receiver<TopicMessage>,
output: mpsc::Sender<TopicMessage>,
_shutdown: watch::Receiver<bool>,
) {
while let Some(msg) = input.recv().await {
if output.send(msg).await.is_err() {
break;
}
}
}
}
pub fn build_pipeline(
exchange: Exchange,
symbol: &str,
datatypes: &DataTypesSection,
) -> Box<dyn EventPipeline> {
match exchange {
Exchange::Binance if datatypes.orderbook.enabled => {
let rest = BinanceRestClient::new("https://api.binance.com");
let ob_topic = format!(
"orderbook.{}.{}",
datatypes.orderbook.depth, symbol
);
Box::new(BookInitializer::new(rest, symbol.to_string(), ob_topic))
}
_ => Box::new(PassthroughPipeline),
}
}