pub mod query;
pub use crate::symbology::query::{DateQ, Query};
use crate::{
packed_value, pool,
protocol::symbology::{
self as protocol, OptionDir, OptionStyle, SymbologyUpdateKind, TradeFlags,
},
};
use anyhow::{anyhow, bail, Result};
use arc_swap::ArcSwap;
use bytes::{Buf, BufMut, BytesMut};
use chrono::prelude::*;
use enumflags2::{BitFlag, BitFlags, _internal::RawBitFlags};
use fxhash::{FxHashMap, FxHashSet};
use immutable_chunkmap::{map::MapM as Map, set};
use netidx::{
pack::{Pack, PackError},
path::Path,
pool::Pooled,
};
use once_cell::sync::Lazy;
use paste::paste;
use portable_atomic::{AtomicBool, AtomicU128, Ordering as MemOrdering};
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use std::{
borrow::Borrow,
cell::RefCell,
cmp::Ordering,
fmt,
hash::{Hash, Hasher},
marker::PhantomData,
ops::{Bound, Deref},
result,
str::FromStr,
sync::{atomic::AtomicU64, Arc},
};
use uuid::Uuid;
pub trait Symbolic {
type Id: Borrow<Uuid>;
fn name(&self) -> &'static str;
fn id(&self) -> Self::Id;
fn is_valid(&self) -> bool;
}
#[derive(JsonSchema)]
#[schemars(transparent)]
pub struct AtomicDecimal(#[schemars(with = "Decimal")] AtomicU128);
impl Default for AtomicDecimal {
fn default() -> Self {
Self::new(dec!(999999999))
}
}
impl Serialize for AtomicDecimal {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
<Decimal as Serialize>::serialize(&self.load(), serializer)
}
}
impl<'de> Deserialize<'de> for AtomicDecimal {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::new(<Decimal as Deserialize>::deserialize(deserializer)?))
}
}
impl AtomicDecimal {
fn new(d: Decimal) -> Self {
Self(AtomicU128::new(u128::from_le_bytes(d.serialize())))
}
pub fn load(&self) -> Decimal {
let buf = self.0.load(MemOrdering::Relaxed);
Decimal::deserialize(buf.to_le_bytes())
}
fn store(&self, d: Decimal) {
let new = u128::from_le_bytes(d.serialize());
self.0.store(new, MemOrdering::Relaxed);
}
}
impl PartialEq for AtomicDecimal {
fn eq(&self, other: &Self) -> bool {
self.0.load(MemOrdering::Relaxed) == other.0.load(MemOrdering::Relaxed)
}
}
impl PartialEq<Decimal> for AtomicDecimal {
fn eq(&self, other: &Decimal) -> bool {
&self.load() == other
}
}
impl Eq for AtomicDecimal {}
impl Clone for AtomicDecimal {
fn clone(&self) -> Self {
Self(AtomicU128::new(self.0.load(MemOrdering::Relaxed)))
}
}
impl fmt::Debug for AtomicDecimal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.load())
}
}
pub struct AtomicBitFlags<T: BitFlag> {
bits: AtomicU64,
ph: PhantomData<T>,
}
impl<T> AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn new(t: BitFlags<T>) -> Self {
Self { bits: AtomicU64::new(t.bits()), ph: PhantomData }
}
pub fn load(&self) -> BitFlags<T> {
BitFlags::from_bits_truncate(self.bits.load(MemOrdering::Relaxed))
}
pub fn store(&self, t: BitFlags<T>) {
self.bits.store(t.bits(), MemOrdering::Relaxed)
}
}
impl<T> Serialize for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
<BitFlags<T> as Serialize>::serialize(&self.load(), serializer)
}
}
impl<'de, T> Deserialize<'de> for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::new(<BitFlags<T> as Deserialize>::deserialize(deserializer)?))
}
}
impl<T> Clone for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn clone(&self) -> Self {
Self {
bits: AtomicU64::new(self.bits.load(MemOrdering::Relaxed)),
ph: PhantomData,
}
}
}
impl<T> PartialEq for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn eq(&self, other: &Self) -> bool {
self.bits.load(MemOrdering::Relaxed) == other.bits.load(MemOrdering::Relaxed)
}
}
impl<T> PartialEq<BitFlags<T>> for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn eq(&self, other: &BitFlags<T>) -> bool {
&self.load() == other
}
}
impl<T> Eq for AtomicBitFlags<T> where T: BitFlag + RawBitFlags<Numeric = u64> {}
impl<T> fmt::Debug for AtomicBitFlags<T>
where
T: fmt::Debug + BitFlag + RawBitFlags<Numeric = u64>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.load())
}
}
type PR<T> = std::result::Result<T, PackError>;
macro_rules! packed_by_id {
($name:ident) => {
impl Pack for $name {
fn const_encoded_len() -> Option<usize> {
<protocol::$name as Pack>::const_encoded_len()
}
fn encoded_len(&self) -> usize {
Pack::encoded_len(&self.id)
}
fn encode(&self, buf: &mut impl BufMut) -> PR<()> {
Pack::encode(&self.id, buf)
}
fn decode(buf: &mut impl Buf) -> PR<Self> {
match $name::get_by_id(&Pack::decode(buf)?) {
Some(t) => Ok(t),
None => Ok(paste! {*[<INVALID_ $name:snake:upper>]}),
}
}
}
};
}
macro_rules! hcstatic {
($lname: literal, $name:ident, $valid:ident) => { hcstatic!($lname, $name, $valid,); };
($lname: literal, $name:ident, $valid:ident, $($extra_vis:vis $extra_field:ident: $extra_typ:ty),*) => {
paste! {
static [<$name:snake:upper _BY_NAME>]: Lazy<ArcSwap<Map<&'static str, $name>>> =
Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static [<$name:snake:upper _BY_ID>]: Lazy<ArcSwap<Map<protocol::$name, $name>>> =
Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static [<INVALID_ $name:snake:upper>]: Lazy<$name> = Lazy::new(|| {
let mut by_name = [<$name:snake:upper _BY_NAME>].load_full();
let mut by_id = [<$name:snake:upper _BY_ID>].load_full();
let t = $name::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
stringify!([<INVALID_ $name:snake:upper>]),
false,
$($extra_typ::default()),*
).unwrap();
[<$name:snake:upper _BY_NAME>].store(by_name);
[<$name:snake:upper _BY_ID>].store(by_id);
t
});
#[ctor::ctor]
fn [<force_invalid_ $name:snake>]() {
let _ = *[<INVALID_ $name:snake:upper>];
}
#[derive(Debug, Serialize, JsonSchema)]
#[schemars(rename = $lname)]
pub struct [<$name Inner>] {
pub name: &'static str,
pub id: protocol::$name,
$($extra_vis $extra_field: $extra_typ),*
}
#[derive(Debug, Clone, Copy)]
pub struct $name(&'static [<$name Inner>]);
impl $name {
fn new(
by_name: &mut Map<&'static str, $name>,
by_id: &mut Map<protocol::$name, $name>,
s: &str,
validate: bool,
$($extra_field: $extra_typ),*
) -> Result<Self> {
match by_name.get(s) {
Some(t) => Ok(*t),
None => {
if validate {
$valid(s, $(&$extra_field),*)?;
}
let name: &'static str = Box::leak(Box::from(s));
let id = protocol::$name::from(name);
let inner = Box::leak(Box::new(
[<$name Inner>] { name, id, $($extra_field),* }
));
let t = Self(inner);
if by_id.get(&t.id).is_some() {
bail!("hell hath frozen over, there is a sha1 conflict")
}
by_name.insert_cow(t.name, t);
by_id.insert_cow(t.id, t);
Ok(t)
}
}
}
#[allow(dead_code)]
fn remove(
self,
by_name: &mut Map<&'static str, $name>,
by_id: &mut Map<protocol::$name, $name>,
) {
by_name.remove_cow(&self.name);
by_id.remove_cow(&self.id);
}
pub fn get(s: &str) -> Option<Self> {
[<$name:snake:upper _BY_NAME>].load().get(s).copied()
}
fn get_or_insert_default(s: &str) -> Result<Self> {
match [<$name:snake:upper _BY_NAME>].load().get(s).copied() {
Some(t) => Ok(t),
None => {
let mut by_name = [<$name:snake:upper _BY_NAME>].load_full();
let mut by_id = [<$name:snake:upper _BY_ID>].load_full();
let t = $name::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
s,
true,
$($extra_typ::default()),*
)?;
[<$name:snake:upper _BY_NAME>].store(by_name);
[<$name:snake:upper _BY_ID>].store(by_id);
Ok(t)
}
}
}
pub fn get_by_id(id: &protocol::$name) -> Option<Self> {
[<$name:snake:upper _BY_ID>].load().get(id).copied()
}
pub fn all() -> Arc<Map<&'static str, $name>> {
[<$name:snake:upper _BY_NAME>].load_full()
}
pub fn all_by_id() -> Arc<Map<protocol::$name, $name>> {
[<$name:snake:upper _BY_ID>].load_full()
}
pub fn fuzzy(pat: &str, limit: usize) -> Pooled<Vec<(i64, $name)>> {
use fuzzy_matcher::{skim::{SkimMatcherV2, SkimScoreConfig}, FuzzyMatcher};
pool!(pool_matches, Vec<(i64, $name)>, 5, 100_000);
let matcher = SkimMatcherV2::default()
.ignore_case()
.score_config(SkimScoreConfig { bonus_consecutive: 15, ..Default::default() });
let mut res = pool_matches().take();
for (name, val) in &*$name::all() {
if val.is_valid() {
if let Some(score) = matcher.fuzzy_match(name, pat) {
res.push((score, *val));
}
}
}
res.sort_by_key(|(k, _)| -*k);
res.truncate(limit);
res
}
pub fn serialize_by_inner<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
<[<$name Inner>] as Serialize>::serialize(self.0, s)
}
pub fn serialize_by_name<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_str(self.name)
}
pub fn deserialize_by_name<'de, D>(d: D) -> Result<$name, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str([<$name Visitor>](false))
}
#[allow(dead_code)]
pub(crate) fn deserialize_by_name_default<'de, D>(d: D) -> Result<$name, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str([<$name Visitor>](true))
}
}
packed_by_id!($name);
packed_value!($name);
impl Deref for $name {
type Target = [<$name Inner>];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Symbolic for $name {
type Id = protocol::$name;
fn name(&self) -> &'static str {
self.0.name
}
fn id(&self) -> Self::Id {
self.0.id
}
fn is_valid(&self) -> bool {
*self != *[<INVALID_ $name:snake:upper>]
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
write!(f, "{}", self.name)
}
}
impl PartialEq for $name {
fn eq(&self, other: &Self) -> bool {
(self.0 as *const [<$name Inner>]) == (other.0 as *const [<$name Inner>])
}
}
impl Eq for $name {}
impl Hash for $name {
fn hash<H: Hasher>(&self, state: &mut H) {
(self.0 as *const [<$name Inner>]).hash(state)
}
}
impl PartialOrd for $name {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(self.0 as *const [<$name Inner>])
.partial_cmp(&(other.0 as *const [<$name Inner>]))
}
}
impl Ord for $name {
fn cmp(&self, other: &Self) -> Ordering {
(self.0 as *const [<$name Inner>])
.cmp(&(other.0 as *const [<$name Inner>]))
}
}
impl Serialize for $name {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where S: Serializer,
{
Serialize::serialize(&self.id, serializer)
}
}
impl JsonSchema for $name {
fn schema_name() -> String {
protocol::$name::schema_name()
}
fn json_schema(gen: &mut SchemaGenerator) -> Schema {
protocol::$name::json_schema(gen)
}
}
struct [<$name Visitor>](bool);
impl<'de> Visitor<'de> for [<$name Visitor>] {
type Value = $name;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "expecting a string")
}
fn visit_str<E>(self, v: &str) -> result::Result<Self::Value, E>
where E: serde::de::Error
{
if self.0 {
$name::get_or_insert_default(v).map_err(|e| {
let e = format!("invalid {} err: {}", stringify!($name), e);
E::custom(e)
})
} else {
Ok($name::get(v).unwrap_or(*[<INVALID_ $name:snake:upper>]))
}
}
}
impl<'de> Deserialize<'de> for $name {
fn deserialize<D>(deserializer: D) -> Result<$name, D::Error>
where D: Deserializer<'de>
{
let id = protocol::$name::deserialize(deserializer)?;
Ok($name::get_by_id(&id).unwrap_or(*[<INVALID_ $name:snake:upper>]))
}
}
}
};
}
macro_rules! make_atomic {
($name_literal:literal, $name:ident) => {
paste! {
#[derive(Debug, JsonSchema)]
pub struct $name(
#[schemars(with = $name_literal)]
ArcSwap<[<$name Inner>]>
);
impl Serialize for $name{
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let inner = self.load();
inner.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for $name {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let inner = [<$name Inner>]::deserialize(deserializer)?;
Ok(Self(ArcSwap::from_pointee(inner)))
}
}
impl From<[<$name Inner>]> for $name {
fn from(value: [<$name Inner>]) -> Self {
Self(ArcSwap::from_pointee(value))
}
}
impl Deref for $name {
type Target = ArcSwap<[<$name Inner>]>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
}
};
}
fn must_be_uppercase_ascii_or_digit_or_dash(s: &str) -> Result<()> {
for c in s.chars() {
if !(c.is_ascii_uppercase() || c.is_ascii_digit() || c == '-') {
bail!("expected uppercase ascii or digit or dash")
}
}
Ok(())
}
fn allow_any(_s: &str) -> Result<()> {
Ok(())
}
hcstatic!("Venue", Venue, must_be_uppercase_ascii_or_digit_or_dash);
hcstatic!("Route", Route, must_be_uppercase_ascii_or_digit_or_dash);
hcstatic!("QuoteSymbol", QuoteSymbol, allow_any);
hcstatic!("TradingSymbol", TradingSymbol, allow_any);
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
#[schemars(transparent)]
pub struct TokenInfo(
#[schemars(with = "FxHashMap<Venue, protocol::TokenInfo>")]
FxHashMap<Venue, protocol::TokenInfo>,
);
impl Deref for TokenInfo {
type Target = FxHashMap<Venue, protocol::TokenInfo>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Into<FxHashMap<protocol::Venue, protocol::TokenInfo>> for &TokenInfo {
fn into(self) -> FxHashMap<protocol::Venue, protocol::TokenInfo> {
self.0.iter().map(|(k, v)| (k.id, v.clone())).collect()
}
}
impl Into<FxHashMap<protocol::Venue, protocol::TokenInfo>> for TokenInfo {
fn into(self) -> FxHashMap<protocol::Venue, protocol::TokenInfo> {
(&self).into()
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", content = "value")]
pub enum ProductClassInner {
Coin {
token_info: TokenInfo,
},
Fiat,
Equity,
Future {
underlying: Product, settlement: Product, expiration: DateTime<Utc>,
},
Option {
underlying: Product, settlement: Product, dir: OptionDir,
style: OptionStyle,
expiration: DateTime<Utc>,
strike: Decimal,
},
}
impl ProductClassInner {
pub fn kind(&self) -> &'static str {
match self {
Self::Coin { .. } => "Coin",
Self::Fiat => "Fiat",
Self::Equity => "Equity",
Self::Future { .. } => "Future",
Self::Option { .. } => "Option",
}
}
pub fn get_token_info(&self) -> Option<&TokenInfo> {
match self {
Self::Coin { token_info } => Some(token_info),
_ => None,
}
}
pub fn merge_eq(from: &ProductClassInner, to: &ProductClassInner) -> bool {
match (from, to) {
(Self::Coin { .. }, Self::Coin { .. })
| (Self::Equity, Self::Equity)
| (Self::Fiat, Self::Fiat) => true,
(
Self::Future { underlying, settlement, expiration },
Self::Future {
underlying: bunderlying,
settlement: bsettlement,
expiration: bexpiration,
},
) => {
underlying == bunderlying
&& settlement == bsettlement
&& expiration == bexpiration
}
(
Self::Option { underlying, settlement, dir, style, expiration, strike },
Self::Option {
underlying: bunderlying,
settlement: bsettlement,
dir: bdir,
style: bstyle,
expiration: bexpiration,
strike: bstrike,
},
) => {
underlying == bunderlying
&& settlement == bsettlement
&& expiration == bexpiration
&& strike == bstrike
&& dir == bdir
&& style == bstyle
}
_ => false,
}
}
}
impl Default for ProductClassInner {
fn default() -> Self {
Self::Fiat
}
}
impl Into<protocol::ProductClass> for &ProductClassInner {
fn into(self) -> protocol::ProductClass {
match self {
ProductClassInner::Coin { token_info } => {
protocol::ProductClass::Coin { token_info: token_info.into() }
}
ProductClassInner::Fiat => protocol::ProductClass::Fiat,
ProductClassInner::Equity => protocol::ProductClass::Equity,
ProductClassInner::Future { underlying, settlement, expiration } => {
protocol::ProductClass::Future {
underlying: underlying.id,
settlement: settlement.id,
expiration: *expiration,
}
}
ProductClassInner::Option {
underlying,
settlement,
dir,
style,
expiration,
strike,
} => protocol::ProductClass::Option {
underlying: underlying.id,
settlement: settlement.id,
dir: *dir,
style: *style,
expiration: *expiration,
strike: *strike,
},
}
}
}
impl Into<protocol::ProductClass> for ProductClassInner {
fn into(self) -> protocol::ProductClass {
(&self).into()
}
}
fn validate_product(s: &str, class: &ProductClass, _price: &AtomicDecimal) -> Result<()> {
use regex::Regex;
static PROD_PAT: &str = "^[ A-Za-z0-9.]+$";
static PROD: Lazy<Regex> = Lazy::new(|| Regex::new(PROD_PAT).unwrap());
static COIN_PAT: &str = "^[a-zA-Z0-9.]+ Crypto$";
static COIN: Lazy<Regex> = Lazy::new(|| Regex::new(COIN_PAT).unwrap());
static EQUITY_PAT: &str = "^[A-Z0-9]+ [A-Z]{2} Stock$";
static EQUITY: Lazy<Regex> = Lazy::new(|| Regex::new(EQUITY_PAT).unwrap());
static FIAT_PAT: &str = "^[A-Z]{3}$";
static FIAT: Lazy<Regex> = Lazy::new(|| Regex::new(FIAT_PAT).unwrap());
static FUTURE_PAT: &str = "^[A-Z0-9]+([A-Z])([0-9]) [A-Z]{2} Future$";
static FUTURE: Lazy<Regex> = Lazy::new(|| Regex::new(FUTURE_PAT).unwrap());
static OPTION_PAT: &str =
"^[A-Z0-9]+[A-Z][0-9]{2}[0-9]+\\.[0-9]+ [A-Z]{2} (Put|Call)$";
static OPTION: Lazy<Regex> = Lazy::new(|| Regex::new(OPTION_PAT).unwrap());
if !PROD.is_match(s) {
bail!("\"{}\" product names must match '[ A-Za-z0-9.]+'", s)
}
match &**class.load() {
ProductClassInner::Coin { .. } => {
if !COIN.is_match(s) {
bail!("\"{}\" coin products must match {}", s, COIN_PAT)
}
}
ProductClassInner::Equity => {
if !EQUITY.is_match(s) {
bail!("\"{}\" equity products must match {}", s, EQUITY_PAT)
}
}
ProductClassInner::Fiat => {
if !FIAT.is_match(s) {
bail!("\"{}\" fiat products must match {}", s, FIAT_PAT)
}
}
ProductClassInner::Future { underlying: _, settlement: _, expiration: _ } => {
if !FUTURE.is_match(s) {
bail!("\"{}\" futures products must match {}", s, FUTURE_PAT)
}
}
ProductClassInner::Option {
underlying: _,
settlement: _,
dir: _,
style: _,
expiration: _,
strike: _,
} => {
if !OPTION.is_match(s) {
bail!("\"{}\" options products must match {}", s, OPTION_PAT)
}
}
}
Ok(())
}
#[test]
fn test_product_validation() {
validate_product(
"IBM US Stock",
&ProductClassInner::Equity.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
validate_product(
"BTC Crypto",
&ProductClassInner::Coin { token_info: TokenInfo::default() }.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
let usd = Product::get_or_insert_default("USD").unwrap();
validate_product(
"ESJ3 US Future",
&ProductClassInner::Future {
underlying: usd,
settlement: usd,
expiration: Utc::now(),
}
.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
validate_product(
"ESJ253000.00 US Put",
&ProductClassInner::Option {
underlying: usd,
settlement: usd,
dir: OptionDir::Put,
style: OptionStyle::American,
expiration: Utc::now(),
strike: dec!(3000),
}
.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap()
}
make_atomic!("ProductClassInner", ProductClass);
impl Default for ProductClass {
fn default() -> Self {
Self(ArcSwap::from_pointee(ProductClassInner::default()))
}
}
impl Into<protocol::ProductClass> for &ProductClass {
fn into(self) -> protocol::ProductClass {
(&**self.0.load()).into()
}
}
impl Into<protocol::ProductClass> for ProductClass {
fn into(self) -> protocol::ProductClass {
(&self).into()
}
}
hcstatic!(
"Product",
Product,
validate_product,
pub class: ProductClass,
pub price_in_limit_dollars: AtomicDecimal
);
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct TradeInfoInner {
pub trading_symbol: TradingSymbol,
pub quote_increment: Decimal,
pub base_increment: Decimal,
pub flags: BitFlags<TradeFlags>,
}
impl TradeInfoInner {
fn is_valid(&self) -> bool {
self.trading_symbol.is_valid()
}
}
type TradeInfoOptInner = Option<TradeInfoInner>;
make_atomic!("TradeInfoOptInner", TradeInfoOpt);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct QuoteInfoInner {
pub primary: QuoteSymbol,
pub implied: Option<QuoteSymbol>,
pub l2: Option<QuoteSymbol>,
}
impl QuoteInfoInner {
fn is_valid(&self) -> bool {
self.primary.is_valid() && self.implied.map(|s| s.is_valid()).unwrap_or(true)
|| self.l2.map(|s| s.is_valid()).unwrap_or(true)
}
}
type QuoteInfoOptInner = Option<QuoteInfoInner>;
make_atomic!("QuoteInfoOptInner", QuoteInfoOpt);
#[derive(Debug, Serialize, JsonSchema)]
#[schemars(rename = "TradableProduct")]
pub struct TradableProductInner {
pub id: protocol::TradableProduct,
pub name: &'static str,
pub base: Product,
pub quote: Product,
pub venue: Venue,
pub route: Route,
pub quote_info: QuoteInfoOpt,
pub trade_info: TradeInfoOpt,
#[serde(skip)]
removed: AtomicBool,
}
static TRADABLE_PRODUCT: Lazy<
ArcSwap<Map<(Product, Product, Venue, Route), TradableProduct>>,
> = Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static TRADABLE_PRODUCT_BY_ID: Lazy<
ArcSwap<Map<protocol::TradableProduct, TradableProduct>>,
> = Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static INVALID_TRADABLE_PRODUCT: Lazy<TradableProduct> = Lazy::new(|| {
let mut by_name = TRADABLE_PRODUCT.load_full();
let mut by_id = TRADABLE_PRODUCT_BY_ID.load_full();
let mut index = INDEX.load_full();
let t = TradableProduct::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
Arc::make_mut(&mut index),
*INVALID_PRODUCT,
*INVALID_PRODUCT,
*INVALID_VENUE,
*INVALID_ROUTE,
None,
None,
false,
)
.unwrap();
TRADABLE_PRODUCT.store(by_name);
TRADABLE_PRODUCT_BY_ID.store(by_id);
INDEX.store(index);
t
});
#[ctor::ctor]
fn force_invalid_tradable_product() {
let _ = *INVALID_TRADABLE_PRODUCT;
}
#[derive(Debug, Clone, Copy)]
pub struct TradableProduct(&'static TradableProductInner);
impl TradableProduct {
fn new(
by_name: &mut Map<(Product, Product, Venue, Route), TradableProduct>,
by_id: &mut Map<protocol::TradableProduct, TradableProduct>,
index: &mut TradableProductIndex,
base: Product,
quote: Product,
venue: Venue,
route: Route,
quote_info: Option<QuoteInfoInner>,
trade_info: Option<TradeInfoInner>,
validate: bool,
) -> Result<Self> {
match by_name.get(&(base, quote, venue, route)) {
Some(i) => Ok(*i),
None => {
if validate {
if !base.is_valid() {
bail!("can't construct tradable product with invalid base")
}
if !quote.is_valid() {
bail!("can't construct tradable product with invalid quote")
}
if !venue.is_valid() {
bail!("can't construct tradable product with invalid venue")
}
if !route.is_valid() {
bail!("can't construct tradable product with invalid route")
}
if !quote_info.as_ref().map(|q| q.is_valid()).unwrap_or(true) {
bail!("can't construct tradable product with invalid quote_info")
}
if !trade_info.as_ref().map(|t| t.is_valid()).unwrap_or(true) {
bail!("can't construct tradable product with invalid trade_info")
}
}
let id = protocol::TradableProduct::from_parts(
base.name, quote.name, venue.name, route.name,
);
let name = {
use netidx::utils::escape;
Box::leak(Box::from(format!(
"{}/{}*{}/{}",
escape(base.name, '\\', &['*', '/']),
escape(quote.name, '\\', &['*', '/']),
escape(venue.name, '\\', &['*', '/']),
escape(route.name, '\\', &['*', '/'])
)))
};
let inner = Box::leak(Box::new(TradableProductInner {
id,
name,
base,
quote,
venue,
route,
quote_info: quote_info.into(),
trade_info: trade_info.into(),
removed: AtomicBool::new(false),
}));
let t = TradableProduct(inner);
if by_id.get(&t.id).is_some() {
bail!("tradable sha1 hash collision")
}
by_name.insert_cow((base, quote, venue, route), t);
by_id.insert_cow(t.id, t);
index.insert(t);
Ok(t)
}
}
}
pub fn is_removed(&self) -> bool {
self.removed.load(MemOrdering::Relaxed)
}
pub fn subscriber_paths(&self) -> (Path, Path) {
(
Path::from("by-id").append(&self.id.to_string()),
Path::from("by-name")
.append(self.venue.name)
.append(self.route.name)
.append(self.base.name)
.append(self.quote.name),
)
}
fn remove(
self,
by_name: &mut Map<(Product, Product, Venue, Route), TradableProduct>,
by_id: &mut Map<protocol::TradableProduct, TradableProduct>,
) {
by_name.remove_cow(&(self.base, self.quote, self.venue, self.route));
if let Some(i) = by_id.remove_cow(&self.id) {
i.removed.store(true, MemOrdering::Relaxed);
}
}
pub fn get(
base: Product,
quote: Product,
venue: Venue,
route: Route,
) -> Option<Self> {
TRADABLE_PRODUCT.load().get(&(base, quote, venue, route)).copied()
}
pub fn get_by_id(id: &protocol::TradableProduct) -> Option<Self> {
TRADABLE_PRODUCT_BY_ID.load().get(id).copied()
}
pub fn all() -> Arc<Map<(Product, Product, Venue, Route), TradableProduct>> {
TRADABLE_PRODUCT.load_full()
}
pub fn fuzzy(pat: &str, limit: usize) -> Pooled<Vec<(i64, Self)>> {
use fuzzy_matcher::{
skim::{SkimMatcherV2, SkimScoreConfig},
FuzzyMatcher,
};
pool!(pool_matches, Vec<(i64, TradableProduct)>, 5, 100_000);
let matcher =
SkimMatcherV2::default().ignore_case().score_config(SkimScoreConfig {
bonus_consecutive: 15,
..Default::default()
});
let mut res = pool_matches().take();
for (_, val) in &*Self::all() {
if let Some(score) = matcher.fuzzy_match(val.name, pat) {
res.push((score, *val));
}
}
res.sort_by_key(|(k, _)| -*k);
res.truncate(limit);
res
}
pub fn serialize_by_inner<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
<TradableProductInner as Serialize>::serialize(self.0, s)
}
pub fn serialize_by_name<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_str(self.name)
}
pub fn deserialize_by_name<'de, D>(d: D) -> Result<TradableProduct, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str(TradableProductVisitor)
}
}
packed_by_id!(TradableProduct);
packed_value!(TradableProduct);
impl Deref for TradableProduct {
type Target = TradableProductInner;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl Symbolic for TradableProduct {
type Id = protocol::TradableProduct;
fn name(&self) -> &'static str {
self.0.name
}
fn id(&self) -> Self::Id {
self.0.id
}
fn is_valid(&self) -> bool {
*self != *INVALID_TRADABLE_PRODUCT
}
}
impl fmt::Display for TradableProduct {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.name)
}
}
impl FromStr for TradableProduct {
type Err = anyhow::Error;
fn from_str(v: &str) -> Result<Self, Self::Err> {
use netidx::utils::{split_escaped, unescape};
let mut i = split_escaped(v, '\\', '*');
let base_quote =
i.next().ok_or_else(|| anyhow!("tradable product missing base/quote"))?;
let venue_route =
i.next().ok_or_else(|| anyhow!("tradable product missing venu/route"))?;
let mut i = split_escaped(base_quote, '\\', '/');
let base = i.next().ok_or_else(|| anyhow!("tradable product missing base"))?;
let quote = i.next().ok_or_else(|| anyhow!("tradable product missing quote"))?;
let mut i = split_escaped(venue_route, '\\', '/');
let venue = i.next().ok_or_else(|| anyhow!("tradable product missing venue"))?;
let route = i.next().ok_or_else(|| anyhow!("tradable product missing route"))?;
let base = Product::get(&unescape(base, '\\')).unwrap_or(*INVALID_PRODUCT);
let quote = Product::get(&unescape(quote, '\\')).unwrap_or(*INVALID_PRODUCT);
let venue = Venue::get(&unescape(venue, '\\')).unwrap_or(*INVALID_VENUE);
let route = Route::get(&unescape(route, '\\')).unwrap_or(*INVALID_ROUTE);
Ok(TradableProduct::get(base, quote, venue, route)
.unwrap_or(*INVALID_TRADABLE_PRODUCT))
}
}
impl Hash for TradableProduct {
fn hash<H: Hasher>(&self, state: &mut H) {
(self.0 as *const TradableProductInner).hash(state)
}
}
impl PartialEq for TradableProduct {
fn eq(&self, other: &Self) -> bool {
(self.0 as *const _) == (other.0 as *const _)
}
}
impl Eq for TradableProduct {}
impl PartialOrd for TradableProduct {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(self.0 as *const TradableProductInner)
.partial_cmp(&(other.0 as *const TradableProductInner))
}
}
impl Ord for TradableProduct {
fn cmp(&self, other: &Self) -> Ordering {
(self.0 as *const TradableProductInner)
.cmp(&(other.0 as *const TradableProductInner))
}
}
impl Serialize for TradableProduct {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Serialize::serialize(&self.id, serializer)
}
}
impl JsonSchema for TradableProduct {
fn schema_name() -> String {
protocol::TradableProduct::schema_name()
}
fn json_schema(gen: &mut SchemaGenerator) -> Schema {
protocol::TradableProduct::json_schema(gen)
}
}
struct TradableProductVisitor;
impl<'de> Visitor<'de> for TradableProductVisitor {
type Value = TradableProduct;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "expected a string in the form {{base}}/{{quote}}*{{venue}}/{{route}}")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
v.parse::<TradableProduct>().map_err(|e| E::custom(e.to_string()))
}
}
impl<'de> Deserialize<'de> for TradableProduct {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let id = protocol::TradableProduct::deserialize(deserializer)?;
Ok(TradableProduct::get_by_id(&id).unwrap_or(*INVALID_TRADABLE_PRODUCT))
}
}
pub type Set<T> = set::Set<T, 16>;
#[derive(Debug, Clone)]
pub struct TradableProductIndex {
all: Set<TradableProduct>,
by_base: Map<Product, Set<TradableProduct>>,
by_quote: Map<Product, Set<TradableProduct>>,
by_venue: Map<Venue, Set<TradableProduct>>,
by_route: Map<Route, Set<TradableProduct>>,
by_fiat: Map<Product, Set<TradableProduct>>,
by_underlying: Map<Product, Set<TradableProduct>>,
by_settlement: Map<Product, Set<TradableProduct>>,
by_expiration: Map<DateTime<Utc>, Set<TradableProduct>>,
}
static INDEX: Lazy<ArcSwap<TradableProductIndex>> =
Lazy::new(|| ArcSwap::new(Arc::new(TradableProductIndex::new())));
impl TradableProductIndex {
fn new() -> Self {
Self {
all: Set::new(),
by_base: Map::default(),
by_quote: Map::default(),
by_venue: Map::default(),
by_route: Map::default(),
by_fiat: Map::default(),
by_underlying: Map::default(),
by_settlement: Map::default(),
by_expiration: Map::default(),
}
}
fn insert(&mut self, i: TradableProduct) {
fn insert<K: Ord + Clone + Copy + 'static>(
m: &mut Map<K, Set<TradableProduct>>,
k: K,
i: TradableProduct,
) {
let mut set = m.remove_cow(&k).unwrap_or(Set::new());
set.insert_cow(i);
m.insert_cow(k, set);
}
self.all.insert_cow(i);
insert(&mut self.by_base, i.base, i);
insert(&mut self.by_quote, i.quote, i);
insert(&mut self.by_venue, i.venue, i);
insert(&mut self.by_route, i.route, i);
match &**i.base.class.load() {
ProductClassInner::Future { underlying, settlement, expiration } => {
insert(&mut self.by_underlying, *underlying, i);
insert(&mut self.by_settlement, *settlement, i);
insert(&mut self.by_expiration, *expiration, i);
}
ProductClassInner::Option {
underlying,
settlement,
dir: _,
style: _,
expiration,
strike: _,
} => {
insert(&mut self.by_underlying, *underlying, i);
insert(&mut self.by_settlement, *settlement, i);
insert(&mut self.by_expiration, *expiration, i);
}
ProductClassInner::Coin { .. }
| ProductClassInner::Equity
| ProductClassInner::Fiat => (),
}
}
fn remove(&mut self, i: TradableProduct) {
fn remove<K: Ord + Clone + Copy + 'static>(
m: &mut Map<K, Set<TradableProduct>>,
k: K,
i: TradableProduct,
) {
if let Some(mut set) = m.remove_cow(&k) {
set.remove_cow(&i);
if set.len() > 0 {
m.insert_cow(k, set);
}
}
}
self.all.remove_cow(&i);
remove(&mut self.by_base, i.base, i);
remove(&mut self.by_quote, i.quote, i);
remove(&mut self.by_venue, i.venue, i);
remove(&mut self.by_route, i.route, i);
match &**i.base.class.load() {
ProductClassInner::Future { underlying, settlement, expiration } => {
remove(&mut self.by_underlying, *underlying, i);
remove(&mut self.by_settlement, *settlement, i);
remove(&mut self.by_expiration, *expiration, i);
}
ProductClassInner::Option {
underlying,
settlement,
dir: _,
style: _,
expiration,
strike: _,
} => {
remove(&mut self.by_underlying, *underlying, i);
remove(&mut self.by_settlement, *settlement, i);
remove(&mut self.by_expiration, *expiration, i);
}
ProductClassInner::Coin { .. }
| ProductClassInner::Equity
| ProductClassInner::Fiat => (),
}
}
fn query_(&self, q: &Query) -> Set<TradableProduct> {
fn start_of_day(dt: DateTime<Utc>) -> DateTime<Utc> {
let date = dt.naive_utc().date();
let ndt = NaiveDateTime::new(date, NaiveTime::from_hms_opt(0, 0, 0).unwrap());
DateTime::from_utc(ndt, Utc)
}
fn end_of_day(dt: DateTime<Utc>) -> DateTime<Utc> {
let date = dt.naive_utc().date();
let ndt = NaiveDateTime::new(
date,
NaiveTime::from_hms_milli_opt(23, 59, 59, 999).unwrap(),
);
DateTime::from_utc(ndt, Utc)
}
match q {
Query::And(terms) => terms
.iter()
.fold(self.all.clone(), |acc, term| acc.intersect(&self.query_(term))),
Query::Or(terms) => {
terms.iter().fold(Set::new(), |acc, term| acc.union(&self.query_(term)))
}
Query::Not(term) => self.all.diff(&self.query_(term)),
Query::All => self.all.clone(),
Query::Base(t) => self.by_base.get(&t).cloned().unwrap_or_else(Set::new),
Query::Quote(t) => self.by_quote.get(&t).cloned().unwrap_or_else(Set::new),
Query::Route(r) => self.by_route.get(&r).cloned().unwrap_or_else(Set::new),
Query::Venue(v) => self.by_venue.get(&v).cloned().unwrap_or_else(Set::new),
Query::Fiat(t) => self.by_fiat.get(&t).cloned().unwrap_or_else(Set::new),
Query::Underlying(t) => {
self.by_underlying.get(&t).cloned().unwrap_or_else(Set::new)
}
Query::Settlement(t) => {
self.by_settlement.get(&t).cloned().unwrap_or_else(Set::new)
}
Query::Expiration(dq) => match dq {
DateQ::On(dt) => {
let date = dt.naive_utc().date();
self.by_expiration
.range(Bound::Included(start_of_day(*dt)), Bound::Unbounded)
.take_while(|(k, _)| k.naive_utc().date() == date)
.fold(Set::new(), |acc, (_, s)| acc.union(s))
}
DateQ::OnOrAfter(dt) => self
.by_expiration
.range(Bound::Included(start_of_day(*dt)), Bound::Unbounded)
.fold(Set::new(), |acc, (_, s)| acc.union(s)),
DateQ::OnOrBefore(dt) => self
.by_expiration
.range(Bound::Unbounded, Bound::Included(end_of_day(*dt)))
.fold(Set::new(), |acc, (_, s)| acc.union(s)),
DateQ::Between(st, en) => self
.by_expiration
.range(
Bound::Included(start_of_day(*st)),
Bound::Included(end_of_day(*en)),
)
.fold(Set::new(), |acc, (_, s)| acc.union(s)),
},
}
}
pub fn query(q: &Query) -> Set<TradableProduct> {
INDEX.load().query_(q)
}
pub fn all() -> Set<TradableProduct> {
INDEX.load().all.clone()
}
}
static TXNACTIVE: AtomicBool = AtomicBool::new(false);
fn dump_product(
pset: &mut FxHashSet<Product>,
updates: &mut Vec<SymbologyUpdateKind>,
product: &Product,
) {
pset.insert(*product);
let class = match &**product.class.load() {
ProductClassInner::Fiat => protocol::ProductClass::Fiat,
ProductClassInner::Coin { token_info } => {
protocol::ProductClass::Coin { token_info: token_info.into() }
}
ProductClassInner::Equity => protocol::ProductClass::Equity,
ProductClassInner::Future { underlying, settlement, expiration } => {
if !pset.contains(&underlying) {
dump_product(pset, updates, underlying)
}
if !pset.contains(&settlement) {
dump_product(pset, updates, settlement)
}
protocol::ProductClass::Future {
underlying: underlying.id,
settlement: settlement.id,
expiration: *expiration,
}
}
ProductClassInner::Option {
underlying,
settlement,
dir,
style,
expiration,
strike,
} => {
if !pset.contains(&underlying) {
dump_product(pset, updates, underlying)
}
if !pset.contains(&settlement) {
dump_product(pset, updates, settlement)
}
protocol::ProductClass::Option {
underlying: underlying.id,
settlement: settlement.id,
dir: *dir,
style: *style,
expiration: *expiration,
strike: *strike,
}
}
};
updates.push(SymbologyUpdateKind::AddProduct {
name: netidx::chars::Chars::from(product.name),
class,
price_in_limit_dollars: product.price_in_limit_dollars.load(),
})
}
pub fn dump() -> Pooled<Vec<SymbologyUpdateKind>> {
use netidx::chars::Chars;
pool!(pool_pset, FxHashSet<Product>, 2, 1_000_000);
pool!(pool_update, Vec<SymbologyUpdateKind>, 2, 1_000_000);
let mut pset = pool_pset().take();
let mut updates = pool_update().take();
for (_, venue) in &*Venue::all() {
updates.push(SymbologyUpdateKind::AddVenue(Chars::from(venue.name)));
}
for (_, route) in &*Route::all() {
updates.push(SymbologyUpdateKind::AddRoute(Chars::from(route.name)));
}
for (_, trading) in &*TradingSymbol::all() {
updates.push(SymbologyUpdateKind::AddTradingSymbol(Chars::from(trading.name)));
}
for (_, quote) in &*QuoteSymbol::all() {
updates.push(SymbologyUpdateKind::AddQuoteSymbol(Chars::from(quote.name)));
}
for (_, product) in &*Product::all() {
dump_product(&mut pset, &mut updates, product)
}
for (_, tradable) in &*TradableProduct::all() {
let quote_info =
(**tradable.quote_info.load()).as_ref().map(|q| protocol::QuoteInfo {
primary: q.primary.id,
implied: q.implied.map(|i| i.id),
l2: q.l2.map(|l| l.id),
});
let trade_info =
(**tradable.trade_info.load()).as_ref().map(|t| protocol::TradeInfo {
trading_symbol: t.trading_symbol.id,
quote_increment: t.quote_increment,
base_increment: t.base_increment,
flags: t.flags,
});
updates.push(SymbologyUpdateKind::AddTradableProduct {
base: tradable.base.id,
quote: tradable.quote.id,
venue: tradable.venue.id,
route: tradable.route.id,
quote_info,
trade_info,
})
}
updates
}
pub struct Txn {
venue_by_name: Arc<Map<&'static str, Venue>>,
venue_by_id: Arc<Map<protocol::Venue, Venue>>,
route_by_name: Arc<Map<&'static str, Route>>,
route_by_id: Arc<Map<protocol::Route, Route>>,
quote_symbol_by_name: Arc<Map<&'static str, QuoteSymbol>>,
quote_symbol_by_id: Arc<Map<protocol::QuoteSymbol, QuoteSymbol>>,
trading_symbol_by_name: Arc<Map<&'static str, TradingSymbol>>,
trading_symbol_by_id: Arc<Map<protocol::TradingSymbol, TradingSymbol>>,
product_by_name: Arc<Map<&'static str, Product>>,
product_by_id: Arc<Map<protocol::Product, Product>>,
tradable_product_by_name: Arc<Map<(Product, Product, Venue, Route), TradableProduct>>,
tradable_product_by_id: Arc<Map<protocol::TradableProduct, TradableProduct>>,
index: Arc<TradableProductIndex>,
}
impl Drop for Txn {
fn drop(&mut self) {
TXNACTIVE.store(false, MemOrdering::Relaxed)
}
}
impl Txn {
pub fn commit(self) {
VENUE_BY_NAME.store(Arc::clone(&self.venue_by_name));
VENUE_BY_ID.store(Arc::clone(&self.venue_by_id));
ROUTE_BY_NAME.store(Arc::clone(&self.route_by_name));
ROUTE_BY_ID.store(Arc::clone(&self.route_by_id));
QUOTE_SYMBOL_BY_NAME.store(Arc::clone(&self.quote_symbol_by_name));
QUOTE_SYMBOL_BY_ID.store(Arc::clone(&self.quote_symbol_by_id));
TRADING_SYMBOL_BY_NAME.store(Arc::clone(&self.trading_symbol_by_name));
TRADING_SYMBOL_BY_ID.store(Arc::clone(&self.trading_symbol_by_id));
PRODUCT_BY_NAME.store(Arc::clone(&self.product_by_name));
PRODUCT_BY_ID.store(Arc::clone(&self.product_by_id));
TRADABLE_PRODUCT.store(Arc::clone(&self.tradable_product_by_name));
TRADABLE_PRODUCT_BY_ID.store(Arc::clone(&self.tradable_product_by_id));
INDEX.store(Arc::clone(&self.index));
}
pub fn new() -> Result<Self> {
loop {
let cur = TXNACTIVE.load(MemOrdering::Relaxed);
if cur {
bail!("only one transaction may be in progress at a time")
} else {
let res = TXNACTIVE.compare_exchange(
cur,
true,
MemOrdering::Relaxed,
MemOrdering::Relaxed,
);
if let Ok(_) = res {
break;
}
}
}
Ok(Self {
venue_by_name: VENUE_BY_NAME.load_full(),
venue_by_id: VENUE_BY_ID.load_full(),
route_by_name: ROUTE_BY_NAME.load_full(),
route_by_id: ROUTE_BY_ID.load_full(),
quote_symbol_by_name: QUOTE_SYMBOL_BY_NAME.load_full(),
quote_symbol_by_id: QUOTE_SYMBOL_BY_ID.load_full(),
trading_symbol_by_name: TRADING_SYMBOL_BY_NAME.load_full(),
trading_symbol_by_id: TRADING_SYMBOL_BY_ID.load_full(),
product_by_name: PRODUCT_BY_NAME.load_full(),
product_by_id: PRODUCT_BY_ID.load_full(),
tradable_product_by_name: TRADABLE_PRODUCT.load_full(),
tradable_product_by_id: TRADABLE_PRODUCT_BY_ID.load_full(),
index: INDEX.load_full(),
})
}
fn add_tradable_product(
&mut self,
base: &protocol::Product,
quote: &protocol::Product,
venue: &protocol::Venue,
route: &protocol::Route,
quote_info: &Option<protocol::QuoteInfo>,
trade_info: &Option<protocol::TradeInfo>,
) -> Result<()> {
let base = self
.product_by_id
.get(base)
.copied()
.ok_or_else(|| anyhow!("missing base product"))?;
let quote = self
.product_by_id
.get(quote)
.copied()
.ok_or_else(|| anyhow!("missing quote product"))?;
let route = self
.route_by_id
.get(route)
.copied()
.ok_or_else(|| anyhow!("missing route"))?;
let venue = self
.venue_by_id
.get(venue)
.copied()
.ok_or_else(|| anyhow!("missing venue"))?;
let quote_info = quote_info
.map(|q| {
Ok::<_, anyhow::Error>(QuoteInfoInner {
primary: self
.quote_symbol_by_id
.get(&q.primary)
.copied()
.ok_or_else(|| anyhow!("missing primary quote symbol"))?,
implied: q
.implied
.map(|q| {
self.quote_symbol_by_id
.get(&q)
.copied()
.ok_or_else(|| anyhow!("missing implied quote symbol"))
})
.transpose()?,
l2: q
.l2
.map(|q| {
self.quote_symbol_by_id
.get(&q)
.copied()
.ok_or_else(|| anyhow!("missing l2 quote symbol"))
})
.transpose()?,
})
})
.transpose()?;
let trade_info = trade_info
.map(|t| {
Ok::<_, anyhow::Error>(TradeInfoInner {
trading_symbol: self
.trading_symbol_by_id
.get(&t.trading_symbol)
.copied()
.ok_or_else(|| anyhow!("unknown trading symbol"))?,
quote_increment: t.quote_increment,
base_increment: t.base_increment,
flags: t.flags,
})
})
.transpose()?;
match TradableProduct::get(base, quote, venue, route) {
Some(i) => {
i.trade_info.store(Arc::new(trade_info));
i.quote_info.store(Arc::new(quote_info));
}
None => {
TradableProduct::new(
Arc::make_mut(&mut self.tradable_product_by_name),
Arc::make_mut(&mut self.tradable_product_by_id),
Arc::make_mut(&mut self.index),
base,
quote,
venue,
route,
quote_info,
trade_info,
true,
)?;
}
}
Ok(())
}
fn remove_tradable_product(&mut self, id: &protocol::TradableProduct) -> Result<()> {
let i = self
.tradable_product_by_id
.get(id)
.copied()
.ok_or_else(|| anyhow!("no such tradable"))?;
Arc::make_mut(&mut self.index).remove(i);
Ok(i.remove(
Arc::make_mut(&mut self.tradable_product_by_name),
Arc::make_mut(&mut self.tradable_product_by_id),
))
}
fn remove_route(&mut self, route: &protocol::Route) -> Result<()> {
let route = self
.route_by_id
.get(&*route)
.copied()
.ok_or_else(|| anyhow!("no such route"))?;
if TradableProductIndex::query(&Query::Route(route)).len() > 0 {
bail!("cannot remove route tradable products depend on")
} else {
Ok(route.remove(
Arc::make_mut(&mut self.route_by_name),
Arc::make_mut(&mut self.route_by_id),
))
}
}
fn remove_venue(&mut self, venue: &protocol::Venue) -> Result<()> {
let venue = self
.venue_by_id
.get(&*venue)
.copied()
.ok_or_else(|| anyhow!("no such venue"))?;
if TradableProductIndex::query(&Query::Venue(venue)).len() > 0 {
bail!("cannot remove venue tradable products depend on")
} else {
Ok(venue.remove(
Arc::make_mut(&mut self.venue_by_name),
Arc::make_mut(&mut self.venue_by_id),
))
}
}
fn add_product(
&mut self,
name: &str,
class: &protocol::ProductClass,
price_in_limit_dollars: Decimal,
) -> Result<Product> {
let map_token_info = |token_info: &FxHashMap<
protocol::Venue,
protocol::TokenInfo,
>|
-> Result<TokenInfo> {
let mut inner: FxHashMap<Venue, protocol::TokenInfo> =
FxHashMap::with_capacity_and_hasher(token_info.len(), Default::default());
for (venue, token_info) in token_info {
let venue = self
.venue_by_id
.get(venue)
.copied()
.ok_or_else(|| anyhow!("unknown venue in token_info"))?;
inner.insert(venue, token_info.clone());
}
Ok(TokenInfo(inner))
};
let class = match class {
protocol::ProductClass::Coin { token_info } => {
ProductClassInner::Coin { token_info: map_token_info(token_info)? }
}
protocol::ProductClass::Equity => ProductClassInner::Equity,
protocol::ProductClass::Fiat => ProductClassInner::Fiat,
protocol::ProductClass::StableCoin { fiat: _, token_info } => {
ProductClassInner::Coin { token_info: map_token_info(token_info)? }
}
protocol::ProductClass::Future { underlying, settlement, expiration } => {
let underlying = self
.product_by_id
.get(underlying)
.copied()
.ok_or_else(|| anyhow!("unknown future underlying"))?;
let settlement = self
.product_by_id
.get(settlement)
.copied()
.ok_or_else(|| anyhow!("unknown future settlement"))?;
ProductClassInner::Future { underlying, settlement, expiration: *expiration }
}
protocol::ProductClass::Option {
underlying,
settlement,
dir,
style,
expiration,
strike,
} => {
let underlying = self
.product_by_id
.get(underlying)
.copied()
.ok_or_else(|| anyhow!("unknown option underlying"))?;
let settlement = self
.product_by_id
.get(settlement)
.copied()
.ok_or_else(|| anyhow!("unknown option settlement"))?;
ProductClassInner::Option {
underlying,
settlement,
dir: *dir,
style: *style,
expiration: *expiration,
strike: *strike,
}
}
};
match self.product_by_name.get(&name) {
Some(pr) => {
if pr.price_in_limit_dollars != price_in_limit_dollars {
pr.price_in_limit_dollars.store(price_in_limit_dollars)
}
if !ProductClassInner::merge_eq(&*pr.class.load(), &class) {
pr.class.store(Arc::new(class))
} else {
match (class, &**pr.class.load()) {
(
ProductClassInner::Coin { token_info },
ProductClassInner::Coin { token_info: current },
) => {
let mut merged = (*current).clone();
for (venue, ti) in token_info.iter() {
merged.insert(*venue, ti.clone());
}
pr.class.store(Arc::new(ProductClassInner::Coin {
token_info: TokenInfo(merged),
}));
}
(class, _) => pr.class.store(Arc::new(class)),
}
}
Ok(*pr)
}
None => Product::new(
Arc::make_mut(&mut self.product_by_name),
Arc::make_mut(&mut self.product_by_id),
name,
true,
class.into(),
AtomicDecimal::new(price_in_limit_dollars),
),
}
}
fn remove_product(&mut self, product: &protocol::Product) -> Result<()> {
let product = self
.product_by_id
.get(&*product)
.copied()
.ok_or_else(|| anyhow!("no such product"))?;
let q = Query::Or(vec![
Query::Base(product),
Query::Quote(product),
Query::Underlying(product),
Query::Settlement(product),
]);
if TradableProductIndex::query(&q).len() > 0 {
bail!("cannot remove product tradable products depend on")
} else {
Ok(product.remove(
Arc::make_mut(&mut self.product_by_name),
Arc::make_mut(&mut self.product_by_id),
))
}
}
pub fn update(&mut self, up: &protocol::SymbologyUpdateKind) -> Result<()> {
match up {
SymbologyUpdateKind::AddProduct { name, class, price_in_limit_dollars } => {
self.add_product(&name, class, *price_in_limit_dollars).map(|_| ())
}
SymbologyUpdateKind::RemoveProduct(t) => self.remove_product(t),
SymbologyUpdateKind::AddRoute(r) => Route::new(
Arc::make_mut(&mut self.route_by_name),
Arc::make_mut(&mut self.route_by_id),
&r,
true,
)
.map(|_| ()),
SymbologyUpdateKind::RemoveRoute(r) => self.remove_route(r),
SymbologyUpdateKind::AddVenue(v) => Venue::new(
Arc::make_mut(&mut self.venue_by_name),
Arc::make_mut(&mut self.venue_by_id),
&v,
true,
)
.map(|_| ()),
SymbologyUpdateKind::RemoveVenue(v) => self.remove_venue(v),
SymbologyUpdateKind::AddTradableProduct {
base,
quote,
venue,
route,
quote_info,
trade_info,
} => self
.add_tradable_product(base, quote, venue, route, quote_info, trade_info),
SymbologyUpdateKind::RemoveTradableProduct(tradable) => {
self.remove_tradable_product(tradable)
}
SymbologyUpdateKind::SetPriceInLimitDollars(product, pld) => {
let product = self
.product_by_id
.get(&*product)
.copied()
.ok_or_else(|| anyhow!("unknown product"))?;
Ok(product.price_in_limit_dollars.store(*pld))
}
SymbologyUpdateKind::AddQuoteSymbol(s) => QuoteSymbol::new(
Arc::make_mut(&mut self.quote_symbol_by_name),
Arc::make_mut(&mut self.quote_symbol_by_id),
s,
true,
)
.map(|_| ()),
SymbologyUpdateKind::RemoveQuoteSymbol(_) => Ok(()), SymbologyUpdateKind::AddTradingSymbol(name) => TradingSymbol::new(
Arc::make_mut(&mut self.trading_symbol_by_name),
Arc::make_mut(&mut self.trading_symbol_by_id),
name,
true,
)
.map(|_| ()),
SymbologyUpdateKind::RemoveTradingSymbol(_) => Ok(()), SymbologyUpdateKind::Flush(_) => Ok(()),
SymbologyUpdateKind::Snapshot { original_length, compressed } => {
thread_local! {
static BUF: RefCell<BytesMut> = RefCell::new(BytesMut::new());
}
pool!(pool_updates, Vec<SymbologyUpdateKind>, 10, 100_000);
let original_length = *original_length;
if original_length > compressed.len() << 6
|| original_length < compressed.len()
{
bail!("suspicious looking original length {}", original_length)
}
let mut updates = BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.resize(original_length, 0u8);
let len =
zstd::bulk::decompress_to_buffer(&compressed[..], &mut buf[..])?;
if len != original_length {
bail!(
"unexpected decompressed length {} expected {}",
len,
original_length
)
}
let mut updates = pool_updates().take();
while buf.has_remaining() {
updates.push(Pack::decode(&mut *buf)?)
}
Ok::<_, anyhow::Error>(updates)
})?;
for up in updates.drain(..) {
self.update(&up)?
}
Ok(())
}
}
}
}