use crate::{
client::symbology as client,
config::Common,
protocol::symbology::{
FlushId, Product, ProductClass, QuoteInfo, QuoteSymbol, Route, SymbologyUpdate,
SymbologyUpdateKind, TradableProduct, TradeInfo, TradingSymbol, Venue,
},
symbology::{self as db, Txn},
};
use anyhow::{anyhow, bail, Result};
use futures::{channel::mpsc, prelude::*};
use fxhash::{FxHashMap, FxHashSet};
use log::{error, warn};
use netidx::{chars::Chars, subscriber::Value};
use netidx_protocols::{call_rpc, rpc::client::Proc};
use parking_lot::Mutex;
use rust_decimal::Decimal;
use rust_decimal_macros::dec;
use std::{collections::HashMap, sync::Arc};
use tokio::{sync::oneshot, task};
fn check_result(v: Value) -> Result<()> {
match v {
Value::Ok => Ok(()),
Value::Error(e) => Err(anyhow!(e)),
_ => bail!("unexpected response"),
}
}
type Flushes = Arc<Mutex<FxHashMap<FlushId, oneshot::Sender<()>>>>;
pub static FIATS: [(&'static str, Decimal); 8] = [
("AED", dec!(0.25)),
("AUD", dec!(0.75)),
("CAD", dec!(0.75)),
("CHF", dec!(1)),
("EUR", dec!(1)),
("GBP", dec!(1.5)),
("JPY", dec!(0.01)),
("USD", dec!(1)),
];
pub static STABLE: [(&'static str, &'static str, Decimal); 2] =
[("USDC Crypto", "USD", dec!(1)), ("USDT Crypto", "USD", dec!(1))];
pub struct Client {
flushes: Flushes,
client: client::Client,
flush: Proc,
set_price_in_limit_dollars: Proc,
add_product: Proc,
remove_product: Proc,
add_tradable_product: Proc,
remove_tradable_product: Proc,
add_route: Proc,
remove_route: Proc,
add_venue: Proc,
remove_venue: Proc,
add_quote_symbol: Proc,
remove_quote_symbol: Proc,
add_trading_symbol: Proc,
remove_trading_symbol: Proc,
}
impl Client {
async fn update_loader_task(
flushes: Flushes,
mut updates: mpsc::UnboundedReceiver<SymbologyUpdate>,
) -> Result<()> {
let mut to_flush = Vec::new();
while let Some(mut update) = updates.next().await {
let mut txn = Txn::new()?;
loop {
if let SymbologyUpdateKind::Flush(id) = &update.kind {
if let Some(sender) = flushes.lock().remove(&id) {
to_flush.push(sender);
}
} else {
if let Err(e) = txn.update(&update.kind) {
warn!("could not apply symbology update {}", e)
}
}
match updates.try_next() {
Ok(Some(up)) => update = up,
Ok(None) | Err(_) => break,
}
}
txn.commit();
for sender in to_flush.drain(..) {
let _ = sender.send(());
}
}
Ok(())
}
pub async fn new(common: &Common) -> Result<Self> {
let api_base = common.paths.sym_api();
let subscriber = &common.subscriber;
let flush = Proc::new(subscriber, api_base.append("flush")).await?;
let set_price_in_limit_dollars =
Proc::new(subscriber, api_base.append("set-price-in-limit-dollars")).await?;
let add_product = Proc::new(subscriber, api_base.append("add-product")).await?;
let remove_product =
Proc::new(subscriber, api_base.append("remove-product")).await?;
let add_tradable_product =
Proc::new(subscriber, api_base.append("add-tradable-product")).await?;
let remove_tradable_product =
Proc::new(subscriber, api_base.append("remove-tradable-product")).await?;
let add_route = Proc::new(subscriber, api_base.append("add-route")).await?;
let remove_route = Proc::new(subscriber, api_base.append("remove-route")).await?;
let add_venue = Proc::new(subscriber, api_base.append("add-venue")).await?;
let remove_venue = Proc::new(subscriber, api_base.append("remove-venue")).await?;
let add_quote_symbol =
Proc::new(subscriber, api_base.append("add-quote-symbol")).await?;
let remove_quote_symbol =
Proc::new(subscriber, api_base.append("remove-quote-symbol")).await?;
let add_trading_symbol =
Proc::new(subscriber, api_base.append("add-trading-symbol")).await?;
let remove_trading_symbol =
Proc::new(subscriber, api_base.append("remove-trading-symbol")).await?;
let (tx, rx) = mpsc::unbounded();
let client = client::Client::start(common, None, Some(tx));
let flushes = Arc::new(Mutex::new(HashMap::default()));
task::spawn({
let flushes = flushes.clone();
async move {
if let Err(e) = Self::update_loader_task(flushes, rx).await {
error!("update loader task died on error {}", e)
}
}
});
Ok(Self {
flushes,
client,
flush,
set_price_in_limit_dollars,
add_product,
remove_product,
add_tradable_product,
remove_tradable_product,
add_route,
remove_route,
add_venue,
remove_venue,
add_quote_symbol,
remove_quote_symbol,
add_trading_symbol,
remove_trading_symbol,
})
}
pub async fn wait_caught_up(&self) -> Result<()> {
self.client.wait_caught_up().await;
self.flush().await
}
pub async fn flush(&self) -> Result<()> {
let id = FlushId::new();
let (tx, rx) = oneshot::channel();
self.flushes.lock().insert(id, tx);
check_result(call_rpc!(&self.flush, id: id).await?)?;
let _ = rx.await;
Ok(())
}
pub async fn set_price_in_limit_dollars(
&self,
product: Product,
price: Decimal,
) -> Result<()> {
check_result(
call_rpc!(&self.set_price_in_limit_dollars, product: product, price: price)
.await?,
)
}
pub async fn add_product(
&self,
name: Chars,
class: ProductClass,
price_in_limit_dollars: Decimal,
) -> Result<()> {
check_result(
call_rpc!(
&self.add_product,
name: name,
class: class,
price_in_limit_dollars: price_in_limit_dollars
)
.await?,
)
}
pub async fn remove_product(&self, product: Product) -> Result<()> {
check_result(call_rpc!(&self.remove_product, product: product).await?)
}
pub async fn add_product_and_flush(
&self,
name: Chars,
class: &db::ProductClass,
price_in_limit_dollars: Option<Decimal>,
) -> Result<db::Product> {
self.add_product(
name.clone(),
class.into(),
price_in_limit_dollars.unwrap_or(dec!(9999999999999)),
)
.await?;
self.flush().await?;
db::Product::get(&name).ok_or_else(|| anyhow!("error adding product {name}"))
}
pub async fn add_tradable_product(
&self,
base: Product,
quote: Product,
venue: Venue,
route: Route,
quote_info: Option<QuoteInfo>,
trade_info: Option<TradeInfo>,
) -> Result<()> {
check_result(
call_rpc!(
&self.add_tradable_product,
base: base,
quote: quote,
venue: venue,
route: route,
quote_info: quote_info,
trade_info: trade_info
)
.await?,
)
}
pub async fn remove_tradable_product(&self, tradable: TradableProduct) -> Result<()> {
check_result(call_rpc!(&self.remove_tradable_product, tradable: tradable).await?)
}
pub async fn add_route(&self, route: Chars) -> Result<()> {
check_result(call_rpc!(&self.add_route, route: route).await?)
}
pub async fn add_route_and_flush(&self, route: Chars) -> Result<db::Route> {
self.add_route(route.clone()).await?;
self.flush().await?;
db::Route::get(&route)
.ok_or_else(|| anyhow!("route {} not found after adding", route))
}
pub async fn remove_route(&self, route: Route) -> Result<()> {
check_result(call_rpc!(&self.remove_route, route: route).await?)
}
pub async fn add_venue(&self, venue: Chars) -> Result<()> {
check_result(call_rpc!(&self.add_venue, venue: venue).await?)
}
pub async fn add_venue_and_flush(&self, venue: Chars) -> Result<db::Venue> {
self.add_venue(venue.clone()).await?;
self.flush().await?;
db::Venue::get(&venue)
.ok_or_else(|| anyhow!("venue {} not found after adding", venue))
}
pub async fn remove_venue(&self, venue: Venue) -> Result<()> {
check_result(call_rpc!(&self.remove_venue, venue: venue).await?)
}
pub async fn add_quote_symbol(&self, symbol: Chars) -> Result<()> {
check_result(call_rpc!(&self.add_quote_symbol, symbol: symbol).await?)
}
pub async fn add_quote_symbol_and_flush(
&self,
symbol: Chars,
) -> Result<db::QuoteSymbol> {
self.add_quote_symbol(symbol.clone()).await?;
self.flush().await?;
db::QuoteSymbol::get(&symbol)
.ok_or_else(|| anyhow!("quote_symbol {} not found after adding", symbol))
}
pub async fn remove_quote_symbol(&self, symbol: QuoteSymbol) -> Result<()> {
check_result(call_rpc!(&self.remove_quote_symbol, symbol: symbol).await?)
}
pub async fn add_trading_symbol(&self, symbol: Chars) -> Result<()> {
check_result(call_rpc!(&self.add_trading_symbol, symbol: symbol).await?)
}
pub async fn add_trading_symbol_and_flush(
&self,
symbol: Chars,
) -> Result<db::TradingSymbol> {
self.add_trading_symbol(symbol.clone()).await?;
self.flush().await?;
db::TradingSymbol::get(&symbol)
.ok_or_else(|| anyhow!("trading_symbol {} not found after adding", symbol))
}
pub async fn remove_trading_symbol(&self, symbol: TradingSymbol) -> Result<()> {
check_result(call_rpc!(&self.remove_trading_symbol, symbol: symbol).await?)
}
pub async fn add_fiats(&self, mut fiats: FxHashSet<&str>) -> Result<()> {
for (c, p) in &FIATS {
if db::Product::get(c).is_none() {
self.add_product(Chars::from(*c), ProductClass::Fiat, *p).await?;
}
fiats.remove(c);
}
self.flush().await?;
for (c, u, p) in &STABLE {
if db::Product::get(c).is_none() {
let class = ProductClass::StableCoin {
fiat: db::Product::get(u).map(|t| t.id).ok_or_else(|| {
anyhow!("missing underlying currency for stablecoin")
})?,
token_info: Default::default(),
};
self.add_product(Chars::from(*c), class, *p).await?;
}
fiats.remove(c);
}
self.flush().await?;
if fiats.len() > 0 {
bail!("unknown fiats {:?}", fiats)
}
Ok(())
}
pub async fn add_venue_and_direct_route(
&self,
venue: &'static str,
) -> Result<(db::Route, db::Venue)> {
if db::Route::get("DIRECT").is_none() {
self.add_route(Chars::from("DIRECT")).await?;
}
if db::Venue::get(venue).is_none() {
self.add_venue(Chars::from(venue)).await?;
}
self.flush().await?;
let route =
db::Route::get("DIRECT").ok_or_else(|| anyhow!("failed to get route"))?;
let venue =
db::Venue::get(venue).ok_or_else(|| anyhow!("failed to get venue"))?;
Ok((route, venue))
}
}