ctp_rust/api/
async_trader_api.rs

1//! 异步交易API模块
2//!
3//! 基于同步TraderApi提供异步封装,使用tokio实现
4
5use crate::api::trader_api::{
6    InputOrderField, InvestorPositionField, OrderField, ReqAuthenticateField, RspAuthenticateField,
7    TradeField, TraderApi, TraderSpiHandler, TradingAccountField,
8};
9use crate::api::CtpApi;
10use crate::error::{CtpError, CtpResult};
11use crate::types::{
12    InputOrderActionField, QryInvestorPositionField, QryTradingAccountField, ReqUserLoginField,
13    RspInfoField, RspUserLoginField,
14};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, Mutex, Notify};
18use tokio::time::{timeout, Duration};
19use tracing::{debug, error, warn};
20
21/// 异步交易事件类型
22#[derive(Debug, Clone)]
23pub enum AsyncTraderEvent {
24    /// 连接成功
25    Connected,
26    /// 连接断开
27    Disconnected(i32),
28    /// 心跳超时警告
29    HeartBeatWarning(i32),
30    /// 认证响应
31    AuthenticateResponse {
32        rsp_authenticate: Option<RspAuthenticateField>,
33        rsp_info: Option<RspInfoField>,
34        request_id: i32,
35        is_last: bool,
36    },
37    /// 登录响应
38    LoginResponse {
39        user_login: Option<RspUserLoginField>,
40        rsp_info: Option<RspInfoField>,
41        request_id: i32,
42        is_last: bool,
43    },
44    /// 登出响应
45    LogoutResponse {
46        rsp_info: Option<RspInfoField>,
47        request_id: i32,
48        is_last: bool,
49    },
50    /// 报单录入响应
51    OrderInsertResponse {
52        input_order: Option<InputOrderField>,
53        rsp_info: Option<RspInfoField>,
54        request_id: i32,
55        is_last: bool,
56    },
57    /// 报单操作响应
58    OrderActionResponse {
59        input_order_action: Option<InputOrderActionField>,
60        rsp_info: Option<RspInfoField>,
61        request_id: i32,
62        is_last: bool,
63    },
64    /// 查询交易账户响应
65    QryTradingAccountResponse {
66        trading_account: Option<TradingAccountField>,
67        rsp_info: Option<RspInfoField>,
68        request_id: i32,
69        is_last: bool,
70    },
71    /// 查询投资者持仓响应
72    QryInvestorPositionResponse {
73        investor_position: Option<InvestorPositionField>,
74        rsp_info: Option<RspInfoField>,
75        request_id: i32,
76        is_last: bool,
77    },
78    /// 查询报单响应
79    QryOrderResponse {
80        order: Option<OrderField>,
81        rsp_info: Option<RspInfoField>,
82        request_id: i32,
83        is_last: bool,
84    },
85    /// 查询成交响应
86    QryTradeResponse {
87        trade: Option<TradeField>,
88        rsp_info: Option<RspInfoField>,
89        request_id: i32,
90        is_last: bool,
91    },
92    /// 报单回报
93    OrderReturn(OrderField),
94    /// 成交回报
95    TradeReturn(TradeField),
96    /// 错误响应
97    ErrorResponse {
98        rsp_info: Option<RspInfoField>,
99        request_id: i32,
100        is_last: bool,
101    },
102}
103
104/// 异步交易API状态
105#[derive(Debug, Clone, Default)]
106pub struct AsyncTraderState {
107    pub connected: bool,
108    pub authenticated: bool,
109    pub logged_in: bool,
110    pub auth_info: Option<RspAuthenticateField>,
111    pub login_info: Option<RspUserLoginField>,
112}
113
114/// 待处理的异步请求
115#[derive(Debug, Clone)]
116struct PendingRequest {
117    notify: Arc<Notify>,
118    response_data: Arc<Mutex<Option<AsyncTraderEvent>>>,
119}
120
121/// 异步交易API适配器
122pub struct AsyncTraderApi {
123    /// 内部同步API
124    inner: Arc<Mutex<TraderApi>>,
125    /// 事件发送器
126    event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
127    /// 事件接收器
128    event_receiver: Arc<Mutex<mpsc::UnboundedReceiver<AsyncTraderEvent>>>,
129    /// 当前状态
130    state: Arc<Mutex<AsyncTraderState>>,
131    /// 连接通知
132    connected_notify: Arc<Notify>,
133    /// 认证通知
134    auth_notify: Arc<Notify>,
135    /// 登录通知
136    login_notify: Arc<Notify>,
137    /// 待处理的请求映射 (request_id -> PendingRequest)
138    pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
139}
140
141impl AsyncTraderApi {
142    /// 创建异步交易API实例
143    pub async fn new(flow_path: Option<&str>, is_production_mode: Option<bool>) -> CtpResult<Self> {
144        let trader_api = TraderApi::new(flow_path, is_production_mode)?;
145        let (event_sender, event_receiver) = mpsc::unbounded_channel();
146
147        Ok(Self {
148            inner: Arc::new(Mutex::new(trader_api)),
149            event_sender,
150            event_receiver: Arc::new(Mutex::new(event_receiver)),
151            state: Arc::new(Mutex::new(AsyncTraderState::default())),
152            connected_notify: Arc::new(Notify::new()),
153            auth_notify: Arc::new(Notify::new()),
154            login_notify: Arc::new(Notify::new()),
155            pending_requests: Arc::new(Mutex::new(HashMap::new())),
156        })
157    }
158
159    /// 注册前置机地址
160    pub async fn register_front(&self, front_address: &str) -> CtpResult<()> {
161        let mut api = self.inner.lock().await;
162        api.register_front(front_address)
163    }
164
165    /// 初始化API
166    pub async fn init(&self) -> CtpResult<()> {
167        let mut api = self.inner.lock().await;
168
169        // 创建异步事件处理器
170        let handler = AsyncTraderHandler::new(
171            self.event_sender.clone(),
172            self.state.clone(),
173            self.connected_notify.clone(),
174            self.auth_notify.clone(),
175            self.login_notify.clone(),
176            self.pending_requests.clone(),
177        );
178
179        // 注册处理器
180        api.register_spi(handler)?;
181
182        // 初始化
183        api.init()
184    }
185
186    /// 等待连接建立(带超时)
187    pub async fn wait_connected(&self, timeout_secs: u64) -> CtpResult<()> {
188        let state = self.state.lock().await;
189        if state.connected {
190            return Ok(());
191        }
192        drop(state);
193
194        match timeout(
195            Duration::from_secs(timeout_secs),
196            self.connected_notify.notified(),
197        )
198        .await
199        {
200            Ok(_) => {
201                let state = self.state.lock().await;
202                if state.connected {
203                    Ok(())
204                } else {
205                    Err(CtpError::InitializationError("连接失败".to_string()))
206                }
207            }
208            Err(_) => Err(CtpError::InitializationError("连接超时".to_string())),
209        }
210    }
211
212    /// 异步认证
213    pub async fn authenticate(
214        &self,
215        req: &ReqAuthenticateField,
216        timeout_secs: u64,
217    ) -> CtpResult<RspAuthenticateField> {
218        let mut api = self.inner.lock().await;
219        let request_id = api.req_authenticate(req)?;
220        drop(api);
221
222        let pending_request = PendingRequest {
223            notify: Arc::new(Notify::new()),
224            response_data: Arc::new(Mutex::new(None)),
225        };
226
227        // 注册待处理请求
228        {
229            let mut pending = self.pending_requests.lock().await;
230            pending.insert(request_id, pending_request.clone());
231        }
232
233        match timeout(
234            Duration::from_secs(timeout_secs),
235            pending_request.notify.notified(),
236        )
237        .await
238        {
239            Ok(_) => {
240                let state = self.state.lock().await;
241                if let Some(auth_info) = &state.auth_info {
242                    Ok(auth_info.clone())
243                } else {
244                    Err(CtpError::InitializationError("认证失败".to_string()))
245                }
246            }
247            Err(_) => {
248                // 清理待处理请求
249                let mut pending = self.pending_requests.lock().await;
250                pending.remove(&request_id);
251                Err(CtpError::InitializationError("认证超时".to_string()))
252            }
253        }
254    }
255
256    /// 异步登录
257    pub async fn login(
258        &self,
259        req: &ReqUserLoginField,
260        timeout_secs: u64,
261    ) -> CtpResult<RspUserLoginField> {
262        let mut api = self.inner.lock().await;
263        let request_id = api.req_user_login(req)?;
264        drop(api);
265
266        let pending_request = PendingRequest {
267            notify: Arc::new(Notify::new()),
268            response_data: Arc::new(Mutex::new(None)),
269        };
270
271        // 注册待处理请求
272        {
273            let mut pending = self.pending_requests.lock().await;
274            pending.insert(request_id, pending_request.clone());
275        }
276
277        match timeout(
278            Duration::from_secs(timeout_secs),
279            pending_request.notify.notified(),
280        )
281        .await
282        {
283            Ok(_) => {
284                let state = self.state.lock().await;
285                if let Some(login_info) = &state.login_info {
286                    Ok(login_info.clone())
287                } else {
288                    Err(CtpError::InitializationError("登录失败".to_string()))
289                }
290            }
291            Err(_) => {
292                // 清理待处理请求
293                let mut pending = self.pending_requests.lock().await;
294                pending.remove(&request_id);
295                Err(CtpError::InitializationError("登录超时".to_string()))
296            }
297        }
298    }
299
300    /// 异步报单录入
301    pub async fn order_insert(
302        &self,
303        req: &InputOrderField,
304        timeout_secs: u64,
305    ) -> CtpResult<AsyncTraderEvent> {
306        let mut api = self.inner.lock().await;
307        let request_id = api.req_order_insert(req)?;
308        drop(api);
309
310        self.wait_for_response(request_id, timeout_secs).await
311    }
312
313    /// 异步报单操作
314    pub async fn order_action(
315        &self,
316        req: &InputOrderActionField,
317        timeout_secs: u64,
318    ) -> CtpResult<AsyncTraderEvent> {
319        let mut api = self.inner.lock().await;
320        let request_id = api.req_order_action(req)?;
321        drop(api);
322
323        self.wait_for_response(request_id, timeout_secs).await
324    }
325
326    /// 异步查询交易账户
327    pub async fn qry_trading_account(
328        &self,
329        req: &QryTradingAccountField,
330        timeout_secs: u64,
331    ) -> CtpResult<Vec<TradingAccountField>> {
332        let mut api = self.inner.lock().await;
333        let request_id = api.req_qry_trading_account(req)?;
334        drop(api);
335
336        // 收集所有响应数据
337        let mut results = Vec::new();
338        let mut is_finished = false;
339
340        let start_time = std::time::Instant::now();
341        let timeout_duration = Duration::from_secs(timeout_secs);
342
343        while !is_finished && start_time.elapsed() < timeout_duration {
344            if let Some(event) = self.recv_event().await {
345                match event {
346                    AsyncTraderEvent::QryTradingAccountResponse {
347                        trading_account,
348                        rsp_info,
349                        request_id: resp_id,
350                        is_last,
351                    } if resp_id == request_id => {
352                        if let Some(rsp) = rsp_info {
353                            if !rsp.is_success() {
354                                return Err(CtpError::BusinessError(
355                                    rsp.error_id,
356                                    rsp.get_error_msg().unwrap_or_default(),
357                                ));
358                            }
359                        }
360                        if let Some(account) = trading_account {
361                            results.push(account);
362                        }
363                        is_finished = is_last;
364                    }
365                    _ => continue,
366                }
367            }
368        }
369
370        if is_finished {
371            Ok(results)
372        } else {
373            Err(CtpError::InitializationError("查询超时".to_string()))
374        }
375    }
376
377    /// 异步查询投资者持仓
378    pub async fn qry_investor_position(
379        &self,
380        req: &QryInvestorPositionField,
381        timeout_secs: u64,
382    ) -> CtpResult<Vec<InvestorPositionField>> {
383        let mut api = self.inner.lock().await;
384        let request_id = api.req_qry_investor_position(req)?;
385        drop(api);
386
387        // 收集所有响应数据
388        let mut results = Vec::new();
389        let mut is_finished = false;
390
391        let start_time = std::time::Instant::now();
392        let timeout_duration = Duration::from_secs(timeout_secs);
393
394        while !is_finished && start_time.elapsed() < timeout_duration {
395            if let Some(event) = self.recv_event().await {
396                match event {
397                    AsyncTraderEvent::QryInvestorPositionResponse {
398                        investor_position,
399                        rsp_info,
400                        request_id: resp_id,
401                        is_last,
402                    } if resp_id == request_id => {
403                        if let Some(rsp) = rsp_info {
404                            if !rsp.is_success() {
405                                return Err(CtpError::BusinessError(
406                                    rsp.error_id,
407                                    rsp.get_error_msg().unwrap_or_default(),
408                                ));
409                            }
410                        }
411                        if let Some(position) = investor_position {
412                            results.push(position);
413                        }
414                        is_finished = is_last;
415                    }
416                    _ => continue,
417                }
418            }
419        }
420
421        if is_finished {
422            Ok(results)
423        } else {
424            Err(CtpError::InitializationError("查询超时".to_string()))
425        }
426    }
427
428    /// 等待指定请求的响应
429    async fn wait_for_response(
430        &self,
431        request_id: i32,
432        timeout_secs: u64,
433    ) -> CtpResult<AsyncTraderEvent> {
434        let pending_request = PendingRequest {
435            notify: Arc::new(Notify::new()),
436            response_data: Arc::new(Mutex::new(None)),
437        };
438
439        // 注册待处理请求
440        {
441            let mut pending = self.pending_requests.lock().await;
442            pending.insert(request_id, pending_request.clone());
443        }
444
445        match timeout(
446            Duration::from_secs(timeout_secs),
447            pending_request.notify.notified(),
448        )
449        .await
450        {
451            Ok(_) => {
452                let response_data = pending_request.response_data.lock().await;
453                if let Some(event) = response_data.as_ref() {
454                    Ok(event.clone())
455                } else {
456                    Err(CtpError::InitializationError("响应数据为空".to_string()))
457                }
458            }
459            Err(_) => {
460                // 清理待处理请求
461                let mut pending = self.pending_requests.lock().await;
462                pending.remove(&request_id);
463                Err(CtpError::InitializationError("请求超时".to_string()))
464            }
465        }
466    }
467
468    /// 接收下一个事件
469    pub async fn recv_event(&self) -> Option<AsyncTraderEvent> {
470        let mut receiver = self.event_receiver.lock().await;
471        receiver.recv().await
472    }
473
474    /// 尝试接收事件(非阻塞)
475    pub async fn try_recv_event(&self) -> Result<AsyncTraderEvent, mpsc::error::TryRecvError> {
476        let mut receiver = self.event_receiver.lock().await;
477        receiver.try_recv()
478    }
479
480    /// 获取当前状态
481    pub async fn get_state(&self) -> AsyncTraderState {
482        self.state.lock().await.clone()
483    }
484
485    /// 释放资源
486    pub async fn release(&self) -> CtpResult<()> {
487        let mut api = self.inner.lock().await;
488        api.release();
489        Ok(())
490    }
491}
492
493/// 异步事件处理器
494#[derive(Clone)]
495struct AsyncTraderHandler {
496    event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
497    state: Arc<Mutex<AsyncTraderState>>,
498    connected_notify: Arc<Notify>,
499    auth_notify: Arc<Notify>,
500    login_notify: Arc<Notify>,
501    pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
502}
503
504impl AsyncTraderHandler {
505    fn new(
506        event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
507        state: Arc<Mutex<AsyncTraderState>>,
508        connected_notify: Arc<Notify>,
509        auth_notify: Arc<Notify>,
510        login_notify: Arc<Notify>,
511        pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
512    ) -> Self {
513        Self {
514            event_sender,
515            state,
516            connected_notify,
517            auth_notify,
518            login_notify,
519            pending_requests,
520        }
521    }
522
523    /// 通知待处理的请求
524    fn notify_pending_request(&self, request_id: i32, event: AsyncTraderEvent) {
525        if let Ok(mut pending) = self.pending_requests.try_lock() {
526            if let Some(req) = pending.remove(&request_id) {
527                // 设置响应数据
528                if let Ok(mut data) = req.response_data.try_lock() {
529                    *data = Some(event);
530                }
531                // 通知等待者
532                req.notify.notify_waiters();
533            }
534        }
535    }
536}
537
538impl TraderSpiHandler for AsyncTraderHandler {
539    fn on_front_connected(&mut self) {
540        debug!("异步交易API: 连接成功");
541
542        // 更新状态
543        if let Ok(mut state) = self.state.try_lock() {
544            state.connected = true;
545        }
546
547        // 通知等待者
548        self.connected_notify.notify_waiters();
549
550        // 发送事件
551        let _ = self.event_sender.send(AsyncTraderEvent::Connected);
552    }
553
554    fn on_front_disconnected(&mut self, reason: i32) {
555        warn!("异步交易API: 连接断开, 原因: {}", reason);
556
557        // 更新状态
558        if let Ok(mut state) = self.state.try_lock() {
559            state.connected = false;
560            state.authenticated = false;
561            state.logged_in = false;
562            state.auth_info = None;
563            state.login_info = None;
564        }
565
566        // 发送事件
567        let _ = self
568            .event_sender
569            .send(AsyncTraderEvent::Disconnected(reason));
570    }
571
572    fn on_heart_beat_warning(&mut self, time_lapse: i32) {
573        warn!("异步交易API: 心跳超时警告, 时间间隔: {}秒", time_lapse);
574        let _ = self
575            .event_sender
576            .send(AsyncTraderEvent::HeartBeatWarning(time_lapse));
577    }
578
579    fn on_rsp_authenticate(
580        &mut self,
581        rsp_authenticate: Option<RspAuthenticateField>,
582        rsp_info: Option<RspInfoField>,
583        request_id: i32,
584        is_last: bool,
585    ) {
586        debug!("异步交易API: 收到认证响应");
587
588        // 检查是否成功
589        let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
590
591        if success && is_last {
592            if let Ok(mut state) = self.state.try_lock() {
593                state.authenticated = true;
594                state.auth_info = rsp_authenticate.clone();
595            }
596            // 通知认证完成
597            self.auth_notify.notify_waiters();
598        }
599
600        // 发送事件
601        let event = AsyncTraderEvent::AuthenticateResponse {
602            rsp_authenticate,
603            rsp_info,
604            request_id,
605            is_last,
606        };
607
608        let _ = self.event_sender.send(event.clone());
609
610        // 通知待处理的请求
611        self.notify_pending_request(request_id, event);
612    }
613
614    fn on_rsp_user_login(
615        &mut self,
616        user_login: Option<RspUserLoginField>,
617        rsp_info: Option<RspInfoField>,
618        request_id: i32,
619        is_last: bool,
620    ) {
621        debug!("异步交易API: 收到登录响应");
622
623        // 检查是否成功
624        let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
625
626        if success && is_last {
627            if let Ok(mut state) = self.state.try_lock() {
628                state.logged_in = true;
629                state.login_info = user_login.clone();
630            }
631            // 通知登录完成
632            self.login_notify.notify_waiters();
633        }
634
635        // 发送事件
636        let event = AsyncTraderEvent::LoginResponse {
637            user_login,
638            rsp_info,
639            request_id,
640            is_last,
641        };
642
643        let _ = self.event_sender.send(event.clone());
644
645        // 通知待处理的请求
646        self.notify_pending_request(request_id, event);
647    }
648
649    fn on_rsp_user_logout(
650        &mut self,
651        _user_logout: Option<()>,
652        rsp_info: Option<RspInfoField>,
653        request_id: i32,
654        is_last: bool,
655    ) {
656        debug!("异步交易API: 收到登出响应");
657
658        if is_last {
659            if let Ok(mut state) = self.state.try_lock() {
660                state.logged_in = false;
661                state.login_info = None;
662            }
663        }
664
665        let event = AsyncTraderEvent::LogoutResponse {
666            rsp_info,
667            request_id,
668            is_last,
669        };
670
671        let _ = self.event_sender.send(event.clone());
672
673        // 通知待处理的请求
674        self.notify_pending_request(request_id, event);
675    }
676
677    fn on_rsp_error(&mut self, rsp_info: Option<RspInfoField>, request_id: i32, is_last: bool) {
678        error!("异步交易API: 收到错误响应");
679
680        let event = AsyncTraderEvent::ErrorResponse {
681            rsp_info,
682            request_id,
683            is_last,
684        };
685
686        let _ = self.event_sender.send(event.clone());
687
688        // 通知待处理的请求
689        self.notify_pending_request(request_id, event);
690    }
691
692    fn on_rsp_order_insert(
693        &mut self,
694        input_order: Option<InputOrderField>,
695        rsp_info: Option<RspInfoField>,
696        request_id: i32,
697        is_last: bool,
698    ) {
699        debug!("异步交易API: 收到报单录入响应");
700
701        let event = AsyncTraderEvent::OrderInsertResponse {
702            input_order,
703            rsp_info,
704            request_id,
705            is_last,
706        };
707
708        let _ = self.event_sender.send(event.clone());
709
710        // 通知待处理的请求
711        self.notify_pending_request(request_id, event);
712    }
713
714    fn on_rsp_order_action(
715        &mut self,
716        input_order_action: Option<InputOrderActionField>,
717        rsp_info: Option<RspInfoField>,
718        request_id: i32,
719        is_last: bool,
720    ) {
721        debug!("异步交易API: 收到报单操作响应");
722
723        let event = AsyncTraderEvent::OrderActionResponse {
724            input_order_action,
725            rsp_info,
726            request_id,
727            is_last,
728        };
729
730        let _ = self.event_sender.send(event.clone());
731
732        // 通知待处理的请求
733        self.notify_pending_request(request_id, event);
734    }
735
736    fn on_rsp_qry_trading_account(
737        &mut self,
738        trading_account: Option<TradingAccountField>,
739        rsp_info: Option<RspInfoField>,
740        request_id: i32,
741        is_last: bool,
742    ) {
743        debug!("异步交易API: 收到查询交易账户响应");
744
745        let _ = self
746            .event_sender
747            .send(AsyncTraderEvent::QryTradingAccountResponse {
748                trading_account,
749                rsp_info,
750                request_id,
751                is_last,
752            });
753    }
754
755    fn on_rsp_qry_investor_position(
756        &mut self,
757        investor_position: Option<InvestorPositionField>,
758        rsp_info: Option<RspInfoField>,
759        request_id: i32,
760        is_last: bool,
761    ) {
762        debug!("异步交易API: 收到查询投资者持仓响应");
763
764        let _ = self
765            .event_sender
766            .send(AsyncTraderEvent::QryInvestorPositionResponse {
767                investor_position,
768                rsp_info,
769                request_id,
770                is_last,
771            });
772    }
773
774    fn on_rsp_qry_order(
775        &mut self,
776        order: Option<OrderField>,
777        rsp_info: Option<RspInfoField>,
778        request_id: i32,
779        is_last: bool,
780    ) {
781        debug!("异步交易API: 收到查询报单响应");
782
783        let _ = self.event_sender.send(AsyncTraderEvent::QryOrderResponse {
784            order,
785            rsp_info,
786            request_id,
787            is_last,
788        });
789    }
790
791    fn on_rsp_qry_trade(
792        &mut self,
793        trade: Option<TradeField>,
794        rsp_info: Option<RspInfoField>,
795        request_id: i32,
796        is_last: bool,
797    ) {
798        debug!("异步交易API: 收到查询成交响应");
799
800        let _ = self.event_sender.send(AsyncTraderEvent::QryTradeResponse {
801            trade,
802            rsp_info,
803            request_id,
804            is_last,
805        });
806    }
807
808    fn on_rtn_order(&mut self, order: OrderField) {
809        debug!("异步交易API: 收到报单回报");
810        let _ = self.event_sender.send(AsyncTraderEvent::OrderReturn(order));
811    }
812
813    fn on_rtn_trade(&mut self, trade: TradeField) {
814        debug!("异步交易API: 收到成交回报");
815        let _ = self.event_sender.send(AsyncTraderEvent::TradeReturn(trade));
816    }
817}