use std::collections::HashMap;
use std::time::{Duration, Instant};
use crate::core::Instrument;
use crate::param::{AccountGroupId, AccountId, DEFAULT_ACCOUNT_GROUP};
use super::builder::MarketDataSync;
use super::error::{
AlreadyRegistered, MarketDataError, PushForError, RegistrationError, UnknownInstrumentId,
};
use super::instrument_id::InstrumentId;
use super::internals::{QuoteState, Slot, SlotQuotes, SlotTtls};
use super::lock::{MarketDataLock, ServiceTtlGate};
use super::quote::{Quote, QuoteTtl};
use super::resolution::{AccountInfo, QuoteResolution};
use super::ttl::TtlSetting;
pub(crate) struct InstrumentRegistry {
by_id: HashMap<InstrumentId, u32>,
by_instrument: HashMap<Instrument, InstrumentId>,
next_auto_id: u64,
next_slot: u32,
}
impl InstrumentRegistry {
pub(crate) fn new() -> Self {
Self {
by_id: HashMap::new(),
by_instrument: HashMap::new(),
next_slot: 0,
next_auto_id: 0,
}
}
fn alloc_slot(&mut self) -> u32 {
let slot = self.next_slot;
self.next_slot += 1;
slot
}
fn next_auto_id(&mut self) -> InstrumentId {
loop {
let candidate = InstrumentId(self.next_auto_id);
self.next_auto_id += 1;
if !self.by_id.contains_key(&candidate) {
return candidate;
}
}
}
fn insert_id(&mut self, instrument_id: InstrumentId, slot: u32) {
self.by_id.insert(instrument_id, slot);
if self.next_auto_id == instrument_id.0 {
self.next_auto_id += 1;
}
}
}
pub struct MarketDataService<Sync: MarketDataSync> {
pub(crate) default_ttl: Option<Duration>,
pub(crate) account_ttl: Sync::Lock<HashMap<AccountId, TtlSetting>>,
pub(crate) group_ttl: Sync::Lock<HashMap<AccountGroupId, TtlSetting>>,
pub(crate) registry: Sync::Lock<InstrumentRegistry>,
pub(crate) slots: Sync::Lock<Vec<Slot<Sync>>>,
pub(crate) sync: Sync,
pub(crate) has_service_level_ttl: Sync::Gate,
}
impl<Sync: MarketDataSync> MarketDataService<Sync> {
pub fn register(&self, instrument: Instrument) -> Result<InstrumentId, AlreadyRegistered> {
self.register_inner(instrument, None)
}
pub fn register_with_ttl(
&self,
instrument: Instrument,
ttl: QuoteTtl,
) -> Result<InstrumentId, AlreadyRegistered> {
self.register_inner(instrument, Some(TtlSetting::from_quote_ttl(ttl)))
}
fn register_inner(
&self,
instrument: Instrument,
instrument_ttl: Option<TtlSetting>,
) -> Result<InstrumentId, AlreadyRegistered> {
{
let guard = self.registry.read();
if guard.by_instrument.contains_key(&instrument) {
return Err(AlreadyRegistered { instrument });
}
}
let mut reg = self.registry.write();
if reg.by_instrument.contains_key(&instrument) {
return Err(AlreadyRegistered { instrument });
}
let instrument_id = reg.next_auto_id();
let slot_idx = reg.alloc_slot();
reg.by_instrument.insert(instrument, instrument_id);
reg.insert_id(instrument_id, slot_idx);
self.ensure_slot_storage(slot_idx, instrument_ttl);
Ok(instrument_id)
}
pub fn register_with_id(
&self,
instrument: Instrument,
instrument_id: InstrumentId,
) -> Result<InstrumentId, RegistrationError> {
self.register_with_id_inner(instrument, instrument_id, None)
}
pub fn register_with_id_and_ttl(
&self,
instrument: Instrument,
instrument_id: InstrumentId,
ttl: QuoteTtl,
) -> Result<InstrumentId, RegistrationError> {
self.register_with_id_inner(
instrument,
instrument_id,
Some(TtlSetting::from_quote_ttl(ttl)),
)
}
fn register_with_id_inner(
&self,
instrument: Instrument,
instrument_id: InstrumentId,
instrument_ttl: Option<TtlSetting>,
) -> Result<InstrumentId, RegistrationError> {
let mut reg = self.registry.write();
if reg.by_instrument.contains_key(&instrument) {
return Err(RegistrationError::DuplicateInstrument { instrument });
}
if reg.by_id.contains_key(&instrument_id) {
return Err(RegistrationError::DuplicateId { instrument_id });
}
let slot_idx = reg.alloc_slot();
reg.by_instrument.insert(instrument, instrument_id);
reg.insert_id(instrument_id, slot_idx);
self.ensure_slot_storage(slot_idx, instrument_ttl);
Ok(instrument_id)
}
pub fn set_account_ttl(&self, account_id: AccountId, ttl: QuoteTtl) {
self.account_ttl
.write()
.insert(account_id, TtlSetting::from_quote_ttl(ttl));
self.has_service_level_ttl.mark_present();
}
pub fn clear_account_ttl(&self, account_id: AccountId) {
self.account_ttl.write().remove(&account_id);
}
pub fn set_account_group_ttl(&self, account_group_id: AccountGroupId, ttl: QuoteTtl) {
self.group_ttl
.write()
.insert(account_group_id, TtlSetting::from_quote_ttl(ttl));
self.has_service_level_ttl.mark_present();
}
pub fn clear_account_group_ttl(&self, account_group_id: AccountGroupId) {
self.group_ttl.write().remove(&account_group_id);
}
pub fn set_instrument_ttl(
&self,
instrument_id: InstrumentId,
ttl: QuoteTtl,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.instrument = Some(TtlSetting::from_quote_ttl(ttl));
})
}
pub fn clear_instrument_ttl(
&self,
instrument_id: InstrumentId,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.instrument = None;
})
}
pub fn set_instrument_account_ttl(
&self,
instrument_id: InstrumentId,
account_id: AccountId,
ttl: QuoteTtl,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.accounts
.insert(account_id, TtlSetting::from_quote_ttl(ttl));
})
}
pub fn clear_instrument_account_ttl(
&self,
instrument_id: InstrumentId,
account_id: AccountId,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.accounts.remove(&account_id);
})
}
pub fn set_instrument_account_group_ttl(
&self,
instrument_id: InstrumentId,
account_group_id: AccountGroupId,
ttl: QuoteTtl,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.groups
.insert(account_group_id, TtlSetting::from_quote_ttl(ttl));
})
}
pub fn clear_instrument_account_group_ttl(
&self,
instrument_id: InstrumentId,
account_group_id: AccountGroupId,
) -> Result<(), UnknownInstrumentId> {
self.with_slot_ttls(instrument_id, |ttls| {
ttls.groups.remove(&account_group_id);
})
}
fn with_slot_ttls(
&self,
instrument_id: InstrumentId,
mutate: impl FnOnce(&mut SlotTtls),
) -> Result<(), UnknownInstrumentId> {
let slot_idx = {
let guard = self.registry.read();
guard
.by_id
.get(&instrument_id)
.copied()
.ok_or(UnknownInstrumentId { instrument_id })?
};
let guard = self.slots.read();
if let Some(slot) = guard.get(slot_idx as usize) {
let mut ttls = slot.ttls.write();
mutate(&mut ttls);
}
Ok(())
}
pub fn clear(&self, instrument_id: InstrumentId) {
let slot_idx = {
let guard = self.registry.read();
match guard.by_id.get(&instrument_id).copied() {
Some(idx) => idx,
None => return,
}
};
let guard = self.slots.read();
if let Some(slot) = guard.get(slot_idx as usize) {
let mut quotes = slot.quotes.write();
quotes.accounts.clear();
quotes.groups.clear();
}
}
pub fn push(
&self,
instrument_id: InstrumentId,
quote: Quote,
) -> Result<(), UnknownInstrumentId> {
self.store_default_by_id(instrument_id, |_prev| quote)
}
pub fn push_patch(
&self,
instrument_id: InstrumentId,
quote: Quote,
) -> Result<(), UnknownInstrumentId> {
self.store_default_by_id(instrument_id, |prev| {
prev.unwrap_or_default().patched_with(quote)
})
}
fn store_default_by_id(
&self,
instrument_id: InstrumentId,
build: impl FnOnce(Option<Quote>) -> Quote,
) -> Result<(), UnknownInstrumentId> {
let slot_idx = self.slot_for_id(instrument_id)?;
let now = Instant::now();
let guard = self.slots.read();
if let Some(slot) = guard.get(slot_idx as usize) {
let mut quotes = slot.quotes.write();
store_into(&mut quotes.groups, DEFAULT_ACCOUNT_GROUP, now, build);
}
Ok(())
}
pub fn push_for(
&self,
instrument_id: InstrumentId,
quote: Quote,
account_ids: &[AccountId],
account_group_ids: &[AccountGroupId],
) -> Result<(), PushForError> {
self.store_for(instrument_id, account_ids, account_group_ids, |_prev| quote)
}
pub fn push_for_patch(
&self,
instrument_id: InstrumentId,
quote: Quote,
account_ids: &[AccountId],
account_group_ids: &[AccountGroupId],
) -> Result<(), PushForError> {
self.store_for(instrument_id, account_ids, account_group_ids, |prev| {
prev.unwrap_or_default().patched_with(quote)
})
}
fn store_for(
&self,
instrument_id: InstrumentId,
account_ids: &[AccountId],
account_group_ids: &[AccountGroupId],
build: impl Fn(Option<Quote>) -> Quote,
) -> Result<(), PushForError> {
if account_ids.is_empty() && account_group_ids.is_empty() {
return Err(PushForError::NoTarget);
}
let slot_idx = {
let guard = self.registry.read();
guard
.by_id
.get(&instrument_id)
.copied()
.ok_or(PushForError::UnknownInstrument { instrument_id })?
};
let now = Instant::now();
let guard = self.slots.read();
if let Some(slot) = guard.get(slot_idx as usize) {
let mut quotes = slot.quotes.write();
for &account_id in account_ids {
store_into(&mut quotes.accounts, account_id, now, &build);
}
for &account_group_id in account_group_ids {
store_into(&mut quotes.groups, account_group_id, now, &build);
}
}
Ok(())
}
pub fn push_by_instrument(&self, instrument: &Instrument, quote: Quote) -> InstrumentId {
let (instrument_id, slot_idx) = self.resolve_or_register_named(instrument);
self.store_default_at(slot_idx, |_prev| quote);
instrument_id
}
pub fn push_by_instrument_patch(&self, instrument: &Instrument, quote: Quote) -> InstrumentId {
let (instrument_id, slot_idx) = self.resolve_or_register_named(instrument);
self.store_default_at(slot_idx, |prev| {
prev.unwrap_or_default().patched_with(quote)
});
instrument_id
}
pub fn get(
&self,
instrument_id: InstrumentId,
account_id: AccountId,
account_info: &impl AccountInfo,
resolution: QuoteResolution,
) -> Result<Quote, MarketDataError> {
let slot_idx = {
let guard = self.registry.read();
match guard.by_id.get(&instrument_id).copied() {
Some(idx) => idx,
None => return Err(MarketDataError::UnknownInstrument),
}
};
let guard = self.slots.read();
let slot = guard
.get(slot_idx as usize)
.ok_or(MarketDataError::QuoteUnavailable)?;
let mut group_cell: Option<Option<AccountGroupId>> = None;
let selected = {
let quotes = slot.quotes.read();
select_quote(
"es,
account_id,
account_info,
resolution,
&mut group_cell,
)
.ok_or(MarketDataError::QuoteUnavailable)?
};
let effective_ttl = {
let ttls = slot.ttls.read();
self.effective_ttl(&ttls, account_id, account_info, &mut group_cell)
};
if let Some(ttl) = effective_ttl {
if Instant::now().saturating_duration_since(selected.pushed_at) >= ttl {
return Err(MarketDataError::QuoteExpired(selected.quote));
}
}
Ok(selected.quote)
}
fn effective_ttl(
&self,
ttls: &SlotTtls,
account_id: AccountId,
account_info: &impl AccountInfo,
group_cell: &mut Option<Option<AccountGroupId>>,
) -> Option<Duration> {
if let Some(setting) = ttls.accounts.get(&account_id) {
return setting.as_duration();
}
if let Some(group) = resolve_group(account_info, group_cell) {
if let Some(setting) = ttls.groups.get(&group) {
return setting.as_duration();
}
}
if let Some(setting) = ttls.groups.get(&DEFAULT_ACCOUNT_GROUP) {
return setting.as_duration();
}
if self.has_service_level_ttl.is_possibly_present() {
if let Some(setting) = self.account_ttl.read().get(&account_id) {
return setting.as_duration();
}
let service_group_ttl = self.group_ttl.read();
if let Some(group) = resolve_group(account_info, group_cell) {
if let Some(setting) = service_group_ttl.get(&group) {
return setting.as_duration();
}
}
if let Some(setting) = service_group_ttl.get(&DEFAULT_ACCOUNT_GROUP) {
return setting.as_duration();
}
}
if let Some(setting) = ttls.instrument {
return setting.as_duration();
}
self.default_ttl
}
pub fn resolve(&self, instrument: &Instrument) -> Option<InstrumentId> {
self.registry.read().by_instrument.get(instrument).copied()
}
fn slot_for_id(&self, instrument_id: InstrumentId) -> Result<u32, UnknownInstrumentId> {
let guard = self.registry.read();
guard
.by_id
.get(&instrument_id)
.copied()
.ok_or(UnknownInstrumentId { instrument_id })
}
fn resolve_or_register_named(&self, instrument: &Instrument) -> (InstrumentId, u32) {
{
let guard = self.registry.read();
if let Some(&instrument_id) = guard.by_instrument.get(instrument) {
let slot_idx = guard.by_id[&instrument_id];
return (instrument_id, slot_idx);
}
}
let mut reg = self.registry.write();
if let Some(&instrument_id) = reg.by_instrument.get(instrument) {
let slot_idx = reg.by_id[&instrument_id];
return (instrument_id, slot_idx);
}
let instrument_id = reg.next_auto_id();
let slot_idx = reg.alloc_slot();
reg.by_instrument.insert(instrument.clone(), instrument_id);
reg.insert_id(instrument_id, slot_idx);
self.ensure_slot_storage(slot_idx, None);
(instrument_id, slot_idx)
}
fn store_default_at(&self, slot_idx: u32, build: impl FnOnce(Option<Quote>) -> Quote) {
let now = Instant::now();
let guard = self.slots.read();
if let Some(slot) = guard.get(slot_idx as usize) {
let mut quotes = slot.quotes.write();
store_into(&mut quotes.groups, DEFAULT_ACCOUNT_GROUP, now, build);
}
}
fn ensure_slot_storage(&self, slot_idx: u32, instrument_ttl: Option<TtlSetting>) {
let mut slots = self.slots.write();
debug_assert_eq!(
slot_idx as usize,
slots.len(),
"slot index must match slots.len()"
);
slots.push(Slot::new(&self.sync, instrument_ttl));
}
}
fn store_into<BucketKey: std::hash::Hash + Eq>(
map: &mut HashMap<BucketKey, QuoteState>,
key: BucketKey,
now: Instant,
build: impl FnOnce(Option<Quote>) -> Quote,
) {
let prev = map.get(&key).map(|state| state.quote);
map.insert(
key,
QuoteState {
quote: build(prev),
pushed_at: now,
},
);
}
fn resolve_group(
account_info: &impl AccountInfo,
group_cell: &mut Option<Option<AccountGroupId>>,
) -> Option<AccountGroupId> {
*group_cell.get_or_insert_with(|| account_info.group())
}
fn select_quote(
quotes: &SlotQuotes,
account_id: AccountId,
account_info: &impl AccountInfo,
resolution: QuoteResolution,
group_cell: &mut Option<Option<AccountGroupId>>,
) -> Option<QuoteState> {
if let Some(state) = quotes.accounts.get(&account_id) {
return Some(*state);
}
match resolution {
QuoteResolution::AccountOnly => None,
QuoteResolution::AccountThenGroup => resolve_group(account_info, group_cell)
.and_then(|group| quotes.groups.get(&group).copied()),
QuoteResolution::AccountThenGroupThenDefault => {
if let Some(group) = resolve_group(account_info, group_cell) {
if let Some(state) = quotes.groups.get(&group) {
return Some(*state);
}
}
quotes.groups.get(&DEFAULT_ACCOUNT_GROUP).copied()
}
}
}