use std::{
any::Any,
cell::{Ref, RefCell, RefMut},
collections::HashMap,
fmt::Debug,
num::NonZeroUsize,
ops::{Deref, DerefMut},
rc::Rc,
sync::Arc,
};
use ahash::{AHashMap, AHashSet};
use chrono::{DateTime, Utc};
use indexmap::IndexMap;
use nautilus_core::{Params, UUID4, UnixNanos, correctness::check_predicate_true};
#[cfg(feature = "defi")]
use nautilus_model::defi::{
Block, Blockchain, Pool, PoolLiquidityUpdate, PoolSwap, data::PoolFeeCollect, data::PoolFlash,
};
use nautilus_model::{
data::{
Bar, BarType, CustomData, DataType, FundingRateUpdate, IndexPriceUpdate, InstrumentStatus,
MarkPriceUpdate, OrderBookDeltas, OrderBookDepth10, QuoteTick, TradeTick,
close::InstrumentClose,
option_chain::{OptionChainSlice, OptionGreeks, StrikeRange},
},
enums::BookType,
events::order::{any::OrderEventAny, canceled::OrderCanceled, filled::OrderFilled},
identifiers::{ActorId, ClientId, ComponentId, InstrumentId, OptionSeriesId, TraderId, Venue},
instruments::{InstrumentAny, SyntheticInstrument},
orderbook::OrderBook,
};
use serde::{Deserialize, Serialize};
use ustr::Ustr;
#[cfg(feature = "indicators")]
use super::indicators::Indicators;
use super::{
Actor,
registry::{get_actor_unchecked, try_get_actor_unchecked},
};
#[cfg(feature = "defi")]
use crate::defi;
#[cfg(feature = "defi")]
#[allow(unused_imports)]
use crate::defi::data_actor as _; use crate::{
cache::Cache,
clock::Clock,
component::Component,
enums::{ComponentState, ComponentTrigger},
logging::{CMD, RECV, REQ, SEND},
messages::{
data::{
BarsResponse, BookResponse, CustomDataResponse, DataCommand, FundingRatesResponse,
InstrumentResponse, InstrumentsResponse, QuotesResponse, RequestBars,
RequestBookSnapshot, RequestCommand, RequestCustomData, RequestFundingRates,
RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades, SubscribeBars,
SubscribeBookDeltas, SubscribeBookSnapshots, SubscribeCommand, SubscribeCustomData,
SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
SubscribeInstrumentClose, SubscribeInstrumentStatus, SubscribeInstruments,
SubscribeMarkPrices, SubscribeOptionChain, SubscribeOptionGreeks, SubscribeQuotes,
SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
UnsubscribeBookSnapshots, UnsubscribeCommand, UnsubscribeCustomData,
UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrument,
UnsubscribeInstrumentClose, UnsubscribeInstrumentStatus, UnsubscribeInstruments,
UnsubscribeMarkPrices, UnsubscribeOptionChain, UnsubscribeOptionGreeks,
UnsubscribeQuotes, UnsubscribeTrades,
},
system::ShutdownSystem,
},
msgbus::{
self, MStr, Pattern, ShareableMessageHandler, Topic, TypedHandler, get_message_bus,
switchboard::{
MessagingSwitchboard, get_bars_topic, get_book_deltas_topic, get_book_snapshots_topic,
get_custom_topic, get_funding_rate_topic, get_index_price_topic,
get_instrument_close_topic, get_instrument_status_topic, get_instrument_topic,
get_instruments_pattern, get_mark_price_topic, get_option_chain_topic,
get_option_greeks_topic, get_order_cancels_topic, get_order_fills_topic,
get_quotes_topic, get_signal_pattern, get_trades_topic,
},
},
signal::Signal,
timer::{TimeEvent, TimeEventCallback},
};
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(default, deny_unknown_fields)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(
module = "nautilus_trader.core.nautilus_pyo3.common",
subclass,
from_py_object
)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
)]
pub struct DataActorConfig {
pub actor_id: Option<ActorId>,
pub log_events: bool,
pub log_commands: bool,
}
impl Default for DataActorConfig {
fn default() -> Self {
Self {
actor_id: None,
log_events: true,
log_commands: true,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
#[cfg_attr(
feature = "python",
pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.common", from_py_object)
)]
#[cfg_attr(
feature = "python",
pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.common")
)]
pub struct ImportableActorConfig {
pub actor_path: String,
pub config_path: String,
pub config: HashMap<String, serde_json::Value>,
}
type RequestCallback = Arc<dyn Fn(UUID4) + Send + Sync>;
pub trait DataActor:
Component + Deref<Target = DataActorCore> + DerefMut<Target = DataActorCore>
{
fn on_save(&self) -> anyhow::Result<IndexMap<String, Vec<u8>>> {
Ok(IndexMap::new())
}
#[allow(unused_variables)]
fn on_load(&mut self, state: IndexMap<String, Vec<u8>>) -> anyhow::Result<()> {
Ok(())
}
fn on_start(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_start` handler was called when not overridden, \
it's expected that any actions required when starting the actor \
occur here, such as subscribing/requesting data"
);
Ok(())
}
fn on_stop(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_stop` handler was called when not overridden, \
it's expected that any actions required when stopping the actor \
occur here, such as unsubscribing from data",
);
Ok(())
}
fn on_resume(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_resume` handler was called when not overridden, \
it's expected that any actions required when resuming the actor \
following a stop occur here"
);
Ok(())
}
fn on_reset(&mut self) -> anyhow::Result<()> {
log::warn!(
"The `on_reset` handler was called when not overridden, \
it's expected that any actions required when resetting the actor \
occur here, such as resetting indicators and other state"
);
Ok(())
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
Ok(())
}
fn on_fault(&mut self) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_time_event(&mut self, event: &TimeEvent) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_data(&mut self, data: &CustomData) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_signal(&mut self, signal: &Signal) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_book(&mut self, order_book: &OrderBook) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_trade(&mut self, tick: &TradeTick) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_option_chain(&mut self, slice: &OptionChainSlice) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_order_filled(&mut self, event: &OrderFilled) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_order_canceled(&mut self, event: &OrderCanceled) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_block(&mut self, block: &Block) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_pool(&mut self, pool: &Pool) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_pool_swap(&mut self, swap: &PoolSwap) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_pool_fee_collect(&mut self, collect: &PoolFeeCollect) -> anyhow::Result<()> {
Ok(())
}
#[cfg(feature = "defi")]
#[allow(unused_variables)]
fn on_pool_flash(&mut self, flash: &PoolFlash) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_data(&mut self, data: &dyn Any) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_funding_rates(
&mut self,
funding_rates: &[FundingRateUpdate],
) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_mark_prices(&mut self, mark_prices: &[MarkPriceUpdate]) -> anyhow::Result<()> {
Ok(())
}
#[allow(unused_variables)]
fn on_historical_index_prices(
&mut self,
index_prices: &[IndexPriceUpdate],
) -> anyhow::Result<()> {
Ok(())
}
fn handle_time_event(&mut self, event: &TimeEvent) {
log_received(&event);
if self.not_running() {
log_not_running(&event);
return;
}
if let Err(e) = DataActor::on_time_event(self, event) {
log_error(&e);
}
}
fn handle_data(&mut self, data: &CustomData) {
log_received(&data);
if self.not_running() {
log_not_running(&data);
return;
}
if let Err(e) = self.on_data(data) {
log_error(&e);
}
}
fn handle_signal(&mut self, signal: &Signal) {
log_received(&signal);
if self.not_running() {
log_not_running(&signal);
return;
}
if let Err(e) = self.on_signal(signal) {
log_error(&e);
}
}
fn handle_instrument(&mut self, instrument: &InstrumentAny) {
log_received(&instrument);
if self.not_running() {
log_not_running(&instrument);
return;
}
if let Err(e) = self.on_instrument(instrument) {
log_error(&e);
}
}
fn handle_book_deltas(&mut self, deltas: &OrderBookDeltas) {
log_received(&deltas);
if self.not_running() {
log_not_running(&deltas);
return;
}
if let Err(e) = self.on_book_deltas(deltas) {
log_error(&e);
}
}
fn handle_book(&mut self, book: &OrderBook) {
log_received(&book);
if self.not_running() {
log_not_running(&book);
return;
}
if let Err(e) = self.on_book(book) {
log_error(&e);
}
}
fn handle_quote(&mut self, quote: &QuoteTick) {
log_received("e);
if self.not_running() {
log_not_running("e);
return;
}
if let Err(e) = self.on_quote(quote) {
log_error(&e);
}
}
fn handle_trade(&mut self, trade: &TradeTick) {
log_received(&trade);
if self.not_running() {
log_not_running(&trade);
return;
}
if let Err(e) = self.on_trade(trade) {
log_error(&e);
}
}
fn handle_bar(&mut self, bar: &Bar) {
log_received(&bar);
if self.not_running() {
log_not_running(&bar);
return;
}
if let Err(e) = self.on_bar(bar) {
log_error(&e);
}
}
fn handle_mark_price(&mut self, mark_price: &MarkPriceUpdate) {
log_received(&mark_price);
if self.not_running() {
log_not_running(&mark_price);
return;
}
if let Err(e) = self.on_mark_price(mark_price) {
log_error(&e);
}
}
fn handle_index_price(&mut self, index_price: &IndexPriceUpdate) {
log_received(&index_price);
if self.not_running() {
log_not_running(&index_price);
return;
}
if let Err(e) = self.on_index_price(index_price) {
log_error(&e);
}
}
fn handle_funding_rate(&mut self, funding_rate: &FundingRateUpdate) {
log_received(&funding_rate);
if self.not_running() {
log_not_running(&funding_rate);
return;
}
if let Err(e) = self.on_funding_rate(funding_rate) {
log_error(&e);
}
}
fn handle_option_greeks(&mut self, greeks: &OptionGreeks) {
log_received(&greeks);
if self.not_running() {
log_not_running(&greeks);
return;
}
if let Err(e) = self.on_option_greeks(greeks) {
log_error(&e);
}
}
fn handle_option_chain(&mut self, slice: &OptionChainSlice) {
log_received(&slice);
if self.not_running() {
log_not_running(&slice);
return;
}
if let Err(e) = self.on_option_chain(slice) {
log_error(&e);
}
}
fn handle_instrument_status(&mut self, status: &InstrumentStatus) {
log_received(&status);
if self.not_running() {
log_not_running(&status);
return;
}
if let Err(e) = self.on_instrument_status(status) {
log_error(&e);
}
}
fn handle_instrument_close(&mut self, close: &InstrumentClose) {
log_received(&close);
if self.not_running() {
log_not_running(&close);
return;
}
if let Err(e) = self.on_instrument_close(close) {
log_error(&e);
}
}
fn handle_order_filled(&mut self, event: &OrderFilled) {
log_received(&event);
if event.strategy_id.inner() == self.actor_id().inner() {
return;
}
if self.not_running() {
log_not_running(&event);
return;
}
if let Err(e) = self.on_order_filled(event) {
log_error(&e);
}
}
fn handle_order_canceled(&mut self, event: &OrderCanceled) {
log_received(&event);
if event.strategy_id.inner() == self.actor_id().inner() {
return;
}
if self.not_running() {
log_not_running(&event);
return;
}
if let Err(e) = self.on_order_canceled(event) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_block(&mut self, block: &Block) {
log_received(&block);
if self.not_running() {
log_not_running(&block);
return;
}
if let Err(e) = self.on_block(block) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_pool(&mut self, pool: &Pool) {
log_received(&pool);
if self.not_running() {
log_not_running(&pool);
return;
}
if let Err(e) = self.on_pool(pool) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_pool_swap(&mut self, swap: &PoolSwap) {
log_received(&swap);
if self.not_running() {
log_not_running(&swap);
return;
}
if let Err(e) = self.on_pool_swap(swap) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_pool_liquidity_update(&mut self, update: &PoolLiquidityUpdate) {
log_received(&update);
if self.not_running() {
log_not_running(&update);
return;
}
if let Err(e) = self.on_pool_liquidity_update(update) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_pool_fee_collect(&mut self, collect: &PoolFeeCollect) {
log_received(&collect);
if self.not_running() {
log_not_running(&collect);
return;
}
if let Err(e) = self.on_pool_fee_collect(collect) {
log_error(&e);
}
}
#[cfg(feature = "defi")]
fn handle_pool_flash(&mut self, flash: &PoolFlash) {
log_received(&flash);
if self.not_running() {
log_not_running(&flash);
return;
}
if let Err(e) = self.on_pool_flash(flash) {
log_error(&e);
}
}
fn handle_historical_data(&mut self, data: &dyn Any) {
log_received(&data);
if let Err(e) = self.on_historical_data(data) {
log_error(&e);
}
}
fn handle_data_response(&mut self, resp: &CustomDataResponse) {
log_received(&resp);
if let Err(e) = self.on_historical_data(resp.data.as_ref()) {
log_error(&e);
}
}
fn handle_instrument_response(&mut self, resp: &InstrumentResponse) {
log_received(&resp);
if let Err(e) = self.on_instrument(&resp.data) {
log_error(&e);
}
}
fn handle_instruments_response(&mut self, resp: &InstrumentsResponse) {
log_received(&resp);
for inst in &resp.data {
if let Err(e) = self.on_instrument(inst) {
log_error(&e);
}
}
}
fn handle_book_response(&mut self, resp: &BookResponse) {
log_received(&resp);
if let Err(e) = self.on_book(&resp.data) {
log_error(&e);
}
}
fn handle_quotes_response(&mut self, resp: &QuotesResponse) {
log_received(&resp);
if let Err(e) = self.on_historical_quotes(&resp.data) {
log_error(&e);
}
}
fn handle_trades_response(&mut self, resp: &TradesResponse) {
log_received(&resp);
if let Err(e) = self.on_historical_trades(&resp.data) {
log_error(&e);
}
}
fn handle_funding_rates_response(&mut self, resp: &FundingRatesResponse) {
log_received(&resp);
if let Err(e) = self.on_historical_funding_rates(&resp.data) {
log_error(&e);
}
}
fn handle_bars_response(&mut self, resp: &BarsResponse) {
log_received(&resp);
if let Err(e) = self.on_historical_bars(&resp.data) {
log_error(&e);
}
}
fn subscribe_data(
&mut self,
data_type: DataType,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
get_actor_unchecked::<Self>(&actor_id).handle_data(data);
});
DataActorCore::subscribe_data(self, handler, data_type, client_id, params);
}
fn subscribe_signal(&mut self, name: &str)
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |data: &CustomData| {
if let Some(signal) = data.data.as_any().downcast_ref::<Signal>() {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_signal(signal);
} else {
log::error!("Actor {actor_id} not found for signal handling");
}
}
});
DataActorCore::subscribe_signal(self, handler, name);
}
fn subscribe_quotes(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_quotes_topic(instrument_id);
let handler = TypedHandler::from(move |quote: &QuoteTick| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_quote(quote);
} else {
log::error!("Actor {actor_id} not found for quote handling");
}
});
DataActorCore::subscribe_quotes(self, topic, handler, instrument_id, client_id, params);
}
fn subscribe_instruments(
&mut self,
venue: Venue,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let pattern = get_instruments_pattern(venue);
let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_instrument(instrument);
} else {
log::error!("Actor {actor_id} not found for instruments handling");
}
});
DataActorCore::subscribe_instruments(self, pattern, handler, venue, client_id, params);
}
fn subscribe_instrument(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_instrument_topic(instrument_id);
let handler = ShareableMessageHandler::from_typed(move |instrument: &InstrumentAny| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_instrument(instrument);
} else {
log::error!("Actor {actor_id} not found for instrument handling");
}
});
DataActorCore::subscribe_instrument(self, topic, handler, instrument_id, client_id, params);
}
fn subscribe_book_deltas(
&mut self,
instrument_id: InstrumentId,
book_type: BookType,
depth: Option<NonZeroUsize>,
client_id: Option<ClientId>,
managed: bool,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_book_deltas_topic(instrument_id);
let handler = TypedHandler::from(move |deltas: &OrderBookDeltas| {
get_actor_unchecked::<Self>(&actor_id).handle_book_deltas(deltas);
});
DataActorCore::subscribe_book_deltas(
self,
topic,
handler,
instrument_id,
book_type,
depth,
client_id,
managed,
params,
);
}
fn subscribe_book_at_interval(
&mut self,
instrument_id: InstrumentId,
book_type: BookType,
depth: Option<NonZeroUsize>,
interval_ms: NonZeroUsize,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_book_snapshots_topic(instrument_id, interval_ms);
let handler = TypedHandler::from(move |book: &OrderBook| {
get_actor_unchecked::<Self>(&actor_id).handle_book(book);
});
DataActorCore::subscribe_book_at_interval(
self,
topic,
handler,
instrument_id,
book_type,
depth,
interval_ms,
client_id,
params,
);
}
fn subscribe_trades(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_trades_topic(instrument_id);
let handler = TypedHandler::from(move |trade: &TradeTick| {
get_actor_unchecked::<Self>(&actor_id).handle_trade(trade);
});
DataActorCore::subscribe_trades(self, topic, handler, instrument_id, client_id, params);
}
fn subscribe_bars(
&mut self,
bar_type: BarType,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_bars_topic(bar_type);
let handler = TypedHandler::from(move |bar: &Bar| {
get_actor_unchecked::<Self>(&actor_id).handle_bar(bar);
});
DataActorCore::subscribe_bars(self, topic, handler, bar_type, client_id, params);
}
fn subscribe_mark_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_mark_price_topic(instrument_id);
let handler = TypedHandler::from(move |mark_price: &MarkPriceUpdate| {
get_actor_unchecked::<Self>(&actor_id).handle_mark_price(mark_price);
});
DataActorCore::subscribe_mark_prices(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_index_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_index_price_topic(instrument_id);
let handler = TypedHandler::from(move |index_price: &IndexPriceUpdate| {
get_actor_unchecked::<Self>(&actor_id).handle_index_price(index_price);
});
DataActorCore::subscribe_index_prices(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_funding_rates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_funding_rate_topic(instrument_id);
let handler = TypedHandler::from(move |funding_rate: &FundingRateUpdate| {
get_actor_unchecked::<Self>(&actor_id).handle_funding_rate(funding_rate);
});
DataActorCore::subscribe_funding_rates(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_option_greeks(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_option_greeks_topic(instrument_id);
let handler = TypedHandler::from(move |option_greeks: &OptionGreeks| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_option_greeks(option_greeks);
} else {
log::error!("Actor {actor_id} not found for option greeks handling");
}
});
DataActorCore::subscribe_option_greeks(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_instrument_status(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_instrument_status_topic(instrument_id);
let handler = ShareableMessageHandler::from_typed(move |status: &InstrumentStatus| {
get_actor_unchecked::<Self>(&actor_id).handle_instrument_status(status);
});
DataActorCore::subscribe_instrument_status(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_instrument_close(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_instrument_close_topic(instrument_id);
let handler = ShareableMessageHandler::from_typed(move |close: &InstrumentClose| {
get_actor_unchecked::<Self>(&actor_id).handle_instrument_close(close);
});
DataActorCore::subscribe_instrument_close(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn subscribe_option_chain(
&mut self,
series_id: OptionSeriesId,
strike_range: StrikeRange,
snapshot_interval_ms: Option<u64>,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_option_chain_topic(series_id);
let handler = TypedHandler::from(move |slice: &OptionChainSlice| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_option_chain(slice);
} else {
log::error!("Actor {actor_id} not found for option chain handling");
}
});
DataActorCore::subscribe_option_chain(
self,
topic,
handler,
series_id,
strike_range,
snapshot_interval_ms,
client_id,
params,
);
}
fn subscribe_order_fills(&mut self, instrument_id: InstrumentId)
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_order_fills_topic(instrument_id);
let handler = TypedHandler::from(move |event: &OrderEventAny| {
if let OrderEventAny::Filled(filled) = event {
get_actor_unchecked::<Self>(&actor_id).handle_order_filled(filled);
}
});
DataActorCore::subscribe_order_fills(self, topic, handler);
}
fn subscribe_order_cancels(&mut self, instrument_id: InstrumentId)
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = get_order_cancels_topic(instrument_id);
let handler = TypedHandler::from(move |event: &OrderEventAny| {
if let OrderEventAny::Canceled(canceled) = event {
get_actor_unchecked::<Self>(&actor_id).handle_order_canceled(canceled);
}
});
DataActorCore::subscribe_order_cancels(self, topic, handler);
}
#[cfg(feature = "defi")]
fn subscribe_blocks(
&mut self,
chain: Blockchain,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_blocks_topic(chain);
let handler = TypedHandler::from(move |block: &Block| {
get_actor_unchecked::<Self>(&actor_id).handle_block(block);
});
DataActorCore::subscribe_blocks(self, topic, handler, chain, client_id, params);
}
#[cfg(feature = "defi")]
fn subscribe_pool(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_pool_topic(instrument_id);
let handler = TypedHandler::from(move |pool: &Pool| {
get_actor_unchecked::<Self>(&actor_id).handle_pool(pool);
});
DataActorCore::subscribe_pool(self, topic, handler, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn subscribe_pool_swaps(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_pool_swaps_topic(instrument_id);
let handler = TypedHandler::from(move |swap: &PoolSwap| {
get_actor_unchecked::<Self>(&actor_id).handle_pool_swap(swap);
});
DataActorCore::subscribe_pool_swaps(self, topic, handler, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn subscribe_pool_liquidity_updates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_liquidity_topic(instrument_id);
let handler = TypedHandler::from(move |update: &PoolLiquidityUpdate| {
get_actor_unchecked::<Self>(&actor_id).handle_pool_liquidity_update(update);
});
DataActorCore::subscribe_pool_liquidity_updates(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
#[cfg(feature = "defi")]
fn subscribe_pool_fee_collects(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_collect_topic(instrument_id);
let handler = TypedHandler::from(move |collect: &PoolFeeCollect| {
get_actor_unchecked::<Self>(&actor_id).handle_pool_fee_collect(collect);
});
DataActorCore::subscribe_pool_fee_collects(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
#[cfg(feature = "defi")]
fn subscribe_pool_flash_events(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let topic = defi::switchboard::get_defi_flash_topic(instrument_id);
let handler = TypedHandler::from(move |flash: &PoolFlash| {
get_actor_unchecked::<Self>(&actor_id).handle_pool_flash(flash);
});
DataActorCore::subscribe_pool_flash_events(
self,
topic,
handler,
instrument_id,
client_id,
params,
);
}
fn unsubscribe_data(
&mut self,
data_type: DataType,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_data(self, data_type, client_id, params);
}
fn unsubscribe_signal(&mut self, name: &str)
where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_signal(self, name);
}
fn unsubscribe_instruments(
&mut self,
venue: Venue,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_instruments(self, venue, client_id, params);
}
fn unsubscribe_instrument(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_instrument(self, instrument_id, client_id, params);
}
fn unsubscribe_book_deltas(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_book_deltas(self, instrument_id, client_id, params);
}
fn unsubscribe_book_at_interval(
&mut self,
instrument_id: InstrumentId,
interval_ms: NonZeroUsize,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_book_at_interval(
self,
instrument_id,
interval_ms,
client_id,
params,
);
}
fn unsubscribe_quotes(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_quotes(self, instrument_id, client_id, params);
}
fn unsubscribe_trades(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_trades(self, instrument_id, client_id, params);
}
fn unsubscribe_bars(
&mut self,
bar_type: BarType,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_bars(self, bar_type, client_id, params);
}
fn unsubscribe_mark_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_mark_prices(self, instrument_id, client_id, params);
}
fn unsubscribe_index_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_index_prices(self, instrument_id, client_id, params);
}
fn unsubscribe_funding_rates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_funding_rates(self, instrument_id, client_id, params);
}
fn unsubscribe_option_greeks(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_option_greeks(self, instrument_id, client_id, params);
}
fn unsubscribe_instrument_status(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_instrument_status(self, instrument_id, client_id, params);
}
fn unsubscribe_instrument_close(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_instrument_close(self, instrument_id, client_id, params);
}
fn unsubscribe_option_chain(&mut self, series_id: OptionSeriesId, client_id: Option<ClientId>)
where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_option_chain(self, series_id, client_id);
}
fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId)
where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_order_fills(self, instrument_id);
}
fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId)
where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_order_cancels(self, instrument_id);
}
#[cfg(feature = "defi")]
fn unsubscribe_blocks(
&mut self,
chain: Blockchain,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_blocks(self, chain, client_id, params);
}
#[cfg(feature = "defi")]
fn unsubscribe_pool(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_pool(self, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn unsubscribe_pool_swaps(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_pool_swaps(self, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn unsubscribe_pool_liquidity_updates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_pool_liquidity_updates(self, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn unsubscribe_pool_fee_collects(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_pool_fee_collects(self, instrument_id, client_id, params);
}
#[cfg(feature = "defi")]
fn unsubscribe_pool_flash_events(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) where
Self: 'static + Debug + Sized,
{
DataActorCore::unsubscribe_pool_flash_events(self, instrument_id, client_id, params);
}
fn request_data(
&mut self,
data_type: DataType,
client_id: ClientId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &CustomDataResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_data_response(resp);
});
DataActorCore::request_data(
self, data_type, client_id, start, end, limit, params, handler,
)
}
fn request_instrument(
&mut self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_instrument_response(resp);
});
DataActorCore::request_instrument(
self,
instrument_id,
start,
end,
client_id,
params,
handler,
)
}
fn request_instruments(
&mut self,
venue: Option<Venue>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &InstrumentsResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_instruments_response(resp);
});
DataActorCore::request_instruments(self, venue, start, end, client_id, params, handler)
}
fn request_book_snapshot(
&mut self,
instrument_id: InstrumentId,
depth: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &BookResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_book_response(resp);
});
DataActorCore::request_book_snapshot(self, instrument_id, depth, client_id, params, handler)
}
fn request_quotes(
&mut self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &QuotesResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_quotes_response(resp);
});
DataActorCore::request_quotes(
self,
instrument_id,
start,
end,
limit,
client_id,
params,
handler,
)
}
fn request_trades(
&mut self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &TradesResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_trades_response(resp);
});
DataActorCore::request_trades(
self,
instrument_id,
start,
end,
limit,
client_id,
params,
handler,
)
}
fn request_funding_rates(
&mut self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &FundingRatesResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_funding_rates_response(resp);
});
DataActorCore::request_funding_rates(
self,
instrument_id,
start,
end,
limit,
client_id,
params,
handler,
)
}
fn request_bars(
&mut self,
bar_type: BarType,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
) -> anyhow::Result<UUID4>
where
Self: 'static + Debug + Sized,
{
let actor_id = self.actor_id().inner();
let handler = ShareableMessageHandler::from_typed(move |resp: &BarsResponse| {
get_actor_unchecked::<Self>(&actor_id).handle_bars_response(resp);
});
DataActorCore::request_bars(
self, bar_type, start, end, limit, client_id, params, handler,
)
}
}
impl<T> Actor for T
where
T: DataActor + Debug + 'static,
{
fn id(&self) -> Ustr {
self.actor_id.inner()
}
#[allow(unused_variables)]
fn handle(&mut self, msg: &dyn Any) {
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl<T> Component for T
where
T: DataActor + Debug + 'static,
{
fn component_id(&self) -> ComponentId {
ComponentId::new(self.actor_id.inner().as_str())
}
fn state(&self) -> ComponentState {
self.state
}
fn transition_state(&mut self, trigger: ComponentTrigger) -> anyhow::Result<()> {
self.state = self.state.transition(&trigger)?;
log::info!("{}", self.state.variant_name());
Ok(())
}
fn register(
&mut self,
trader_id: TraderId,
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
) -> anyhow::Result<()> {
DataActorCore::register(self, trader_id, clock.clone(), cache)?;
let actor_id = self.actor_id().inner();
let callback = TimeEventCallback::from(move |event: TimeEvent| {
if let Some(mut actor) = try_get_actor_unchecked::<Self>(&actor_id) {
actor.handle_time_event(&event);
} else {
log::error!("Actor {actor_id} not found for time event handling");
}
});
clock.borrow_mut().register_default_handler(callback);
self.initialize()
}
fn on_start(&mut self) -> anyhow::Result<()> {
DataActor::on_start(self)
}
fn on_stop(&mut self) -> anyhow::Result<()> {
DataActor::on_stop(self)
}
fn on_resume(&mut self) -> anyhow::Result<()> {
DataActor::on_resume(self)
}
fn on_degrade(&mut self) -> anyhow::Result<()> {
DataActor::on_degrade(self)
}
fn on_fault(&mut self) -> anyhow::Result<()> {
DataActor::on_fault(self)
}
fn on_reset(&mut self) -> anyhow::Result<()> {
DataActor::on_reset(self)
}
fn on_dispose(&mut self) -> anyhow::Result<()> {
DataActor::on_dispose(self)
}
}
#[derive(Clone)]
#[allow(
dead_code,
reason = "TODO: Under development (pending_requests, signal_classes)"
)]
pub struct DataActorCore {
pub actor_id: ActorId,
pub config: DataActorConfig,
trader_id: Option<TraderId>,
clock: Option<Rc<RefCell<dyn Clock>>>, cache: Option<Rc<RefCell<Cache>>>, state: ComponentState,
topic_handlers: AHashMap<MStr<Pattern>, ShareableMessageHandler>,
deltas_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDeltas>>,
depth10_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBookDepth10>>,
book_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderBook>>,
quote_handlers: AHashMap<MStr<Topic>, TypedHandler<QuoteTick>>,
trade_handlers: AHashMap<MStr<Topic>, TypedHandler<TradeTick>>,
bar_handlers: AHashMap<MStr<Topic>, TypedHandler<Bar>>,
mark_price_handlers: AHashMap<MStr<Topic>, TypedHandler<MarkPriceUpdate>>,
index_price_handlers: AHashMap<MStr<Topic>, TypedHandler<IndexPriceUpdate>>,
funding_rate_handlers: AHashMap<MStr<Topic>, TypedHandler<FundingRateUpdate>>,
option_greeks_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionGreeks>>,
option_chain_handlers: AHashMap<MStr<Topic>, TypedHandler<OptionChainSlice>>,
order_event_handlers: AHashMap<MStr<Topic>, TypedHandler<OrderEventAny>>,
#[cfg(feature = "defi")]
block_handlers: AHashMap<MStr<Topic>, TypedHandler<Block>>,
#[cfg(feature = "defi")]
pool_handlers: AHashMap<MStr<Topic>, TypedHandler<Pool>>,
#[cfg(feature = "defi")]
pool_swap_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolSwap>>,
#[cfg(feature = "defi")]
pool_liquidity_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolLiquidityUpdate>>,
#[cfg(feature = "defi")]
pool_collect_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFeeCollect>>,
#[cfg(feature = "defi")]
pool_flash_handlers: AHashMap<MStr<Topic>, TypedHandler<PoolFlash>>,
warning_events: AHashSet<String>, pending_requests: AHashMap<UUID4, Option<RequestCallback>>,
signal_classes: AHashMap<String, String>,
#[cfg(feature = "indicators")]
indicators: Indicators,
}
impl Debug for DataActorCore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DataActorCore))
.field("actor_id", &self.actor_id)
.field("config", &self.config)
.field("state", &self.state)
.field("trader_id", &self.trader_id)
.finish()
}
}
impl DataActorCore {
pub(crate) fn add_subscription_any(
&mut self,
topic: MStr<Topic>,
handler: ShareableMessageHandler,
) {
let pattern: MStr<Pattern> = topic.into();
if self.topic_handlers.contains_key(&pattern) {
log::warn!(
"Actor {} attempted duplicate subscription to topic '{topic}'",
self.actor_id,
);
return;
}
self.topic_handlers.insert(pattern, handler.clone());
msgbus::subscribe_any(pattern, handler, None);
}
pub(crate) fn remove_subscription_any(&mut self, topic: MStr<Topic>) {
let pattern: MStr<Pattern> = topic.into();
if let Some(handler) = self.topic_handlers.remove(&pattern) {
msgbus::unsubscribe_any(pattern, &handler);
} else {
log::warn!(
"Actor {} attempted to unsubscribe from topic '{topic}' when not subscribed",
self.actor_id,
);
}
}
pub(crate) fn add_quote_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<QuoteTick>,
) {
if self.quote_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate quote subscription to '{topic}'",
self.actor_id
);
return;
}
self.quote_handlers.insert(topic, handler.clone());
msgbus::subscribe_quotes(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_quote_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.quote_handlers.remove(&topic) {
msgbus::unsubscribe_quotes(topic.into(), &handler);
}
}
pub(crate) fn add_trade_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<TradeTick>,
) {
if self.trade_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate trade subscription to '{topic}'",
self.actor_id
);
return;
}
self.trade_handlers.insert(topic, handler.clone());
msgbus::subscribe_trades(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_trade_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.trade_handlers.remove(&topic) {
msgbus::unsubscribe_trades(topic.into(), &handler);
}
}
pub(crate) fn add_bar_subscription(&mut self, topic: MStr<Topic>, handler: TypedHandler<Bar>) {
if self.bar_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate bar subscription to '{topic}'",
self.actor_id
);
return;
}
self.bar_handlers.insert(topic, handler.clone());
msgbus::subscribe_bars(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_bar_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.bar_handlers.remove(&topic) {
msgbus::unsubscribe_bars(topic.into(), &handler);
}
}
pub(crate) fn add_order_event_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderEventAny>,
) {
if self.order_event_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate order event subscription to '{topic}'",
self.actor_id
);
return;
}
self.order_event_handlers.insert(topic, handler.clone());
msgbus::subscribe_order_events(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_order_event_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.order_event_handlers.remove(&topic) {
msgbus::unsubscribe_order_events(topic.into(), &handler);
}
}
pub(crate) fn add_deltas_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderBookDeltas>,
) {
if self.deltas_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate deltas subscription to '{topic}'",
self.actor_id
);
return;
}
self.deltas_handlers.insert(topic, handler.clone());
msgbus::subscribe_book_deltas(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_deltas_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.deltas_handlers.remove(&topic) {
msgbus::unsubscribe_book_deltas(topic.into(), &handler);
}
}
#[allow(dead_code)]
pub(crate) fn add_depth10_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderBookDepth10>,
) {
if self.depth10_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate depth10 subscription to '{topic}'",
self.actor_id
);
return;
}
self.depth10_handlers.insert(topic, handler.clone());
msgbus::subscribe_book_depth10(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_depth10_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.depth10_handlers.remove(&topic) {
msgbus::unsubscribe_book_depth10(topic.into(), &handler);
}
}
pub(crate) fn add_instrument_subscription(
&mut self,
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
) {
if self.topic_handlers.contains_key(&pattern) {
log::warn!(
"Actor {} attempted duplicate instrument subscription to '{pattern}'",
self.actor_id
);
return;
}
self.topic_handlers.insert(pattern, handler.clone());
msgbus::subscribe_any(pattern, handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_instrument_subscription(&mut self, pattern: MStr<Pattern>) {
if let Some(handler) = self.topic_handlers.remove(&pattern) {
msgbus::unsubscribe_any(pattern, &handler);
}
}
pub(crate) fn add_instrument_close_subscription(
&mut self,
topic: MStr<Topic>,
handler: ShareableMessageHandler,
) {
let pattern: MStr<Pattern> = topic.into();
if self.topic_handlers.contains_key(&pattern) {
log::warn!(
"Actor {} attempted duplicate instrument close subscription to '{topic}'",
self.actor_id
);
return;
}
self.topic_handlers.insert(pattern, handler.clone());
msgbus::subscribe_any(pattern, handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_instrument_close_subscription(&mut self, topic: MStr<Topic>) {
let pattern: MStr<Pattern> = topic.into();
if let Some(handler) = self.topic_handlers.remove(&pattern) {
msgbus::unsubscribe_any(pattern, &handler);
}
}
pub(crate) fn add_book_snapshot_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderBook>,
) {
if self.book_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate book snapshot subscription to '{topic}'",
self.actor_id
);
return;
}
self.book_handlers.insert(topic, handler.clone());
msgbus::subscribe_book_snapshots(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_book_snapshot_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.book_handlers.remove(&topic) {
msgbus::unsubscribe_book_snapshots(topic.into(), &handler);
}
}
pub(crate) fn add_mark_price_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<MarkPriceUpdate>,
) {
if self.mark_price_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate mark price subscription to '{topic}'",
self.actor_id
);
return;
}
self.mark_price_handlers.insert(topic, handler.clone());
msgbus::subscribe_mark_prices(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_mark_price_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.mark_price_handlers.remove(&topic) {
msgbus::unsubscribe_mark_prices(topic.into(), &handler);
}
}
pub(crate) fn add_index_price_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<IndexPriceUpdate>,
) {
if self.index_price_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate index price subscription to '{topic}'",
self.actor_id
);
return;
}
self.index_price_handlers.insert(topic, handler.clone());
msgbus::subscribe_index_prices(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_index_price_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.index_price_handlers.remove(&topic) {
msgbus::unsubscribe_index_prices(topic.into(), &handler);
}
}
pub(crate) fn add_funding_rate_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<FundingRateUpdate>,
) {
if self.funding_rate_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate funding rate subscription to '{topic}'",
self.actor_id
);
return;
}
self.funding_rate_handlers.insert(topic, handler.clone());
msgbus::subscribe_funding_rates(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_funding_rate_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.funding_rate_handlers.remove(&topic) {
msgbus::unsubscribe_funding_rates(topic.into(), &handler);
}
}
pub(crate) fn add_option_greeks_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OptionGreeks>,
) {
if self.option_greeks_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate option greeks subscription to '{topic}'",
self.actor_id
);
return;
}
self.option_greeks_handlers.insert(topic, handler.clone());
msgbus::subscribe_option_greeks(topic.into(), handler, None);
}
#[allow(dead_code)]
pub(crate) fn remove_option_greeks_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.option_greeks_handlers.remove(&topic) {
msgbus::unsubscribe_option_greeks(topic.into(), &handler);
}
}
pub(crate) fn add_option_chain_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OptionChainSlice>,
) {
if self.option_chain_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate option chain subscription to '{topic}'",
self.actor_id
);
return;
}
self.option_chain_handlers.insert(topic, handler.clone());
msgbus::subscribe_option_chain(topic.into(), handler, None);
}
pub(crate) fn remove_option_chain_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.option_chain_handlers.remove(&topic) {
msgbus::unsubscribe_option_chain(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_block_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<Block>,
) {
if self.block_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate block subscription to '{topic}'",
self.actor_id
);
return;
}
self.block_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_blocks(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_block_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.block_handlers.remove(&topic) {
msgbus::unsubscribe_defi_blocks(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_pool_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<Pool>,
) {
if self.pool_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate pool subscription to '{topic}'",
self.actor_id
);
return;
}
self.pool_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_pools(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_pool_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.pool_handlers.remove(&topic) {
msgbus::unsubscribe_defi_pools(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_pool_swap_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolSwap>,
) {
if self.pool_swap_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate pool swap subscription to '{topic}'",
self.actor_id
);
return;
}
self.pool_swap_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_swaps(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_pool_swap_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.pool_swap_handlers.remove(&topic) {
msgbus::unsubscribe_defi_swaps(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_pool_liquidity_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolLiquidityUpdate>,
) {
if self.pool_liquidity_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate pool liquidity subscription to '{topic}'",
self.actor_id
);
return;
}
self.pool_liquidity_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_liquidity(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_pool_liquidity_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.pool_liquidity_handlers.remove(&topic) {
msgbus::unsubscribe_defi_liquidity(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_pool_collect_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolFeeCollect>,
) {
if self.pool_collect_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate pool collect subscription to '{topic}'",
self.actor_id
);
return;
}
self.pool_collect_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_collects(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_pool_collect_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.pool_collect_handlers.remove(&topic) {
msgbus::unsubscribe_defi_collects(topic.into(), &handler);
}
}
#[cfg(feature = "defi")]
pub(crate) fn add_pool_flash_subscription(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<PoolFlash>,
) {
if self.pool_flash_handlers.contains_key(&topic) {
log::warn!(
"Actor {} attempted duplicate pool flash subscription to '{topic}'",
self.actor_id
);
return;
}
self.pool_flash_handlers.insert(topic, handler.clone());
msgbus::subscribe_defi_flash(topic.into(), handler, None);
}
#[cfg(feature = "defi")]
#[allow(dead_code)]
pub(crate) fn remove_pool_flash_subscription(&mut self, topic: MStr<Topic>) {
if let Some(handler) = self.pool_flash_handlers.remove(&topic) {
msgbus::unsubscribe_defi_flash(topic.into(), &handler);
}
}
pub fn new(config: DataActorConfig) -> Self {
let actor_id = config
.actor_id
.unwrap_or_else(|| Self::default_actor_id(&config));
Self {
actor_id,
config,
trader_id: None, clock: None, cache: None, state: ComponentState::default(),
topic_handlers: AHashMap::new(),
deltas_handlers: AHashMap::new(),
depth10_handlers: AHashMap::new(),
book_handlers: AHashMap::new(),
quote_handlers: AHashMap::new(),
trade_handlers: AHashMap::new(),
bar_handlers: AHashMap::new(),
mark_price_handlers: AHashMap::new(),
index_price_handlers: AHashMap::new(),
funding_rate_handlers: AHashMap::new(),
option_greeks_handlers: AHashMap::new(),
option_chain_handlers: AHashMap::new(),
order_event_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
block_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
pool_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
pool_swap_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
pool_liquidity_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
pool_collect_handlers: AHashMap::new(),
#[cfg(feature = "defi")]
pool_flash_handlers: AHashMap::new(),
warning_events: AHashSet::new(),
pending_requests: AHashMap::new(),
signal_classes: AHashMap::new(),
#[cfg(feature = "indicators")]
indicators: Indicators::default(),
}
}
#[must_use]
pub fn mem_address(&self) -> String {
format!("{self:p}")
}
pub fn state(&self) -> ComponentState {
self.state
}
pub fn trader_id(&self) -> Option<TraderId> {
self.trader_id
}
pub fn actor_id(&self) -> ActorId {
self.actor_id
}
fn default_actor_id(config: &DataActorConfig) -> ActorId {
let memory_address = std::ptr::from_ref(config) as usize;
ActorId::from(format!("{}-{memory_address}", stringify!(DataActor)))
}
pub fn timestamp_ns(&self) -> UnixNanos {
self.clock_ref().timestamp_ns()
}
pub fn clock(&mut self) -> RefMut<'_, dyn Clock> {
self.clock
.as_ref()
.unwrap_or_else(|| {
panic!(
"DataActor {} must be registered before calling `clock()` - trader_id: {:?}",
self.actor_id, self.trader_id
)
})
.borrow_mut()
}
pub fn clock_rc(&self) -> Rc<RefCell<dyn Clock>> {
self.clock
.as_ref()
.expect("DataActor must be registered before accessing clock")
.clone()
}
fn clock_ref(&self) -> Ref<'_, dyn Clock> {
self.clock
.as_ref()
.unwrap_or_else(|| {
panic!(
"DataActor {} must be registered before calling `clock_ref()` - trader_id: {:?}",
self.actor_id, self.trader_id
)
})
.borrow()
}
pub fn cache(&self) -> Ref<'_, Cache> {
self.cache
.as_ref()
.expect("DataActor must be registered before accessing cache")
.borrow()
}
pub fn cache_rc(&self) -> Rc<RefCell<Cache>> {
self.cache
.as_ref()
.expect("DataActor must be registered before accessing cache")
.clone()
}
pub fn register(
&mut self,
trader_id: TraderId,
clock: Rc<RefCell<dyn Clock>>,
cache: Rc<RefCell<Cache>>,
) -> anyhow::Result<()> {
if let Some(existing_trader_id) = self.trader_id {
anyhow::bail!(
"DataActor {} already registered with trader {existing_trader_id}",
self.actor_id
);
}
{
let _timestamp = clock.borrow().timestamp_ns();
}
{
let _cache_borrow = cache.borrow();
}
self.trader_id = Some(trader_id);
self.clock = Some(clock);
self.cache = Some(cache);
if !self.is_properly_registered() {
anyhow::bail!(
"DataActor {} registration incomplete - validation failed",
self.actor_id
);
}
log::debug!("Registered {} with trader {trader_id}", self.actor_id);
Ok(())
}
pub fn register_warning_event(&mut self, event_type: &str) {
self.warning_events.insert(event_type.to_string());
log::debug!("Registered event type '{event_type}' for warning logs");
}
pub fn deregister_warning_event(&mut self, event_type: &str) {
self.warning_events.remove(event_type);
log::debug!("Deregistered event type '{event_type}' from warning logs");
}
pub fn is_registered(&self) -> bool {
self.trader_id.is_some()
}
pub(crate) fn check_registered(&self) {
assert!(
self.is_registered(),
"Actor has not been registered with a Trader"
);
}
fn is_properly_registered(&self) -> bool {
self.trader_id.is_some() && self.clock.is_some() && self.cache.is_some()
}
pub(crate) fn send_data_cmd(&self, command: DataCommand) {
if self.config.log_commands {
log::info!("{CMD}{SEND} {command:?}");
}
let endpoint = MessagingSwitchboard::data_engine_queue_execute();
msgbus::send_data_command(endpoint, command);
}
#[allow(dead_code)]
fn send_data_req(&self, request: &RequestCommand) {
if self.config.log_commands {
log::info!("{REQ}{SEND} {request:?}");
}
let endpoint = MessagingSwitchboard::data_engine_queue_execute();
msgbus::send_any(endpoint, request.as_any());
}
pub fn shutdown_system(&self, reason: Option<String>) {
self.check_registered();
let command = ShutdownSystem::new(
self.trader_id().unwrap(),
self.actor_id.inner(),
reason,
UUID4::new(),
self.timestamp_ns(),
);
let topic = MessagingSwitchboard::shutdown_system_topic();
msgbus::publish_any(topic, command.as_any());
}
pub fn publish_data(&self, data_type: &DataType, data: &CustomData) {
self.check_registered();
let topic = get_custom_topic(data_type);
msgbus::publish_any(topic, data);
}
pub fn publish_signal(&self, name: &str, value: String, ts_event: UnixNanos) {
self.check_registered();
let now = self.timestamp_ns();
let ts_event = if ts_event.as_u64() == 0 {
now
} else {
ts_event
};
let signal = Signal::new(Ustr::from(name), value, ts_event, now);
let data_type = DataType::new(
&format!(
"Signal{}",
nautilus_core::string::conversions::title_case(name)
),
None,
None,
);
let data = CustomData::new(Arc::new(signal), data_type);
let topic = get_custom_topic(&data.data_type);
msgbus::publish_any(topic, &data);
}
pub fn add_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
self.check_registered();
let cache = self.cache_rc();
if cache.borrow().synthetic(&synthetic.id).is_some() {
anyhow::bail!("`synthetic` {} already exists", synthetic.id);
}
cache.borrow_mut().add_synthetic(synthetic)
}
pub fn update_synthetic(&self, synthetic: SyntheticInstrument) -> anyhow::Result<()> {
self.check_registered();
let cache = self.cache_rc();
if cache.borrow().synthetic(&synthetic.id).is_none() {
anyhow::bail!("`synthetic` {} does not exist", synthetic.id);
}
cache.borrow_mut().add_synthetic(synthetic)
}
pub fn subscribe_data(
&mut self,
handler: ShareableMessageHandler,
data_type: DataType,
client_id: Option<ClientId>,
params: Option<Params>,
) {
assert!(
self.is_properly_registered(),
"DataActor {} is not properly registered - trader_id: {:?}, clock: {}, cache: {}",
self.actor_id,
self.trader_id,
self.clock.is_some(),
self.cache.is_some()
);
let topic = get_custom_topic(&data_type);
self.add_subscription_any(topic, handler);
if client_id.is_none() {
return;
}
let command = SubscribeCommand::Data(SubscribeCustomData {
data_type,
client_id,
venue: None,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_signal(&mut self, handler: ShareableMessageHandler, name: &str) {
self.check_registered();
let pattern = get_signal_pattern(name);
if self.topic_handlers.contains_key(&pattern) {
log::warn!(
"Actor {} attempted duplicate signal subscription to '{pattern}'",
self.actor_id,
);
return;
}
self.topic_handlers.insert(pattern, handler.clone());
msgbus::subscribe_any(pattern, handler, None);
}
pub fn subscribe_quotes(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<QuoteTick>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_quote_subscription(topic, handler);
let command = SubscribeCommand::Quotes(SubscribeQuotes {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_instruments(
&mut self,
pattern: MStr<Pattern>,
handler: ShareableMessageHandler,
venue: Venue,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_instrument_subscription(pattern, handler);
let command = SubscribeCommand::Instruments(SubscribeInstruments {
client_id,
venue,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_instrument(
&mut self,
topic: MStr<Topic>,
handler: ShareableMessageHandler,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_instrument_subscription(topic.into(), handler);
let command = SubscribeCommand::Instrument(SubscribeInstrument {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
#[expect(clippy::too_many_arguments)]
pub fn subscribe_book_deltas(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderBookDeltas>,
instrument_id: InstrumentId,
book_type: BookType,
depth: Option<NonZeroUsize>,
client_id: Option<ClientId>,
managed: bool,
params: Option<Params>,
) {
self.check_registered();
self.add_deltas_subscription(topic, handler);
let command = SubscribeCommand::BookDeltas(SubscribeBookDeltas {
instrument_id,
book_type,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
depth,
managed,
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
#[expect(clippy::too_many_arguments)]
pub fn subscribe_book_at_interval(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderBook>,
instrument_id: InstrumentId,
book_type: BookType,
depth: Option<NonZeroUsize>,
interval_ms: NonZeroUsize,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_book_snapshot_subscription(topic, handler);
let command = SubscribeCommand::BookSnapshots(SubscribeBookSnapshots {
instrument_id,
book_type,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
depth,
interval_ms,
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_trades(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<TradeTick>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_trade_subscription(topic, handler);
let command = SubscribeCommand::Trades(SubscribeTrades {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_bars(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<Bar>,
bar_type: BarType,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_bar_subscription(topic, handler);
let command = SubscribeCommand::Bars(SubscribeBars {
bar_type,
client_id,
venue: Some(bar_type.instrument_id().venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_mark_prices(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<MarkPriceUpdate>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_mark_price_subscription(topic, handler);
let command = SubscribeCommand::MarkPrices(SubscribeMarkPrices {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_index_prices(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<IndexPriceUpdate>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_index_price_subscription(topic, handler);
let command = SubscribeCommand::IndexPrices(SubscribeIndexPrices {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_funding_rates(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<FundingRateUpdate>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_funding_rate_subscription(topic, handler);
let command = SubscribeCommand::FundingRates(SubscribeFundingRates {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_option_greeks(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OptionGreeks>,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_option_greeks_subscription(topic, handler);
let command = SubscribeCommand::OptionGreeks(SubscribeOptionGreeks {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_instrument_status(
&mut self,
topic: MStr<Topic>,
handler: ShareableMessageHandler,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_subscription_any(topic, handler);
let command = SubscribeCommand::InstrumentStatus(SubscribeInstrumentStatus {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_instrument_close(
&mut self,
topic: MStr<Topic>,
handler: ShareableMessageHandler,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_instrument_close_subscription(topic, handler);
let command = SubscribeCommand::InstrumentClose(SubscribeInstrumentClose {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Subscribe(command));
}
#[allow(clippy::too_many_arguments)]
pub fn subscribe_option_chain(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OptionChainSlice>,
series_id: OptionSeriesId,
strike_range: StrikeRange,
snapshot_interval_ms: Option<u64>,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
self.add_option_chain_subscription(topic, handler);
let command = SubscribeCommand::OptionChain(SubscribeOptionChain::new(
series_id,
strike_range,
snapshot_interval_ms,
UUID4::new(),
self.timestamp_ns(),
client_id,
Some(series_id.venue),
params,
));
self.send_data_cmd(DataCommand::Subscribe(command));
}
pub fn subscribe_order_fills(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderEventAny>,
) {
self.check_registered();
self.add_order_event_subscription(topic, handler);
}
pub fn subscribe_order_cancels(
&mut self,
topic: MStr<Topic>,
handler: TypedHandler<OrderEventAny>,
) {
self.check_registered();
self.add_order_event_subscription(topic, handler);
}
pub fn unsubscribe_data(
&mut self,
data_type: DataType,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_custom_topic(&data_type);
self.remove_subscription_any(topic);
if client_id.is_none() {
return;
}
let command = UnsubscribeCommand::Data(UnsubscribeCustomData {
data_type,
client_id,
venue: None,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_signal(&mut self, name: &str) {
self.check_registered();
let pattern = get_signal_pattern(name);
if let Some(handler) = self.topic_handlers.remove(&pattern) {
msgbus::unsubscribe_any(pattern, &handler);
} else {
log::warn!(
"Actor {} attempted to unsubscribe from signal pattern '{pattern}' when not subscribed",
self.actor_id,
);
}
}
pub fn unsubscribe_instruments(
&mut self,
venue: Venue,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let pattern = get_instruments_pattern(venue);
self.remove_instrument_subscription(pattern);
let command = UnsubscribeCommand::Instruments(UnsubscribeInstruments {
client_id,
venue,
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_instrument(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_instrument_topic(instrument_id);
self.remove_instrument_subscription(topic.into());
let command = UnsubscribeCommand::Instrument(UnsubscribeInstrument {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_book_deltas(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_book_deltas_topic(instrument_id);
self.remove_deltas_subscription(topic);
let command = UnsubscribeCommand::BookDeltas(UnsubscribeBookDeltas {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_book_at_interval(
&mut self,
instrument_id: InstrumentId,
interval_ms: NonZeroUsize,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_book_snapshots_topic(instrument_id, interval_ms);
self.remove_book_snapshot_subscription(topic);
let command = UnsubscribeCommand::BookSnapshots(UnsubscribeBookSnapshots {
instrument_id,
interval_ms,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_quotes(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_quotes_topic(instrument_id);
self.remove_quote_subscription(topic);
let command = UnsubscribeCommand::Quotes(UnsubscribeQuotes {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_trades(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_trades_topic(instrument_id);
self.remove_trade_subscription(topic);
let command = UnsubscribeCommand::Trades(UnsubscribeTrades {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_bars(
&mut self,
bar_type: BarType,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_bars_topic(bar_type);
self.remove_bar_subscription(topic);
let command = UnsubscribeCommand::Bars(UnsubscribeBars {
bar_type,
client_id,
venue: Some(bar_type.instrument_id().venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_mark_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_mark_price_topic(instrument_id);
self.remove_mark_price_subscription(topic);
let command = UnsubscribeCommand::MarkPrices(UnsubscribeMarkPrices {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_index_prices(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_index_price_topic(instrument_id);
self.remove_index_price_subscription(topic);
let command = UnsubscribeCommand::IndexPrices(UnsubscribeIndexPrices {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_funding_rates(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_funding_rate_topic(instrument_id);
self.remove_funding_rate_subscription(topic);
let command = UnsubscribeCommand::FundingRates(UnsubscribeFundingRates {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_option_greeks(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_option_greeks_topic(instrument_id);
self.remove_option_greeks_subscription(topic);
let command = UnsubscribeCommand::OptionGreeks(UnsubscribeOptionGreeks {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_instrument_status(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_instrument_status_topic(instrument_id);
self.remove_subscription_any(topic);
let command = UnsubscribeCommand::InstrumentStatus(UnsubscribeInstrumentStatus {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_instrument_close(
&mut self,
instrument_id: InstrumentId,
client_id: Option<ClientId>,
params: Option<Params>,
) {
self.check_registered();
let topic = get_instrument_close_topic(instrument_id);
self.remove_instrument_close_subscription(topic);
let command = UnsubscribeCommand::InstrumentClose(UnsubscribeInstrumentClose {
instrument_id,
client_id,
venue: Some(instrument_id.venue),
command_id: UUID4::new(),
ts_init: self.timestamp_ns(),
correlation_id: None,
params,
});
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_option_chain(
&mut self,
series_id: OptionSeriesId,
client_id: Option<ClientId>,
) {
self.check_registered();
let topic = get_option_chain_topic(series_id);
self.remove_option_chain_subscription(topic);
let command = UnsubscribeCommand::OptionChain(UnsubscribeOptionChain::new(
series_id,
UUID4::new(),
self.timestamp_ns(),
client_id,
Some(series_id.venue),
));
self.send_data_cmd(DataCommand::Unsubscribe(command));
}
pub fn unsubscribe_order_fills(&mut self, instrument_id: InstrumentId) {
self.check_registered();
let topic = get_order_fills_topic(instrument_id);
self.remove_order_event_subscription(topic);
}
pub fn unsubscribe_order_cancels(&mut self, instrument_id: InstrumentId) {
self.check_registered();
let topic = get_order_cancels_topic(instrument_id);
self.remove_order_event_subscription(topic);
}
#[expect(clippy::too_many_arguments)]
pub fn request_data(
&self,
data_type: DataType,
client_id: ClientId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Data(RequestCustomData {
client_id,
data_type,
start,
end,
limit,
request_id,
ts_init: self.timestamp_ns(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
pub fn request_instrument(
&self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Instrument(RequestInstrument {
instrument_id,
start,
end,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
pub fn request_instruments(
&self,
venue: Option<Venue>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Instruments(RequestInstruments {
venue,
start,
end,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
pub fn request_book_snapshot(
&self,
instrument_id: InstrumentId,
depth: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let request_id = UUID4::new();
let command = RequestCommand::BookSnapshot(RequestBookSnapshot {
instrument_id,
depth,
client_id,
request_id,
ts_init: self.timestamp_ns(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
#[expect(clippy::too_many_arguments)]
pub fn request_quotes(
&self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Quotes(RequestQuotes {
instrument_id,
start,
end,
limit,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
#[expect(clippy::too_many_arguments)]
pub fn request_trades(
&self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Trades(RequestTrades {
instrument_id,
start,
end,
limit,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
#[expect(clippy::too_many_arguments)]
pub fn request_funding_rates(
&self,
instrument_id: InstrumentId,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::FundingRates(RequestFundingRates {
instrument_id,
start,
end,
limit,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
#[expect(clippy::too_many_arguments)]
pub fn request_bars(
&self,
bar_type: BarType,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
limit: Option<NonZeroUsize>,
client_id: Option<ClientId>,
params: Option<Params>,
handler: ShareableMessageHandler,
) -> anyhow::Result<UUID4> {
self.check_registered();
let now = self.clock_ref().utc_now();
check_timestamps(now, start, end)?;
let request_id = UUID4::new();
let command = RequestCommand::Bars(RequestBars {
bar_type,
start,
end,
limit,
client_id,
request_id,
ts_init: now.into(),
params,
});
get_message_bus()
.borrow_mut()
.register_response_handler(command.request_id(), handler)?;
self.send_data_cmd(DataCommand::Request(command));
Ok(request_id)
}
#[cfg(test)]
pub fn quote_handler_count(&self) -> usize {
self.quote_handlers.len()
}
#[cfg(test)]
pub fn trade_handler_count(&self) -> usize {
self.trade_handlers.len()
}
#[cfg(test)]
pub fn bar_handler_count(&self) -> usize {
self.bar_handlers.len()
}
#[cfg(test)]
pub fn deltas_handler_count(&self) -> usize {
self.deltas_handlers.len()
}
#[cfg(test)]
pub fn has_quote_handler(&self, topic: &str) -> bool {
self.quote_handlers
.contains_key(&MStr::<Topic>::from(topic))
}
#[cfg(test)]
pub fn has_trade_handler(&self, topic: &str) -> bool {
self.trade_handlers
.contains_key(&MStr::<Topic>::from(topic))
}
#[cfg(test)]
pub fn has_bar_handler(&self, topic: &str) -> bool {
self.bar_handlers.contains_key(&MStr::<Topic>::from(topic))
}
#[cfg(test)]
pub fn has_deltas_handler(&self, topic: &str) -> bool {
self.deltas_handlers
.contains_key(&MStr::<Topic>::from(topic))
}
}
fn check_timestamps(
now: DateTime<Utc>,
start: Option<DateTime<Utc>>,
end: Option<DateTime<Utc>>,
) -> anyhow::Result<()> {
if let Some(start) = start {
check_predicate_true(start <= now, "start was > now")?;
}
if let Some(end) = end {
check_predicate_true(end <= now, "end was > now")?;
}
if let (Some(start), Some(end)) = (start, end) {
check_predicate_true(start < end, "start was >= end")?;
}
Ok(())
}
fn log_error(e: &anyhow::Error) {
log::error!("{e}");
}
fn log_not_running<T>(msg: &T)
where
T: Debug,
{
log::trace!("Received message when not running - skipping {msg:?}");
}
fn log_received<T>(msg: &T)
where
T: Debug,
{
log::debug!("{RECV} {msg:?}");
}