use std::{
cell::{Ref, RefCell},
num::NonZeroUsize,
rc::Rc,
};
use indexmap::IndexMap;
use nautilus_common::{
cache::Cache,
msgbus::{self, Handler, MStr, Topic, switchboard},
timer::TimeEvent,
};
use nautilus_model::{
data::{OrderBookDeltas, OrderBookDepth10, QuoteTick},
enums::InstrumentClass,
identifiers::{InstrumentId, Venue},
instruments::Instrument,
orderbook::OrderBook,
};
use ustr::Ustr;
#[derive(Clone, Debug)]
pub struct BookSnapshotInfo {
pub instrument_id: InstrumentId,
pub venue: Venue,
pub parent: Option<(Ustr, InstrumentClass)>,
pub topic: MStr<Topic>,
pub interval_ms: NonZeroUsize,
}
pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
pub(crate) enum BookSnapshotUnsubscribeResult {
NotSubscribed,
Decremented,
Removed,
}
#[derive(Debug)]
pub struct BookUpdater {
pub id: Ustr,
pub instrument_id: InstrumentId,
pub cache: Rc<RefCell<Cache>>,
pub emit_quotes_from_book: bool,
}
impl BookUpdater {
pub fn new(
instrument_id: &InstrumentId,
cache: Rc<RefCell<Cache>>,
emit_quotes_from_book: bool,
) -> Self {
Self {
id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
instrument_id: *instrument_id,
cache,
emit_quotes_from_book,
}
}
}
impl Handler<OrderBookDeltas> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, deltas: &OrderBookDeltas) {
let mut emit: Option<QuoteTick> = None;
{
let mut cache = self.cache.borrow_mut();
if let Some(book) = cache.order_book_mut(&deltas.instrument_id) {
if let Err(e) = book.apply_deltas(deltas) {
log::error!("Failed to apply deltas: {e}");
return;
}
if self.emit_quotes_from_book {
emit = derive_quote_from_book(book);
}
}
}
if let Some(quote) = emit {
publish_quote_if_changed(&self.cache, quote);
}
}
}
impl Handler<OrderBookDepth10> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, depth: &OrderBookDepth10) {
let mut emit: Option<QuoteTick> = None;
{
let mut cache = self.cache.borrow_mut();
if let Some(book) = cache.order_book_mut(&depth.instrument_id) {
if let Err(e) = book.apply_depth(depth) {
log::error!("Failed to apply depth: {e}");
return;
}
if self.emit_quotes_from_book {
emit = derive_quote_from_book(book);
}
}
}
if let Some(quote) = emit {
publish_quote_if_changed(&self.cache, quote);
}
}
}
fn derive_quote_from_book(book: &OrderBook) -> Option<QuoteTick> {
let bid_price = book.best_bid_price()?;
let ask_price = book.best_ask_price()?;
let bid_size = book.best_bid_size()?;
let ask_size = book.best_ask_size()?;
if bid_size.raw == 0 || ask_size.raw == 0 {
return None;
}
Some(QuoteTick::new(
book.instrument_id,
bid_price,
ask_price,
bid_size,
ask_size,
book.ts_last,
book.ts_last,
))
}
pub(crate) fn publish_quote_if_changed(cache: &Rc<RefCell<Cache>>, quote: QuoteTick) {
let publish = {
let cache_ref = cache.borrow();
match cache_ref.quote("e.instrument_id) {
None => true,
Some(last) => {
last.bid_price != quote.bid_price
|| last.ask_price != quote.ask_price
|| last.bid_size != quote.bid_size
|| last.ask_size != quote.ask_size
}
}
};
if !publish {
return;
}
if let Err(e) = cache.borrow_mut().add_quote(quote) {
log::error!("Error on cache insert: {e}");
}
let topic = switchboard::get_quotes_topic(quote.instrument_id);
msgbus::publish_quote(topic, "e);
}
#[derive(Debug)]
pub struct BookSnapshotter {
pub timer_name: Ustr,
pub interval_ms: NonZeroUsize,
pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
pub cache: Rc<RefCell<Cache>>,
}
impl BookSnapshotter {
pub fn new(
interval_ms: NonZeroUsize,
snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
cache: Rc<RefCell<Cache>>,
) -> Self {
let timer_name = format!("OrderBookSnapshots|{interval_ms}");
Self {
timer_name: Ustr::from(&timer_name),
interval_ms,
snapshot_infos,
cache,
}
}
pub fn snapshot(&self, _event: TimeEvent) {
let snapshot_infos: Vec<BookSnapshotInfo> =
self.snapshot_infos.borrow().values().cloned().collect();
log::debug!(
"BookSnapshotter.snapshot called for {} subscriptions at {}ms",
snapshot_infos.len(),
self.interval_ms,
);
let cache = self.cache.borrow();
for snap_info in snapshot_infos {
self.publish_snapshot(&snap_info, &cache);
}
}
fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
if let Some((root, class)) = snap_info.parent {
let topic = snap_info.topic;
for instrument in cache.instruments_by_parent(&snap_info.venue, &root, class) {
self.publish_order_book(&instrument.id(), topic, cache);
}
} else {
self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
}
}
fn publish_order_book(
&self,
instrument_id: &InstrumentId,
topic: MStr<Topic>,
cache: &Ref<Cache>,
) {
let book = cache
.order_book(instrument_id)
.unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
if book.update_count == 0 {
log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
return;
}
log::debug!(
"Publishing OrderBook snapshot for {instrument_id} (update_count={})",
book.update_count
);
msgbus::publish_book(topic, book);
}
}