mod support;
use anyhow::{anyhow as err, Error, Result};
use chrono::{TimeDelta, Utc};
use dydx::config::ClientConfig;
use dydx::indexer::{
CandleResolution, ClientId, Feed, GetCandlesOpts, IndexerClient, ListPerpetualMarketsOpts,
PerpetualMarket, Price, Quantity, Subaccount, SubaccountsMessage, Ticker, TradesMessage,
};
use dydx::node::{
Account, NodeClient, OrderBuilder, OrderId, OrderSide, Wallet,
SHORT_TERM_ORDER_MAXIMUM_LIFETIME,
};
use std::fmt;
use support::constants::TEST_MNEMONIC;
use support::order_book::LiveOrderBook;
use tokio::{
select,
sync::mpsc,
time::{sleep, Duration},
};
pub struct Parameters {
ticker: Ticker,
position_size: Quantity,
shorter_span: TimeDelta,
longer_span: TimeDelta,
}
pub struct Variables {
position: Quantity,
shorter_channel: Channel,
longer_channel: Channel,
state: State,
}
enum State {
Waiting,
InTrend(OrderSide),
}
pub struct Channel {
high: Price,
low: Price,
}
impl fmt::Display for Channel {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[{}, {}]", self.low, self.high)?;
Ok(())
}
}
pub struct TrendFollower {
client: NodeClient,
indexer: IndexerClient,
account: Account,
subaccount: Subaccount,
market: PerpetualMarket,
generator: OrderBuilder,
trades_feed: Feed<TradesMessage>,
subaccounts_feed: Feed<SubaccountsMessage>,
order_book: LiveOrderBook,
channel_rx: mpsc::UnboundedReceiver<Channel>,
parameters: Parameters,
variables: Variables,
}
impl TrendFollower {
pub async fn connect() -> Result<Self> {
support::crypto::init_crypto_provider();
let config = ClientConfig::from_file("client/tests/testnet.toml").await?;
let mut client = NodeClient::connect(config.node).await?;
let mut indexer = IndexerClient::new(config.indexer.clone());
let wallet = Wallet::from_mnemonic(TEST_MNEMONIC)?;
let mut account = wallet.account(0, &mut client).await?;
let subaccount = account.subaccount(0)?;
let ticker = Ticker::from("ETH-USD");
let market = indexer
.markets()
.get_perpetual_markets(Some(ListPerpetualMarketsOpts {
ticker: Some(ticker.clone()),
limit: None,
}))
.await?
.remove(&ticker)
.ok_or_else(|| err!("{ticker} not found in markets query response"))?;
let generator = OrderBuilder::new(market.clone(), subaccount.clone());
client
.close_position(
&mut account,
subaccount.clone(),
market.clone(),
None,
ClientId::random(),
)
.await?;
let trades_feed = indexer.feed().trades(&ticker, false).await?;
let orders_feed = indexer.feed().orders(&ticker, false).await?;
let subaccounts_feed = indexer
.feed()
.subaccounts(subaccount.clone(), false)
.await?;
let order_book = LiveOrderBook::new(orders_feed);
let position_size: Quantity = "0.001".parse()?;
let shorter_span = TimeDelta::minutes(10);
let longer_span = TimeDelta::minutes(30);
let shorter_channel = calculate_channel(&indexer, &ticker, shorter_span).await?;
let longer_channel = calculate_channel(&indexer, &ticker, longer_span).await?;
tracing::info!("Watching channel: {longer_channel}");
let (tx, channel_rx) = mpsc::unbounded_channel();
tokio::spawn(Self::channel_fetcher(
tx,
IndexerClient::new(config.indexer),
ticker.clone(),
shorter_span,
));
let parameters = Parameters {
ticker,
position_size,
shorter_span,
longer_span,
};
let variables = Variables {
position: 0.into(),
shorter_channel,
longer_channel,
state: State::Waiting,
};
Ok(Self {
client,
indexer,
account,
subaccount,
market,
generator,
trades_feed,
subaccounts_feed,
order_book,
channel_rx,
parameters,
variables,
})
}
async fn entrypoint(mut self) {
loop {
if let Err(err) = self.step().await {
tracing::error!("Bot update failed: {err}");
}
}
}
async fn step(&mut self) -> Result<()> {
select! {
msg = self.trades_feed.recv() => {
if let Some(msg) = msg {
self.handle_trades_message(msg).await?;
}
}
msg = self.subaccounts_feed.recv() => {
if let Some(msg) = msg {
self.handle_subaccounts_message(msg).await?;
}
}
channel = self.channel_rx.recv() => {
if let Some(channel) = channel {
self.variables.shorter_channel = channel;
}
}
_ = self.order_book.changed() => {
self.handle_order_book().await?;
}
}
Ok(())
}
async fn handle_trades_message(&mut self, msg: TradesMessage) -> Result<()> {
match msg {
TradesMessage::Initial(_upd) => {}
TradesMessage::Update(_upd) => {}
}
Ok(())
}
async fn handle_subaccounts_message(&mut self, msg: SubaccountsMessage) -> Result<()> {
match msg {
SubaccountsMessage::Initial(upd) => {
let positions = upd.contents.subaccount.open_perpetual_positions;
if let Some(position) = positions.get(&self.parameters.ticker) {
self.variables.position = position.size.clone();
tracing::info!("Position: {}", self.variables.position);
}
}
SubaccountsMessage::Update(upd) => {
if let Some(ref positions) = upd
.contents
.first()
.ok_or_else(|| err!("Subaccount message does not have data!"))?
.perpetual_positions
{
let size = positions
.iter()
.find(|p| (p.market == self.parameters.ticker))
.map(|p| p.size.clone());
if let Some(size) = size {
self.variables.position = size;
tracing::info!("Position: {}", self.variables.position);
}
}
}
}
Ok(())
}
async fn handle_order_book(&mut self) -> Result<()> {
let spread = self
.order_book
.borrow()
.spread()
.map(|spread| (spread.bid.price.clone(), spread.ask.price.clone()));
if let Some((bid, ask)) = spread {
let price = Price((bid.0 + ask.0) / 2);
match self.variables.state {
State::Waiting => {
if price > self.variables.longer_channel.high {
tracing::info!("Channel broken at {price}. Placing buy order.");
self.place_limit_order(OrderSide::Buy, price).await?;
self.variables.state = State::InTrend(OrderSide::Buy);
self.variables.shorter_channel =
self.get_channel(self.parameters.shorter_span).await?;
tracing::info!("In-trend channel: {}", self.variables.shorter_channel);
} else if price < self.variables.longer_channel.low {
tracing::info!("Channel broken at {price}. Placing sell order.");
self.place_limit_order(OrderSide::Sell, price).await?;
self.variables.state = State::InTrend(OrderSide::Sell);
self.variables.shorter_channel =
self.get_channel(self.parameters.shorter_span).await?;
tracing::info!("In-trend channel: {}", self.variables.shorter_channel);
}
}
State::InTrend(side) => {
let break_price = match side {
OrderSide::Buy => {
if price < self.variables.shorter_channel.low {
Some(price)
} else {
None
}
}
OrderSide::Sell => {
if price > self.variables.shorter_channel.high {
Some(price)
} else {
None
}
}
_ => None,
};
if let Some(price) = break_price {
tracing::info!(
"Leaving trend at {price}, channel: {}. Closing position.",
self.variables.shorter_channel
);
self.close_position().await?;
self.variables.state = State::Waiting;
self.variables.longer_channel =
self.get_channel(self.parameters.longer_span).await?;
tracing::info!("Watching channel {}.", self.variables.longer_channel);
}
}
}
}
Ok(())
}
async fn place_limit_order(&mut self, side: OrderSide, price: Price) -> Result<OrderId> {
let current_block = self.client.latest_block_height().await?;
let (id, order) = self
.generator
.clone()
.limit(side, price, self.parameters.position_size.clone())
.until(current_block.ahead(SHORT_TERM_ORDER_MAXIMUM_LIFETIME))
.build(ClientId::random())?;
let hash = self.client.place_order(&mut self.account, order).await?;
tracing::info!("Placing {side:?} order: {hash} (ID: {})", id.client_id);
Ok(id)
}
async fn _cancel_order(&mut self, id: OrderId) -> Result<()> {
let current_block = self.client.latest_block_height().await?;
let until = current_block.ahead(10);
let c_id = id.client_id;
let hash = self
.client
.cancel_order(&mut self.account, id, until)
.await?;
tracing::info!("Cancelling order: {hash} (ID: {c_id})");
Ok(())
}
async fn close_position(&mut self) -> Result<()> {
self.client
.close_position(
&mut self.account,
self.subaccount.clone(),
self.market.clone(),
None,
ClientId::random(),
)
.await
.map(|_| ())
.map_err(|e| err!("Failed closing position: {e}"))
}
async fn get_channel(&self, span: TimeDelta) -> Result<Channel> {
calculate_channel(&self.indexer, &self.parameters.ticker, span).await
}
async fn channel_fetcher(
tx: mpsc::UnboundedSender<Channel>,
indexer: IndexerClient,
ticker: Ticker,
span: TimeDelta,
) -> Result<Channel> {
loop {
sleep(Duration::from_secs(30)).await;
let result = calculate_channel(&indexer, &ticker, span).await?;
tx.send(result)?;
}
}
}
async fn calculate_channel(
indexer: &IndexerClient,
ticker: &Ticker,
span: TimeDelta,
) -> Result<Channel> {
let now = Utc::now();
let opts = GetCandlesOpts {
from_iso: Some(now - span),
to_iso: Some(now),
limit: None,
};
let candles = indexer
.markets()
.get_perpetual_market_candles(ticker, CandleResolution::M1, Some(opts))
.await?;
if candles.is_empty() {
return Err(err!("Candles response is empty"));
}
let high = candles
.iter()
.max_by_key(|c| c.high.clone())
.unwrap()
.high
.clone();
let low = candles
.iter()
.min_by_key(|c| c.low.clone())
.unwrap()
.low
.clone();
Ok(Channel { low, high })
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt().try_init().map_err(Error::msg)?;
#[cfg(feature = "telemetry")]
support::telemetry::metrics_dashboard().await?;
let follower = TrendFollower::connect().await?;
follower.entrypoint().await;
Ok(())
}