use std::{collections::HashMap, fmt::Debug};
use async_trait::async_trait;
use nautilus_common::providers::{InstrumentProvider, InstrumentStore};
use nautilus_core::time::get_atomic_clock_realtime;
use nautilus_model::{
identifiers::InstrumentId,
instruments::{Instrument, InstrumentAny},
};
use crate::{
common::{
consts::DERIVE_VENUE, enums::DeriveInstrumentType, parse::parse_derive_instrument_any,
},
http::{DeriveHttpClient, error::DeriveHttpError, models::DeriveInstrument},
};
const INSTRUMENT_NOT_FOUND_CODE: i64 = 12001;
pub struct DeriveInstrumentProvider {
store: InstrumentStore,
http_client: DeriveHttpClient,
currencies: Vec<String>,
include_expired: bool,
}
impl Debug for DeriveInstrumentProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct(stringify!(DeriveInstrumentProvider))
.field("store", &self.store)
.field("http_client", &self.http_client)
.field("currencies", &self.currencies)
.field("include_expired", &self.include_expired)
.finish()
}
}
impl DeriveInstrumentProvider {
#[must_use]
pub fn new(http_client: DeriveHttpClient, currencies: Vec<String>) -> Self {
Self {
store: InstrumentStore::new(),
http_client,
currencies,
include_expired: false,
}
}
#[must_use]
pub fn with_expired(
http_client: DeriveHttpClient,
currencies: Vec<String>,
include_expired: bool,
) -> Self {
Self {
store: InstrumentStore::new(),
http_client,
currencies,
include_expired,
}
}
#[must_use]
pub fn currencies(&self) -> &[String] {
&self.currencies
}
#[must_use]
pub const fn include_expired(&self) -> bool {
self.include_expired
}
#[must_use]
pub const fn http_client(&self) -> &DeriveHttpClient {
&self.http_client
}
pub fn add_instruments(&mut self, instruments: Vec<InstrumentAny>) {
self.store.add_bulk(instruments);
}
async fn fetch_instruments(
&self,
currencies: &[String],
expired: bool,
) -> anyhow::Result<Vec<InstrumentAny>> {
let mut instruments = Vec::new();
for currency in currencies {
let definitions =
fetch_instrument_definitions(&self.http_client, currency, expired).await?;
instruments.extend(parse_instrument_definitions(definitions)?);
}
Ok(instruments)
}
}
#[async_trait(?Send)]
impl InstrumentProvider for DeriveInstrumentProvider {
fn store(&self) -> &InstrumentStore {
&self.store
}
fn store_mut(&mut self) -> &mut InstrumentStore {
&mut self.store
}
async fn load_all(&mut self, filters: Option<&HashMap<String, String>>) -> anyhow::Result<()> {
let (currencies, expired) =
resolve_load_filters(&self.currencies, self.include_expired, filters)?;
let instruments = self.fetch_instruments(¤cies, expired).await?;
self.store.clear();
self.add_instruments(instruments);
self.store.set_initialized();
Ok(())
}
async fn load_ids(
&mut self,
instrument_ids: &[InstrumentId],
filters: Option<&HashMap<String, String>>,
) -> anyhow::Result<()> {
let missing: Vec<_> = instrument_ids
.iter()
.filter(|id| !self.store.contains(id))
.collect();
if missing.is_empty() {
return Ok(());
}
let expired = resolve_expired_filter(self.include_expired, filters)?;
let mut currencies: Vec<String> = missing
.iter()
.filter_map(|id| currency_from_instrument_id(id).map(ToOwned::to_owned))
.collect();
currencies.sort();
currencies.dedup();
if !currencies.is_empty() {
let instruments = self.fetch_instruments(¤cies, expired).await?;
self.add_instruments(instruments);
}
if missing.iter().all(|id| self.store.contains(id)) {
return Ok(());
}
if !self.store.is_initialized() {
let existing = self.store.get_all().values().cloned().collect::<Vec<_>>();
self.load_all(filters).await?;
for instrument in existing {
if !self.store.contains(&instrument.id()) {
self.store.add(instrument);
}
}
}
let missing_ids: Vec<_> = instrument_ids
.iter()
.filter(|id| !self.store.contains(id))
.collect();
if missing_ids.is_empty() {
Ok(())
} else {
anyhow::bail!("Derive instruments not found: {missing_ids:?}")
}
}
async fn load(
&mut self,
instrument_id: &InstrumentId,
filters: Option<&HashMap<String, String>>,
) -> anyhow::Result<()> {
if self.store.contains(instrument_id) {
return Ok(());
}
self.load_ids(&[*instrument_id], filters).await
}
}
pub(crate) fn parse_instrument_definitions(
definitions: Vec<DeriveInstrument>,
) -> anyhow::Result<Vec<InstrumentAny>> {
let ts_init = get_atomic_clock_realtime().get_time_ns();
let mut instruments = Vec::with_capacity(definitions.len());
for definition in definitions {
if let Some(instrument) = parse_derive_instrument_any(&definition, ts_init)? {
instruments.push(instrument);
}
}
Ok(instruments)
}
pub(crate) async fn fetch_instrument_definitions(
http_client: &DeriveHttpClient,
currency: &str,
expired: bool,
) -> anyhow::Result<Vec<DeriveInstrument>> {
let (mut definitions, options, erc20s) = tokio::try_join!(
http_client.get_instruments(currency, DeriveInstrumentType::Perp, expired),
http_client.get_instruments(currency, DeriveInstrumentType::Option, expired),
fetch_erc20_instruments(http_client, currency, expired),
)?;
definitions.extend(options);
definitions.extend(erc20s);
Ok(definitions)
}
async fn fetch_erc20_instruments(
http_client: &DeriveHttpClient,
currency: &str,
expired: bool,
) -> Result<Vec<DeriveInstrument>, DeriveHttpError> {
match http_client
.get_instruments(currency, DeriveInstrumentType::Erc20, expired)
.await
{
Ok(rows) => Ok(rows),
Err(DeriveHttpError::JsonRpc { code, .. }) if code == INSTRUMENT_NOT_FOUND_CODE => {
Ok(Vec::new())
}
Err(e) => Err(e),
}
}
fn resolve_load_filters(
default_currencies: &[String],
default_expired: bool,
filters: Option<&HashMap<String, String>>,
) -> anyhow::Result<(Vec<String>, bool)> {
let currencies = filters
.and_then(|map| {
map.get("currency")
.map(|currency| vec![currency.trim().to_string()])
.or_else(|| map.get("currencies").map(|value| split_currencies(value)))
})
.unwrap_or_else(|| default_currencies.to_vec());
let currencies = normalize_currencies(currencies);
anyhow::ensure!(
!currencies.is_empty(),
"DeriveInstrumentProvider requires at least one currency",
);
let expired = resolve_expired_filter(default_expired, filters)?;
Ok((currencies, expired))
}
fn resolve_expired_filter(
default_expired: bool,
filters: Option<&HashMap<String, String>>,
) -> anyhow::Result<bool> {
filters
.and_then(|map| map.get("expired"))
.map(|value| value.parse::<bool>())
.transpose()
.map_err(|e| anyhow::anyhow!("invalid Derive `expired` filter: {e}"))
.map(|value| value.unwrap_or(default_expired))
}
fn split_currencies(value: &str) -> Vec<String> {
normalize_currencies(value.split(',').map(ToOwned::to_owned).collect())
}
fn normalize_currencies(currencies: Vec<String>) -> Vec<String> {
let mut currencies: Vec<_> = currencies
.into_iter()
.map(|currency| currency.trim().to_string())
.filter(|currency| !currency.is_empty())
.collect();
currencies.sort();
currencies.dedup();
currencies
}
fn currency_from_instrument_id(instrument_id: &InstrumentId) -> Option<&str> {
if instrument_id.venue != *DERIVE_VENUE {
return None;
}
instrument_id
.symbol
.as_str()
.split_once('-')
.and_then(|(currency, _)| (!currency.is_empty()).then_some(currency))
}