1use async_trait::async_trait;
22use reqwest::Client;
23use serde_json::{json, Value};
24
25use crate::core::types::*;
26use crate::core::traits::*;
27
28use super::endpoints::{
29 proto_id, FutuEndpoints, FutuOrderType, FutuTrdSide, ModifyOrderOp,
30 TrdEnv, TrdMarket, SecMarket, format_symbol, infer_sec_market,
31};
32use super::auth::FutuAuth;
33use super::parser::FutuParser;
34
35pub struct FutuConnector {
51 _client: Client,
52 auth: FutuAuth,
53 endpoints: FutuEndpoints,
54 trd_env: TrdEnv,
56 acc_id: u64,
58 trd_market: TrdMarket,
60}
61
62impl FutuConnector {
63 pub fn new(auth: FutuAuth) -> Self {
65 Self {
66 _client: Client::new(),
67 endpoints: FutuEndpoints::default(),
68 trd_env: TrdEnv::Real,
69 acc_id: 0,
70 trd_market: TrdMarket::Us,
71 auth,
72 }
73 }
74
75 pub fn from_env() -> Self {
77 Self::new(FutuAuth::from_env())
78 }
79
80 pub fn with_env(mut self, env: TrdEnv) -> Self {
82 self.trd_env = env;
83 self
84 }
85
86 pub fn with_market(mut self, market: TrdMarket) -> Self {
88 self.trd_market = market;
89 self
90 }
91
92 pub fn with_acc_id(mut self, acc_id: u64) -> Self {
94 self.acc_id = acc_id;
95 self
96 }
97
98 async fn proto_call(
119 &self,
120 proto_id: u32,
121 request: Value,
122 ) -> ExchangeResult<Value> {
123 Err(ExchangeError::UnsupportedOperation(format!(
124 "Futu OpenD TCP+Protobuf transport not connected. \
125 OpenD address: {}. Proto ID: {}, request: {}",
126 self.endpoints.address(),
127 proto_id,
128 request
129 )))
130 }
131
132 fn trd_header(&self) -> Value {
138 json!({
139 "trdEnv": self.trd_env.as_i32(),
140 "accID": self.acc_id,
141 "trdMarket": self.trd_market.as_i32(),
142 })
143 }
144
145 fn format_sym(&self, symbol: &Symbol) -> String {
150 let sec_market = infer_sec_market(symbol);
151 format_symbol(symbol, sec_market)
152 }
153
154 fn map_side(side: OrderSide) -> FutuTrdSide {
159 match side {
160 OrderSide::Buy => FutuTrdSide::Buy,
161 OrderSide::Sell => FutuTrdSide::Sell,
162 }
163 }
164}
165
166impl ExchangeIdentity for FutuConnector {
171 fn exchange_name(&self) -> &'static str {
172 "futu"
173 }
174
175 fn exchange_id(&self) -> ExchangeId {
176 ExchangeId::Futu
177 }
178
179 fn is_testnet(&self) -> bool {
180 matches!(self.trd_env, TrdEnv::Simulate)
181 }
182
183 fn supported_account_types(&self) -> Vec<AccountType> {
184 vec![AccountType::Spot] }
186}
187
188#[async_trait]
193impl MarketData for FutuConnector {
194 async fn get_price(
195 &self,
196 symbol: SymbolInput<'_>,
197 _account_type: AccountType,
198 ) -> ExchangeResult<Price> {
199 let code: String = match symbol { SymbolInput::Raw(s) => s.to_string(), SymbolInput::Canonical(c) => c.to_concat() };
200 let request = json!({
201 "securityList": [{"market": 1, "code": code}]
202 });
203 let response = self.proto_call(proto_id::QOT_GET_SECURITY_SNAPSHOT, request).await?;
204 let s2c = FutuParser::check_response(&response)?;
205 FutuParser::parse_price(s2c)
206 }
207
208 async fn get_ticker(
209 &self,
210 symbol: SymbolInput<'_>,
211 _account_type: AccountType,
212 ) -> ExchangeResult<Ticker> {
213 let code: String = match symbol { SymbolInput::Raw(s) => s.to_string(), SymbolInput::Canonical(c) => c.to_concat() };
214 let request = json!({
215 "securityList": [{"market": 1, "code": code.clone()}]
216 });
217 let response = self.proto_call(proto_id::QOT_GET_SECURITY_SNAPSHOT, request).await?;
218 let s2c = FutuParser::check_response(&response)?;
219 FutuParser::parse_ticker(s2c, &code)
220 }
221
222 async fn get_orderbook(
223 &self,
224 symbol: SymbolInput<'_>,
225 depth: Option<u16>,
226 _account_type: AccountType,
227 ) -> ExchangeResult<OrderBook> {
228 let code: String = match symbol { SymbolInput::Raw(s) => s.to_string(), SymbolInput::Canonical(c) => c.to_concat() };
229 let request = json!({
230 "security": {"market": 1, "code": code},
231 "num": depth.unwrap_or(10),
232 });
233 let response = self.proto_call(proto_id::QOT_GET_ORDER_BOOK, request).await?;
234 let s2c = FutuParser::check_response(&response)?;
235 FutuParser::parse_orderbook(s2c)
236 }
237
238 async fn get_klines(
239 &self,
240 symbol: SymbolInput<'_>,
241 interval: &str,
242 limit: Option<u16>,
243 _account_type: AccountType,
244 _end_time: Option<i64>,
245 ) -> ExchangeResult<Vec<Kline>> {
246 let code: String = match symbol { SymbolInput::Raw(s) => s.to_string(), SymbolInput::Canonical(c) => c.to_concat() };
247 let kl_type = match interval {
250 "1m" | "1min" => 1,
251 "5m" | "5min" => 2,
252 "15m" | "15min" => 3,
253 "30m" | "30min" => 4,
254 "1h" | "60m" | "60min" => 5,
255 "1d" | "1day" | "D" => 6,
256 "1w" | "1week" | "W" => 7,
257 "1M" | "1mon" => 8,
258 _ => 6, };
260 let max_count = limit.unwrap_or(200) as i32;
261 let request = json!({
262 "security": {"market": 1, "code": code},
263 "klType": kl_type,
264 "reqNum": max_count,
265 });
266 let response = self.proto_call(proto_id::QOT_REQUEST_HISTORY_KL, request).await?;
267 let s2c = FutuParser::check_response(&response)?;
268 FutuParser::parse_klines(s2c)
269 }
270
271 async fn ping(&self) -> ExchangeResult<()> {
272 let request = json!({ "time": 0u64 });
273 self.proto_call(proto_id::KEEP_ALIVE, request).await?;
274 Ok(())
275 }
276}
277
278#[async_trait]
283impl Trading for FutuConnector {
284 async fn place_order(&self, req: OrderRequest) -> ExchangeResult<PlaceOrderResponse> {
299 let symbol_code = self.format_sym(&req.symbol);
300 let trd_side = Self::map_side(req.side);
301
302 let (order_type_val, price, aux_price, time_in_force_val) =
303 match &req.order_type {
304 OrderType::Market => (
305 FutuOrderType::Market.as_i32(),
306 0.0f64,
307 None::<f64>,
308 None::<i32>,
309 ),
310 OrderType::Limit { price } => (
311 FutuOrderType::Normal.as_i32(),
312 *price,
313 None,
314 None,
315 ),
316 OrderType::StopMarket { stop_price } => (
317 FutuOrderType::EnhancedLimit.as_i32(),
318 0.0,
319 Some(*stop_price),
320 None,
321 ),
322 OrderType::StopLimit { stop_price, limit_price } => (
323 FutuOrderType::StopLimit.as_i32(),
324 *limit_price,
325 Some(*stop_price),
326 None,
327 ),
328 OrderType::PostOnly { price } => (
329 FutuOrderType::SpecialLimit.as_i32(),
330 *price,
331 None,
332 None,
333 ),
334 OrderType::Ioc { price } => (
335 FutuOrderType::Normal.as_i32(),
336 price.unwrap_or(0.0),
337 None,
338 Some(2i32), ),
340 OrderType::Fok { price } => (
341 FutuOrderType::Normal.as_i32(),
342 *price,
343 None,
344 Some(4i32), ),
346 OrderType::Oco { .. } => {
347 return Err(ExchangeError::UnsupportedOperation(
348 "Futu does not support native OCO orders".to_string(),
349 ));
350 }
351 OrderType::Bracket { .. } => {
352 return Err(ExchangeError::UnsupportedOperation(
353 "Futu does not support native Bracket orders".to_string(),
354 ));
355 }
356 OrderType::TrailingStop { .. } => {
357 return Err(ExchangeError::UnsupportedOperation(
358 "Futu does not support trailing stop orders".to_string(),
359 ));
360 }
361 OrderType::Iceberg { price, .. } => (
362 FutuOrderType::Normal.as_i32(),
364 *price,
365 None,
366 None,
367 ),
368 OrderType::Twap { .. } => {
369 return Err(ExchangeError::UnsupportedOperation(
370 "Futu does not support TWAP orders".to_string(),
371 ));
372 }
373 OrderType::Gtd { price, .. } => (
374 FutuOrderType::Normal.as_i32(),
375 *price,
376 None,
377 None,
378 ),
379 OrderType::ReduceOnly { .. } => {
380 return Err(ExchangeError::UnsupportedOperation(
381 "Futu stocks do not support ReduceOnly orders".to_string(),
382 ));
383 }
384 OrderType::Oto { .. } | OrderType::ConditionalPlan { .. } | OrderType::DcaRecurring { .. } => {
385 return Err(ExchangeError::UnsupportedOperation(
386 "Futu does not support this order type".to_string(),
387 ));
388 }
389 };
390
391 let sec_market = infer_sec_market(&req.symbol);
393
394 let mut request_body = json!({
395 "header": self.trd_header(),
396 "trdSide": trd_side.as_i32(),
397 "orderType": order_type_val,
398 "code": symbol_code,
399 "qty": req.quantity,
400 "price": price,
401 "secMarket": sec_market.as_i32(),
402 });
403
404 if let Some(ap) = aux_price {
406 request_body["auxPrice"] = json!(ap);
407 }
408 if let Some(tif) = time_in_force_val {
409 request_body["timeInForce"] = json!(tif);
410 }
411 if let Some(ref cid) = req.client_order_id {
412 request_body["remark"] = json!(cid);
413 }
414
415 let response = self.proto_call(proto_id::TRD_PLACE_ORDER, request_body).await?;
416 let s2c = FutuParser::check_response(&response)?;
417 let order = FutuParser::parse_place_order(s2c, &symbol_code)?;
418 Ok(PlaceOrderResponse::Simple(order))
419 }
420
421 async fn cancel_order(&self, req: CancelRequest) -> ExchangeResult<Order> {
426 let order_id = match &req.scope {
427 CancelScope::Single { order_id } => order_id.clone(),
428 CancelScope::Batch { .. } => {
429 return Err(ExchangeError::UnsupportedOperation(
430 "Futu does not support native batch cancel. Use CancelAll trait or cancel individually.".to_string(),
431 ));
432 }
433 CancelScope::All { .. } | CancelScope::BySymbol { .. } => {
434 return Err(ExchangeError::UnsupportedOperation(
435 "Futu does not support native cancel-all. Cancel orders individually.".to_string(),
436 ));
437 }
438 CancelScope::ByLabel(_) | CancelScope::ByCurrencyKind { .. } | CancelScope::ScheduledAt(_) => {
439 return Err(ExchangeError::UnsupportedOperation(
440 "Futu does not support this cancel scope".to_string(),
441 ));
442 }
443 };
444
445 let order_id_u64: u64 = order_id.parse().map_err(|_| {
446 ExchangeError::InvalidRequest(format!("invalid order ID: {}", order_id))
447 })?;
448
449 let request = json!({
450 "header": self.trd_header(),
451 "modifyOrderOp": ModifyOrderOp::Cancel.as_i32(),
452 "orderID": order_id_u64,
453 "qty": 0,
454 "price": 0,
455 });
456
457 let response = self.proto_call(proto_id::TRD_MODIFY_ORDER, request).await?;
458 let s2c = FutuParser::check_response(&response)?;
459
460 if let Some(order_obj) = s2c.get("order") {
463 if order_obj.is_object() {
464 return FutuParser::parse_order(order_obj);
465 }
466 }
467
468 Ok(Order {
470 id: order_id,
471 client_order_id: None,
472 symbol: req.symbol
473 .map(|s| s.base)
474 .unwrap_or_default(),
475 side: OrderSide::Buy,
476 order_type: OrderType::Market,
477 status: OrderStatus::Canceled,
478 price: None,
479 stop_price: None,
480 quantity: 0.0,
481 filled_quantity: 0.0,
482 average_price: None,
483 commission: None,
484 commission_asset: None,
485 created_at: 0,
486 updated_at: None,
487 time_in_force: TimeInForce::Gtc,
488 })
489 }
490
491 async fn get_order(
496 &self,
497 _symbol: &str,
498 order_id: &str,
499 _account_type: AccountType,
500 ) -> ExchangeResult<Order> {
501 let order_id_u64: u64 = order_id.parse().map_err(|_| {
502 ExchangeError::InvalidRequest(format!("invalid order ID: {}", order_id))
503 })?;
504
505 let request = json!({
506 "header": self.trd_header(),
507 "filterConditions": {
508 "orderIDList": [order_id_u64],
509 }
510 });
511
512 let response = self.proto_call(proto_id::TRD_GET_ORDER_LIST, request).await?;
513 let s2c = FutuParser::check_response(&response)?;
514 let orders = FutuParser::parse_order_list(s2c)?;
515
516 orders.into_iter()
517 .find(|o| o.id == order_id)
518 .ok_or_else(|| ExchangeError::NotFound(format!("order {} not found", order_id)))
519 }
520
521 async fn get_open_orders(
525 &self,
526 symbol: Option<&str>,
527 _account_type: AccountType,
528 ) -> ExchangeResult<Vec<Order>> {
529 let mut filter_conditions = json!({
530 "orderStatusFilterList": [6, 7], });
533
534 if let Some(sym) = symbol {
535 filter_conditions["codeList"] = json!([sym]);
536 }
537
538 let request = json!({
539 "header": self.trd_header(),
540 "filterConditions": filter_conditions,
541 });
542
543 let response = self.proto_call(proto_id::TRD_GET_ORDER_LIST, request).await?;
544 let s2c = FutuParser::check_response(&response)?;
545 FutuParser::parse_order_list(s2c)
546 }
547
548 async fn get_order_history(
550 &self,
551 filter: OrderHistoryFilter,
552 _account_type: AccountType,
553 ) -> ExchangeResult<Vec<Order>> {
554 let mut filter_conditions = json!({});
555
556 if let Some(sym) = &filter.symbol {
557 filter_conditions["codeList"] = json!([self.format_sym(sym)]);
558 }
559 if let Some(start) = filter.start_time {
560 filter_conditions["beginTime"] = json!((start / 1000).to_string());
562 }
563 if let Some(end) = filter.end_time {
564 filter_conditions["endTime"] = json!((end / 1000).to_string());
565 }
566
567 let request = json!({
568 "header": self.trd_header(),
569 "filterConditions": filter_conditions,
570 });
571
572 let response = self.proto_call(proto_id::TRD_GET_HIST_ORDER_LIST, request).await?;
573 let s2c = FutuParser::check_response(&response)?;
574 FutuParser::parse_order_list(s2c)
575 }
576}
577
578#[async_trait]
583impl Account for FutuConnector {
584 async fn get_balance(&self, query: BalanceQuery) -> ExchangeResult<Vec<Balance>> {
589 let request = json!({
590 "header": self.trd_header(),
591 "refreshBalance": true,
592 });
593
594 let response = self.proto_call(proto_id::TRD_GET_FUNDS, request).await?;
595 let s2c = FutuParser::check_response(&response)?;
596
597 let currency = match self.trd_market {
599 TrdMarket::Hk => "HKD",
600 TrdMarket::Us => "USD",
601 TrdMarket::CnSh | TrdMarket::CnSz => "CNY",
602 TrdMarket::Sg => "SGD",
603 };
604
605 let mut balances = FutuParser::parse_funds(s2c, currency)?;
606
607 if let Some(asset) = &query.asset {
609 balances.retain(|b| b.asset.eq_ignore_ascii_case(asset));
610 }
611
612 Ok(balances)
613 }
614
615 async fn get_account_info(&self, account_type: AccountType) -> ExchangeResult<AccountInfo> {
617 let request = json!({
618 "header": self.trd_header(),
619 "refreshBalance": true,
620 });
621
622 let response = self.proto_call(proto_id::TRD_GET_FUNDS, request).await?;
623 let s2c = FutuParser::check_response(&response)?;
624 FutuParser::parse_account_info(s2c, account_type)
625 }
626
627 async fn get_fees(&self, symbol: Option<&str>) -> ExchangeResult<FeeInfo> {
633 let (maker, taker) = match self.trd_market {
635 TrdMarket::Hk => (0.0003, 0.0003), TrdMarket::Us => (0.0, 0.0), TrdMarket::CnSh | TrdMarket::CnSz => (0.0003, 0.0003),
638 TrdMarket::Sg => (0.0003, 0.0003),
639 };
640
641 Ok(FeeInfo {
642 maker_rate: maker,
643 taker_rate: taker,
644 symbol: symbol.map(|s| s.to_string()),
645 tier: Some("standard".to_string()),
646 })
647 }
648}
649
650#[async_trait]
655impl Positions for FutuConnector {
656 async fn get_positions(&self, query: PositionQuery) -> ExchangeResult<Vec<Position>> {
661 let mut filter_conditions = json!({});
662
663 if let Some(sym) = &query.symbol {
664 filter_conditions["codeList"] = json!([self.format_sym(sym)]);
665 }
666
667 let request = json!({
668 "header": self.trd_header(),
669 "filterConditions": filter_conditions,
670 });
671
672 let response = self.proto_call(proto_id::TRD_GET_POSITION_LIST, request).await?;
673 let s2c = FutuParser::check_response(&response)?;
674 FutuParser::parse_position_list(s2c)
675 }
676
677 async fn get_funding_rate(
679 &self,
680 _symbol: &str,
681 _account_type: AccountType,
682 ) -> ExchangeResult<FundingRate> {
683 Err(ExchangeError::UnsupportedOperation(
684 "Futu is a stock/ETF broker — funding rates are only applicable to \
685 perpetual futures exchanges."
686 .to_string(),
687 ))
688 }
689
690 async fn modify_position(&self, req: PositionModification) -> ExchangeResult<()> {
696 match req {
697 PositionModification::ClosePosition { symbol, account_type } => {
698 let query = PositionQuery {
700 symbol: Some(symbol.clone()),
701 account_type,
702 };
703 let positions = self.get_positions(query).await?;
704 let position = positions.into_iter()
705 .find(|p| p.symbol == format_symbol(&symbol, infer_sec_market(&symbol)))
706 .ok_or_else(|| ExchangeError::NotFound(
707 format!("no open position for {}", symbol.base)
708 ))?;
709
710 let close_side = match position.side {
711 PositionSide::Long | PositionSide::Both => OrderSide::Sell,
712 PositionSide::Short => OrderSide::Buy,
713 };
714
715 let close_req = OrderRequest {
716 symbol,
717 side: close_side,
718 order_type: OrderType::Market,
719 quantity: position.quantity.abs(),
720 time_in_force: TimeInForce::Gtc,
721 account_type,
722 client_order_id: None,
723 reduce_only: false,
724 };
725
726 self.place_order(close_req).await?;
727 Ok(())
728 }
729
730 PositionModification::SetLeverage { .. } => {
731 Err(ExchangeError::UnsupportedOperation(
732 "Futu stock accounts do not support leverage adjustment via API.".to_string(),
733 ))
734 }
735
736 PositionModification::SetMarginMode { .. } => {
737 Err(ExchangeError::UnsupportedOperation(
738 "Futu stock accounts do not support margin mode switching via API.".to_string(),
739 ))
740 }
741
742 PositionModification::AddMargin { .. } | PositionModification::RemoveMargin { .. } => {
743 Err(ExchangeError::UnsupportedOperation(
744 "Futu stock accounts do not support manual margin adjustment via API.".to_string(),
745 ))
746 }
747
748 PositionModification::SetTpSl { .. } => {
749 Err(ExchangeError::UnsupportedOperation(
750 "Futu does not support setting TP/SL on positions directly. \
751 Place separate conditional orders instead."
752 .to_string(),
753 ))
754 }
755
756 PositionModification::SwitchPositionMode { .. } | PositionModification::MovePositions { .. } => {
757 Err(ExchangeError::UnsupportedOperation(
758 "Futu does not support this position modification".to_string(),
759 ))
760 }
761 }
762 }
763}
764
765#[async_trait]
770impl AmendOrder for FutuConnector {
771 async fn amend_order(&self, req: AmendRequest) -> ExchangeResult<Order> {
776 let order_id_u64: u64 = req.order_id.parse().map_err(|_| {
777 ExchangeError::InvalidRequest(format!("invalid order ID: {}", req.order_id))
778 })?;
779
780 if req.fields.price.is_none() && req.fields.quantity.is_none() {
782 return Err(ExchangeError::InvalidRequest(
783 "amend_order requires at least one of: price, quantity".to_string(),
784 ));
785 }
786
787 let qty = req.fields.quantity.unwrap_or(0.0);
792 let price = req.fields.price.unwrap_or(0.0);
793
794 let request = json!({
795 "header": self.trd_header(),
796 "modifyOrderOp": ModifyOrderOp::Normal.as_i32(),
797 "orderID": order_id_u64,
798 "qty": qty,
799 "price": price,
800 });
801
802 let response = self.proto_call(proto_id::TRD_MODIFY_ORDER, request).await?;
803 let s2c = FutuParser::check_response(&response)?;
804
805 if let Some(order_obj) = s2c.get("order") {
807 if order_obj.is_object() {
808 return FutuParser::parse_order(order_obj);
809 }
810 }
811
812 Ok(Order {
814 id: req.order_id,
815 client_order_id: None,
816 symbol: req.symbol.base,
817 side: OrderSide::Buy, order_type: if price > 0.0 {
819 OrderType::Limit { price }
820 } else {
821 OrderType::Market
822 },
823 status: OrderStatus::Open,
824 price: req.fields.price,
825 stop_price: req.fields.trigger_price,
826 quantity: qty,
827 filled_quantity: 0.0,
828 average_price: None,
829 commission: None,
830 commission_asset: None,
831 created_at: 0,
832 updated_at: None,
833 time_in_force: TimeInForce::Gtc,
834 })
835 }
836}
837
838impl FutuConnector {
843 pub async fn get_account_list(&self) -> ExchangeResult<Vec<(u64, i32)>> {
848 let request = json!({ "trdCategory": 1 }); let response = self.proto_call(proto_id::TRD_GET_ACC_LIST, request).await?;
850 let s2c = FutuParser::check_response(&response)?;
851 FutuParser::parse_acc_list(s2c)
852 }
853
854 pub async fn unlock_trade(&self) -> ExchangeResult<()> {
858 let password = self.auth.trade_password.as_deref().unwrap_or("");
859 let request = json!({
860 "isPwdMd5": false,
861 "pwd": password,
862 "securityFirm": 1, });
864 let response = self.proto_call(proto_id::TRD_UNLOCK_TRADE, request).await?;
865 FutuParser::check_response(&response)?;
866 Ok(())
867 }
868
869 pub async fn get_broker_queue(&self, symbol: Symbol) -> ExchangeResult<BrokerQueue> {
871 let code = self.format_sym(&symbol);
872 let request = json!({
873 "security": {"market": SecMarket::Hk.as_i32(), "code": code},
874 });
875 let _response = self.proto_call(proto_id::QOT_GET_STATIC_INFO, request).await?;
876 Err(ExchangeError::UnsupportedOperation(
877 "BrokerQueue requires LV2 subscription and full protobuf transport".to_string(),
878 ))
879 }
880
881 pub async fn get_fills(&self, symbol: Option<&str>) -> ExchangeResult<Vec<UserTrade>> {
883 let mut filter_conditions = json!({});
884 if let Some(sym) = symbol {
885 filter_conditions["codeList"] = json!([sym]);
886 }
887 let request = json!({
888 "header": self.trd_header(),
889 "filterConditions": filter_conditions,
890 });
891 let _response = self.proto_call(proto_id::TRD_GET_ORDER_FILL_LIST, request).await?;
892 Ok(vec![])
894 }
895}
896
897#[derive(Debug, Clone)]
903pub struct BrokerQueue {
904 pub symbol: String,
905 pub bid_brokers: Vec<BrokerInfo>,
906 pub ask_brokers: Vec<BrokerInfo>,
907}
908
909#[derive(Debug, Clone)]
911pub struct BrokerInfo {
912 pub broker_id: u32,
913 pub broker_name: String,
914 pub position: u32,
915}