use std::{collections::HashMap, sync::Arc};
use color_eyre::eyre::Result;
use tokio::{
select,
sync::{mpsc, watch},
task::JoinSet,
};
use tracing::{debug, field::Empty, instrument, Span};
use uuid::Uuid;
use super::exchanges::Exchanges;
use crate::{
config::AppConfig,
exchange_apis::{
binance, order_types,
order_types::{ConceptualOrder, ConceptualOrderType, Order, ProtocolOrderId},
Market,
},
positions::HubToPosition,
protocols::{ProtocolFill, ProtocolFills},
PositionOrderId,
};
#[derive(Clone, Debug, Default, derive_new::new)]
pub struct ExchangeToHub {
pub key: Uuid,
pub market: Market,
pub fill_qty: f64,
pub order: Order<PositionOrderId>,
}
#[derive(Clone, Debug, Default, derive_new::new)]
pub struct HubToExchange {
pub key: Uuid,
pub orders: Vec<Order<PositionOrderId>>,
}
#[instrument(skip_all)]
pub fn init_hub(config_arc: Arc<AppConfig>, parent_js: &mut JoinSet<Result<()>>, exchanges: Arc<Exchanges>) -> mpsc::Sender<PositionToHub> {
let (tx, rx) = mpsc::channel(32);
parent_js.spawn(hub(config_arc.clone(), rx, exchanges));
tx
}
#[derive(Clone, Debug, derive_new::new)]
pub struct PositionToHub {
key: Uuid,
orders: Vec<ConceptualOrder<ProtocolOrderId>>,
position_callback: HubToPosition,
}
#[derive(Clone, Debug, derive_new::new)]
struct PositionLocalKnowledge {
pub key: Uuid,
pub callback: mpsc::Sender<ProtocolFills>,
pub requested_orders: Vec<ConceptualOrder<ProtocolOrderId>>,
}
#[derive(Clone, Debug, Default, derive_new::new)]
struct ExchangeLocalKnowledge {
pub key: Uuid,
pub target_orders: Vec<Order<PositionOrderId>>,
}
#[instrument(skip_all)]
pub async fn hub(config_arc: Arc<AppConfig>, mut rx: mpsc::Receiver<PositionToHub>, exchanges: Arc<Exchanges>) -> Result<()> {
let (fills_tx, mut fills_rx) = mpsc::channel::<ExchangeToHub>(32);
let (orders_tx, orders_rx) = watch::channel::<HubToExchange>(HubToExchange::default());
let mut js = JoinSet::new();
let exchanges_clone = exchanges.clone();
let config_arc_clone = config_arc.clone();
js.spawn(async move {
let mut exchange_runtimes_js = JoinSet::new();
binance::binance_runtime(config_arc_clone, &mut exchange_runtimes_js, fills_tx, orders_rx, exchanges_clone.binance.clone()).await;
unreachable!();
});
let mut positions_local_knowledge: HashMap<Uuid, PositionLocalKnowledge> = HashMap::new();
let mut exchanges_local_knowledge: HashMap<Market, ExchangeLocalKnowledge> = HashMap::new();
loop {
select! {
Some(update_from_position) = rx.recv() => {
handle_update_from_position(update_from_position, &mut positions_local_knowledge, &orders_tx, &mut exchanges_local_knowledge)?;
},
Some(fill) = fills_rx.recv() => {
let exchange_local_knowledge = exchanges_local_knowledge.entry(fill.market).or_default();
exchange_local_knowledge.key = fill.key;
let position_local_knowledge = positions_local_knowledge.get_mut(&fill.order.id.position_id).expect("Can't receive a fill without a position first requesting those orders");
handle_fill(fill, position_local_knowledge).await?;
},
else => break,
}
}
js.join_all().await;
Ok(())
}
#[instrument(skip(orders_tx, positions_local_knowledge), fields(position_local_knowledge = Empty))]
fn handle_update_from_position(
hub_rx: PositionToHub,
positions_local_knowledge: &mut HashMap<Uuid, PositionLocalKnowledge>,
orders_tx: &tokio::sync::watch::Sender<HubToExchange>,
exchanges_local_knowledge: &mut HashMap<Market, ExchangeLocalKnowledge>,
) -> Result<()> {
let position_id = hub_rx.position_callback.position_id;
let position_local_knowledge = positions_local_knowledge
.entry(position_id)
.or_insert(PositionLocalKnowledge::new(Uuid::default(), hub_rx.position_callback.sender, Vec::new()));
Span::current().record("position_local_knowledge", format!("{:?}", position_local_knowledge));
if position_local_knowledge.key != hub_rx.key {
debug!("Key mismatch, ignoring the request.");
return Ok(());
}
position_local_knowledge.requested_orders = hub_rx.orders;
let mut requested_orders_all_positions: Vec<ConceptualOrder<PositionOrderId>> = Vec::new();
for (position_id, plk) in positions_local_knowledge.iter() {
let remap_to_position_id = plk.requested_orders.iter().map(|o| {
let new_id = PositionOrderId::new_from_protocol_id(*position_id, o.id.clone());
ConceptualOrder { id: new_id, ..o.clone() }
});
requested_orders_all_positions.extend(remap_to_position_id);
}
let target_orders = hub_process_orders(requested_orders_all_positions);
debug!(?target_orders);
let binance_futures_orders = target_orders
.iter()
.filter(|o| o.symbol.market == Market::BinanceFutures)
.cloned()
.collect::<Vec<Order<PositionOrderId>>>();
let exchange_local_knowledge = exchanges_local_knowledge.entry(Market::BinanceFutures).or_default();
let passforward = HubToExchange::new(exchange_local_knowledge.key, binance_futures_orders);
orders_tx.send(passforward)?;
Ok(())
}
#[instrument]
async fn handle_fill(fill: ExchangeToHub, position_local_knowledge: &mut PositionLocalKnowledge) -> Result<()> {
position_local_knowledge.key = fill.key;
let vec_fill = vec![ProtocolFill::new(fill.order.id.into(), fill.fill_qty)];
position_local_knowledge.callback.send(ProtocolFills::new(position_local_knowledge.key, vec_fill)).await?;
debug!("Sent fills to position");
Ok(())
}
#[instrument]
fn hub_process_orders(conceptual_orders: Vec<ConceptualOrder<PositionOrderId>>) -> Vec<Order<PositionOrderId>> {
let mut orders: Vec<Order<PositionOrderId>> = Vec::new();
for o in conceptual_orders {
match &o.order_type {
ConceptualOrderType::Market(_) => {
let order = Order::new(o.id, order_types::OrderType::Market, o.symbol.clone(), o.side, o.qty_notional);
orders.push(order);
}
ConceptualOrderType::StopMarket(stop_market) => {
let order = Order::new(
o.id,
order_types::OrderType::StopMarket(order_types::StopMarketOrder::new(stop_market.price)),
o.symbol.clone(),
o.side,
o.qty_notional,
);
orders.push(order);
}
_ => panic!("Unsupported order type"),
}
}
orders
}
mod tests {
#![allow(unused_imports)] use order_types::{ConceptualMarket, ConceptualStopMarket};
use v_utils::trades::Side;
use super::*;
use crate::exchange_apis::Symbol;
#[test]
fn test_hub_process() {
let from_orders = vec![
ConceptualOrder {
id: PositionOrderId::new(Uuid::parse_str("058a3b5d-7ce0-465c-9339-b43261e99b19").unwrap(), "ts:p0.02".to_string(), 0),
order_type: ConceptualOrderType::Market(ConceptualMarket::default()),
symbol: Symbol::new("BTC".to_string(), "USDT".to_string(), Market::BinanceFutures),
side: Side::Buy,
qty_notional: 100.0,
},
ConceptualOrder {
id: PositionOrderId::new(Uuid::parse_str("86acfda1-ef53-4bae-9f20-bbad6cbc8504").unwrap(), "ts:p0.02".to_string(), 1),
order_type: ConceptualOrderType::StopMarket(ConceptualStopMarket::default()),
symbol: Symbol::new("BTC".to_string(), "USDT".to_string(), Market::BinanceFutures),
side: Side::Buy,
qty_notional: 100.0,
},
];
let converted = hub_process_orders(from_orders);
insta::assert_json_snapshot!(converted, @r###"
[
{
"id": {
"position_id": "058a3b5d-7ce0-465c-9339-b43261e99b19",
"protocol_id": "ts:p0.02",
"ordinal": 0
},
"order_type": "Market",
"symbol": {
"base": "BTC",
"quote": "USDT",
"market": "BinanceFutures"
},
"side": "Buy",
"qty_notional": 100.0
},
{
"id": {
"position_id": "86acfda1-ef53-4bae-9f20-bbad6cbc8504",
"protocol_id": "ts:p0.02",
"ordinal": 1
},
"order_type": {
"StopMarket": {
"price": 0.0
}
},
"symbol": {
"base": "BTC",
"quote": "USDT",
"market": "BinanceFutures"
},
"side": "Buy",
"qty_notional": 100.0
}
]
"###);
}
}