use std::{collections::HashSet, sync::Arc};
use alloy::primitives::Address;
use async_trait::async_trait;
use nautilus_common::{
clients::ExecutionClient,
messages::execution::{
BatchCancelOrders, CancelAllOrders, CancelOrder, GenerateFillReports,
GenerateOrderStatusReport, GenerateOrderStatusReports, GeneratePositionStatusReports,
ModifyOrder, QueryAccount, QueryOrder, SubmitOrder, SubmitOrderList,
},
};
use nautilus_core::UnixNanos;
use nautilus_live::ExecutionClientCore;
use nautilus_model::{
accounts::AccountAny,
defi::{
SharedChain, Token,
validation::validate_address,
wallet::{TokenBalance, WalletBalance},
},
enums::OmsType,
identifiers::{AccountId, ClientId, Venue},
reports::{ExecutionMassStatus, FillReport, OrderStatusReport, PositionStatusReport},
types::{AccountBalance, MarginBalance, Money},
};
use crate::{
cache::BlockchainCache, config::BlockchainExecutionClientConfig,
contracts::erc20::Erc20Contract, rpc::http::BlockchainHttpRpcClient,
};
#[derive(Debug)]
pub struct BlockchainExecutionClient {
core: ExecutionClientCore,
cache: BlockchainCache,
chain: SharedChain,
wallet_address: Address,
wallet_balance: WalletBalance,
erc20_contract: Erc20Contract,
http_rpc_client: Arc<BlockchainHttpRpcClient>,
}
impl BlockchainExecutionClient {
pub fn new(
core_client: ExecutionClientCore,
config: BlockchainExecutionClientConfig,
) -> anyhow::Result<Self> {
let chain = Arc::new(config.chain);
let cache = BlockchainCache::new(chain.clone());
let http_rpc_client = Arc::new(BlockchainHttpRpcClient::new(
config.http_rpc_url.clone(),
config.rpc_requests_per_second,
));
let wallet_address = validate_address(config.wallet_address.as_str())?;
let erc20_contract = Erc20Contract::new(http_rpc_client.clone(), true);
let mut token_universe = HashSet::new();
if let Some(specified_tokens) = config.tokens {
for token in specified_tokens {
let token_address = validate_address(token.as_str())?;
token_universe.insert(token_address);
}
}
let wallet_balance = WalletBalance::new(token_universe);
Ok(Self {
core: core_client,
wallet_balance,
chain,
cache,
erc20_contract,
http_rpc_client,
wallet_address,
})
}
async fn fetch_native_currency_balance(&self) -> anyhow::Result<Money> {
let balance_u256 = self
.http_rpc_client
.get_balance(&self.wallet_address, None)
.await?;
let native_currency = self.chain.native_currency();
let balance = Money::from_wei(balance_u256, native_currency);
Ok(balance)
}
async fn fetch_token_balance(
&mut self,
token_address: &Address,
) -> anyhow::Result<TokenBalance> {
let token = if let Some(token) = self.cache.get_token(token_address) {
token.to_owned()
} else {
let token_info = self.erc20_contract.fetch_token_info(token_address).await?;
let token = Token::new(
self.chain.clone(),
*token_address,
token_info.name,
token_info.symbol,
token_info.decimals,
);
self.cache.add_token(token.clone()).await?;
token
};
let amount = self
.erc20_contract
.balance_of(token_address, &self.wallet_address)
.await?;
let token_balance = TokenBalance::new(amount, token);
Ok(token_balance)
}
async fn refresh_wallet_balances(&mut self) -> anyhow::Result<()> {
let native_currency_balance = self.fetch_native_currency_balance().await?;
log::info!(
"Initializing wallet balance with native currency balance: {} {}",
native_currency_balance.as_decimal(),
native_currency_balance.currency
);
self.wallet_balance
.set_native_currency_balance(native_currency_balance);
if self.wallet_balance.is_token_universe_initialized() {
let tokens: Vec<Address> = self
.wallet_balance
.token_universe
.clone()
.into_iter()
.collect();
for token in tokens {
if let Ok(token_balance) = self.fetch_token_balance(&token).await {
log::info!("Adding token balance to the wallet: {token_balance}");
self.wallet_balance.add_token_balance(token_balance);
}
}
} else {
}
Ok(())
}
}
#[async_trait(?Send)]
impl ExecutionClient for BlockchainExecutionClient {
fn is_connected(&self) -> bool {
self.core.is_connected()
}
fn client_id(&self) -> ClientId {
self.core.client_id
}
fn account_id(&self) -> AccountId {
self.core.account_id
}
fn venue(&self) -> Venue {
self.core.venue
}
fn oms_type(&self) -> OmsType {
self.core.oms_type
}
fn get_account(&self) -> Option<AccountAny> {
todo!("implement get_account")
}
fn generate_account_state(
&self,
_balances: Vec<AccountBalance>,
_margins: Vec<MarginBalance>,
_reported: bool,
_ts_event: UnixNanos,
) -> anyhow::Result<()> {
todo!("implement generate_account_state")
}
fn start(&mut self) -> anyhow::Result<()> {
todo!("implement start")
}
fn stop(&mut self) -> anyhow::Result<()> {
todo!("implement stop")
}
fn submit_order(&self, _cmd: &SubmitOrder) -> anyhow::Result<()> {
todo!("implement submit_order")
}
fn submit_order_list(&self, _cmd: &SubmitOrderList) -> anyhow::Result<()> {
todo!("implement submit_order_list")
}
fn modify_order(&self, _cmd: &ModifyOrder) -> anyhow::Result<()> {
todo!("implement modify_order")
}
fn cancel_order(&self, _cmd: &CancelOrder) -> anyhow::Result<()> {
todo!("implement cancel_order")
}
fn cancel_all_orders(&self, _cmd: &CancelAllOrders) -> anyhow::Result<()> {
todo!("implement cancel_all_orders")
}
fn batch_cancel_orders(&self, _cmd: &BatchCancelOrders) -> anyhow::Result<()> {
todo!("implement batch_cancel_orders")
}
fn query_account(&self, _cmd: &QueryAccount) -> anyhow::Result<()> {
todo!("implement query_account")
}
fn query_order(&self, _cmd: &QueryOrder) -> anyhow::Result<()> {
todo!("implement query_order")
}
async fn connect(&mut self) -> anyhow::Result<()> {
if self.core.is_connected() {
log::warn!("Blockchain execution client already connected");
return Ok(());
}
log::info!(
"Connecting to blockchain execution client on chain {}",
self.chain.name
);
self.refresh_wallet_balances().await?;
self.core.set_connected();
log::info!(
"Blockchain execution client connected on chain {}",
self.chain.name
);
Ok(())
}
async fn disconnect(&mut self) -> anyhow::Result<()> {
self.core.set_disconnected();
Ok(())
}
async fn generate_order_status_report(
&self,
_cmd: &GenerateOrderStatusReport,
) -> anyhow::Result<Option<OrderStatusReport>> {
todo!("implement generate_order_status_report")
}
async fn generate_order_status_reports(
&self,
_cmd: &GenerateOrderStatusReports,
) -> anyhow::Result<Vec<OrderStatusReport>> {
todo!("implement generate_order_status_reports")
}
async fn generate_fill_reports(
&self,
_cmd: GenerateFillReports,
) -> anyhow::Result<Vec<FillReport>> {
todo!("implement generate_fill_reports")
}
async fn generate_position_status_reports(
&self,
_cmd: &GeneratePositionStatusReports,
) -> anyhow::Result<Vec<PositionStatusReport>> {
todo!("implement generate_position_status_reports")
}
async fn generate_mass_status(
&self,
_lookback_mins: Option<u64>,
) -> anyhow::Result<Option<ExecutionMassStatus>> {
todo!("implement generate_mass_status")
}
}