pub mod index;
pub mod query;
pub use crate::symbology::query::{DateQ, Query};
use crate::{
hcstr::Str,
packed_value, pool,
protocol::symbology::{self as protocol, OptionDir, SymbologyUpdateKind, TradeFlags},
};
use anyhow::{anyhow, bail, Result};
use arc_swap::ArcSwap;
use bytes::{Buf, BufMut, Bytes, BytesMut};
use chrono::prelude::*;
use enumflags2::{BitFlag, BitFlags, _internal::RawBitFlags};
use fxhash::{FxHashMap, FxHashSet};
use immutable_chunkmap::map::MapL as Map;
use md5::{Digest, Md5};
use netidx::{
pack::{Pack, PackError},
path::Path,
pool::Pooled,
};
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use paste::paste;
use portable_atomic::{AtomicBool, AtomicU128, Ordering as MemOrdering};
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use schemars::{gen::SchemaGenerator, schema::Schema, JsonSchema};
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use smallvec::SmallVec;
use std::{
borrow::Borrow,
cell::RefCell,
cmp::Ordering,
fmt,
hash::{Hash, Hasher},
marker::PhantomData,
mem::MaybeUninit,
ops::Deref,
result,
str::FromStr,
sync::{
atomic::{AtomicU64, AtomicUsize},
Arc,
},
};
use uuid::Uuid;
pub trait Symbolic {
type Id: Borrow<Uuid>;
fn name(&self) -> Str;
fn id(&self) -> Self::Id;
fn is_valid(&self) -> bool;
}
#[derive(JsonSchema)]
#[schemars(transparent)]
pub struct AtomicDecimal(#[schemars(with = "Decimal")] AtomicU128);
impl Default for AtomicDecimal {
fn default() -> Self {
Self::new(dec!(999999999))
}
}
impl Serialize for AtomicDecimal {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
<Decimal as Serialize>::serialize(&self.load(), serializer)
}
}
impl<'de> Deserialize<'de> for AtomicDecimal {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::new(<Decimal as Deserialize>::deserialize(deserializer)?))
}
}
impl AtomicDecimal {
fn new(d: Decimal) -> Self {
Self(AtomicU128::new(u128::from_le_bytes(d.serialize())))
}
pub fn load(&self) -> Decimal {
let buf = self.0.load(MemOrdering::Relaxed);
Decimal::deserialize(buf.to_le_bytes())
}
fn store(&self, d: Decimal) {
let new = u128::from_le_bytes(d.serialize());
self.0.store(new, MemOrdering::Relaxed);
}
}
impl PartialEq for AtomicDecimal {
fn eq(&self, other: &Self) -> bool {
self.0.load(MemOrdering::Relaxed) == other.0.load(MemOrdering::Relaxed)
}
}
impl PartialEq<Decimal> for AtomicDecimal {
fn eq(&self, other: &Decimal) -> bool {
&self.load() == other
}
}
impl Eq for AtomicDecimal {}
impl Clone for AtomicDecimal {
fn clone(&self) -> Self {
Self(AtomicU128::new(self.0.load(MemOrdering::Relaxed)))
}
}
impl fmt::Debug for AtomicDecimal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.load())
}
}
pub struct AtomicBitFlags<T: BitFlag> {
bits: AtomicU64,
ph: PhantomData<T>,
}
impl<T> AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn new(t: BitFlags<T>) -> Self {
Self { bits: AtomicU64::new(t.bits()), ph: PhantomData }
}
pub fn load(&self) -> BitFlags<T> {
BitFlags::from_bits_truncate(self.bits.load(MemOrdering::Relaxed))
}
pub fn store(&self, t: BitFlags<T>) {
self.bits.store(t.bits(), MemOrdering::Relaxed)
}
}
impl<T> Serialize for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
<BitFlags<T> as Serialize>::serialize(&self.load(), serializer)
}
}
impl<'de, T> Deserialize<'de> for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
Ok(Self::new(<BitFlags<T> as Deserialize>::deserialize(deserializer)?))
}
}
impl<T> Clone for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn clone(&self) -> Self {
Self {
bits: AtomicU64::new(self.bits.load(MemOrdering::Relaxed)),
ph: PhantomData,
}
}
}
impl<T> PartialEq for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn eq(&self, other: &Self) -> bool {
self.bits.load(MemOrdering::Relaxed) == other.bits.load(MemOrdering::Relaxed)
}
}
impl<T> PartialEq<BitFlags<T>> for AtomicBitFlags<T>
where
T: BitFlag + RawBitFlags<Numeric = u64>,
{
fn eq(&self, other: &BitFlags<T>) -> bool {
&self.load() == other
}
}
impl<T> Eq for AtomicBitFlags<T> where T: BitFlag + RawBitFlags<Numeric = u64> {}
impl<T> fmt::Debug for AtomicBitFlags<T>
where
T: fmt::Debug + BitFlag + RawBitFlags<Numeric = u64>,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.load())
}
}
type PR<T> = std::result::Result<T, PackError>;
macro_rules! packed_by_id {
($name:ident) => {
impl Pack for $name {
fn const_encoded_len() -> Option<usize> {
<protocol::$name as Pack>::const_encoded_len()
}
fn encoded_len(&self) -> usize {
Pack::encoded_len(&self.id)
}
fn encode(&self, buf: &mut impl BufMut) -> PR<()> {
Pack::encode(&self.id, buf)
}
fn decode(buf: &mut impl Buf) -> PR<Self> {
match $name::get_by_id(&Pack::decode(buf)?) {
Some(t) => Ok(t),
None => Ok(paste! {*[<INVALID_ $name:snake:upper>]}),
}
}
}
};
}
macro_rules! hcstatic {
($lname:literal, $name:ident, $valid:ident) => { hcstatic!($lname, $name, $valid, 64,); };
($lname:literal,
$name:ident,
$valid:ident,
$sz:literal,
$($extra_vis:vis $extra_field:ident: $extra_typ:ty),*
) => {
paste! {
static [<$name:snake:upper _BY_NAME>]: Lazy<ArcSwap<Map<Str, $name>>> =
Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static [<$name:snake:upper _BY_ID>]: Lazy<ArcSwap<Map<protocol::$name, $name>>> =
Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static [<INVALID_ $name:snake:upper>]: Lazy<$name> = Lazy::new(|| {
let mut by_name = [<$name:snake:upper _BY_NAME>].load_full();
let mut by_id = [<$name:snake:upper _BY_ID>].load_full();
let t = $name::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
stringify!([<INVALID_ $name:snake:upper>]),
false,
$($extra_typ::default()),*
).unwrap();
[<$name:snake:upper _BY_NAME>].store(by_name);
[<$name:snake:upper _BY_ID>].store(by_id);
t
});
struct [<$name Root>] {
data: &'static mut [MaybeUninit<[<$name Inner>]>],
pos: usize
}
impl [<$name Root>] {
fn new_data() -> &'static mut [MaybeUninit<[<$name Inner>]>] {
let mut data = Vec::with_capacity($sz);
data.extend((0..$sz).into_iter().map(|_| MaybeUninit::uninit()));
data.leak()
}
fn new() -> Self {
Self {
data: Self::new_data(),
pos: 0
}
}
fn insert(&mut self, t: [<$name Inner>]) -> &'static [<$name Inner>] {
if self.pos >= self.data.len() {
self.pos = 0;
self.data = Self::new_data();
}
self.data[self.pos] = MaybeUninit::new(t);
let t = unsafe { &*self.data[self.pos].as_ptr() };
self.pos += 1;
t
}
}
unsafe impl Send for [<$name Root>] {}
unsafe impl Sync for [<$name Root>] {}
static [<ROOT_ $name:snake:upper>]: Lazy<Mutex<[<$name Root>]>> =
Lazy::new(|| Mutex::new([<$name Root>]::new()));
#[ctor::ctor]
fn [<force_invalid_ $name:snake>]() {
let _ = *[<INVALID_ $name:snake:upper>];
}
#[derive(Debug, Serialize, JsonSchema)]
#[schemars(rename = $lname)]
pub struct [<$name Inner>] {
pub name: Str,
pub id: protocol::$name,
$($extra_vis $extra_field: $extra_typ),*
}
#[derive(Debug, Clone, Copy)]
pub struct $name(&'static [<$name Inner>]);
impl $name {
fn new(
by_name: &mut Map<Str, $name>,
by_id: &mut Map<protocol::$name, $name>,
s: &str,
validate: bool,
$($extra_field: $extra_typ),*
) -> Result<Self> {
match by_name.get(s) {
Some(t) => Ok(*t),
None => {
if validate {
$valid(s, $(&$extra_field),*)?;
}
let name = Str::try_from(s)?;
let id = protocol::$name::from(name);
let inner = [<ROOT_ $name:snake:upper>].lock().insert(
[<$name Inner>] { name, id, $($extra_field),* }
);
let t = Self(inner);
if by_id.get(&t.id).is_some() {
bail!("hell hath frozen over, there is a sha1 conflict")
}
by_name.insert_cow(t.name, t);
by_id.insert_cow(t.id, t);
Ok(t)
}
}
}
#[allow(dead_code)]
fn remove(
self,
by_name: &mut Map<Str, $name>,
by_id: &mut Map<protocol::$name, $name>,
) {
by_name.remove_cow(&self.name);
by_id.remove_cow(&self.id);
}
pub fn get(s: &str) -> Option<Self> {
[<$name:snake:upper _BY_NAME>].load().get(s).copied()
}
fn get_or_insert_default(s: &str) -> Result<Self> {
match [<$name:snake:upper _BY_NAME>].load().get(s).copied() {
Some(t) => Ok(t),
None => {
let mut by_name = [<$name:snake:upper _BY_NAME>].load_full();
let mut by_id = [<$name:snake:upper _BY_ID>].load_full();
let t = $name::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
s,
true,
$($extra_typ::default()),*
)?;
[<$name:snake:upper _BY_NAME>].store(by_name);
[<$name:snake:upper _BY_ID>].store(by_id);
Ok(t)
}
}
}
pub fn get_by_id(id: &protocol::$name) -> Option<Self> {
[<$name:snake:upper _BY_ID>].load().get(id).copied()
}
pub fn all() -> Arc<Map<Str, $name>> {
[<$name:snake:upper _BY_NAME>].load_full()
}
pub fn all_by_id() -> Arc<Map<protocol::$name, $name>> {
[<$name:snake:upper _BY_ID>].load_full()
}
pub fn fuzzy(pat: &str, limit: usize) -> Pooled<Vec<(i64, $name)>> {
use fuzzy_matcher::{skim::{SkimMatcherV2, SkimScoreConfig}, FuzzyMatcher};
pool!(pool_matches, Vec<(i64, $name)>, 5, 100_000);
let matcher = SkimMatcherV2::default()
.ignore_case()
.score_config(SkimScoreConfig { bonus_consecutive: 15, ..Default::default() });
let mut res = pool_matches().take();
for (name, val) in &*$name::all() {
if val.is_valid() {
if let Some(score) = matcher.fuzzy_match(name, pat) {
res.push((score, *val));
}
}
}
res.sort_by_key(|(k, _)| -*k);
res.truncate(limit);
res
}
pub fn serialize_by_inner<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
<[<$name Inner>] as Serialize>::serialize(self.0, s)
}
pub fn serialize_by_name<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
s.serialize_str(&self.name)
}
pub fn deserialize_by_name<'de, D>(d: D) -> Result<$name, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str([<$name Visitor>](false))
}
#[allow(dead_code)]
pub(crate) fn deserialize_by_name_default<'de, D>(d: D) -> Result<$name, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str([<$name Visitor>](true))
}
}
packed_by_id!($name);
packed_value!($name);
impl Deref for $name {
type Target = [<$name Inner>];
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Symbolic for $name {
type Id = protocol::$name;
fn name(&self) -> Str {
self.0.name
}
fn id(&self) -> Self::Id {
self.0.id
}
fn is_valid(&self) -> bool {
*self != *[<INVALID_ $name:snake:upper>]
}
}
impl fmt::Display for $name {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> result::Result<(), fmt::Error> {
write!(f, "{}", self.name)
}
}
impl PartialEq for $name {
fn eq(&self, other: &Self) -> bool {
(self.0 as *const [<$name Inner>]) == (other.0 as *const [<$name Inner>])
}
}
impl Eq for $name {}
impl Hash for $name {
fn hash<H: Hasher>(&self, state: &mut H) {
(self.0 as *const [<$name Inner>]).hash(state)
}
}
impl PartialOrd for $name {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(self.0 as *const [<$name Inner>])
.partial_cmp(&(other.0 as *const [<$name Inner>]))
}
}
impl Ord for $name {
fn cmp(&self, other: &Self) -> Ordering {
(self.0 as *const [<$name Inner>])
.cmp(&(other.0 as *const [<$name Inner>]))
}
}
impl Serialize for $name {
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where S: Serializer,
{
Serialize::serialize(&self.id, serializer)
}
}
impl JsonSchema for $name {
fn schema_name() -> String {
protocol::$name::schema_name()
}
fn json_schema(gen: &mut SchemaGenerator) -> Schema {
protocol::$name::json_schema(gen)
}
}
struct [<$name Visitor>](bool);
impl<'de> Visitor<'de> for [<$name Visitor>] {
type Value = $name;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "expecting a string")
}
fn visit_str<E>(self, v: &str) -> result::Result<Self::Value, E>
where E: serde::de::Error
{
if self.0 {
$name::get_or_insert_default(v).map_err(|e| {
let e = format!("invalid {} err: {}", stringify!($name), e);
E::custom(e)
})
} else {
Ok($name::get(v).unwrap_or(*[<INVALID_ $name:snake:upper>]))
}
}
}
impl<'de> Deserialize<'de> for $name {
fn deserialize<D>(deserializer: D) -> Result<$name, D::Error>
where D: Deserializer<'de>
{
let id = protocol::$name::deserialize(deserializer)?;
Ok($name::get_by_id(&id).unwrap_or(*[<INVALID_ $name:snake:upper>]))
}
}
}
};
}
macro_rules! make_atomic {
($name_literal:literal, $name:ident) => {
paste! {
#[derive(Debug, JsonSchema)]
pub struct $name(
#[schemars(with = $name_literal)]
ArcSwap<[<$name Inner>]>
);
impl Serialize for $name{
fn serialize<S>(&self, serializer: S) -> result::Result<S::Ok, S::Error>
where
S: Serializer,
{
let inner = self.load();
inner.serialize(serializer)
}
}
impl<'de> Deserialize<'de> for $name {
fn deserialize<D>(deserializer: D) -> result::Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let inner = [<$name Inner>]::deserialize(deserializer)?;
Ok(Self(ArcSwap::from_pointee(inner)))
}
}
impl From<[<$name Inner>]> for $name {
fn from(value: [<$name Inner>]) -> Self {
Self(ArcSwap::from_pointee(value))
}
}
impl Deref for $name {
type Target = ArcSwap<[<$name Inner>]>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
}
};
}
fn must_be_uppercase_ascii_or_digit_or_dash(s: &str) -> Result<()> {
for c in s.chars() {
if !(c.is_ascii_uppercase() || c.is_ascii_digit() || c == '-') {
bail!("expected uppercase ascii or digit or dash")
}
}
Ok(())
}
fn allow_any(_s: &str) -> Result<()> {
Ok(())
}
hcstatic!("Venue", Venue, must_be_uppercase_ascii_or_digit_or_dash);
hcstatic!("Route", Route, must_be_uppercase_ascii_or_digit_or_dash);
hcstatic!("QuoteSymbol", QuoteSymbol, allow_any, 8124,);
hcstatic!("TradingSymbol", TradingSymbol, allow_any, 8124,);
#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
#[schemars(transparent)]
pub struct TokenInfo(
#[schemars(with = "FxHashMap<Venue, protocol::TokenInfo>")]
FxHashMap<Venue, protocol::TokenInfo>,
);
impl Deref for TokenInfo {
type Target = FxHashMap<Venue, protocol::TokenInfo>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl Into<FxHashMap<protocol::Venue, protocol::TokenInfo>> for &TokenInfo {
fn into(self) -> FxHashMap<protocol::Venue, protocol::TokenInfo> {
self.0.iter().map(|(k, v)| (k.id, v.clone())).collect()
}
}
impl Into<FxHashMap<protocol::Venue, protocol::TokenInfo>> for TokenInfo {
fn into(self) -> FxHashMap<protocol::Venue, protocol::TokenInfo> {
(&self).into()
}
}
#[derive(Debug, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "type", content = "value")]
pub enum ProductClassInner {
Coin { token_info: Box<TokenInfo> },
Fiat,
Equity,
Commodity,
Energy,
Metal,
Future { underlying: Product, multiplier: Decimal },
Option { underlying: Product, multiplier: Decimal },
}
impl ProductClassInner {
pub fn kind(&self) -> &'static str {
match self {
Self::Coin { .. } => "Coin",
Self::Fiat => "Fiat",
Self::Equity => "Equity",
Self::Commodity => "Commodity",
Self::Energy => "Energy",
Self::Metal => "Metal",
Self::Future { .. } => "Future",
Self::Option { .. } => "Option",
}
}
pub fn get_token_info(&self) -> Option<&TokenInfo> {
match self {
Self::Coin { token_info } => Some(token_info),
_ => None,
}
}
pub fn merge_eq(from: &ProductClassInner, to: &ProductClassInner) -> bool {
match (from, to) {
(Self::Coin { .. }, Self::Coin { .. })
| (Self::Equity, Self::Equity)
| (Self::Commodity, Self::Commodity)
| (Self::Energy, Self::Energy)
| (Self::Metal, Self::Metal)
| (Self::Fiat, Self::Fiat) => true,
(
Self::Future { underlying, multiplier },
Self::Future { underlying: bunderlying, multiplier: bmultiplier },
) => underlying == bunderlying && multiplier == bmultiplier,
(
Self::Option { underlying, multiplier },
Self::Option { underlying: bunderlying, multiplier: bmultiplier },
) => underlying == bunderlying && multiplier == bmultiplier,
_ => false,
}
}
}
impl Default for ProductClassInner {
fn default() -> Self {
Self::Fiat
}
}
impl Into<protocol::ProductClass> for &ProductClassInner {
fn into(self) -> protocol::ProductClass {
match self {
ProductClassInner::Coin { token_info } => {
protocol::ProductClass::Coin { token_info: (&**token_info).into() }
}
ProductClassInner::Fiat => protocol::ProductClass::Fiat,
ProductClassInner::Equity => protocol::ProductClass::Equity,
ProductClassInner::Commodity => protocol::ProductClass::Commodity,
ProductClassInner::Energy => protocol::ProductClass::Energy,
ProductClassInner::Metal => protocol::ProductClass::Metal,
ProductClassInner::Future { underlying, multiplier } => {
protocol::ProductClass::Future {
underlying: underlying.id,
multiplier: *multiplier,
}
}
ProductClassInner::Option { underlying, multiplier } => {
protocol::ProductClass::Option {
underlying: underlying.id,
multiplier: *multiplier,
}
}
}
}
}
impl Into<protocol::ProductClass> for ProductClassInner {
fn into(self) -> protocol::ProductClass {
(&self).into()
}
}
fn validate_product(s: &str, class: &ProductClass, _price: &AtomicDecimal) -> Result<()> {
use regex::Regex;
static PROD_PAT: &str = "^[- A-Za-z0-9.]+$";
static PROD: Lazy<Regex> = Lazy::new(|| Regex::new(PROD_PAT).unwrap());
static COIN_PAT: &str = "^[a-zA-Z0-9.]+ Crypto$";
static COIN: Lazy<Regex> = Lazy::new(|| Regex::new(COIN_PAT).unwrap());
static EQUITY_PAT: &str = "^[A-Z0-9]+ [A-Z]{2} Stock$";
static EQUITY: Lazy<Regex> = Lazy::new(|| Regex::new(EQUITY_PAT).unwrap());
static COMMODITY_PAT: &str = "^[A-Z0-9]+ Commodity$";
static COMMODITY: Lazy<Regex> = Lazy::new(|| Regex::new(COMMODITY_PAT).unwrap());
static ENERGY_PAT: &str = "^[A-Z0-9]+ Energy";
static ENERGY: Lazy<Regex> = Lazy::new(|| Regex::new(ENERGY_PAT).unwrap());
static METAL_PAT: &str = "^[A-Z0-9]+ Metal";
static METAL: Lazy<Regex> = Lazy::new(|| Regex::new(METAL_PAT).unwrap());
static FIAT_PAT: &str = "^[A-Z]{3}$";
static FIAT: Lazy<Regex> = Lazy::new(|| Regex::new(FIAT_PAT).unwrap());
static FUTURE_PAT: &str = "^[A-Z0-9]+([A-Z])([0-9]+) [A-Z]{2} Future$";
static FUTURE: Lazy<Regex> = Lazy::new(|| Regex::new(FUTURE_PAT).unwrap());
static OPTION_PAT: &str =
"^[A-Z0-9]+[A-Z][0-9]+ (P|C)-?[0-9]+(\\.[0-9]+)? [A-Z]{2} Option$";
static OPTION: Lazy<Regex> = Lazy::new(|| Regex::new(OPTION_PAT).unwrap());
if !PROD.is_match(s) {
bail!("\"{}\" product names must match '{}'", s, PROD_PAT)
}
match &**class.load() {
ProductClassInner::Coin { .. } => {
if !COIN.is_match(s) {
bail!("\"{}\" coin products must match {}", s, COIN_PAT)
}
}
ProductClassInner::Equity => {
if !EQUITY.is_match(s) {
bail!("\"{}\" equity products must match {}", s, EQUITY_PAT)
}
}
ProductClassInner::Commodity => {
if !COMMODITY.is_match(s) {
bail!("\"{}\" commodity products must match {}", s, COMMODITY_PAT)
}
}
ProductClassInner::Energy => {
if !ENERGY.is_match(s) {
bail!("\"{}\" energy products must match {}", s, ENERGY_PAT)
}
}
ProductClassInner::Metal => {
if !METAL.is_match(s) {
bail!("\"{}\" metal products must match {}", s, METAL_PAT)
}
}
ProductClassInner::Fiat => {
if !FIAT.is_match(s) {
bail!("\"{}\" fiat products must match {}", s, FIAT_PAT)
}
}
ProductClassInner::Future { underlying: _, multiplier: _ } => {
if !FUTURE.is_match(s) {
bail!("\"{}\" futures products must match {}", s, FUTURE_PAT)
}
}
ProductClassInner::Option { underlying: _, multiplier: _ } => {
if !OPTION.is_match(s) {
bail!("\"{}\" options products must match {}", s, OPTION_PAT)
}
}
}
Ok(())
}
#[test]
fn test_product_validation() {
validate_product(
"IBM US Stock",
&ProductClassInner::Equity.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
validate_product(
"BTC Crypto",
&ProductClassInner::Coin { token_info: Box::new(TokenInfo::default()) }.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
let usd = Product::get_or_insert_default("USD").unwrap();
validate_product(
"ESJ3 US Future",
&ProductClassInner::Future { underlying: usd, multiplier: dec!(1) }.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap();
validate_product(
"ESJ2 P3000.00 US Option",
&ProductClassInner::Option { underlying: usd, multiplier: dec!(1) }.into(),
&AtomicDecimal::new(dec!(0)),
)
.unwrap()
}
make_atomic!("ProductClassInner", ProductClass);
impl Default for ProductClass {
fn default() -> Self {
Self(ArcSwap::from_pointee(ProductClassInner::default()))
}
}
impl Into<protocol::ProductClass> for &ProductClass {
fn into(self) -> protocol::ProductClass {
(&**self.0.load()).into()
}
}
impl Into<protocol::ProductClass> for ProductClass {
fn into(self) -> protocol::ProductClass {
(&self).into()
}
}
fn parse_expiration(d: &str) -> Option<DateTime<Utc>> {
use atoi::atoi;
let month = d.chars().next().and_then(|m| match m {
'F' => Some(1),
'G' => Some(2),
'H' => Some(3),
'J' => Some(4),
'K' => Some(5),
'M' => Some(6),
'N' => Some(7),
'Q' => Some(8),
'U' => Some(9),
'V' => Some(10),
'X' => Some(11),
'Z' => Some(12),
_ => None,
})?;
let now = Utc::now();
let current_year = now.date_naive().year();
let decade = current_year - current_year % 10;
let century = current_year - current_year % 100;
let year = if d.len() == 2 || d.len() == 4 {
atoi::<i32>(&d.as_bytes()[1..2]).map(|y| decade + y)
} else if d.len() == 3 {
atoi::<i32>(&d.as_bytes()[1..]).map(|y| century + y)
} else {
None
}?;
let day = if d.len() == 4 { atoi::<u32>(&d.as_bytes()[2..]) } else { Some(1) }?;
let exp_date = NaiveDate::from_ymd_opt(decade + year, month, day)?;
Some(Utc.from_utc_datetime(&NaiveDateTime::new(exp_date, NaiveTime::MIN)))
}
hcstatic!(
"Product",
Product,
validate_product,
512,
pub class: ProductClass,
pub price_in_limit_dollars: AtomicDecimal
);
impl Product {
pub fn expiration(&self) -> Option<DateTime<Utc>> {
let class = self.0.class.load();
match &**class {
ProductClassInner::Option { .. } | ProductClassInner::Future { .. } => {
let name = self.0.name.as_str();
name.split_once(" ").and_then(|(sym, _)| {
let mut idx = None;
let mut done = false;
for (i, c) in sym.char_indices().rev() {
if done {
idx = Some(i);
break;
}
if !c.is_ascii_digit() {
done = true;
}
}
idx.and_then(|i| parse_expiration(sym.split_at(i).1))
})
}
ProductClassInner::Coin { token_info: _ }
| ProductClassInner::Commodity
| ProductClassInner::Energy
| ProductClassInner::Equity
| ProductClassInner::Fiat
| ProductClassInner::Metal => None,
}
}
pub fn option_dir(&self) -> Option<OptionDir> {
let class = self.0.class.load();
match &**class {
ProductClassInner::Option { .. } => {
let name = self.0.name.as_str();
name.split_once(" ")
.and_then(|(_, strike)| strike.chars().next())
.and_then(|c| match c {
'P' => Some(OptionDir::Put),
'C' => Some(OptionDir::Call),
_ => None,
})
}
ProductClassInner::Future { .. }
| ProductClassInner::Coin { token_info: _ }
| ProductClassInner::Commodity
| ProductClassInner::Energy
| ProductClassInner::Equity
| ProductClassInner::Fiat
| ProductClassInner::Metal => None,
}
}
pub fn option_strike(&self) -> Option<Decimal> {
let class = self.0.class.load();
match &**class {
ProductClassInner::Option { .. } => {
let name = self.0.name.as_str();
name.split_once(" ").and_then(|(_, strike)| {
strike.trim_start_matches(&['P', 'C']).parse::<Decimal>().ok()
})
}
ProductClassInner::Future { .. }
| ProductClassInner::Coin { token_info: _ }
| ProductClassInner::Commodity
| ProductClassInner::Energy
| ProductClassInner::Equity
| ProductClassInner::Fiat
| ProductClassInner::Metal => None,
}
}
pub fn underlying(&self) -> Option<Product> {
let class = self.0.class.load();
match &**class {
ProductClassInner::Option { underlying, .. }
| ProductClassInner::Future { underlying, .. } => Some(*underlying),
ProductClassInner::Coin { token_info: _ }
| ProductClassInner::Commodity
| ProductClassInner::Energy
| ProductClassInner::Equity
| ProductClassInner::Fiat
| ProductClassInner::Metal => None,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct TradeInfoInner {
pub trading_symbol: TradingSymbol,
pub quote_increment: Decimal,
pub base_increment: Decimal,
pub flags: BitFlags<TradeFlags>,
}
impl TradeInfoInner {
fn is_valid(&self) -> bool {
self.trading_symbol.is_valid()
}
}
type TradeInfoOptInner = Option<TradeInfoInner>;
make_atomic!("TradeInfoOptInner", TradeInfoOpt);
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
pub struct QuoteInfoInner {
pub primary: QuoteSymbol,
pub implied: Option<QuoteSymbol>,
pub l2: Option<QuoteSymbol>,
}
impl QuoteInfoInner {
fn is_valid(&self) -> bool {
self.primary.is_valid() && self.implied.map(|s| s.is_valid()).unwrap_or(true)
|| self.l2.map(|s| s.is_valid()).unwrap_or(true)
}
}
type QuoteInfoOptInner = Option<QuoteInfoInner>;
make_atomic!("QuoteInfoOptInner", QuoteInfoOpt);
#[derive(Debug, Clone, Copy)]
pub struct TradableProductSnap {
data: *const [MaybeUninit<TradableProductInner>],
pos: usize,
total: usize,
}
unsafe impl Send for TradableProductSnap {}
unsafe impl Sync for TradableProductSnap {}
struct TradableProductRoot {
data: &'static mut [MaybeUninit<TradableProductInner>],
pos: usize,
prev: Option<&'static TradableProductRoot>,
}
impl TradableProductRoot {
fn new_data() -> &'static mut [MaybeUninit<TradableProductInner>] {
const SZ: usize = 512;
let mut data = Vec::with_capacity(SZ);
data.extend((0..SZ).into_iter().map(|_| MaybeUninit::uninit()));
data.leak()
}
fn new() -> Self {
Self { data: Self::new_data(), pos: 0, prev: None }
}
fn insert(&mut self, t: TradableProductInner) -> &'static TradableProductInner {
if self.pos >= self.data.len() {
let prev = std::mem::replace(
self,
Self { data: Vec::new().leak(), pos: 0, prev: None },
);
*self = Self {
data: Self::new_data(),
pos: 0,
prev: Some(Box::leak(Box::new(prev))),
};
}
self.data[self.pos] = MaybeUninit::new(t);
let t = unsafe { &*self.data[self.pos].as_ptr() };
self.pos += 1;
t
}
fn snapshot<F: FnMut(TradableProduct)>(
&self,
previous: Option<TradableProductSnap>,
mut on_new: F,
) -> TradableProductSnap {
let mut cur = self;
let mut snaps = SmallVec::<[&Self; 128]>::new();
let mut pos;
match previous {
None => {
pos = 0;
loop {
snaps.push(cur);
match cur.prev {
None => break,
Some(p) => {
cur = p;
}
}
}
}
Some(prev) => {
pos = prev.pos;
loop {
snaps.push(cur);
if cur.data as *const _ == prev.data {
break;
} else {
match cur.prev {
None => break,
Some(p) => {
cur = p;
}
}
}
}
}
}
for cur in snaps {
for t in &cur.data[pos..cur.pos] {
on_new(unsafe { TradableProduct(&*t.as_ptr()) })
}
pos = 0;
}
TradableProductSnap {
data: self.data as *const _,
pos: self.pos,
total: TRADABLE_PRODUCT_COUNT.load(MemOrdering::Relaxed),
}
}
}
unsafe impl Send for TradableProductRoot {}
unsafe impl Sync for TradableProductRoot {}
static ROOT_TRADABLE_PRODUCT: Lazy<Mutex<TradableProductRoot>> =
Lazy::new(|| Mutex::new(TradableProductRoot::new()));
static TRADABLE_PRODUCT_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Serialize, JsonSchema)]
#[schemars(rename = "TradableProduct")]
pub struct TradableProductInner {
pub id: protocol::TradableProduct,
pub base: Product,
pub quote: Product,
pub venue: Venue,
pub route: Route,
pub quote_info: QuoteInfoOpt,
pub trade_info: TradeInfoOpt,
#[serde(skip)]
removed: AtomicBool,
}
static TRADABLE_PRODUCT: Lazy<
ArcSwap<Map<(Product, Product, Venue, Route), TradableProduct>>,
> = Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static TRADABLE_PRODUCT_BY_ID: Lazy<
ArcSwap<Map<protocol::TradableProduct, TradableProduct>>,
> = Lazy::new(|| ArcSwap::new(Arc::new(Map::new())));
static INVALID_TRADABLE_PRODUCT: Lazy<TradableProduct> = Lazy::new(|| {
let mut by_name = TRADABLE_PRODUCT.load_full();
let mut by_id = TRADABLE_PRODUCT_BY_ID.load_full();
let t = TradableProduct::new(
Arc::make_mut(&mut by_name),
Arc::make_mut(&mut by_id),
*INVALID_PRODUCT,
*INVALID_PRODUCT,
*INVALID_VENUE,
*INVALID_ROUTE,
None,
None,
false,
)
.unwrap();
TRADABLE_PRODUCT.store(by_name);
TRADABLE_PRODUCT_BY_ID.store(by_id);
t
});
#[ctor::ctor]
fn force_invalid_tradable_product() {
let _ = *INVALID_TRADABLE_PRODUCT;
}
thread_local! {
static TP_NAME: RefCell<String> = RefCell::new(String::new());
}
#[derive(Debug, Clone, Copy)]
pub struct TradableProduct(&'static TradableProductInner);
impl TradableProduct {
fn new(
by_name: &mut Map<(Product, Product, Venue, Route), TradableProduct>,
by_id: &mut Map<protocol::TradableProduct, TradableProduct>,
base: Product,
quote: Product,
venue: Venue,
route: Route,
quote_info: Option<QuoteInfoInner>,
trade_info: Option<TradeInfoInner>,
validate: bool,
) -> Result<Self> {
match by_name.get(&(base, quote, venue, route)) {
Some(i) => Ok(*i),
None => {
if validate {
if !base.is_valid() {
bail!("can't construct tradable product with invalid base")
}
if !quote.is_valid() {
bail!("can't construct tradable product with invalid quote")
}
if !venue.is_valid() {
bail!("can't construct tradable product with invalid venue")
}
if !route.is_valid() {
bail!("can't construct tradable product with invalid route")
}
if !quote_info.as_ref().map(|q| q.is_valid()).unwrap_or(true) {
bail!("can't construct tradable product with invalid quote_info")
}
if !trade_info.as_ref().map(|t| t.is_valid()).unwrap_or(true) {
bail!("can't construct tradable product with invalid trade_info")
}
}
let id = protocol::TradableProduct::from_parts(
&base.name,
"e.name,
&venue.name,
&route.name,
);
let inner = ROOT_TRADABLE_PRODUCT.lock().insert(TradableProductInner {
id,
base,
quote,
venue,
route,
quote_info: quote_info.into(),
trade_info: trade_info.into(),
removed: AtomicBool::new(false),
});
let t = TradableProduct(inner);
if by_id.get(&t.id).is_some() {
bail!("tradable sha1 hash collision")
}
by_name.insert_cow((base, quote, venue, route), t);
by_id.insert_cow(t.id, t);
TRADABLE_PRODUCT_COUNT.fetch_add(1, MemOrdering::Relaxed);
Ok(t)
}
}
}
pub fn snapshot<F: FnMut(TradableProduct)>(
prev: Option<TradableProductSnap>,
new: F,
) -> TradableProductSnap {
match prev {
None => ROOT_TRADABLE_PRODUCT.lock().snapshot(prev, new),
Some(p) => {
if p.total < TRADABLE_PRODUCT_COUNT.load(MemOrdering::Relaxed) {
ROOT_TRADABLE_PRODUCT.lock().snapshot(prev, new)
} else {
p
}
}
}
}
pub fn fmt_name<T: fmt::Write>(&self, dst: &mut T) -> fmt::Result {
use netidx::utils::escape;
write!(
dst,
"{}/{}*{}/{}",
escape(&self.base.name, '\\', &['*', '/']),
escape(&self.quote.name, '\\', &['*', '/']),
escape(&self.venue.name, '\\', &['*', '/']),
escape(&self.route.name, '\\', &['*', '/'])
)
}
pub fn is_removed(&self) -> bool {
self.removed.load(MemOrdering::Relaxed)
}
pub fn subscriber_paths(&self) -> (Path, Path) {
(
Path::from("by-id").append(&self.id.to_string()),
Path::from("by-name")
.append(&self.venue.name)
.append(&self.route.name)
.append(&self.base.name)
.append(&self.quote.name),
)
}
pub fn is_match(&self, q: &Query) -> bool {
match q {
Query::And(terms) => terms.iter().all(|q| self.is_match(q)),
Query::Or(terms) => terms.iter().any(|q| self.is_match(q)),
Query::Not(term) => !self.is_match(term),
Query::All => true,
Query::Base(p) => self.base == *p,
Query::Quote(p) => self.quote == *p,
Query::Route(r) => self.route == *r,
Query::Venue(v) => self.venue == *v,
Query::Underlying(p) => {
self.base.underlying() == Some(*p) || self.quote.underlying() == Some(*p)
}
Query::Expiration(dq) => match self.base.expiration() {
None => false,
Some(exp) => match dq {
DateQ::On(dt) => dt.naive_utc().date() == exp.naive_utc().date(),
DateQ::OnOrAfter(dt) => exp >= *dt,
DateQ::OnOrBefore(dt) => exp <= *dt,
DateQ::Between(st, en) => exp >= *st && exp <= *en,
},
},
}
}
pub fn tick_size(&self) -> Decimal {
(**self.trade_info.load())
.as_ref()
.map(|o| o.quote_increment)
.unwrap_or(Decimal::new(1, 1))
}
fn remove(
self,
by_name: &mut Map<(Product, Product, Venue, Route), TradableProduct>,
by_id: &mut Map<protocol::TradableProduct, TradableProduct>,
) {
by_name.remove_cow(&(self.base, self.quote, self.venue, self.route));
if let Some(i) = by_id.remove_cow(&self.id) {
i.removed.store(true, MemOrdering::Relaxed);
}
}
pub fn get(
base: Product,
quote: Product,
venue: Venue,
route: Route,
) -> Option<Self> {
TRADABLE_PRODUCT.load().get(&(base, quote, venue, route)).copied()
}
pub fn get_by_id(id: &protocol::TradableProduct) -> Option<Self> {
TRADABLE_PRODUCT_BY_ID.load().get(id).copied()
}
pub fn all() -> Arc<Map<(Product, Product, Venue, Route), TradableProduct>> {
TRADABLE_PRODUCT.load_full()
}
pub fn fuzzy(pat: &str, limit: usize) -> Pooled<Vec<(i64, Self)>> {
use fuzzy_matcher::{
skim::{SkimMatcherV2, SkimScoreConfig},
FuzzyMatcher,
};
pool!(pool_matches, Vec<(i64, TradableProduct)>, 5, 100_000);
let matcher =
SkimMatcherV2::default().ignore_case().score_config(SkimScoreConfig {
bonus_consecutive: 15,
..Default::default()
});
let mut res = pool_matches().take();
TP_NAME.with(|buf| {
let mut buf = buf.borrow_mut();
for (_, val) in &*Self::all() {
buf.clear();
val.fmt_name(&mut *buf).unwrap();
if let Some(score) = matcher.fuzzy_match(&*buf, pat) {
res.push((score, *val));
}
}
});
res.sort_by_key(|(k, _)| -*k);
res.truncate(limit);
res
}
pub fn serialize_by_inner<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
<TradableProductInner as Serialize>::serialize(self.0, s)
}
pub fn serialize_by_name<S>(&self, s: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
TP_NAME.with(|buf| {
let mut buf = buf.borrow_mut();
buf.clear();
self.fmt_name(&mut *buf).unwrap();
s.serialize_str(&*buf)
})
}
pub fn deserialize_by_name<'de, D>(d: D) -> Result<TradableProduct, D::Error>
where
D: serde::Deserializer<'de>,
{
d.deserialize_str(TradableProductVisitor)
}
}
packed_by_id!(TradableProduct);
packed_value!(TradableProduct);
impl Deref for TradableProduct {
type Target = TradableProductInner;
fn deref(&self) -> &Self::Target {
self.0
}
}
impl Symbolic for TradableProduct {
type Id = protocol::TradableProduct;
fn name(&self) -> Str {
TP_NAME.with(|buf| {
let mut buf = buf.borrow_mut();
buf.clear();
self.fmt_name(&mut *buf).unwrap();
while buf.as_bytes().len() > 255 {
buf.pop();
}
Str::try_from(buf.as_str()).unwrap()
})
}
fn id(&self) -> Self::Id {
self.0.id
}
fn is_valid(&self) -> bool {
*self != *INVALID_TRADABLE_PRODUCT
}
}
impl fmt::Display for TradableProduct {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.fmt_name(f)
}
}
impl FromStr for TradableProduct {
type Err = anyhow::Error;
fn from_str(v: &str) -> Result<Self, Self::Err> {
use netidx::utils::{split_escaped, unescape};
let mut i = split_escaped(v, '\\', '*');
let base_quote =
i.next().ok_or_else(|| anyhow!("tradable product missing base/quote"))?;
let venue_route =
i.next().ok_or_else(|| anyhow!("tradable product missing venu/route"))?;
let mut i = split_escaped(base_quote, '\\', '/');
let base = i.next().ok_or_else(|| anyhow!("tradable product missing base"))?;
let quote = i.next().ok_or_else(|| anyhow!("tradable product missing quote"))?;
let mut i = split_escaped(venue_route, '\\', '/');
let venue = i.next().ok_or_else(|| anyhow!("tradable product missing venue"))?;
let route = i.next().ok_or_else(|| anyhow!("tradable product missing route"))?;
let base = Product::get(&unescape(base, '\\')).unwrap_or(*INVALID_PRODUCT);
let quote = Product::get(&unescape(quote, '\\')).unwrap_or(*INVALID_PRODUCT);
let venue = Venue::get(&unescape(venue, '\\')).unwrap_or(*INVALID_VENUE);
let route = Route::get(&unescape(route, '\\')).unwrap_or(*INVALID_ROUTE);
Ok(TradableProduct::get(base, quote, venue, route)
.unwrap_or(*INVALID_TRADABLE_PRODUCT))
}
}
impl Hash for TradableProduct {
fn hash<H: Hasher>(&self, state: &mut H) {
(self.0 as *const TradableProductInner).hash(state)
}
}
impl PartialEq for TradableProduct {
fn eq(&self, other: &Self) -> bool {
(self.0 as *const _) == (other.0 as *const _)
}
}
impl Eq for TradableProduct {}
impl PartialOrd for TradableProduct {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
(self.0 as *const TradableProductInner)
.partial_cmp(&(other.0 as *const TradableProductInner))
}
}
impl Ord for TradableProduct {
fn cmp(&self, other: &Self) -> Ordering {
(self.0 as *const TradableProductInner)
.cmp(&(other.0 as *const TradableProductInner))
}
}
impl Serialize for TradableProduct {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
Serialize::serialize(&self.id, serializer)
}
}
impl JsonSchema for TradableProduct {
fn schema_name() -> String {
protocol::TradableProduct::schema_name()
}
fn json_schema(gen: &mut SchemaGenerator) -> Schema {
protocol::TradableProduct::json_schema(gen)
}
}
struct TradableProductVisitor;
impl<'de> Visitor<'de> for TradableProductVisitor {
type Value = TradableProduct;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "expected a string in the form {{base}}/{{quote}}*{{venue}}/{{route}}")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
v.parse::<TradableProduct>().map_err(|e| E::custom(e.to_string()))
}
}
impl<'de> Deserialize<'de> for TradableProduct {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
let id = protocol::TradableProduct::deserialize(deserializer)?;
Ok(TradableProduct::get_by_id(&id).unwrap_or(*INVALID_TRADABLE_PRODUCT))
}
}
static TXNACTIVE: AtomicBool = AtomicBool::new(false);
fn dump_product(
pset: &mut FxHashSet<Product>,
updates: &mut Vec<SymbologyUpdateKind>,
product: &Product,
) {
pset.insert(*product);
let class = match &**product.class.load() {
ProductClassInner::Fiat => protocol::ProductClass::Fiat,
ProductClassInner::Coin { token_info } => {
protocol::ProductClass::Coin { token_info: (&**token_info).into() }
}
ProductClassInner::Equity => protocol::ProductClass::Equity,
ProductClassInner::Commodity => protocol::ProductClass::Commodity,
ProductClassInner::Energy => protocol::ProductClass::Energy,
ProductClassInner::Metal => protocol::ProductClass::Metal,
ProductClassInner::Future { underlying, multiplier } => {
if !pset.contains(&underlying) {
dump_product(pset, updates, underlying)
}
protocol::ProductClass::Future {
underlying: underlying.id,
multiplier: *multiplier,
}
}
ProductClassInner::Option { underlying, multiplier } => {
if !pset.contains(&underlying) {
dump_product(pset, updates, underlying)
}
protocol::ProductClass::Option {
underlying: underlying.id,
multiplier: *multiplier,
}
}
};
updates.push(SymbologyUpdateKind::AddProduct {
name: product.name.as_chars(),
class,
price_in_limit_dollars: product.price_in_limit_dollars.load(),
})
}
pub fn dump() -> Pooled<Vec<SymbologyUpdateKind>> {
pool!(pool_pset, FxHashSet<Product>, 2, 1_000_000);
pool!(pool_update, Vec<SymbologyUpdateKind>, 2, 1_000_000);
let mut pset = pool_pset().take();
let mut updates = pool_update().take();
for (_, venue) in &*Venue::all() {
updates.push(SymbologyUpdateKind::AddVenue(venue.name.as_chars()));
}
for (_, route) in &*Route::all() {
updates.push(SymbologyUpdateKind::AddRoute(route.name.as_chars()));
}
for (_, trading) in &*TradingSymbol::all() {
updates.push(SymbologyUpdateKind::AddTradingSymbol(trading.name.as_chars()));
}
for (_, quote) in &*QuoteSymbol::all() {
updates.push(SymbologyUpdateKind::AddQuoteSymbol(quote.name.as_chars()));
}
for (_, product) in &*Product::all() {
dump_product(&mut pset, &mut updates, product)
}
for (_, tradable) in &*TradableProduct::all() {
let quote_info =
(**tradable.quote_info.load()).as_ref().map(|q| protocol::QuoteInfo {
primary: q.primary.id,
implied: q.implied.map(|i| i.id),
l2: q.l2.map(|l| l.id),
});
let trade_info =
(**tradable.trade_info.load()).as_ref().map(|t| protocol::TradeInfo {
trading_symbol: t.trading_symbol.id,
quote_increment: t.quote_increment,
base_increment: t.base_increment,
flags: t.flags,
});
updates.push(SymbologyUpdateKind::AddTradableProduct {
base: tradable.base.id,
quote: tradable.quote.id,
venue: tradable.venue.id,
route: tradable.route.id,
quote_info,
trade_info,
})
}
updates
}
pub fn dump_squashed() -> Result<(Bytes, SymbologyUpdateKind)> {
thread_local! {
static BUF: RefCell<BytesMut> = RefCell::new(BytesMut::new());
static COMP: RefCell<Result<zstd::bulk::Compressor<'static>>> = RefCell::new({
match zstd::bulk::Compressor::new(16) {
Err(e) => Err(e.into()),
Ok(mut comp) => match comp.multithread(num_cpus::get() as u32) {
Err(e) => Err(e.into()),
Ok(()) => Ok(comp)
}
}
});
}
BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.clear();
for up in dump().drain(..) {
Pack::encode(&up, &mut *buf)?;
}
let original = buf.split().freeze();
buf.resize(original.len(), 0u8);
let clen = COMP.with(|comp| {
let mut comp = comp.borrow_mut();
let comp = comp.as_mut().map_err(|e| anyhow!("{:?}", e))?;
let len = comp.compress_to_buffer(&original[..], &mut buf[..])?;
Ok::<_, anyhow::Error>(len)
})?;
if clen > u32::MAX as usize {
bail!("snapshot is too big")
}
buf.resize(clen, 0u8);
let compressed = buf.split().freeze();
let mut md5 = Md5::default();
md5.update(&*compressed);
let md5_hash = md5.finalize();
buf.extend_from_slice(&md5_hash);
let md5 = buf.split().freeze();
let up =
SymbologyUpdateKind::Snapshot { original_length: original.len(), compressed };
Ok((md5, up))
})
}
pub struct Txn {
venue_by_name: Arc<Map<Str, Venue>>,
venue_by_id: Arc<Map<protocol::Venue, Venue>>,
route_by_name: Arc<Map<Str, Route>>,
route_by_id: Arc<Map<protocol::Route, Route>>,
quote_symbol_by_name: Arc<Map<Str, QuoteSymbol>>,
quote_symbol_by_id: Arc<Map<protocol::QuoteSymbol, QuoteSymbol>>,
trading_symbol_by_name: Arc<Map<Str, TradingSymbol>>,
trading_symbol_by_id: Arc<Map<protocol::TradingSymbol, TradingSymbol>>,
product_by_name: Arc<Map<Str, Product>>,
product_by_id: Arc<Map<protocol::Product, Product>>,
tradable_product_by_name: Arc<Map<(Product, Product, Venue, Route), TradableProduct>>,
tradable_product_by_id: Arc<Map<protocol::TradableProduct, TradableProduct>>,
}
impl Drop for Txn {
fn drop(&mut self) {
TXNACTIVE.store(false, MemOrdering::Relaxed)
}
}
impl Txn {
pub fn commit(self) {
VENUE_BY_NAME.store(Arc::clone(&self.venue_by_name));
VENUE_BY_ID.store(Arc::clone(&self.venue_by_id));
ROUTE_BY_NAME.store(Arc::clone(&self.route_by_name));
ROUTE_BY_ID.store(Arc::clone(&self.route_by_id));
QUOTE_SYMBOL_BY_NAME.store(Arc::clone(&self.quote_symbol_by_name));
QUOTE_SYMBOL_BY_ID.store(Arc::clone(&self.quote_symbol_by_id));
TRADING_SYMBOL_BY_NAME.store(Arc::clone(&self.trading_symbol_by_name));
TRADING_SYMBOL_BY_ID.store(Arc::clone(&self.trading_symbol_by_id));
PRODUCT_BY_NAME.store(Arc::clone(&self.product_by_name));
PRODUCT_BY_ID.store(Arc::clone(&self.product_by_id));
TRADABLE_PRODUCT.store(Arc::clone(&self.tradable_product_by_name));
TRADABLE_PRODUCT_BY_ID.store(Arc::clone(&self.tradable_product_by_id));
}
pub fn new() -> Result<Self> {
loop {
let cur = TXNACTIVE.load(MemOrdering::Relaxed);
if cur {
bail!("only one transaction may be in progress at a time")
} else {
let res = TXNACTIVE.compare_exchange(
cur,
true,
MemOrdering::Relaxed,
MemOrdering::Relaxed,
);
if let Ok(_) = res {
break;
}
}
}
Ok(Self {
venue_by_name: VENUE_BY_NAME.load_full(),
venue_by_id: VENUE_BY_ID.load_full(),
route_by_name: ROUTE_BY_NAME.load_full(),
route_by_id: ROUTE_BY_ID.load_full(),
quote_symbol_by_name: QUOTE_SYMBOL_BY_NAME.load_full(),
quote_symbol_by_id: QUOTE_SYMBOL_BY_ID.load_full(),
trading_symbol_by_name: TRADING_SYMBOL_BY_NAME.load_full(),
trading_symbol_by_id: TRADING_SYMBOL_BY_ID.load_full(),
product_by_name: PRODUCT_BY_NAME.load_full(),
product_by_id: PRODUCT_BY_ID.load_full(),
tradable_product_by_name: TRADABLE_PRODUCT.load_full(),
tradable_product_by_id: TRADABLE_PRODUCT_BY_ID.load_full(),
})
}
pub fn add_tradable_product(
&mut self,
base: &protocol::Product,
quote: &protocol::Product,
venue: &protocol::Venue,
route: &protocol::Route,
quote_info: &Option<protocol::QuoteInfo>,
trade_info: &Option<protocol::TradeInfo>,
) -> Result<()> {
let base = self
.product_by_id
.get(base)
.copied()
.ok_or_else(|| anyhow!("missing base product"))?;
let quote = self
.product_by_id
.get(quote)
.copied()
.ok_or_else(|| anyhow!("missing quote product"))?;
let route = self
.route_by_id
.get(route)
.copied()
.ok_or_else(|| anyhow!("missing route"))?;
let venue = self
.venue_by_id
.get(venue)
.copied()
.ok_or_else(|| anyhow!("missing venue"))?;
let quote_info = quote_info
.map(|q| {
Ok::<_, anyhow::Error>(QuoteInfoInner {
primary: self
.quote_symbol_by_id
.get(&q.primary)
.copied()
.ok_or_else(|| anyhow!("missing primary quote symbol"))?,
implied: q
.implied
.map(|q| {
self.quote_symbol_by_id
.get(&q)
.copied()
.ok_or_else(|| anyhow!("missing implied quote symbol"))
})
.transpose()?,
l2: q
.l2
.map(|q| {
self.quote_symbol_by_id
.get(&q)
.copied()
.ok_or_else(|| anyhow!("missing l2 quote symbol"))
})
.transpose()?,
})
})
.transpose()?;
let trade_info = trade_info
.map(|t| {
Ok::<_, anyhow::Error>(TradeInfoInner {
trading_symbol: self
.trading_symbol_by_id
.get(&t.trading_symbol)
.copied()
.ok_or_else(|| anyhow!("unknown trading symbol"))?,
quote_increment: t.quote_increment,
base_increment: t.base_increment,
flags: t.flags,
})
})
.transpose()?;
match TradableProduct::get(base, quote, venue, route) {
Some(i) => {
i.trade_info.store(Arc::new(trade_info));
i.quote_info.store(Arc::new(quote_info));
}
None => {
TradableProduct::new(
Arc::make_mut(&mut self.tradable_product_by_name),
Arc::make_mut(&mut self.tradable_product_by_id),
base,
quote,
venue,
route,
quote_info,
trade_info,
true,
)?;
}
}
Ok(())
}
pub fn remove_tradable_product(
&mut self,
id: &protocol::TradableProduct,
) -> Result<()> {
let i = self
.tradable_product_by_id
.get(id)
.copied()
.ok_or_else(|| anyhow!("no such tradable"))?;
Ok(i.remove(
Arc::make_mut(&mut self.tradable_product_by_name),
Arc::make_mut(&mut self.tradable_product_by_id),
))
}
pub fn add_route(&mut self, route: &str) -> Result<Route> {
Route::new(
Arc::make_mut(&mut self.route_by_name),
Arc::make_mut(&mut self.route_by_id),
route,
true,
)
}
pub fn remove_route(&mut self, route: &protocol::Route) -> Result<()> {
let route = self
.route_by_id
.get(&*route)
.copied()
.ok_or_else(|| anyhow!("no such route"))?;
Ok(route.remove(
Arc::make_mut(&mut self.route_by_name),
Arc::make_mut(&mut self.route_by_id),
))
}
pub fn add_venue(&mut self, venue: &str) -> Result<Venue> {
Venue::new(
Arc::make_mut(&mut self.venue_by_name),
Arc::make_mut(&mut self.venue_by_id),
venue,
true,
)
}
pub fn remove_venue(&mut self, venue: &protocol::Venue) -> Result<()> {
let venue = self
.venue_by_id
.get(&*venue)
.copied()
.ok_or_else(|| anyhow!("no such venue"))?;
Ok(venue.remove(
Arc::make_mut(&mut self.venue_by_name),
Arc::make_mut(&mut self.venue_by_id),
))
}
pub fn add_product(
&mut self,
name: &str,
class: &protocol::ProductClass,
price_in_limit_dollars: Decimal,
) -> Result<Product> {
let map_token_info = |token_info: &FxHashMap<
protocol::Venue,
protocol::TokenInfo,
>|
-> Result<TokenInfo> {
let mut inner: FxHashMap<Venue, protocol::TokenInfo> =
FxHashMap::with_capacity_and_hasher(token_info.len(), Default::default());
for (venue, token_info) in token_info {
let venue = self
.venue_by_id
.get(venue)
.copied()
.ok_or_else(|| anyhow!("unknown venue in token_info"))?;
inner.insert(venue, token_info.clone());
}
Ok(TokenInfo(inner))
};
let class = match class {
protocol::ProductClass::Coin { token_info } => ProductClassInner::Coin {
token_info: Box::new(map_token_info(token_info)?),
},
protocol::ProductClass::Equity => ProductClassInner::Equity,
protocol::ProductClass::Commodity => ProductClassInner::Commodity,
protocol::ProductClass::Energy => ProductClassInner::Energy,
protocol::ProductClass::Metal => ProductClassInner::Metal,
protocol::ProductClass::Fiat => ProductClassInner::Fiat,
protocol::ProductClass::StableCoin { fiat: _, token_info } => {
ProductClassInner::Coin {
token_info: Box::new(map_token_info(token_info)?),
}
}
protocol::ProductClass::Future { underlying, multiplier } => {
let underlying = self
.product_by_id
.get(underlying)
.copied()
.ok_or_else(|| anyhow!("unknown future underlying"))?;
ProductClassInner::Future { underlying, multiplier: *multiplier }
}
protocol::ProductClass::Option { underlying, multiplier } => {
let underlying = self
.product_by_id
.get(underlying)
.copied()
.ok_or_else(|| anyhow!("unknown option underlying"))?;
ProductClassInner::Option { underlying, multiplier: *multiplier }
}
};
match self.product_by_name.get(&*name) {
Some(pr) => {
if pr.price_in_limit_dollars != price_in_limit_dollars {
pr.price_in_limit_dollars.store(price_in_limit_dollars)
}
if !ProductClassInner::merge_eq(&*pr.class.load(), &class) {
pr.class.store(Arc::new(class))
} else {
match (class, &**pr.class.load()) {
(
ProductClassInner::Coin { token_info },
ProductClassInner::Coin { token_info: current },
) => {
let mut merged = (*current).clone();
for (venue, ti) in token_info.iter() {
merged.insert(*venue, ti.clone());
}
pr.class.store(Arc::new(ProductClassInner::Coin {
token_info: Box::new(TokenInfo(merged)),
}));
}
(class, _) => pr.class.store(Arc::new(class)),
}
}
Ok(*pr)
}
None => Product::new(
Arc::make_mut(&mut self.product_by_name),
Arc::make_mut(&mut self.product_by_id),
name,
true,
class.into(),
AtomicDecimal::new(price_in_limit_dollars),
),
}
}
pub fn remove_product(&mut self, product: &protocol::Product) -> Result<()> {
let product = self
.product_by_id
.get(&*product)
.copied()
.ok_or_else(|| anyhow!("no such product"))?;
Ok(product.remove(
Arc::make_mut(&mut self.product_by_name),
Arc::make_mut(&mut self.product_by_id),
))
}
pub fn add_quote_symbol(&mut self, s: &str) -> Result<QuoteSymbol> {
QuoteSymbol::new(
Arc::make_mut(&mut self.quote_symbol_by_name),
Arc::make_mut(&mut self.quote_symbol_by_id),
s,
true,
)
}
pub fn remove_quote_symbol(&mut self, s: &protocol::QuoteSymbol) -> Result<()> {
if let Some(s) = QuoteSymbol::get_by_id(s) {
s.remove(
Arc::make_mut(&mut self.quote_symbol_by_name),
Arc::make_mut(&mut self.quote_symbol_by_id),
)
}
Ok(())
}
pub fn add_trading_symbol(&mut self, s: &str) -> Result<TradingSymbol> {
TradingSymbol::new(
Arc::make_mut(&mut self.trading_symbol_by_name),
Arc::make_mut(&mut self.trading_symbol_by_id),
s,
true,
)
}
pub fn remove_trading_symbol(&mut self, s: &protocol::TradingSymbol) -> Result<()> {
if let Some(s) = TradingSymbol::get_by_id(s) {
s.remove(
Arc::make_mut(&mut self.trading_symbol_by_name),
Arc::make_mut(&mut self.trading_symbol_by_id),
)
}
Ok(())
}
pub fn update(&mut self, up: &protocol::SymbologyUpdateKind) -> Result<()> {
match up {
SymbologyUpdateKind::AddProduct { name, class, price_in_limit_dollars } => {
self.add_product(&name, class, *price_in_limit_dollars).map(|_| ())
}
SymbologyUpdateKind::RemoveProduct(t) => self.remove_product(t),
SymbologyUpdateKind::AddRoute(r) => self.add_route(r).map(|_| ()),
SymbologyUpdateKind::RemoveRoute(r) => self.remove_route(r),
SymbologyUpdateKind::AddVenue(v) => self.add_venue(v).map(|_| ()),
SymbologyUpdateKind::RemoveVenue(v) => self.remove_venue(v),
SymbologyUpdateKind::AddTradableProduct {
base,
quote,
venue,
route,
quote_info,
trade_info,
} => self
.add_tradable_product(base, quote, venue, route, quote_info, trade_info),
SymbologyUpdateKind::RemoveTradableProduct(tradable) => {
self.remove_tradable_product(tradable)
}
SymbologyUpdateKind::SetPriceInLimitDollars(product, pld) => {
let product = self
.product_by_id
.get(&*product)
.copied()
.ok_or_else(|| anyhow!("unknown product"))?;
Ok(product.price_in_limit_dollars.store(*pld))
}
SymbologyUpdateKind::AddQuoteSymbol(s) => {
self.add_quote_symbol(s).map(|_| ())
}
SymbologyUpdateKind::RemoveQuoteSymbol(s) => self.remove_quote_symbol(s),
SymbologyUpdateKind::AddTradingSymbol(name) => {
self.add_trading_symbol(name).map(|_| ())
}
SymbologyUpdateKind::RemoveTradingSymbol(s) => self.remove_trading_symbol(s),
SymbologyUpdateKind::Flush(_) => Ok(()),
SymbologyUpdateKind::Snapshot { original_length, compressed } => {
thread_local! {
static BUF: RefCell<BytesMut> = RefCell::new(BytesMut::new());
}
pool!(pool_updates, Vec<SymbologyUpdateKind>, 10, 100_000);
let original_length = *original_length;
if original_length > compressed.len() << 6
|| original_length < compressed.len()
{
bail!("suspicious looking original length {}", original_length)
}
let mut updates = BUF.with(|buf| {
let mut buf = buf.borrow_mut();
buf.resize(original_length, 0u8);
let len =
zstd::bulk::decompress_to_buffer(&compressed[..], &mut buf[..])?;
if len != original_length {
bail!(
"unexpected decompressed length {} expected {}",
len,
original_length
)
}
let mut updates = pool_updates().take();
while buf.has_remaining() {
updates.push(Pack::decode(&mut *buf)?)
}
Ok::<_, anyhow::Error>(updates)
})?;
for up in updates.drain(..) {
self.update(&up)?
}
Ok(())
}
}
}
}