use std::{sync::Arc, time::Duration};
use dashmap::DashMap;
use nautilus_common::{live::get_runtime, messages::DataEvent, providers::InstrumentProvider};
use nautilus_core::{AtomicMap, UnixNanos, time::AtomicTime};
use nautilus_model::{
identifiers::InstrumentId,
instruments::{Instrument, InstrumentAny},
};
use ustr::Ustr;
use super::{PolymarketDataClient, runtime::is_instrument_expired};
use crate::{filters::InstrumentFilter, http::gamma::PolymarketGammaHttpClient};
#[derive(Clone, Copy, Debug)]
pub(crate) struct TokenMeta {
pub(crate) instrument_id: InstrumentId,
pub(crate) price_precision: u8,
pub(crate) size_precision: u8,
}
pub(crate) fn cache_instrument(
instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
instrument: &InstrumentAny,
) {
let instrument_id = instrument.id();
token_meta.insert(
Ustr::from(instrument.raw_symbol().as_str()),
TokenMeta {
instrument_id,
price_precision: instrument.price_precision(),
size_precision: instrument.size_precision(),
},
);
instruments.insert(instrument_id, instrument.clone());
}
pub(super) fn cache_instrument_if_active(
now_ns: UnixNanos,
instruments: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
instrument: &InstrumentAny,
) -> bool {
if is_instrument_expired(instrument, now_ns) {
return false;
}
cache_instrument(instruments, token_meta, instrument);
true
}
pub(super) fn cache_and_publish_instruments(
instruments_cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
now_ns: UnixNanos,
instruments: Vec<InstrumentAny>,
) -> usize {
let mut total = 0;
for instrument in instruments {
if !cache_instrument_if_active(now_ns, instruments_cache, token_meta, &instrument) {
log::debug!(
"Skipping expired instrument {} during live cache publish",
instrument.id()
);
continue;
}
let instrument_id = instrument.id();
total += 1;
if let Err(e) = data_sender.send(DataEvent::Instrument(instrument)) {
log::warn!("Failed to publish instrument {instrument_id}: {e}");
}
}
total
}
pub(super) async fn refresh_scoped_instruments(
http_client: PolymarketGammaHttpClient,
instrument_config: Option<crate::config::PolymarketInstrumentProviderConfig>,
filters: Vec<Arc<dyn InstrumentFilter>>,
instruments_cache: &Arc<AtomicMap<InstrumentId, InstrumentAny>>,
token_meta: &Arc<DashMap<Ustr, TokenMeta>>,
data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
clock: &'static AtomicTime,
) -> anyhow::Result<usize> {
let Some(instrument_config) = instrument_config else {
return Ok(0);
};
let refreshed =
crate::providers::fetch_configured_instruments(&http_client, &instrument_config, &filters)
.await?;
Ok(cache_and_publish_instruments(
instruments_cache,
token_meta,
data_sender,
clock.get_time_ns(),
refreshed,
))
}
impl PolymarketDataClient {
pub(super) async fn bootstrap_instruments(&mut self) -> anyhow::Result<()> {
self.provider.initialize(false).await?;
let total = cache_and_publish_instruments(
&self.instruments,
&self.token_meta,
&self.data_sender,
self.clock.get_time_ns(),
self.provider
.store()
.list_all()
.into_iter()
.cloned()
.collect::<Vec<_>>(),
);
log::info!("Published {total} Polymarket instruments to data engine");
Ok(())
}
pub(super) fn spawn_instrument_refresh_task(&mut self) {
let Some(interval_mins) = self.config.update_instruments_interval_mins else {
return;
};
if interval_mins == 0 || self.config.instrument_config.is_none() {
return;
}
let interval = Duration::from_secs(interval_mins.saturating_mul(60));
let cancellation = self.cancellation_token.clone();
let http_client = self.provider.http_client().clone();
let instrument_config = self.config.instrument_config.clone();
let filters = self.provider.filters();
let instruments_cache = self.instruments.clone();
let token_meta = self.token_meta.clone();
let data_sender = self.data_sender.clone();
let clock = self.clock;
let handle = get_runtime().spawn(async move {
log::debug!("Polymarket instrument refresh task started");
loop {
tokio::select! {
() = tokio::time::sleep(interval) => {}
() = cancellation.cancelled() => {
log::debug!("Polymarket instrument refresh task cancelled");
break;
}
}
match refresh_scoped_instruments(
http_client.clone(),
instrument_config.clone(),
filters.clone(),
&instruments_cache,
&token_meta,
&data_sender,
clock,
)
.await
{
Ok(total) => {
if total > 0 {
log::info!(
"Refreshed {total} Polymarket instruments into the live cache"
);
}
}
Err(e) => {
log::error!("Failed to refresh Polymarket instruments: {e}");
}
}
}
log::debug!("Polymarket instrument refresh task ended");
});
self.tasks.push(handle);
}
}
#[cfg(test)]
mod tests {
use nautilus_core::UnixNanos;
use nautilus_model::{
enums::AssetClass,
identifiers::Symbol,
instruments::BinaryOption,
types::{Currency, Price, Quantity},
};
use rstest::rstest;
use super::*;
fn stub_instrument(
raw_symbol: &str,
price_increment: Price,
size_increment: Quantity,
) -> InstrumentAny {
let price_precision = price_increment.precision;
let size_precision = size_increment.precision;
InstrumentAny::BinaryOption(BinaryOption::new(
InstrumentId::from(format!("{raw_symbol}.POLYMARKET").as_str()),
Symbol::new(raw_symbol),
AssetClass::Alternative,
Currency::pUSD(),
UnixNanos::default(),
UnixNanos::from(u64::MAX),
price_precision,
size_precision,
price_increment,
size_increment,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
None,
UnixNanos::default(),
UnixNanos::default(),
))
}
#[rstest]
#[case::p3_s2("token-a", Price::from("0.001"), Quantity::from("0.01"))]
#[case::p5_s4("token-b", Price::from("0.00001"), Quantity::from("0.0001"))]
fn cache_instrument_writes_both_maps(
#[case] raw_symbol: &str,
#[case] price_increment: Price,
#[case] size_increment: Quantity,
) {
let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
let inst = stub_instrument(raw_symbol, price_increment, size_increment);
let expected_id = inst.id();
let expected_token = Ustr::from(raw_symbol);
let expected_price_precision = price_increment.precision;
let expected_size_precision = size_increment.precision;
cache_instrument(&instruments, &token_meta, &inst);
let loaded = instruments.load();
let cached = loaded
.get(&expected_id)
.expect("instrument inserted into live cache");
assert_eq!(cached.id(), expected_id);
assert_eq!(cached.raw_symbol().as_str(), raw_symbol);
let meta = token_meta
.get(&expected_token)
.expect("token_meta inserted for raw_symbol");
assert_eq!(meta.instrument_id, expected_id);
assert_eq!(meta.price_precision, expected_price_precision);
assert_eq!(meta.size_precision, expected_size_precision);
}
#[rstest]
fn cache_instrument_overwrites_precisions_on_second_call() {
let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
let raw_symbol = "token-overwrite";
let first = stub_instrument(raw_symbol, Price::from("0.01"), Quantity::from("0.1"));
cache_instrument(&instruments, &token_meta, &first);
let second = stub_instrument(raw_symbol, Price::from("0.0001"), Quantity::from("0.001"));
cache_instrument(&instruments, &token_meta, &second);
let meta = token_meta
.get(&Ustr::from(raw_symbol))
.expect("token_meta present after overwrite");
assert_eq!(meta.price_precision, 4);
assert_eq!(meta.size_precision, 3);
assert_eq!(token_meta.len(), 1);
assert_eq!(instruments.load().len(), 1);
}
#[rstest]
fn cache_instrument_maintains_dual_cache_invariant() {
let instruments: Arc<AtomicMap<InstrumentId, InstrumentAny>> = Arc::new(AtomicMap::new());
let token_meta: Arc<DashMap<Ustr, TokenMeta>> = Arc::new(DashMap::new());
let samples = [
stub_instrument("token-1", Price::from("0.001"), Quantity::from("0.01")),
stub_instrument("token-2", Price::from("0.0001"), Quantity::from("0.01")),
stub_instrument("token-3", Price::from("0.00001"), Quantity::from("0.001")),
];
for inst in &samples {
cache_instrument(&instruments, &token_meta, inst);
}
let loaded = instruments.load();
assert_eq!(loaded.len(), samples.len());
for inst in loaded.values() {
let token_id = Ustr::from(inst.raw_symbol().as_str());
let meta = token_meta
.get(&token_id)
.unwrap_or_else(|| panic!("missing token_meta for {token_id}"));
assert_eq!(meta.instrument_id, inst.id());
}
}
}