use crate::{
config::Common,
pool,
protocol::{
alerts::Alert,
limits::{LimitScope, LimitSet},
orderflow::{
AberrantFill, Fill, FillId, FromOms, NormalCptyReject, NormalFill, OmsReject,
Order, OrderId, ToOms,
},
Account, Dir,
},
symbology::{Product, Route, TradableProduct, Venue},
};
use anyhow::{anyhow, bail, Result};
use futures::{channel::mpsc as fmpsc, StreamExt};
use fxhash::FxHashMap;
use log::debug;
use netidx::{
pool::Pooled,
subscriber::{Event, FromValue, SubId, UpdatesFlags, Value},
};
use netidx_archive::logfile::Seek;
use netidx_core::path::Path;
use rust_decimal::Decimal;
use std::{collections::HashMap, ops::Deref, result, sync::Arc};
use tokio::{
sync::{broadcast, RwLock},
task::JoinHandle,
};
pub mod alerts;
pub mod limits;
pub mod managed_order;
pub mod managed_product;
pub mod orderflow;
pub mod secrets;
pub mod simple_orderflow;
pub mod symbology;
pub mod symbology_loader;
pub struct ClientInner {
common: Common,
symbology: symbology_loader::Client,
orderflow: orderflow::Client,
simple_orderflow: simple_orderflow::OmsQueryApi,
limits: limits::OmsLimitsApi,
secrets: secrets::SecretsQueryApi,
alerts_tx: broadcast::Sender<Pooled<Vec<Alert>>>,
_alerts_rx: broadcast::Receiver<Pooled<Vec<Alert>>>,
default_account: Account,
managed_orders: RwLock<FxHashMap<OrderId, Arc<managed_order::ManagedOrder>>>,
}
#[derive(Clone)]
pub struct Client(Arc<ClientInner>);
impl Deref for Client {
type Target = ClientInner;
fn deref(&self) -> &Self::Target {
&*self.0
}
}
impl Client {
pub async fn new(common: &Common) -> Result<(Self, JoinHandle<Result<()>>)> {
debug!("client loading symbology");
let symbology = symbology_loader::Client::new(common).await?;
debug!("waiting for symbology...");
symbology.wait_caught_up().await?;
debug!("client connecting simple orderflow");
let simple_orderflow = simple_orderflow::OmsQueryApi::new(common).await?;
let chanid = simple_orderflow.get_channel_id().await?;
debug!("client connecting orderflow");
let orderflow = orderflow::Client::new(common, Some(chanid)).await?;
debug!("client connecting limits api");
let limits = limits::OmsLimitsApi::new(common).await?;
debug!("client connecting secrets api");
let secrets = secrets::SecretsQueryApi::new(common).await?;
let (alerts_tx, alerts_rx) = broadcast::channel::<Pooled<Vec<Alert>>>(10000);
debug!("client started");
let t = Self(Arc::new(ClientInner {
common: common.clone(),
symbology,
orderflow,
simple_orderflow,
limits,
secrets,
alerts_tx,
_alerts_rx: alerts_rx,
default_account: Account::get("DEFAULT"),
managed_orders: RwLock::new(HashMap::default()),
}));
let closed = t.run();
Ok((t, closed))
}
pub async fn initialize(
common: Option<&Common>,
) -> Result<(Self, tokio::task::JoinHandle<Result<()>>)> {
let common = match common {
Some(common) => common.clone(),
None => Common::load_default().await?,
};
Self::new(&common).await
}
pub fn common(&self) -> &Common {
&self.common
}
pub fn symbology(&self) -> &symbology_loader::Client {
&self.symbology
}
pub fn orderflow(&self) -> &orderflow::Client {
&self.orderflow
}
pub fn simple_orderflow(&self) -> &simple_orderflow::OmsQueryApi {
&self.simple_orderflow
}
pub fn limits(&self) -> &limits::OmsLimitsApi {
&self.limits
}
pub fn secrets(&self) -> &secrets::SecretsQueryApi {
&self.secrets
}
pub fn alerts(&self) -> broadcast::Receiver<Pooled<Vec<Alert>>> {
self.alerts_tx.subscribe()
}
pub fn subscribe_to_alerts(&self, path: Path, seek: Seek) {
pool!(alerts_pool, Vec<Alert>, 10000, 1000);
let common = self.common.clone();
let tx = self.alerts_tx.clone();
tokio::spawn(async move {
let api = alerts::AlertsApi::new(&common, path).await?;
let (inner_tx, mut rx) = fmpsc::channel::<Pooled<Vec<(SubId, Event)>>>(10000);
api.last_alert().updates(UpdatesFlags::BEGIN_WITH_LAST, inner_tx);
api.start_session(&seek).await?;
loop {
if let Some(updates) = rx.next().await {
let mut alerts = alerts_pool().take();
for (_, event) in updates.iter() {
match event {
Event::Unsubscribed => (),
Event::Update(Value::Null) => (),
Event::Update(value) => {
if let Ok(alert) = Alert::from_value(value.clone()) {
alerts.push(alert);
}
}
}
}
if !alerts.is_empty() {
tx.send(alerts).unwrap();
}
}
}
#[allow(unreachable_code)]
Ok::<(), anyhow::Error>(())
});
}
pub fn default_account(&self) -> Account {
self.default_account
}
pub fn managed_product(
&self,
base: &str,
quote: &str,
venue: Venue,
route: Route,
) -> Result<managed_product::ManagedProduct> {
let base = Product::get(base).ok_or(anyhow!("no such product: {}", base))?;
let quote = Product::get(quote).ok_or(anyhow!("no such product: {}", quote))?;
let tradable_product = TradableProduct::get(base, quote, venue, route)
.ok_or(anyhow!(
"no tradable product found for ({}, {}, {}, {}): is symbology up, and the cpty loaded?",
base, quote, venue, route
))?;
Ok(managed_product::ManagedProduct::new(&self.common, tradable_product))
}
pub async fn managed_order(
&self,
order_id: OrderId,
) -> Arc<managed_order::ManagedOrder> {
let mo = managed_order::ManagedOrder::new(order_id);
let mo = Arc::new(mo);
let mut managed_orders = self.managed_orders.write().await;
managed_orders.insert(order_id, mo.clone());
mo
}
pub async fn send_limit_order(
&self,
product: TradableProduct,
account: Account,
dir: Dir,
price: Decimal,
quantity: Decimal,
) -> Result<Arc<managed_order::ManagedOrder>> {
let oid = self.orderflow.orderid();
self.orderflow.send_one(&ToOms::Order(Order {
id: oid,
timestamp: None,
target: product,
account,
dir,
price,
quantity,
}))?;
let mo = self.managed_order(oid).await;
Ok(mo)
}
pub async fn list_open_orders(
&self,
set: LimitSet,
scope: LimitScope,
) -> Result<Pooled<Vec<Order>>> {
let oids = self.simple_orderflow.list_open(set, scope).await?;
let orders = self.simple_orderflow.get_order_details(&oids).await?;
Ok(orders)
}
pub async fn list_fills(
&self,
set: LimitSet,
scope: LimitScope,
) -> Result<Pooled<Vec<result::Result<NormalFill, AberrantFill>>>> {
let oids_and_fillids = self.simple_orderflow.list_fills(set, scope).await?;
let fillids: Vec<FillId> =
oids_and_fillids.iter().map(|(_, fillid)| *fillid).collect();
let fills =
self.simple_orderflow.get_fill_details(&Pooled::orphan(fillids)).await?;
Ok(fills)
}
fn run(&self) -> tokio::task::JoinHandle<Result<()>> {
let t = self.clone();
tokio::task::spawn(async move {
while let Ok(msg) = t.orderflow.recv_one().await {
debug!("from orderflow: {:?}", msg);
match msg {
FromOms::OrderAck(Ok(oid))
| FromOms::Fill(Fill::Normal(Ok(NormalFill { id: oid, .. })))
| FromOms::Fill(Fill::Correction(Ok(NormalFill {
id: oid, ..
})))
| FromOms::Fill(Fill::Reversal(Ok(NormalFill { id: oid, .. })))
| FromOms::Out(Ok(oid))
| FromOms::OmsReject(OmsReject { id: oid, .. })
| FromOms::CptyReject(Ok(NormalCptyReject { id: oid, .. }))
| FromOms::NotOut(oid) => {
let mos = t.managed_orders.read().await;
if let Some(mo) = mos.get(&oid) {
let res = t
.simple_orderflow
.get_order_state(&Pooled::orphan(vec![oid]))
.await?;
let state = res
.first()
.ok_or(anyhow!("expected exactly one item in vec"))?;
mo.set_order_state(*state);
}
}
other => bail!("aberrant response: {:?}", other),
}
}
Ok(())
})
}
}
pub mod utils {
use crate::protocol::orderflow::{AberrantFill, NormalFill};
use crate::protocol::Dir;
use anyhow::{anyhow, Result};
use rust_decimal::Decimal;
use std::result;
pub fn is_more_agg_than(price: Decimal, than: Decimal, dir: Dir) -> bool {
match dir {
Dir::Buy => price > than,
Dir::Sell => price < than,
}
}
pub fn round_to_nearest_towards_zero(x: Decimal, increment: Decimal) -> Decimal {
if x.is_sign_positive() || x.is_zero() {
(x / increment).floor() * increment
} else {
(x / increment).ceil() * increment
}
}
pub fn all_normal(
fills: &Vec<result::Result<NormalFill, AberrantFill>>,
) -> Result<Vec<&NormalFill>> {
let mut ret = Vec::new();
for result in fills {
match result {
Ok(fill) => ret.push(fill),
Err(_) => return Err(anyhow!("there are aberrant fills")),
}
}
Ok(ret)
}
#[cfg(test)]
mod test {
use super::*;
use rust_decimal_macros::dec;
#[test]
fn it_rounds_toward_zero() {
assert_eq!(round_to_nearest_towards_zero(dec!(0), dec!(0.25)), dec!(0));
assert_eq!(round_to_nearest_towards_zero(dec!(1.33), dec!(0.25)), dec!(1.25));
assert_eq!(
round_to_nearest_towards_zero(dec!(-1.33), dec!(0.25)),
dec!(-1.25)
);
}
}
}
pub mod venues {
use crate::symbology::Venue;
use once_cell::sync::Lazy;
pub static COINBASE: Lazy<Venue> = Lazy::new(|| Venue::get("COINBASE").unwrap());
pub static UNISWAPV3F100: Lazy<Venue> =
Lazy::new(|| Venue::get("UNISWAPV3F100").unwrap());
pub static UNISWAPV3F500: Lazy<Venue> =
Lazy::new(|| Venue::get("UNISWAPV3F500").unwrap());
pub static B2C2: Lazy<Venue> = Lazy::new(|| Venue::get("B2C2").unwrap());
}
pub mod routes {
use crate::symbology::Route;
use once_cell::sync::Lazy;
pub static DIRECT: Lazy<Route> = Lazy::new(|| Route::get("DIRECT").unwrap());
}