use std::{
cell::{Ref, RefCell},
num::NonZeroUsize,
rc::Rc,
};
use indexmap::IndexMap;
use nautilus_common::{
cache::Cache,
msgbus::{self, Handler, MStr, Topic, switchboard},
timer::TimeEvent,
};
use nautilus_model::{
data::{OrderBookDeltas, OrderBookDepth10, QuoteTick},
enums::InstrumentClass,
identifiers::{InstrumentId, Venue},
instruments::Instrument,
orderbook::OrderBook,
};
use ustr::Ustr;
#[derive(Clone, Debug)]
pub struct BookSnapshotInfo {
pub instrument_id: InstrumentId,
pub venue: Venue,
pub parent: Option<(Ustr, InstrumentClass)>,
pub topic: MStr<Topic>,
pub interval_ms: NonZeroUsize,
}
pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
pub(crate) enum BookSnapshotUnsubscribeResult {
NotSubscribed,
Decremented,
Removed,
}
#[derive(Debug)]
pub struct BookUpdater {
pub id: Ustr,
pub instrument_id: InstrumentId,
pub cache: Rc<RefCell<Cache>>,
pub emit_quotes_from_book: bool,
}
impl BookUpdater {
pub fn new(
instrument_id: &InstrumentId,
cache: Rc<RefCell<Cache>>,
emit_quotes_from_book: bool,
) -> Self {
Self {
id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
instrument_id: *instrument_id,
cache,
emit_quotes_from_book,
}
}
}
impl Handler<OrderBookDeltas> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, deltas: &OrderBookDeltas) {
let mut emit: Option<QuoteTick> = None;
{
let mut cache = self.cache.borrow_mut();
if let Some(book) = cache.order_book_mut(&deltas.instrument_id) {
if let Err(e) = book.apply_deltas(deltas) {
log::error!("Failed to apply deltas: {e}");
return;
}
if self.emit_quotes_from_book {
emit = derive_quote_from_book(book);
}
}
}
if let Some(quote) = emit {
publish_quote_if_changed(&self.cache, quote);
}
}
}
impl Handler<OrderBookDepth10> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, depth: &OrderBookDepth10) {
let mut emit: Option<QuoteTick> = None;
{
let mut cache = self.cache.borrow_mut();
if let Some(book) = cache.order_book_mut(&depth.instrument_id) {
if let Err(e) = book.apply_depth(depth) {
log::error!("Failed to apply depth: {e}");
return;
}
if self.emit_quotes_from_book {
emit = derive_quote_from_book(book);
}
}
}
if let Some(quote) = emit {
publish_quote_if_changed(&self.cache, quote);
}
}
}
fn derive_quote_from_book(book: &OrderBook) -> Option<QuoteTick> {
let bid_price = book.best_bid_price()?;
let ask_price = book.best_ask_price()?;
let bid_size = book.best_bid_size()?;
let ask_size = book.best_ask_size()?;
if bid_size.raw == 0 || ask_size.raw == 0 {
return None;
}
Some(QuoteTick::new(
book.instrument_id,
bid_price,
ask_price,
bid_size,
ask_size,
book.ts_last,
book.ts_last,
))
}
pub(crate) fn publish_quote_if_changed(cache: &Rc<RefCell<Cache>>, quote: QuoteTick) {
let publish = {
let cache_ref = cache.borrow();
match cache_ref.quote("e.instrument_id) {
None => true,
Some(last) => {
last.bid_price != quote.bid_price
|| last.ask_price != quote.ask_price
|| last.bid_size != quote.bid_size
|| last.ask_size != quote.ask_size
}
}
};
if !publish {
return;
}
if let Err(e) = cache.borrow_mut().add_quote(quote) {
log::error!("Error on cache insert: {e}");
}
let topic = switchboard::get_quotes_topic(quote.instrument_id);
msgbus::publish_quote(topic, "e);
}
#[derive(Debug)]
pub struct BookSnapshotter {
pub timer_name: Ustr,
pub interval_ms: NonZeroUsize,
pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
pub cache: Rc<RefCell<Cache>>,
}
impl BookSnapshotter {
pub fn new(
interval_ms: NonZeroUsize,
snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
cache: Rc<RefCell<Cache>>,
) -> Self {
let timer_name = format!("OrderBookSnapshots|{interval_ms}");
Self {
timer_name: Ustr::from(&timer_name),
interval_ms,
snapshot_infos,
cache,
}
}
pub fn snapshot(&self, _event: TimeEvent) {
let snapshot_infos: Vec<BookSnapshotInfo> =
self.snapshot_infos.borrow().values().cloned().collect();
log::debug!(
"BookSnapshotter.snapshot called for {} subscriptions at {}ms",
snapshot_infos.len(),
self.interval_ms,
);
let cache = self.cache.borrow();
for snap_info in snapshot_infos {
self.publish_snapshot(&snap_info, &cache);
}
}
fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
if let Some((root, class)) = snap_info.parent {
let topic = snap_info.topic;
for instrument in cache.instruments_by_parent(&snap_info.venue, &root, class) {
self.publish_order_book(&instrument.id(), topic, cache);
}
} else {
self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
}
}
fn publish_order_book(
&self,
instrument_id: &InstrumentId,
topic: MStr<Topic>,
cache: &Ref<Cache>,
) {
let Some(book) = cache.order_book(instrument_id) else {
log::error!("Cannot publish OrderBook snapshot: no book found for {instrument_id}");
return;
};
if book.update_count == 0 {
log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
return;
}
log::debug!(
"Publishing OrderBook snapshot for {instrument_id} (update_count={})",
book.update_count
);
msgbus::publish_book(topic, book);
}
}
#[cfg(test)]
mod tests {
use nautilus_core::{UUID4, UnixNanos};
use rstest::rstest;
use super::*;
#[rstest]
fn snapshot_skips_missing_order_book() {
let instrument_id = InstrumentId::from("AUD/USD.SIM");
let interval_ms = NonZeroUsize::new(100).unwrap();
let topic = switchboard::get_book_snapshots_topic(instrument_id, interval_ms);
let snapshot_infos = Rc::new(RefCell::new(IndexMap::new()));
snapshot_infos.borrow_mut().insert(
instrument_id,
BookSnapshotInfo {
instrument_id,
venue: Venue::new("SIM"),
parent: None,
topic,
interval_ms,
},
);
let snapshotter = BookSnapshotter::new(
interval_ms,
snapshot_infos,
Rc::new(RefCell::new(Cache::default())),
);
let event = TimeEvent::new(
Ustr::from("TEST"),
UUID4::new(),
UnixNanos::default(),
UnixNanos::default(),
);
snapshotter.snapshot(event);
}
}