use std::{
cell::{Ref, RefCell},
num::NonZeroUsize,
rc::Rc,
};
use indexmap::IndexMap;
use nautilus_common::{
cache::Cache,
msgbus::{self, Handler, MStr, Topic},
timer::TimeEvent,
};
use nautilus_model::{
data::{OrderBookDeltas, OrderBookDepth10},
identifiers::{InstrumentId, Venue},
instruments::Instrument,
};
use ustr::Ustr;
#[derive(Clone, Debug)]
pub struct BookSnapshotInfo {
pub instrument_id: InstrumentId,
pub venue: Venue,
pub is_composite: bool,
pub root: Ustr,
pub topic: MStr<Topic>,
pub interval_ms: NonZeroUsize,
}
pub(crate) type BookSnapshotInfos = Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>;
pub(crate) type BookSnapshotKey = (InstrumentId, NonZeroUsize);
pub(crate) enum BookSnapshotUnsubscribeResult {
NotSubscribed,
Decremented,
Removed,
}
#[derive(Debug)]
pub struct BookUpdater {
pub id: Ustr,
pub instrument_id: InstrumentId,
pub cache: Rc<RefCell<Cache>>,
}
impl BookUpdater {
pub fn new(instrument_id: &InstrumentId, cache: Rc<RefCell<Cache>>) -> Self {
Self {
id: Ustr::from(&format!("{}-{}", stringify!(BookUpdater), instrument_id)),
instrument_id: *instrument_id,
cache,
}
}
}
impl Handler<OrderBookDeltas> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, deltas: &OrderBookDeltas) {
if let Some(book) = self
.cache
.borrow_mut()
.order_book_mut(&deltas.instrument_id)
&& let Err(e) = book.apply_deltas(deltas)
{
log::error!("Failed to apply deltas: {e}");
}
}
}
impl Handler<OrderBookDepth10> for BookUpdater {
fn id(&self) -> Ustr {
self.id
}
fn handle(&self, depth: &OrderBookDepth10) {
if let Some(book) = self.cache.borrow_mut().order_book_mut(&depth.instrument_id)
&& let Err(e) = book.apply_depth(depth)
{
log::error!("Failed to apply depth: {e}");
}
}
}
#[derive(Debug)]
pub struct BookSnapshotter {
pub timer_name: Ustr,
pub interval_ms: NonZeroUsize,
pub snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
pub cache: Rc<RefCell<Cache>>,
}
impl BookSnapshotter {
pub fn new(
interval_ms: NonZeroUsize,
snapshot_infos: Rc<RefCell<IndexMap<InstrumentId, BookSnapshotInfo>>>,
cache: Rc<RefCell<Cache>>,
) -> Self {
let timer_name = format!("OrderBookSnapshots|{interval_ms}");
Self {
timer_name: Ustr::from(&timer_name),
interval_ms,
snapshot_infos,
cache,
}
}
pub fn snapshot(&self, _event: TimeEvent) {
let snapshot_infos: Vec<BookSnapshotInfo> =
self.snapshot_infos.borrow().values().cloned().collect();
log::debug!(
"BookSnapshotter.snapshot called for {} subscriptions at {}ms",
snapshot_infos.len(),
self.interval_ms,
);
let cache = self.cache.borrow();
for snap_info in snapshot_infos {
self.publish_snapshot(&snap_info, &cache);
}
}
fn publish_snapshot(&self, snap_info: &BookSnapshotInfo, cache: &Ref<Cache>) {
if snap_info.is_composite {
let topic = snap_info.topic;
let underlying = snap_info.root;
for instrument in cache.instruments(&snap_info.venue, Some(&underlying)) {
self.publish_order_book(&instrument.id(), topic, cache);
}
} else {
self.publish_order_book(&snap_info.instrument_id, snap_info.topic, cache);
}
}
fn publish_order_book(
&self,
instrument_id: &InstrumentId,
topic: MStr<Topic>,
cache: &Ref<Cache>,
) {
let book = cache
.order_book(instrument_id)
.unwrap_or_else(|| panic!("OrderBook for {instrument_id} was not in cache"));
if book.update_count == 0 {
log::debug!("OrderBook not yet updated for snapshot: {instrument_id}");
return;
}
log::debug!(
"Publishing OrderBook snapshot for {instrument_id} (update_count={})",
book.update_count
);
msgbus::publish_book(topic, book);
}
}