use async_trait::async_trait;
use reqwest::Client;
use serde_json::{json, Value};
use crate::core::types::*;
use crate::core::traits::*;
use super::endpoints::{
proto_id, FutuEndpoints, FutuOrderType, FutuTrdSide, ModifyOrderOp,
TrdEnv, TrdMarket, SecMarket, format_symbol, infer_sec_market,
};
use super::auth::FutuAuth;
use super::parser::FutuParser;
pub struct FutuConnector {
_client: Client,
auth: FutuAuth,
endpoints: FutuEndpoints,
trd_env: TrdEnv,
acc_id: u64,
trd_market: TrdMarket,
}
impl FutuConnector {
pub fn new(auth: FutuAuth) -> Self {
Self {
_client: Client::new(),
endpoints: FutuEndpoints::default(),
trd_env: TrdEnv::Real,
acc_id: 0,
trd_market: TrdMarket::Us,
auth,
}
}
pub fn from_env() -> Self {
Self::new(FutuAuth::from_env())
}
pub fn with_env(mut self, env: TrdEnv) -> Self {
self.trd_env = env;
self
}
pub fn with_market(mut self, market: TrdMarket) -> Self {
self.trd_market = market;
self
}
pub fn with_acc_id(mut self, acc_id: u64) -> Self {
self.acc_id = acc_id;
self
}
async fn proto_call(
&self,
proto_id: u32,
request: Value,
) -> ExchangeResult<Value> {
Err(ExchangeError::UnsupportedOperation(format!(
"Futu OpenD TCP+Protobuf transport not connected. \
OpenD address: {}. Proto ID: {}, request: {}",
self.endpoints.address(),
proto_id,
request
)))
}
fn trd_header(&self) -> Value {
json!({
"trdEnv": self.trd_env.as_i32(),
"accID": self.acc_id,
"trdMarket": self.trd_market.as_i32(),
})
}
fn format_sym(&self, symbol: &Symbol) -> String {
let sec_market = infer_sec_market(symbol);
format_symbol(symbol, sec_market)
}
fn map_side(side: OrderSide) -> FutuTrdSide {
match side {
OrderSide::Buy => FutuTrdSide::Buy,
OrderSide::Sell => FutuTrdSide::Sell,
}
}
}
impl ExchangeIdentity for FutuConnector {
fn exchange_name(&self) -> &'static str {
"futu"
}
fn exchange_id(&self) -> ExchangeId {
ExchangeId::Futu
}
fn is_testnet(&self) -> bool {
matches!(self.trd_env, TrdEnv::Simulate)
}
fn supported_account_types(&self) -> Vec<AccountType> {
vec![AccountType::Spot] }
}
#[async_trait]
impl MarketData for FutuConnector {
async fn get_price(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Price> {
let code = self.format_sym(&symbol);
let request = json!({
"securityList": [{"market": 1, "code": code}]
});
let response = self.proto_call(proto_id::QOT_GET_SECURITY_SNAPSHOT, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_price(s2c)
}
async fn get_ticker(
&self,
symbol: Symbol,
_account_type: AccountType,
) -> ExchangeResult<Ticker> {
let code = self.format_sym(&symbol);
let request = json!({
"securityList": [{"market": 1, "code": code}]
});
let response = self.proto_call(proto_id::QOT_GET_SECURITY_SNAPSHOT, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_ticker(s2c, &symbol.base)
}
async fn get_orderbook(
&self,
symbol: Symbol,
depth: Option<u16>,
_account_type: AccountType,
) -> ExchangeResult<OrderBook> {
let code = self.format_sym(&symbol);
let request = json!({
"security": {"market": 1, "code": code},
"num": depth.unwrap_or(10),
});
let response = self.proto_call(proto_id::QOT_GET_ORDER_BOOK, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_orderbook(s2c)
}
async fn get_klines(
&self,
symbol: Symbol,
interval: &str,
limit: Option<u16>,
_account_type: AccountType,
_end_time: Option<i64>,
) -> ExchangeResult<Vec<Kline>> {
let code = self.format_sym(&symbol);
let kl_type = match interval {
"1m" | "1min" => 1,
"5m" | "5min" => 2,
"15m" | "15min" => 3,
"30m" | "30min" => 4,
"1h" | "60m" | "60min" => 5,
"1d" | "1day" | "D" => 6,
"1w" | "1week" | "W" => 7,
"1M" | "1mon" => 8,
_ => 6, };
let max_count = limit.unwrap_or(200) as i32;
let request = json!({
"security": {"market": 1, "code": code},
"klType": kl_type,
"reqNum": max_count,
});
let response = self.proto_call(proto_id::QOT_REQUEST_HISTORY_KL, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_klines(s2c)
}
async fn ping(&self) -> ExchangeResult<()> {
let request = json!({ "time": 0u64 });
self.proto_call(proto_id::KEEP_ALIVE, request).await?;
Ok(())
}
}
#[async_trait]
impl Trading for FutuConnector {
async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
let symbol_code = self.format_sym(&req.symbol);
let trd_side = Self::map_side(req.side);
let (order_type_val, price, aux_price, time_in_force_val) =
match &req.order_type {
OrderType::Market => (
FutuOrderType::Market.as_i32(),
0.0f64,
None::<f64>,
None::<i32>,
),
OrderType::Limit { price } => (
FutuOrderType::Normal.as_i32(),
*price,
None,
None,
),
OrderType::StopMarket { stop_price } => (
FutuOrderType::EnhancedLimit.as_i32(),
0.0,
Some(*stop_price),
None,
),
OrderType::StopLimit { stop_price, limit_price } => (
FutuOrderType::StopLimit.as_i32(),
*limit_price,
Some(*stop_price),
None,
),
OrderType::PostOnly { price } => (
FutuOrderType::SpecialLimit.as_i32(),
*price,
None,
None,
),
OrderType::Ioc { price } => (
FutuOrderType::Normal.as_i32(),
price.unwrap_or(0.0),
None,
Some(2i32), ),
OrderType::Fok { price } => (
FutuOrderType::Normal.as_i32(),
*price,
None,
Some(4i32), ),
OrderType::Oco { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support native OCO orders".to_string(),
));
}
OrderType::Bracket { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support native Bracket orders".to_string(),
));
}
OrderType::TrailingStop { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support trailing stop orders".to_string(),
));
}
OrderType::Iceberg { price, .. } => (
FutuOrderType::Normal.as_i32(),
*price,
None,
None,
),
OrderType::Twap { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support TWAP orders".to_string(),
));
}
OrderType::Gtd { price, .. } => (
FutuOrderType::Normal.as_i32(),
*price,
None,
None,
),
OrderType::ReduceOnly { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu stocks do not support ReduceOnly orders".to_string(),
));
}
OrderType::Oto { .. } | OrderType::ConditionalPlan { .. } | OrderType::DcaRecurring { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support this order type".to_string(),
));
}
};
let sec_market = infer_sec_market(&req.symbol);
let mut request_body = json!({
"header": self.trd_header(),
"trdSide": trd_side.as_i32(),
"orderType": order_type_val,
"code": symbol_code,
"qty": req.quantity,
"price": price,
"secMarket": sec_market.as_i32(),
});
if let Some(ap) = aux_price {
request_body["auxPrice"] = json!(ap);
}
if let Some(tif) = time_in_force_val {
request_body["timeInForce"] = json!(tif);
}
if let Some(ref cid) = req.client_order_id {
request_body["remark"] = json!(cid);
}
let response = self.proto_call(proto_id::TRD_PLACE_ORDER, request_body).await?;
let s2c = FutuParser::check_response(&response)?;
let order = FutuParser::parse_place_order(s2c, &symbol_code)?;
Ok(PlaceOrderResponse::Simple(order))
}
async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
let order_id = match &req.scope {
CancelScope::Single { order_id } => order_id.clone(),
CancelScope::Batch { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support native batch cancel. Use CancelAll trait or cancel individually.".to_string(),
));
}
CancelScope::All { .. } | CancelScope::BySymbol { .. } => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support native cancel-all. Cancel orders individually.".to_string(),
));
}
CancelScope::ByLabel(_) | CancelScope::ByCurrencyKind { .. } | CancelScope::ScheduledAt(_) => {
return Err(ExchangeError::UnsupportedOperation(
"Futu does not support this cancel scope".to_string(),
));
}
};
let order_id_u64: u64 = order_id.parse().map_err(|_| {
ExchangeError::InvalidRequest(format!("invalid order ID: {}", order_id))
})?;
let request = json!({
"header": self.trd_header(),
"modifyOrderOp": ModifyOrderOp::Cancel.as_i32(),
"orderID": order_id_u64,
"qty": 0,
"price": 0,
});
let response = self.proto_call(proto_id::TRD_MODIFY_ORDER, request).await?;
let s2c = FutuParser::check_response(&response)?;
if let Some(order_obj) = s2c.get("order") {
if order_obj.is_object() {
return FutuParser::parse_order(order_obj);
}
}
Ok(Order {
id: order_id,
client_order_id: None,
symbol: req.symbol
.map(|s| s.base)
.unwrap_or_default(),
side: OrderSide::Buy,
order_type: OrderType::Market,
status: OrderStatus::Canceled,
price: None,
stop_price: None,
quantity: 0.0,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: None,
time_in_force: TimeInForce::Gtc,
})
}
async fn get_order(
&self,
_symbol: &str,
order_id: &str,
_account_type: AccountType,
) -> ExchangeResult<Order> {
let order_id_u64: u64 = order_id.parse().map_err(|_| {
ExchangeError::InvalidRequest(format!("invalid order ID: {}", order_id))
})?;
let request = json!({
"header": self.trd_header(),
"filterConditions": {
"orderIDList": [order_id_u64],
}
});
let response = self.proto_call(proto_id::TRD_GET_ORDER_LIST, request).await?;
let s2c = FutuParser::check_response(&response)?;
let orders = FutuParser::parse_order_list(s2c)?;
orders.into_iter()
.find(|o| o.id == order_id)
.ok_or_else(|| ExchangeError::NotFound(format!("order {} not found", order_id)))
}
async fn get_open_orders(
&self,
symbol: Option<&str>,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let mut filter_conditions = json!({
"orderStatusFilterList": [6, 7], });
if let Some(sym) = symbol {
filter_conditions["codeList"] = json!([sym]);
}
let request = json!({
"header": self.trd_header(),
"filterConditions": filter_conditions,
});
let response = self.proto_call(proto_id::TRD_GET_ORDER_LIST, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_order_list(s2c)
}
async fn get_order_history(
&self,
filter: OrderHistoryFilter,
_account_type: AccountType,
) -> ExchangeResult<Vec<Order>> {
let mut filter_conditions = json!({});
if let Some(sym) = &filter.symbol {
filter_conditions["codeList"] = json!([self.format_sym(sym)]);
}
if let Some(start) = filter.start_time {
filter_conditions["beginTime"] = json!((start / 1000).to_string());
}
if let Some(end) = filter.end_time {
filter_conditions["endTime"] = json!((end / 1000).to_string());
}
let request = json!({
"header": self.trd_header(),
"filterConditions": filter_conditions,
});
let response = self.proto_call(proto_id::TRD_GET_HIST_ORDER_LIST, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_order_list(s2c)
}
}
#[async_trait]
impl Account for FutuConnector {
async fn get_balance(&self, query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
let request = json!({
"header": self.trd_header(),
"refreshBalance": true,
});
let response = self.proto_call(proto_id::TRD_GET_FUNDS, request).await?;
let s2c = FutuParser::check_response(&response)?;
let currency = match self.trd_market {
TrdMarket::Hk => "HKD",
TrdMarket::Us => "USD",
TrdMarket::CnSh | TrdMarket::CnSz => "CNY",
TrdMarket::Sg => "SGD",
};
let mut balances = FutuParser::parse_funds(s2c, currency)?;
if let Some(asset) = &query.asset {
balances.retain(|b| b.asset.eq_ignore_ascii_case(asset));
}
Ok(balances)
}
async fn get_account_info(&self, account_type: AccountType) -> ExchangeResult<AccountInfo> {
let request = json!({
"header": self.trd_header(),
"refreshBalance": true,
});
let response = self.proto_call(proto_id::TRD_GET_FUNDS, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_account_info(s2c, account_type)
}
async fn get_fees(&self, symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
let (maker, taker) = match self.trd_market {
TrdMarket::Hk => (0.0003, 0.0003), TrdMarket::Us => (0.0, 0.0), TrdMarket::CnSh | TrdMarket::CnSz => (0.0003, 0.0003),
TrdMarket::Sg => (0.0003, 0.0003),
};
Ok(FeeInfo {
maker_rate: maker,
taker_rate: taker,
symbol: symbol.map(|s| s.to_string()),
tier: Some("standard".to_string()),
})
}
}
#[async_trait]
impl Positions for FutuConnector {
async fn get_positions(&self, query: PositionQuery) -> ExchangeResult<Vec<Position>> {
let mut filter_conditions = json!({});
if let Some(sym) = &query.symbol {
filter_conditions["codeList"] = json!([self.format_sym(sym)]);
}
let request = json!({
"header": self.trd_header(),
"filterConditions": filter_conditions,
});
let response = self.proto_call(proto_id::TRD_GET_POSITION_LIST, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_position_list(s2c)
}
async fn get_funding_rate(
&self,
_symbol: &str,
_account_type: AccountType,
) -> ExchangeResult<FundingRate> {
Err(ExchangeError::UnsupportedOperation(
"Futu is a stock/ETF broker — funding rates are only applicable to \
perpetual futures exchanges."
.to_string(),
))
}
async fn modify_position(&self, req: PositionModification) -> ExchangeResult<()> {
match req {
PositionModification::ClosePosition { symbol, account_type } => {
let query = PositionQuery {
symbol: Some(symbol.clone()),
account_type,
};
let positions = self.get_positions(query).await?;
let position = positions.into_iter()
.find(|p| p.symbol == format_symbol(&symbol, infer_sec_market(&symbol)))
.ok_or_else(|| ExchangeError::NotFound(
format!("no open position for {}", symbol.base)
))?;
let close_side = match position.side {
PositionSide::Long | PositionSide::Both => OrderSide::Sell,
PositionSide::Short => OrderSide::Buy,
};
let close_req = OrderRequest {
symbol,
side: close_side,
order_type: OrderType::Market,
quantity: position.quantity.abs(),
time_in_force: TimeInForce::Gtc,
account_type,
client_order_id: None,
reduce_only: false,
};
self.place_order(close_req).await?;
Ok(())
}
PositionModification::SetLeverage { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Futu stock accounts do not support leverage adjustment via API.".to_string(),
))
}
PositionModification::SetMarginMode { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Futu stock accounts do not support margin mode switching via API.".to_string(),
))
}
PositionModification::AddMargin { .. } | PositionModification::RemoveMargin { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Futu stock accounts do not support manual margin adjustment via API.".to_string(),
))
}
PositionModification::SetTpSl { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Futu does not support setting TP/SL on positions directly. \
Place separate conditional orders instead."
.to_string(),
))
}
PositionModification::SwitchPositionMode { .. } | PositionModification::MovePositions { .. } => {
Err(ExchangeError::UnsupportedOperation(
"Futu does not support this position modification".to_string(),
))
}
}
}
}
#[async_trait]
impl AmendOrder for FutuConnector {
async fn amend_order(&self, req: AmendRequest) -> ExchangeResult<Order> {
let order_id_u64: u64 = req.order_id.parse().map_err(|_| {
ExchangeError::InvalidRequest(format!("invalid order ID: {}", req.order_id))
})?;
if req.fields.price.is_none() && req.fields.quantity.is_none() {
return Err(ExchangeError::InvalidRequest(
"amend_order requires at least one of: price, quantity".to_string(),
));
}
let qty = req.fields.quantity.unwrap_or(0.0);
let price = req.fields.price.unwrap_or(0.0);
let request = json!({
"header": self.trd_header(),
"modifyOrderOp": ModifyOrderOp::Normal.as_i32(),
"orderID": order_id_u64,
"qty": qty,
"price": price,
});
let response = self.proto_call(proto_id::TRD_MODIFY_ORDER, request).await?;
let s2c = FutuParser::check_response(&response)?;
if let Some(order_obj) = s2c.get("order") {
if order_obj.is_object() {
return FutuParser::parse_order(order_obj);
}
}
Ok(Order {
id: req.order_id,
client_order_id: None,
symbol: req.symbol.base,
side: OrderSide::Buy, order_type: if price > 0.0 {
OrderType::Limit { price }
} else {
OrderType::Market
},
status: OrderStatus::Open,
price: req.fields.price,
stop_price: req.fields.trigger_price,
quantity: qty,
filled_quantity: 0.0,
average_price: None,
commission: None,
commission_asset: None,
created_at: 0,
updated_at: None,
time_in_force: TimeInForce::Gtc,
})
}
}
impl FutuConnector {
pub async fn get_account_list(&self) -> ExchangeResult<Vec<(u64, i32)>> {
let request = json!({ "trdCategory": 1 }); let response = self.proto_call(proto_id::TRD_GET_ACC_LIST, request).await?;
let s2c = FutuParser::check_response(&response)?;
FutuParser::parse_acc_list(s2c)
}
pub async fn unlock_trade(&self) -> ExchangeResult<()> {
let password = self.auth.trade_password.as_deref().unwrap_or("");
let request = json!({
"isPwdMd5": false,
"pwd": password,
"securityFirm": 1, });
let response = self.proto_call(proto_id::TRD_UNLOCK_TRADE, request).await?;
FutuParser::check_response(&response)?;
Ok(())
}
pub async fn get_broker_queue(&self, symbol: Symbol) -> ExchangeResult<BrokerQueue> {
let code = self.format_sym(&symbol);
let request = json!({
"security": {"market": SecMarket::Hk.as_i32(), "code": code},
});
let _response = self.proto_call(proto_id::QOT_GET_STATIC_INFO, request).await?;
Err(ExchangeError::UnsupportedOperation(
"BrokerQueue requires LV2 subscription and full protobuf transport".to_string(),
))
}
pub async fn get_fills(&self, symbol: Option<&str>) -> ExchangeResult<Vec<UserTrade>> {
let mut filter_conditions = json!({});
if let Some(sym) = symbol {
filter_conditions["codeList"] = json!([sym]);
}
let request = json!({
"header": self.trd_header(),
"filterConditions": filter_conditions,
});
let _response = self.proto_call(proto_id::TRD_GET_ORDER_FILL_LIST, request).await?;
Ok(vec![])
}
}
#[derive(Debug, Clone)]
pub struct BrokerQueue {
pub symbol: String,
pub bid_brokers: Vec<BrokerInfo>,
pub ask_brokers: Vec<BrokerInfo>,
}
#[derive(Debug, Clone)]
pub struct BrokerInfo {
pub broker_id: u32,
pub broker_name: String,
pub position: u32,
}