pub mod book;
pub mod config;
mod handlers;
#[cfg(feature = "defi")]
pub mod pool;
use std::{
any::{Any, type_name},
cell::{Ref, RefCell},
collections::{VecDeque, hash_map::Entry},
fmt::{Debug, Display},
num::NonZeroUsize,
rc::Rc,
};
use ahash::{AHashMap, AHashSet};
use book::{BookSnapshotInfo, BookSnapshotter, BookUpdater};
use config::DataEngineConfig;
use futures::future::join_all;
use handlers::{BarBarHandler, BarQuoteHandler, BarTradeHandler};
use indexmap::IndexMap;
use nautilus_common::{
cache::Cache,
clock::Clock,
logging::{RECV, RES},
messages::data::{
DataCommand, DataResponse, ForwardPricesResponse, RequestCommand, RequestForwardPrices,
SubscribeBars, SubscribeBookDeltas, SubscribeBookDepth10, SubscribeBookSnapshots,
SubscribeCommand, SubscribeOptionChain, UnsubscribeBars, UnsubscribeBookDeltas,
UnsubscribeBookDepth10, UnsubscribeBookSnapshots, UnsubscribeCommand,
UnsubscribeInstrumentStatus, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
UnsubscribeQuotes,
},
msgbus::{
self, MStr, ShareableMessageHandler, Topic, TypedHandler, TypedIntoHandler,
switchboard::{self, MessagingSwitchboard},
},
runner::get_data_cmd_sender,
timer::{TimeEvent, TimeEventCallback},
};
use nautilus_core::{
UUID4, WeakCell,
correctness::{
FAILED, check_key_in_map, check_key_not_in_map, check_predicate_false, check_predicate_true,
},
datetime::millis_to_nanos_unchecked,
};
#[cfg(feature = "defi")]
use nautilus_model::defi::DefiData;
use nautilus_model::{
data::{
Bar, BarType, CustomData, Data, DataType, FundingRateUpdate, IndexPriceUpdate,
InstrumentClose, InstrumentStatus, MarkPriceUpdate, OrderBookDelta, OrderBookDeltas,
OrderBookDepth10, QuoteTick, TradeTick,
option_chain::{OptionGreeks, StrikeRange},
},
enums::{
AggregationSource, BarAggregation, BookType, MarketStatusAction, PriceType, RecordFlag,
},
identifiers::{ClientId, InstrumentId, OptionSeriesId, Venue},
instruments::{Instrument, InstrumentAny, SyntheticInstrument},
orderbook::OrderBook,
types::Price,
};
#[cfg(feature = "streaming")]
use nautilus_persistence::backend::catalog::ParquetDataCatalog;
use ustr::Ustr;
#[cfg(feature = "defi")]
#[allow(unused_imports)] use crate::defi::engine as _;
#[cfg(feature = "defi")]
use crate::engine::pool::PoolUpdater;
use crate::{
aggregation::{
BarAggregator, RenkoBarAggregator, TickBarAggregator, TickImbalanceBarAggregator,
TickRunsBarAggregator, TimeBarAggregator, ValueBarAggregator, ValueImbalanceBarAggregator,
ValueRunsBarAggregator, VolumeBarAggregator, VolumeImbalanceBarAggregator,
VolumeRunsBarAggregator,
},
client::DataClientAdapter,
option_chains::OptionChainManager,
};
#[derive(Debug, Clone)]
pub(crate) enum DeferredCommand {
Subscribe(SubscribeCommand),
Unsubscribe(UnsubscribeCommand),
ExpireSeries(OptionSeriesId),
}
pub(crate) type DeferredCommandQueue = Rc<RefCell<VecDeque<DeferredCommand>>>;
#[derive(Clone)]
pub enum BarAggregatorSubscription {
Bar {
topic: MStr<Topic>,
handler: TypedHandler<Bar>,
},
Trade {
topic: MStr<Topic>,
handler: TypedHandler<TradeTick>,
},
Quote {
topic: MStr<Topic>,
handler: TypedHandler<QuoteTick>,
},
}
impl Debug for BarAggregatorSubscription {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Bar { topic, handler } => f
.debug_struct(stringify!(Bar))
.field("topic", topic)
.field("handler_id", &handler.id())
.finish(),
Self::Trade { topic, handler } => f
.debug_struct(stringify!(Trade))
.field("topic", topic)
.field("handler_id", &handler.id())
.finish(),
Self::Quote { topic, handler } => f
.debug_struct(stringify!(Quote))
.field("topic", topic)
.field("handler_id", &handler.id())
.finish(),
}
}
}
#[derive(Debug)]
pub struct DataEngine {
pub(crate) clock: Rc<RefCell<dyn Clock>>,
pub(crate) cache: Rc<RefCell<Cache>>,
pub(crate) external_clients: AHashSet<ClientId>,
clients: IndexMap<ClientId, DataClientAdapter>,
default_client: Option<DataClientAdapter>,
#[cfg(feature = "streaming")]
catalogs: AHashMap<Ustr, ParquetDataCatalog>,
routing_map: IndexMap<Venue, ClientId>,
book_intervals: AHashMap<NonZeroUsize, AHashSet<InstrumentId>>,
book_deltas_subs: AHashSet<InstrumentId>,
book_depth10_subs: AHashSet<InstrumentId>,
book_updaters: AHashMap<InstrumentId, Rc<BookUpdater>>,
book_snapshotters: AHashMap<InstrumentId, Rc<BookSnapshotter>>,
bar_aggregators: AHashMap<BarType, Rc<RefCell<Box<dyn BarAggregator>>>>,
bar_aggregator_handlers: AHashMap<BarType, Vec<BarAggregatorSubscription>>,
option_chain_managers: AHashMap<OptionSeriesId, Rc<RefCell<OptionChainManager>>>,
option_chain_instrument_index: AHashMap<InstrumentId, OptionSeriesId>,
deferred_cmd_queue: DeferredCommandQueue,
pending_option_chain_requests: AHashMap<UUID4, SubscribeOptionChain>,
_synthetic_quote_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
_synthetic_trade_feeds: AHashMap<InstrumentId, Vec<SyntheticInstrument>>,
buffered_deltas_map: AHashMap<InstrumentId, OrderBookDeltas>,
pub(crate) msgbus_priority: u8,
pub(crate) config: DataEngineConfig,
#[cfg(feature = "defi")]
pub(crate) pool_updaters: AHashMap<InstrumentId, Rc<PoolUpdater>>,
#[cfg(feature = "defi")]
pub(crate) pool_updaters_pending: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub(crate) pool_snapshot_pending: AHashSet<InstrumentId>,
#[cfg(feature = "defi")]
pub(crate) pool_event_buffers: AHashMap<InstrumentId, Vec<DefiData>>,
}
impl DataEngine {
#[must_use]
pub fn new(
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
config: Option<DataEngineConfig>,
) -> Self {
let config = config.unwrap_or_default();
let external_clients: AHashSet<ClientId> = config
.external_clients
.clone()
.unwrap_or_default()
.into_iter()
.collect();
Self {
clock,
cache,
external_clients,
clients: IndexMap::new(),
default_client: None,
#[cfg(feature = "streaming")]
catalogs: AHashMap::new(),
routing_map: IndexMap::new(),
book_intervals: AHashMap::new(),
book_deltas_subs: AHashSet::new(),
book_depth10_subs: AHashSet::new(),
book_updaters: AHashMap::new(),
book_snapshotters: AHashMap::new(),
bar_aggregators: AHashMap::new(),
bar_aggregator_handlers: AHashMap::new(),
option_chain_managers: AHashMap::new(),
option_chain_instrument_index: AHashMap::new(),
deferred_cmd_queue: Rc::new(RefCell::new(VecDeque::new())),
pending_option_chain_requests: AHashMap::new(),
_synthetic_quote_feeds: AHashMap::new(),
_synthetic_trade_feeds: AHashMap::new(),
buffered_deltas_map: AHashMap::new(),
msgbus_priority: 10, config,
#[cfg(feature = "defi")]
pool_updaters: AHashMap::new(),
#[cfg(feature = "defi")]
pool_updaters_pending: AHashSet::new(),
#[cfg(feature = "defi")]
pool_snapshot_pending: AHashSet::new(),
#[cfg(feature = "defi")]
pool_event_buffers: AHashMap::new(),
}
}
pub fn register_msgbus_handlers(engine: &Rc<RefCell<Self>>) {
let weak = WeakCell::from(Rc::downgrade(engine));
let weak1 = weak.clone();
msgbus::register_data_command_endpoint(
MessagingSwitchboard::data_engine_execute(),
TypedIntoHandler::from(move |cmd: DataCommand| {
if let Some(rc) = weak1.upgrade() {
rc.borrow_mut().execute(cmd);
}
}),
);
msgbus::register_data_command_endpoint(
MessagingSwitchboard::data_engine_queue_execute(),
TypedIntoHandler::from(move |cmd: DataCommand| {
get_data_cmd_sender().clone().execute(cmd);
}),
);
let weak2 = weak.clone();
msgbus::register_any(
MessagingSwitchboard::data_engine_process(),
ShareableMessageHandler::from_any(move |data: &dyn Any| {
if let Some(rc) = weak2.upgrade() {
rc.borrow_mut().process(data);
}
}),
);
let weak3 = weak.clone();
msgbus::register_data_endpoint(
MessagingSwitchboard::data_engine_process_data(),
TypedIntoHandler::from(move |data: Data| {
if let Some(rc) = weak3.upgrade() {
rc.borrow_mut().process_data(data);
}
}),
);
#[cfg(feature = "defi")]
{
let weak4 = weak.clone();
msgbus::register_defi_data_endpoint(
MessagingSwitchboard::data_engine_process_defi_data(),
TypedIntoHandler::from(move |data: DefiData| {
if let Some(rc) = weak4.upgrade() {
rc.borrow_mut().process_defi_data(data);
}
}),
);
}
let weak5 = weak;
msgbus::register_data_response_endpoint(
MessagingSwitchboard::data_engine_response(),
TypedIntoHandler::from(move |resp: DataResponse| {
if let Some(rc) = weak5.upgrade() {
rc.borrow_mut().response(resp);
}
}),
);
}
#[must_use]
pub fn get_clock(&self) -> Ref<'_, dyn Clock> {
self.clock.borrow()
}
#[must_use]
pub fn get_cache(&self) -> Ref<'_, Cache> {
self.cache.borrow()
}
#[must_use]
pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
Rc::clone(&self.cache)
}
#[cfg(feature = "streaming")]
pub fn register_catalog(&mut self, catalog: ParquetDataCatalog, name: Option<&str>) {
let name = Ustr::from(name.unwrap_or("catalog_0"));
check_key_not_in_map(&name, &self.catalogs, "name", "catalogs").expect(FAILED);
self.catalogs.insert(name, catalog);
log::info!("Registered catalog <{name}>");
}
pub fn register_client(&mut self, client: DataClientAdapter, routing: Option<Venue>) {
let client_id = client.client_id();
if let Some(default_client) = &self.default_client {
check_predicate_false(
default_client.client_id() == client.client_id(),
"client_id already registered as default client",
)
.expect(FAILED);
}
check_key_not_in_map(&client_id, &self.clients, "client_id", "clients").expect(FAILED);
if let Some(routing) = routing {
self.routing_map.insert(routing, client_id);
log::debug!("Set client {client_id} routing for {routing}");
}
if client.venue.is_none() && self.default_client.is_none() {
self.default_client = Some(client);
log::debug!("Registered client {client_id} for default routing");
} else {
self.clients.insert(client_id, client);
log::debug!("Registered client {client_id}");
}
}
pub fn deregister_client(&mut self, client_id: &ClientId) {
check_key_in_map(client_id, &self.clients, "client_id", "clients").expect(FAILED);
self.clients.shift_remove(client_id);
log::info!("Deregistered client {client_id}");
}
pub fn register_default_client(&mut self, client: DataClientAdapter) {
check_predicate_true(
self.default_client.is_none(),
"default client already registered",
)
.expect(FAILED);
let client_id = client.client_id();
self.default_client = Some(client);
log::debug!("Registered default client {client_id}");
}
pub fn start(&mut self) {
for client in self.get_clients_mut() {
if let Err(e) = client.start() {
log::error!("{e}");
}
}
for aggregator in self.bar_aggregators.values() {
if aggregator.borrow().bar_type().spec().is_time_aggregated() {
aggregator
.borrow_mut()
.start_timer(Some(aggregator.clone()));
}
}
}
pub fn stop(&mut self) {
for client in self.get_clients_mut() {
if let Err(e) = client.stop() {
log::error!("{e}");
}
}
for aggregator in self.bar_aggregators.values() {
aggregator.borrow_mut().stop();
}
}
pub fn reset(&mut self) {
for client in self.get_clients_mut() {
if let Err(e) = client.reset() {
log::error!("{e}");
}
}
let bar_types: Vec<BarType> = self.bar_aggregators.keys().copied().collect();
for bar_type in bar_types {
if let Err(e) = self.stop_bar_aggregator(bar_type) {
log::error!("Error stopping bar aggregator during reset for {bar_type}: {e}");
}
}
}
pub fn dispose(&mut self) {
for client in self.get_clients_mut() {
if let Err(e) = client.dispose() {
log::error!("{e}");
}
}
self.clock.borrow_mut().cancel_timers();
}
pub async fn connect(&mut self) {
let futures: Vec<_> = self
.get_clients_mut()
.into_iter()
.map(|client| client.connect())
.collect();
let results = join_all(futures).await;
for error in results.into_iter().filter_map(Result::err) {
log::error!("Failed to connect data client: {error}");
}
}
pub async fn disconnect(&mut self) -> anyhow::Result<()> {
let futures: Vec<_> = self
.get_clients_mut()
.into_iter()
.map(|client| client.disconnect())
.collect();
let results = join_all(futures).await;
let errors: Vec<_> = results.into_iter().filter_map(Result::err).collect();
if errors.is_empty() {
Ok(())
} else {
let error_msgs: Vec<_> = errors.iter().map(|e| e.to_string()).collect();
anyhow::bail!(
"Failed to disconnect data clients: {}",
error_msgs.join("; ")
)
}
}
#[must_use]
pub fn check_connected(&self) -> bool {
self.get_clients()
.iter()
.all(|client| client.is_connected())
}
#[must_use]
pub fn check_disconnected(&self) -> bool {
self.get_clients()
.iter()
.all(|client| !client.is_connected())
}
#[must_use]
pub fn client_connection_status(&self) -> Vec<(ClientId, bool)> {
self.get_clients()
.into_iter()
.map(|client| (client.client_id(), client.is_connected()))
.collect()
}
#[must_use]
pub fn registered_clients(&self) -> Vec<ClientId> {
self.get_clients()
.into_iter()
.map(|client| client.client_id())
.collect()
}
pub(crate) fn collect_subscriptions<F, T>(&self, get_subs: F) -> Vec<T>
where
F: Fn(&DataClientAdapter) -> &AHashSet<T>,
T: Clone,
{
self.get_clients()
.into_iter()
.flat_map(get_subs)
.cloned()
.collect()
}
#[must_use]
pub fn get_clients(&self) -> Vec<&DataClientAdapter> {
let (default_opt, clients_map) = (&self.default_client, &self.clients);
let mut clients: Vec<&DataClientAdapter> = clients_map.values().collect();
if let Some(default) = default_opt {
clients.push(default);
}
clients
}
#[must_use]
pub fn get_clients_mut(&mut self) -> Vec<&mut DataClientAdapter> {
let (default_opt, clients_map) = (&mut self.default_client, &mut self.clients);
let mut clients: Vec<&mut DataClientAdapter> = clients_map.values_mut().collect();
if let Some(default) = default_opt {
clients.push(default);
}
clients
}
pub fn get_client(
&mut self,
client_id: Option<&ClientId>,
venue: Option<&Venue>,
) -> Option<&mut DataClientAdapter> {
if let Some(client_id) = client_id {
if let Some(client) = self.clients.get_mut(client_id) {
return Some(client);
}
if let Some(default) = self.default_client.as_mut()
&& default.client_id() == *client_id
{
return Some(default);
}
return None;
}
if let Some(v) = venue {
if let Some(client_id) = self.routing_map.get(v) {
return self.clients.get_mut(client_id);
}
}
self.get_default_client()
}
const fn get_default_client(&mut self) -> Option<&mut DataClientAdapter> {
self.default_client.as_mut()
}
#[must_use]
pub fn subscribed_custom_data(&self) -> Vec<DataType> {
self.collect_subscriptions(|client| &client.subscriptions_custom)
}
#[must_use]
pub fn subscribed_instruments(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_instrument)
}
#[must_use]
pub fn subscribed_book_deltas(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_book_deltas)
}
#[must_use]
pub fn subscribed_book_depth10(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_book_depth10)
}
#[must_use]
pub fn subscribed_book_snapshots(&self) -> Vec<InstrumentId> {
self.book_intervals
.values()
.flat_map(|set| set.iter().copied())
.collect()
}
#[must_use]
pub fn subscribed_quotes(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_quotes)
}
#[must_use]
pub fn subscribed_trades(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_trades)
}
#[must_use]
pub fn subscribed_bars(&self) -> Vec<BarType> {
self.collect_subscriptions(|client| &client.subscriptions_bars)
}
#[must_use]
pub fn subscribed_mark_prices(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_mark_prices)
}
#[must_use]
pub fn subscribed_index_prices(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_index_prices)
}
#[must_use]
pub fn subscribed_funding_rates(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_funding_rates)
}
#[must_use]
pub fn subscribed_instrument_status(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_instrument_status)
}
#[must_use]
pub fn subscribed_instrument_close(&self) -> Vec<InstrumentId> {
self.collect_subscriptions(|client| &client.subscriptions_instrument_close)
}
pub fn execute(&mut self, cmd: DataCommand) {
if let Err(e) = match cmd {
DataCommand::Subscribe(c) => self.execute_subscribe(&c),
DataCommand::Unsubscribe(c) => self.execute_unsubscribe(&c),
DataCommand::Request(c) => self.execute_request(c),
#[cfg(feature = "defi")]
DataCommand::DefiRequest(c) => self.execute_defi_request(c),
#[cfg(feature = "defi")]
DataCommand::DefiSubscribe(c) => self.execute_defi_subscribe(&c),
#[cfg(feature = "defi")]
DataCommand::DefiUnsubscribe(c) => self.execute_defi_unsubscribe(&c),
_ => {
log::warn!("Unhandled DataCommand variant");
Ok(())
}
} {
log::error!("{e}");
}
}
pub fn execute_subscribe(&mut self, cmd: &SubscribeCommand) -> anyhow::Result<()> {
match &cmd {
SubscribeCommand::BookDeltas(cmd) => self.subscribe_book_deltas(cmd)?,
SubscribeCommand::BookDepth10(cmd) => self.subscribe_book_depth10(cmd)?,
SubscribeCommand::BookSnapshots(cmd) => {
return self.subscribe_book_snapshots(cmd);
}
SubscribeCommand::Bars(cmd) => self.subscribe_bars(cmd)?,
SubscribeCommand::OptionChain(cmd) => {
self.subscribe_option_chain(cmd);
return Ok(());
}
_ => {} }
if let Some(client_id) = cmd.client_id()
&& self.external_clients.contains(client_id)
{
if self.config.debug {
log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
}
return Ok(());
}
if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
client.execute_subscribe(cmd);
} else {
log::error!(
"Cannot handle command: no client found for client_id={:?}, venue={:?}",
cmd.client_id(),
cmd.venue(),
);
}
Ok(())
}
pub fn execute_unsubscribe(&mut self, cmd: &UnsubscribeCommand) -> anyhow::Result<()> {
match &cmd {
UnsubscribeCommand::BookDeltas(cmd) => self.unsubscribe_book_deltas(cmd),
UnsubscribeCommand::BookDepth10(cmd) => self.unsubscribe_book_depth10(cmd),
UnsubscribeCommand::BookSnapshots(cmd) => {
self.unsubscribe_book_snapshots(cmd);
return Ok(());
}
UnsubscribeCommand::Bars(cmd) => self.unsubscribe_bars(cmd),
UnsubscribeCommand::OptionChain(cmd) => {
self.unsubscribe_option_chain(cmd);
return Ok(());
}
_ => {} }
if let Some(client_id) = cmd.client_id()
&& self.external_clients.contains(client_id)
{
if self.config.debug {
log::debug!(
"Skipping unsubscribe command for external client {client_id}: {cmd:?}",
);
}
return Ok(());
}
if let Some(client) = self.get_client(cmd.client_id(), cmd.venue()) {
client.execute_unsubscribe(cmd);
} else {
log::error!(
"Cannot handle command: no client found for client_id={:?}, venue={:?}",
cmd.client_id(),
cmd.venue(),
);
}
Ok(())
}
pub fn execute_request(&mut self, req: RequestCommand) -> anyhow::Result<()> {
if let Some(cid) = req.client_id()
&& self.external_clients.contains(cid)
{
if self.config.debug {
log::debug!("Skipping data request for external client {cid}: {req:?}");
}
return Ok(());
}
if let Some(client) = self.get_client(req.client_id(), req.venue()) {
match req {
RequestCommand::Data(req) => client.request_data(req),
RequestCommand::Instrument(req) => client.request_instrument(req),
RequestCommand::Instruments(req) => client.request_instruments(req),
RequestCommand::BookSnapshot(req) => client.request_book_snapshot(req),
RequestCommand::BookDepth(req) => client.request_book_depth(req),
RequestCommand::Quotes(req) => client.request_quotes(req),
RequestCommand::Trades(req) => client.request_trades(req),
RequestCommand::FundingRates(req) => client.request_funding_rates(req),
RequestCommand::ForwardPrices(req) => client.request_forward_prices(req),
RequestCommand::Bars(req) => client.request_bars(req),
}
} else {
anyhow::bail!(
"Cannot handle request: no client found for {:?} {:?}",
req.client_id(),
req.venue()
);
}
}
pub fn process(&mut self, data: &dyn Any) {
if let Some(instrument) = data.downcast_ref::<InstrumentAny>() {
self.handle_instrument(instrument);
} else if let Some(funding_rate) = data.downcast_ref::<FundingRateUpdate>() {
self.handle_funding_rate(*funding_rate);
} else if let Some(status) = data.downcast_ref::<InstrumentStatus>() {
self.handle_instrument_status(*status);
} else if let Some(option_greeks) = data.downcast_ref::<OptionGreeks>() {
self.cache.borrow_mut().add_option_greeks(*option_greeks);
let topic = switchboard::get_option_greeks_topic(option_greeks.instrument_id);
msgbus::publish_option_greeks(topic, option_greeks);
self.drain_deferred_commands();
} else {
log::error!("Cannot process data {data:?}, type is unrecognized");
}
}
pub fn process_data(&mut self, data: Data) {
match data {
Data::Delta(delta) => self.handle_delta(delta),
Data::Deltas(deltas) => self.handle_deltas(deltas.into_inner()),
Data::Depth10(depth) => self.handle_depth10(*depth),
Data::Quote(quote) => {
self.handle_quote(quote);
self.drain_deferred_commands();
}
Data::Trade(trade) => self.handle_trade(trade),
Data::Bar(bar) => self.handle_bar(bar),
Data::MarkPriceUpdate(mark_price) => {
self.handle_mark_price(mark_price);
self.drain_deferred_commands();
}
Data::IndexPriceUpdate(index_price) => {
self.handle_index_price(index_price);
self.drain_deferred_commands();
}
Data::InstrumentClose(close) => self.handle_instrument_close(close),
Data::Custom(custom) => self.handle_custom_data(&custom),
}
}
#[allow(clippy::needless_pass_by_value)] pub fn response(&mut self, resp: DataResponse) {
log::debug!("{RECV}{RES} {resp:?}");
let correlation_id = *resp.correlation_id();
match &resp {
DataResponse::Instrument(r) => {
self.handle_instrument_response(r.data.clone());
}
DataResponse::Instruments(r) => {
self.handle_instruments(&r.data);
}
DataResponse::Quotes(r) => {
if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
self.handle_quotes(&r.data);
}
}
DataResponse::Trades(r) => {
if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
self.handle_trades(&r.data);
}
}
DataResponse::FundingRates(r) => {
if !log_if_empty_response(&r.data, &r.instrument_id, &correlation_id) {
self.handle_funding_rates(&r.data);
}
}
DataResponse::Bars(r) => {
if !log_if_empty_response(&r.data, &r.bar_type, &correlation_id) {
self.handle_bars(&r.data);
}
}
DataResponse::Book(r) => self.handle_book_response(&r.data),
DataResponse::ForwardPrices(r) => {
return self.handle_forward_prices_response(&correlation_id, r);
}
_ => todo!("Handle other response types"),
}
msgbus::send_response(&correlation_id, &resp);
}
fn handle_instrument(&mut self, instrument: &InstrumentAny) {
log::debug!("Handling instrument: {}", instrument.id());
if let Err(e) = self
.cache
.as_ref()
.borrow_mut()
.add_instrument(instrument.clone())
{
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_instrument_topic(instrument.id());
log::debug!("Publishing instrument to topic: {topic}");
msgbus::publish_any(topic, instrument);
self.update_option_chains(instrument);
}
fn update_option_chains(&mut self, instrument: &InstrumentAny) {
let Some(underlying) = instrument.underlying() else {
return;
};
let Some(expiration_ns) = instrument.expiration_ns() else {
return;
};
let Some(strike) = instrument.strike_price() else {
return;
};
let Some(kind) = instrument.option_kind() else {
return;
};
let venue = instrument.id().venue;
let settlement = instrument.settlement_currency().code;
let series_id = OptionSeriesId::new(venue, underlying, settlement, expiration_ns);
let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
return;
};
let clock = self.clock.clone();
let client = self.get_client(None, Some(&venue));
if manager_rc
.borrow_mut()
.add_instrument(instrument.id(), strike, kind, client, &clock)
{
self.option_chain_instrument_index
.insert(instrument.id(), series_id);
}
}
fn handle_delta(&mut self, delta: OrderBookDelta) {
let deltas = if self.config.buffer_deltas {
if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&delta.instrument_id) {
buffered_deltas.deltas.push(delta);
buffered_deltas.flags = delta.flags;
buffered_deltas.sequence = delta.sequence;
buffered_deltas.ts_event = delta.ts_event;
buffered_deltas.ts_init = delta.ts_init;
} else {
let buffered_deltas = OrderBookDeltas::new(delta.instrument_id, vec![delta]);
self.buffered_deltas_map
.insert(delta.instrument_id, buffered_deltas);
}
if !RecordFlag::F_LAST.matches(delta.flags) {
return; }
self.buffered_deltas_map
.remove(&delta.instrument_id)
.expect("buffered deltas exist")
} else {
OrderBookDeltas::new(delta.instrument_id, vec![delta])
};
let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
msgbus::publish_deltas(topic, &deltas);
}
fn handle_deltas(&mut self, deltas: OrderBookDeltas) {
if self.config.buffer_deltas {
let instrument_id = deltas.instrument_id;
for delta in deltas.deltas {
if let Some(buffered_deltas) = self.buffered_deltas_map.get_mut(&instrument_id) {
buffered_deltas.deltas.push(delta);
buffered_deltas.flags = delta.flags;
buffered_deltas.sequence = delta.sequence;
buffered_deltas.ts_event = delta.ts_event;
buffered_deltas.ts_init = delta.ts_init;
} else {
let buffered_deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
self.buffered_deltas_map
.insert(instrument_id, buffered_deltas);
}
if RecordFlag::F_LAST.matches(delta.flags) {
let deltas_to_publish = self
.buffered_deltas_map
.remove(&instrument_id)
.expect("buffered deltas exist");
let topic = switchboard::get_book_deltas_topic(instrument_id);
msgbus::publish_deltas(topic, &deltas_to_publish);
}
}
} else {
let topic = switchboard::get_book_deltas_topic(deltas.instrument_id);
msgbus::publish_deltas(topic, &deltas);
}
}
fn handle_depth10(&self, depth: OrderBookDepth10) {
let topic = switchboard::get_book_depth10_topic(depth.instrument_id);
msgbus::publish_depth10(topic, &depth);
}
fn handle_quote(&self, quote: QuoteTick) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_quote(quote) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_quotes_topic(quote.instrument_id);
msgbus::publish_quote(topic, "e);
}
fn handle_trade(&self, trade: TradeTick) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_trade(trade) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_trades_topic(trade.instrument_id);
msgbus::publish_trade(topic, &trade);
}
fn handle_bar(&self, bar: Bar) {
if self.config.validate_data_sequence
&& let Some(last_bar) = self.cache.as_ref().borrow().bar(&bar.bar_type)
{
if bar.ts_event < last_bar.ts_event {
log::warn!(
"Bar {bar} was prior to last bar `ts_event` {}",
last_bar.ts_event
);
return; }
if bar.ts_init < last_bar.ts_init {
log::warn!(
"Bar {bar} was prior to last bar `ts_init` {}",
last_bar.ts_init
);
return; }
}
if let Err(e) = self.cache.as_ref().borrow_mut().add_bar(bar) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_bars_topic(bar.bar_type);
msgbus::publish_bar(topic, &bar);
}
fn handle_mark_price(&self, mark_price: MarkPriceUpdate) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_mark_price(mark_price) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_mark_price_topic(mark_price.instrument_id);
msgbus::publish_mark_price(topic, &mark_price);
}
fn handle_index_price(&self, index_price: IndexPriceUpdate) {
if let Err(e) = self
.cache
.as_ref()
.borrow_mut()
.add_index_price(index_price)
{
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_index_price_topic(index_price.instrument_id);
msgbus::publish_index_price(topic, &index_price);
}
pub fn handle_funding_rate(&mut self, funding_rate: FundingRateUpdate) {
if let Err(e) = self
.cache
.as_ref()
.borrow_mut()
.add_funding_rate(funding_rate)
{
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_funding_rate_topic(funding_rate.instrument_id);
msgbus::publish_funding_rate(topic, &funding_rate);
}
fn handle_instrument_status(&mut self, status: InstrumentStatus) {
let topic = switchboard::get_instrument_status_topic(status.instrument_id);
msgbus::publish_any(topic, &status);
if self
.option_chain_instrument_index
.contains_key(&status.instrument_id)
&& matches!(
status.action,
MarketStatusAction::Close | MarketStatusAction::NotAvailableForTrading
)
{
self.expire_option_chain_instrument(status.instrument_id);
}
}
fn expire_option_chain_instrument(&mut self, instrument_id: InstrumentId) {
let Some(series_id) = self.option_chain_instrument_index.remove(&instrument_id) else {
return;
};
let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
return;
};
let series_empty = manager_rc
.borrow_mut()
.handle_instrument_expired(&instrument_id);
self.drain_deferred_commands();
log::info!(
"Expired instrument {instrument_id} from option chain {series_id} (series_empty={series_empty})",
);
if series_empty {
manager_rc.borrow_mut().teardown(&self.clock);
self.option_chain_managers.remove(&series_id);
log::info!("Torn down empty option chain manager for {series_id}");
}
}
fn handle_instrument_close(&self, close: InstrumentClose) {
let topic = switchboard::get_instrument_close_topic(close.instrument_id);
msgbus::publish_any(topic, &close);
}
fn handle_custom_data(&self, custom: &CustomData) {
log::debug!("Processing custom data: {}", custom.data.type_name());
let topic = switchboard::get_custom_topic(&custom.data_type);
msgbus::publish_any(topic, custom);
}
fn drain_deferred_commands(&mut self) {
loop {
let commands: VecDeque<DeferredCommand> =
std::mem::take(&mut *self.deferred_cmd_queue.borrow_mut());
if commands.is_empty() {
break;
}
for cmd in commands {
match cmd {
DeferredCommand::Subscribe(sub) => {
let client = self.get_client(sub.client_id(), sub.venue());
if let Some(client) = client {
client.execute_subscribe(&sub);
}
}
DeferredCommand::Unsubscribe(unsub) => {
let client = self.get_client(unsub.client_id(), unsub.venue());
if let Some(client) = client {
client.execute_unsubscribe(&unsub);
}
}
DeferredCommand::ExpireSeries(series_id) => {
self.expire_series(series_id);
}
}
}
}
}
fn expire_series(&mut self, series_id: OptionSeriesId) {
let Some(manager_rc) = self.option_chain_managers.get(&series_id).cloned() else {
return;
};
let instrument_ids: Vec<InstrumentId> = self
.option_chain_instrument_index
.iter()
.filter(|(_, sid)| **sid == series_id)
.map(|(id, _)| *id)
.collect();
for id in &instrument_ids {
self.option_chain_instrument_index.remove(id);
manager_rc.borrow_mut().handle_instrument_expired(id);
}
manager_rc.borrow_mut().teardown(&self.clock);
self.option_chain_managers.remove(&series_id);
log::info!("Proactively torn down expired option chain {series_id}");
}
fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
if cmd.instrument_id.is_synthetic() {
anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
}
self.book_deltas_subs.insert(cmd.instrument_id);
self.setup_book_updater(&cmd.instrument_id, cmd.book_type, true, cmd.managed)?;
Ok(())
}
fn subscribe_book_depth10(&mut self, cmd: &SubscribeBookDepth10) -> anyhow::Result<()> {
if cmd.instrument_id.is_synthetic() {
anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDepth10` data");
}
self.book_depth10_subs.insert(cmd.instrument_id);
self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, cmd.managed)?;
Ok(())
}
fn subscribe_book_snapshots(&mut self, cmd: &SubscribeBookSnapshots) -> anyhow::Result<()> {
if cmd.instrument_id.is_synthetic() {
anyhow::bail!("Cannot subscribe for synthetic instrument `OrderBookDelta` data");
}
let first_for_interval = match self.book_intervals.entry(cmd.interval_ms) {
Entry::Vacant(e) => {
let mut set = AHashSet::new();
set.insert(cmd.instrument_id);
e.insert(set);
true
}
Entry::Occupied(mut e) => {
e.get_mut().insert(cmd.instrument_id);
false
}
};
if first_for_interval {
let interval_ns = millis_to_nanos_unchecked(cmd.interval_ms.get() as f64);
let topic = switchboard::get_book_snapshots_topic(cmd.instrument_id, cmd.interval_ms);
let snap_info = BookSnapshotInfo {
instrument_id: cmd.instrument_id,
venue: cmd.instrument_id.venue,
is_composite: cmd.instrument_id.symbol.is_composite(),
root: Ustr::from(cmd.instrument_id.symbol.root()),
topic,
interval_ms: cmd.interval_ms,
};
let now_ns = self.clock.borrow().timestamp_ns().as_u64();
let start_time_ns = now_ns - (now_ns % interval_ns) + interval_ns;
let snapshotter = Rc::new(BookSnapshotter::new(snap_info, self.cache.clone()));
self.book_snapshotters
.insert(cmd.instrument_id, snapshotter.clone());
let timer_name = snapshotter.timer_name;
let callback_fn: Rc<dyn Fn(TimeEvent)> =
Rc::new(move |event| snapshotter.snapshot(event));
let callback = TimeEventCallback::from(callback_fn);
self.clock
.borrow_mut()
.set_timer_ns(
&timer_name,
interval_ns,
Some(start_time_ns.into()),
None,
Some(callback),
None,
None,
)
.expect(FAILED);
}
if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
self.setup_book_updater(&cmd.instrument_id, cmd.book_type, false, true)?;
}
if let Some(client_id) = cmd.client_id.as_ref()
&& self.external_clients.contains(client_id)
{
if self.config.debug {
log::debug!("Skipping subscribe command for external client {client_id}: {cmd:?}",);
}
return Ok(());
}
log::debug!(
"Forwarding BookSnapshots as BookDeltas for {}, client_id={:?}, venue={:?}",
cmd.instrument_id,
cmd.client_id,
cmd.venue,
);
if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
let deltas_cmd = SubscribeBookDeltas::new(
cmd.instrument_id,
cmd.book_type,
cmd.client_id,
cmd.venue,
UUID4::new(),
cmd.ts_init,
cmd.depth,
true, Some(cmd.command_id),
cmd.params.clone(),
);
log::debug!(
"Calling client.execute_subscribe for BookDeltas: {}",
cmd.instrument_id
);
client.execute_subscribe(&SubscribeCommand::BookDeltas(deltas_cmd));
} else {
log::error!(
"Cannot handle command: no client found for client_id={:?}, venue={:?}",
cmd.client_id,
cmd.venue,
);
}
Ok(())
}
fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
match cmd.bar_type.aggregation_source() {
AggregationSource::Internal => {
if !self.bar_aggregators.contains_key(&cmd.bar_type.standard()) {
self.start_bar_aggregator(cmd.bar_type)?;
}
}
AggregationSource::External => {
if cmd.bar_type.instrument_id().is_synthetic() {
anyhow::bail!(
"Cannot subscribe for externally aggregated synthetic instrument bar data"
);
}
}
}
Ok(())
}
fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) {
if !self.subscribed_book_deltas().contains(&cmd.instrument_id) {
log::warn!("Cannot unsubscribe from `OrderBookDeltas` data: not subscribed");
return;
}
self.book_deltas_subs.remove(&cmd.instrument_id);
let topics = vec![
switchboard::get_book_deltas_topic(cmd.instrument_id),
switchboard::get_book_depth10_topic(cmd.instrument_id),
];
self.maintain_book_updater(&cmd.instrument_id, &topics);
self.maintain_book_snapshotter(&cmd.instrument_id);
}
fn unsubscribe_book_depth10(&mut self, cmd: &UnsubscribeBookDepth10) {
if !self.book_depth10_subs.contains(&cmd.instrument_id) {
log::warn!("Cannot unsubscribe from `OrderBookDepth10` data: not subscribed");
return;
}
self.book_depth10_subs.remove(&cmd.instrument_id);
let topics = vec![
switchboard::get_book_deltas_topic(cmd.instrument_id),
switchboard::get_book_depth10_topic(cmd.instrument_id),
];
self.maintain_book_updater(&cmd.instrument_id, &topics);
self.maintain_book_snapshotter(&cmd.instrument_id);
}
fn unsubscribe_book_snapshots(&mut self, cmd: &UnsubscribeBookSnapshots) {
let is_subscribed = self
.book_intervals
.values()
.any(|set| set.contains(&cmd.instrument_id));
if !is_subscribed {
log::warn!("Cannot unsubscribe from `OrderBook` snapshots: not subscribed");
return;
}
let mut to_remove = Vec::new();
for (interval, set) in &mut self.book_intervals {
if set.remove(&cmd.instrument_id) && set.is_empty() {
to_remove.push(*interval);
}
}
for interval in to_remove {
self.book_intervals.remove(&interval);
}
let topics = vec![
switchboard::get_book_deltas_topic(cmd.instrument_id),
switchboard::get_book_depth10_topic(cmd.instrument_id),
];
self.maintain_book_updater(&cmd.instrument_id, &topics);
self.maintain_book_snapshotter(&cmd.instrument_id);
let still_in_intervals = self
.book_intervals
.values()
.any(|set| set.contains(&cmd.instrument_id));
if !still_in_intervals && !self.book_deltas_subs.contains(&cmd.instrument_id) {
if let Some(client_id) = cmd.client_id.as_ref()
&& self.external_clients.contains(client_id)
{
return;
}
if let Some(client) = self.get_client(cmd.client_id.as_ref(), cmd.venue.as_ref()) {
let deltas_cmd = UnsubscribeBookDeltas::new(
cmd.instrument_id,
cmd.client_id,
cmd.venue,
UUID4::new(),
cmd.ts_init,
Some(cmd.command_id),
cmd.params.clone(),
);
client.execute_unsubscribe(&UnsubscribeCommand::BookDeltas(deltas_cmd));
}
}
}
fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) {
let bar_type = cmd.bar_type;
let topic = switchboard::get_bars_topic(bar_type.standard());
if msgbus::exact_subscriber_count_bars(topic) > 0 {
return;
}
if self.bar_aggregators.contains_key(&bar_type.standard())
&& let Err(e) = self.stop_bar_aggregator(bar_type)
{
log::error!("Error stopping bar aggregator for {bar_type}: {e}");
}
if bar_type.is_composite() {
let source_type = bar_type.composite();
let source_topic = switchboard::get_bars_topic(source_type);
if msgbus::exact_subscriber_count_bars(source_topic) == 0
&& self.bar_aggregators.contains_key(&source_type)
&& let Err(e) = self.stop_bar_aggregator(source_type)
{
log::error!("Error stopping source bar aggregator for {source_type}: {e}");
}
}
}
fn subscribe_option_chain(&mut self, cmd: &SubscribeOptionChain) {
let series_id = cmd.series_id;
if let Some(old) = self.option_chain_managers.remove(&series_id) {
log::info!("Re-subscribing option chain for {series_id}, tearing down previous");
let all_ids = old.borrow().all_instrument_ids();
let old_venue = old.borrow().venue();
old.borrow_mut().teardown(&self.clock);
self.forward_option_chain_unsubscribes(&all_ids, old_venue, cmd.client_id);
}
self.pending_option_chain_requests
.retain(|_, pending_cmd| pending_cmd.series_id != series_id);
if !matches!(cmd.strike_range, StrikeRange::Fixed(_)) {
let resolved_client_id = self
.get_client(cmd.client_id.as_ref(), Some(&series_id.venue))
.map(|c| c.client_id);
if let Some(client_id) = resolved_client_id {
let request_id = UUID4::new();
let ts_init = self.clock.borrow().timestamp_ns();
let sample_instrument_id = {
let cache = self.cache.borrow();
cache
.instruments(&series_id.venue, Some(&series_id.underlying))
.iter()
.find(|i| {
i.expiration_ns() == Some(series_id.expiration_ns)
&& i.settlement_currency().code == series_id.settlement_currency
})
.map(|i| i.id())
};
let request = RequestForwardPrices::new(
series_id.venue,
series_id.underlying,
sample_instrument_id,
Some(client_id),
request_id,
ts_init,
None,
);
self.pending_option_chain_requests
.insert(request_id, cmd.clone());
let req_cmd = RequestCommand::ForwardPrices(request);
if let Err(e) = self.execute_request(req_cmd) {
log::warn!("Failed to request forward prices for {series_id}: {e}");
let cmd = self
.pending_option_chain_requests
.remove(&request_id)
.expect("just inserted");
self.create_option_chain_manager(&cmd, None);
}
return;
}
}
self.create_option_chain_manager(cmd, None);
}
fn create_option_chain_manager(
&mut self,
cmd: &SubscribeOptionChain,
initial_atm_price: Option<Price>,
) {
let series_id = cmd.series_id;
let cache = self.cache.clone();
let clock = self.clock.clone();
let priority = self.msgbus_priority;
let deferred_cmd_queue = self.deferred_cmd_queue.clone();
let manager_rc = {
let client = self.get_client(cmd.client_id.as_ref(), Some(&series_id.venue));
OptionChainManager::create_and_setup(
series_id,
&cache,
cmd,
&clock,
priority,
client,
initial_atm_price,
deferred_cmd_queue,
)
};
for id in manager_rc.borrow().all_instrument_ids() {
self.option_chain_instrument_index.insert(id, series_id);
}
self.option_chain_managers.insert(series_id, manager_rc);
}
fn unsubscribe_option_chain(&mut self, cmd: &UnsubscribeOptionChain) {
let series_id = cmd.series_id;
let Some(manager_rc) = self.option_chain_managers.remove(&series_id) else {
log::warn!("Cannot unsubscribe option chain for {series_id}: not subscribed");
return;
};
let all_ids = manager_rc.borrow().all_instrument_ids();
let venue = manager_rc.borrow().venue();
for id in &all_ids {
self.option_chain_instrument_index.remove(id);
}
manager_rc.borrow_mut().teardown(&self.clock);
self.forward_option_chain_unsubscribes(&all_ids, venue, cmd.client_id);
log::info!("Unsubscribed option chain for {series_id}");
}
fn forward_option_chain_unsubscribes(
&mut self,
instrument_ids: &[InstrumentId],
venue: Venue,
client_id: Option<ClientId>,
) {
let ts_init = self.clock.borrow().timestamp_ns();
let Some(client) = self.get_client(client_id.as_ref(), Some(&venue)) else {
log::error!(
"Cannot forward option chain unsubscribes: no client found for venue={venue}",
);
return;
};
for instrument_id in instrument_ids {
client.execute_unsubscribe(&UnsubscribeCommand::Quotes(UnsubscribeQuotes::new(
*instrument_id,
client_id,
Some(venue),
UUID4::new(),
ts_init,
None,
None,
)));
client.execute_unsubscribe(&UnsubscribeCommand::OptionGreeks(
UnsubscribeOptionGreeks::new(
*instrument_id,
client_id,
Some(venue),
UUID4::new(),
ts_init,
None,
None,
),
));
client.execute_unsubscribe(&UnsubscribeCommand::InstrumentStatus(
UnsubscribeInstrumentStatus::new(
*instrument_id,
client_id,
Some(venue),
UUID4::new(),
ts_init,
None,
None,
),
));
}
}
fn maintain_book_updater(&mut self, instrument_id: &InstrumentId, _topics: &[MStr<Topic>]) {
let Some(updater) = self.book_updaters.get(instrument_id) else {
return;
};
let has_deltas = self.book_deltas_subs.contains(instrument_id);
let has_depth10 = self.book_depth10_subs.contains(instrument_id);
let deltas_topic = switchboard::get_book_deltas_topic(*instrument_id);
let depth_topic = switchboard::get_book_depth10_topic(*instrument_id);
let deltas_handler: TypedHandler<OrderBookDeltas> = TypedHandler::new(updater.clone());
let depth_handler: TypedHandler<OrderBookDepth10> = TypedHandler::new(updater.clone());
if !has_deltas {
msgbus::unsubscribe_book_deltas(deltas_topic.into(), &deltas_handler);
}
if !has_depth10 {
msgbus::unsubscribe_book_depth10(depth_topic.into(), &depth_handler);
}
if !has_deltas && !has_depth10 {
self.book_updaters.remove(instrument_id);
log::debug!("Removed BookUpdater for instrument ID {instrument_id}");
}
}
fn maintain_book_snapshotter(&mut self, instrument_id: &InstrumentId) {
if let Some(snapshotter) = self.book_snapshotters.get(instrument_id) {
let topic = switchboard::get_book_snapshots_topic(
*instrument_id,
snapshotter.snap_info.interval_ms,
);
if msgbus::subscriber_count_book_snapshots(topic) == 0 {
let timer_name = snapshotter.timer_name;
self.book_snapshotters.remove(instrument_id);
let mut clock = self.clock.borrow_mut();
if clock.timer_exists(&timer_name) {
clock.cancel_timer(&timer_name);
}
log::debug!("Removed BookSnapshotter for instrument ID {instrument_id}");
}
}
}
fn handle_instrument_response(&self, instrument: InstrumentAny) {
let mut cache = self.cache.as_ref().borrow_mut();
if let Err(e) = cache.add_instrument(instrument) {
log_error_on_cache_insert(&e);
}
}
fn handle_instruments(&self, instruments: &[InstrumentAny]) {
let mut cache = self.cache.as_ref().borrow_mut();
for instrument in instruments {
if let Err(e) = cache.add_instrument(instrument.clone()) {
log_error_on_cache_insert(&e);
}
}
}
fn handle_quotes(&self, quotes: &[QuoteTick]) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_quotes(quotes) {
log_error_on_cache_insert(&e);
}
}
fn handle_trades(&self, trades: &[TradeTick]) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_trades(trades) {
log_error_on_cache_insert(&e);
}
}
fn handle_funding_rates(&self, funding_rates: &[FundingRateUpdate]) {
if let Err(e) = self
.cache
.as_ref()
.borrow_mut()
.add_funding_rates(funding_rates)
{
log_error_on_cache_insert(&e);
}
}
fn handle_bars(&self, bars: &[Bar]) {
if let Err(e) = self.cache.as_ref().borrow_mut().add_bars(bars) {
log_error_on_cache_insert(&e);
}
}
fn handle_book_response(&self, book: &OrderBook) {
log::debug!("Adding order book {} to cache", book.instrument_id);
if let Err(e) = self
.cache
.as_ref()
.borrow_mut()
.add_order_book(book.clone())
{
log_error_on_cache_insert(&e);
}
}
fn handle_forward_prices_response(
&mut self,
correlation_id: &UUID4,
resp: &ForwardPricesResponse,
) {
let Some(cmd) = self.pending_option_chain_requests.remove(correlation_id) else {
log::debug!(
"No pending option chain request for correlation_id={correlation_id}, ignoring"
);
return;
};
let series_id = cmd.series_id;
let cache = self.cache.borrow();
let mut best_price: Option<Price> = None;
for fp in &resp.data {
if let Some(instrument) = cache.instrument(&fp.instrument_id)
&& let Some(expiration) = instrument.expiration_ns()
&& expiration == series_id.expiration_ns
&& instrument.settlement_currency().code == series_id.settlement_currency
{
match Price::from_decimal(fp.forward_price) {
Ok(price) => best_price = Some(price),
Err(e) => log::warn!("Invalid forward price for {}: {e}", fp.instrument_id),
}
break;
}
}
drop(cache);
if let Some(price) = best_price {
log::info!("Forward price for {series_id}: {price} (instant bootstrap)",);
} else {
log::info!(
"No matching forward price found for {series_id}, will bootstrap from live data",
);
}
self.create_option_chain_manager(&cmd, best_price);
}
#[allow(clippy::too_many_arguments)]
fn setup_book_updater(
&mut self,
instrument_id: &InstrumentId,
book_type: BookType,
only_deltas: bool,
managed: bool,
) -> anyhow::Result<()> {
let mut cache = self.cache.borrow_mut();
if managed && !cache.has_order_book(instrument_id) {
let book = OrderBook::new(*instrument_id, book_type);
log::debug!("Created {book}");
cache.add_order_book(book)?;
}
let updater = self
.book_updaters
.entry(*instrument_id)
.or_insert_with(|| Rc::new(BookUpdater::new(instrument_id, self.cache.clone())))
.clone();
let topic = switchboard::get_book_deltas_topic(*instrument_id);
let deltas_handler = TypedHandler::new(updater.clone());
msgbus::subscribe_book_deltas(topic.into(), deltas_handler, Some(self.msgbus_priority));
if !only_deltas {
let topic = switchboard::get_book_depth10_topic(*instrument_id);
let depth_handler = TypedHandler::new(updater);
msgbus::subscribe_book_depth10(topic.into(), depth_handler, Some(self.msgbus_priority));
}
Ok(())
}
fn create_bar_aggregator(
&self,
instrument: &InstrumentAny,
bar_type: BarType,
) -> Box<dyn BarAggregator> {
let cache = self.cache.clone();
let handler = move |bar: Bar| {
if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_bars_topic(bar.bar_type);
msgbus::publish_bar(topic, &bar);
};
let clock = self.clock.clone();
let config = self.config.clone();
let price_precision = instrument.price_precision();
let size_precision = instrument.size_precision();
if bar_type.spec().is_time_aggregated() {
let time_bars_origin_offset = config
.time_bars_origins
.get(&bar_type.spec().aggregation)
.map(|duration| chrono::TimeDelta::from_std(*duration).unwrap_or_default());
Box::new(TimeBarAggregator::new(
bar_type,
price_precision,
size_precision,
clock,
handler,
config.time_bars_build_with_no_updates,
config.time_bars_timestamp_on_close,
config.time_bars_interval_type,
time_bars_origin_offset,
config.time_bars_build_delay,
config.time_bars_skip_first_non_full_bar,
))
} else {
match bar_type.spec().aggregation {
BarAggregation::Tick => Box::new(TickBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::TickImbalance => Box::new(TickImbalanceBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::TickRuns => Box::new(TickRunsBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::Volume => Box::new(VolumeBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::VolumeImbalance => Box::new(VolumeImbalanceBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::VolumeRuns => Box::new(VolumeRunsBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::Value => Box::new(ValueBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::ValueImbalance => Box::new(ValueImbalanceBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::ValueRuns => Box::new(ValueRunsBarAggregator::new(
bar_type,
price_precision,
size_precision,
handler,
)) as Box<dyn BarAggregator>,
BarAggregation::Renko => Box::new(RenkoBarAggregator::new(
bar_type,
price_precision,
size_precision,
instrument.price_increment(),
handler,
)) as Box<dyn BarAggregator>,
_ => panic!(
"BarAggregation {:?} is not currently implemented. Supported aggregations: MILLISECOND, SECOND, MINUTE, HOUR, DAY, WEEK, MONTH, YEAR, TICK, TICK_IMBALANCE, TICK_RUNS, VOLUME, VOLUME_IMBALANCE, VOLUME_RUNS, VALUE, VALUE_IMBALANCE, VALUE_RUNS, RENKO",
bar_type.spec().aggregation
),
}
}
}
fn start_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
let instrument = {
let cache = self.cache.borrow();
cache
.instrument(&bar_type.instrument_id())
.ok_or_else(|| {
anyhow::anyhow!(
"Cannot start bar aggregation: no instrument found for {}",
bar_type.instrument_id(),
)
})?
.clone()
};
let bar_key = bar_type.standard();
let aggregator = if let Some(rc) = self.bar_aggregators.get(&bar_key) {
rc.clone()
} else {
let agg = self.create_bar_aggregator(&instrument, bar_type);
let rc = Rc::new(RefCell::new(agg));
self.bar_aggregators.insert(bar_key, rc.clone());
rc
};
let mut subscriptions = Vec::new();
if bar_type.is_composite() {
let topic = switchboard::get_bars_topic(bar_type.composite());
let handler = TypedHandler::new(BarBarHandler::new(&aggregator, bar_key));
msgbus::subscribe_bars(topic.into(), handler.clone(), Some(self.msgbus_priority));
subscriptions.push(BarAggregatorSubscription::Bar { topic, handler });
} else if bar_type.spec().price_type == PriceType::Last {
let topic = switchboard::get_trades_topic(bar_type.instrument_id());
let handler = TypedHandler::new(BarTradeHandler::new(&aggregator, bar_key));
msgbus::subscribe_trades(topic.into(), handler.clone(), Some(self.msgbus_priority));
subscriptions.push(BarAggregatorSubscription::Trade { topic, handler });
} else {
if matches!(
bar_type.spec().aggregation,
BarAggregation::TickImbalance
| BarAggregation::VolumeImbalance
| BarAggregation::ValueImbalance
| BarAggregation::TickRuns
| BarAggregation::VolumeRuns
| BarAggregation::ValueRuns
) {
log::warn!(
"Bar type {bar_type} uses imbalance/runs aggregation which requires trade \
data with `aggressor_side`, but `price_type` is not LAST so it will receive \
quote data: bars will not emit correctly",
);
}
let topic = switchboard::get_quotes_topic(bar_type.instrument_id());
let handler = TypedHandler::new(BarQuoteHandler::new(&aggregator, bar_key));
msgbus::subscribe_quotes(topic.into(), handler.clone(), Some(self.msgbus_priority));
subscriptions.push(BarAggregatorSubscription::Quote { topic, handler });
}
self.bar_aggregator_handlers.insert(bar_key, subscriptions);
self.setup_bar_aggregator(bar_type, false)?;
aggregator.borrow_mut().set_is_running(true);
Ok(())
}
fn setup_bar_aggregator(&self, bar_type: BarType, historical: bool) -> anyhow::Result<()> {
let bar_key = bar_type.standard();
let aggregator = self.bar_aggregators.get(&bar_key).ok_or_else(|| {
anyhow::anyhow!("Cannot setup bar aggregator: no aggregator found for {bar_type}")
})?;
let handler: Box<dyn FnMut(Bar)> = if historical {
let cache = self.cache.clone();
Box::new(move |bar: Bar| {
if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
log_error_on_cache_insert(&e);
}
})
} else {
let cache = self.cache.clone();
Box::new(move |bar: Bar| {
if let Err(e) = cache.as_ref().borrow_mut().add_bar(bar) {
log_error_on_cache_insert(&e);
}
let topic = switchboard::get_bars_topic(bar.bar_type);
msgbus::publish_bar(topic, &bar);
})
};
aggregator
.borrow_mut()
.set_historical_mode(historical, handler);
if bar_type.spec().is_time_aggregated() {
use nautilus_common::clock::TestClock;
if historical {
let test_clock = Rc::new(RefCell::new(TestClock::new()));
aggregator.borrow_mut().set_clock(test_clock);
let aggregator_weak = Rc::downgrade(aggregator);
aggregator.borrow_mut().set_aggregator_weak(aggregator_weak);
} else {
aggregator.borrow_mut().set_clock(self.clock.clone());
aggregator
.borrow_mut()
.start_timer(Some(aggregator.clone()));
}
}
Ok(())
}
fn stop_bar_aggregator(&mut self, bar_type: BarType) -> anyhow::Result<()> {
let aggregator = self
.bar_aggregators
.remove(&bar_type.standard())
.ok_or_else(|| {
anyhow::anyhow!("Cannot stop bar aggregator: no aggregator to stop for {bar_type}")
})?;
aggregator.borrow_mut().stop();
let bar_key = bar_type.standard();
if let Some(subs) = self.bar_aggregator_handlers.remove(&bar_key) {
for sub in subs {
match sub {
BarAggregatorSubscription::Bar { topic, handler } => {
msgbus::unsubscribe_bars(topic.into(), &handler);
}
BarAggregatorSubscription::Trade { topic, handler } => {
msgbus::unsubscribe_trades(topic.into(), &handler);
}
BarAggregatorSubscription::Quote { topic, handler } => {
msgbus::unsubscribe_quotes(topic.into(), &handler);
}
}
}
}
Ok(())
}
}
#[inline(always)]
fn log_error_on_cache_insert<T: Display>(e: &T) {
log::error!("Error on cache insert: {e}");
}
#[inline(always)]
fn log_if_empty_response<T, I: Display>(data: &[T], id: &I, correlation_id: &UUID4) -> bool {
if data.is_empty() {
let name = type_name::<T>();
let short_name = name.rsplit("::").next().unwrap_or(name);
log::warn!("Received empty {short_name} response for {id} {correlation_id}");
return true;
}
false
}