1use crate::api::trader_api::{
6 InputOrderField, InvestorPositionField, OrderField, ReqAuthenticateField, RspAuthenticateField,
7 TradeField, TraderApi, TraderSpiHandler, TradingAccountField,
8};
9use crate::api::CtpApi;
10use crate::error::{CtpError, CtpResult};
11use crate::types::{
12 InputOrderActionField, QryInvestorPositionField, QryTradingAccountField, ReqUserLoginField,
13 RspInfoField, RspUserLoginField,
14};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, Mutex, Notify};
18use tokio::time::{timeout, Duration};
19use tracing::{debug, error, warn};
20
21#[derive(Debug, Clone)]
23pub enum AsyncTraderEvent {
24 Connected,
26 Disconnected(i32),
28 HeartBeatWarning(i32),
30 AuthenticateResponse {
32 rsp_authenticate: Option<RspAuthenticateField>,
33 rsp_info: Option<RspInfoField>,
34 request_id: i32,
35 is_last: bool,
36 },
37 LoginResponse {
39 user_login: Option<RspUserLoginField>,
40 rsp_info: Option<RspInfoField>,
41 request_id: i32,
42 is_last: bool,
43 },
44 LogoutResponse {
46 rsp_info: Option<RspInfoField>,
47 request_id: i32,
48 is_last: bool,
49 },
50 OrderInsertResponse {
52 input_order: Option<InputOrderField>,
53 rsp_info: Option<RspInfoField>,
54 request_id: i32,
55 is_last: bool,
56 },
57 OrderActionResponse {
59 input_order_action: Option<InputOrderActionField>,
60 rsp_info: Option<RspInfoField>,
61 request_id: i32,
62 is_last: bool,
63 },
64 QryTradingAccountResponse {
66 trading_account: Option<TradingAccountField>,
67 rsp_info: Option<RspInfoField>,
68 request_id: i32,
69 is_last: bool,
70 },
71 QryInvestorPositionResponse {
73 investor_position: Option<InvestorPositionField>,
74 rsp_info: Option<RspInfoField>,
75 request_id: i32,
76 is_last: bool,
77 },
78 QryOrderResponse {
80 order: Option<OrderField>,
81 rsp_info: Option<RspInfoField>,
82 request_id: i32,
83 is_last: bool,
84 },
85 QryTradeResponse {
87 trade: Option<TradeField>,
88 rsp_info: Option<RspInfoField>,
89 request_id: i32,
90 is_last: bool,
91 },
92 OrderReturn(OrderField),
94 TradeReturn(TradeField),
96 ErrorResponse {
98 rsp_info: Option<RspInfoField>,
99 request_id: i32,
100 is_last: bool,
101 },
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct AsyncTraderState {
107 pub connected: bool,
108 pub authenticated: bool,
109 pub logged_in: bool,
110 pub auth_info: Option<RspAuthenticateField>,
111 pub login_info: Option<RspUserLoginField>,
112}
113
114#[derive(Debug, Clone)]
116struct PendingRequest {
117 notify: Arc<Notify>,
118 response_data: Arc<Mutex<Option<AsyncTraderEvent>>>,
119}
120
121pub struct AsyncTraderApi {
123 inner: Arc<Mutex<TraderApi>>,
125 event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
127 event_receiver: Arc<Mutex<mpsc::UnboundedReceiver<AsyncTraderEvent>>>,
129 state: Arc<Mutex<AsyncTraderState>>,
131 connected_notify: Arc<Notify>,
133 auth_notify: Arc<Notify>,
135 login_notify: Arc<Notify>,
137 pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
139}
140
141impl AsyncTraderApi {
142 pub async fn new(flow_path: Option<&str>, is_production_mode: Option<bool>) -> CtpResult<Self> {
144 let trader_api = TraderApi::new(flow_path, is_production_mode)?;
145 let (event_sender, event_receiver) = mpsc::unbounded_channel();
146
147 Ok(Self {
148 inner: Arc::new(Mutex::new(trader_api)),
149 event_sender,
150 event_receiver: Arc::new(Mutex::new(event_receiver)),
151 state: Arc::new(Mutex::new(AsyncTraderState::default())),
152 connected_notify: Arc::new(Notify::new()),
153 auth_notify: Arc::new(Notify::new()),
154 login_notify: Arc::new(Notify::new()),
155 pending_requests: Arc::new(Mutex::new(HashMap::new())),
156 })
157 }
158
159 pub async fn register_front(&self, front_address: &str) -> CtpResult<()> {
161 let mut api = self.inner.lock().await;
162 api.register_front(front_address)
163 }
164
165 pub async fn init(&self) -> CtpResult<()> {
167 let mut api = self.inner.lock().await;
168
169 let handler = AsyncTraderHandler::new(
171 self.event_sender.clone(),
172 self.state.clone(),
173 self.connected_notify.clone(),
174 self.auth_notify.clone(),
175 self.login_notify.clone(),
176 self.pending_requests.clone(),
177 );
178
179 api.register_spi(handler)?;
181
182 api.init()
184 }
185
186 pub async fn wait_connected(&self, timeout_secs: u64) -> CtpResult<()> {
188 let state = self.state.lock().await;
189 if state.connected {
190 return Ok(());
191 }
192 drop(state);
193
194 match timeout(
195 Duration::from_secs(timeout_secs),
196 self.connected_notify.notified(),
197 )
198 .await
199 {
200 Ok(_) => {
201 let state = self.state.lock().await;
202 if state.connected {
203 Ok(())
204 } else {
205 Err(CtpError::InitializationError("连接失败".to_string()))
206 }
207 }
208 Err(_) => Err(CtpError::InitializationError("连接超时".to_string())),
209 }
210 }
211
212 pub async fn authenticate(
214 &self,
215 req: &ReqAuthenticateField,
216 timeout_secs: u64,
217 ) -> CtpResult<RspAuthenticateField> {
218 let mut api = self.inner.lock().await;
219 let request_id = api.req_authenticate(req)?;
220 drop(api);
221
222 let pending_request = PendingRequest {
223 notify: Arc::new(Notify::new()),
224 response_data: Arc::new(Mutex::new(None)),
225 };
226
227 {
229 let mut pending = self.pending_requests.lock().await;
230 pending.insert(request_id, pending_request.clone());
231 }
232
233 match timeout(
234 Duration::from_secs(timeout_secs),
235 pending_request.notify.notified(),
236 )
237 .await
238 {
239 Ok(_) => {
240 let state = self.state.lock().await;
241 if let Some(auth_info) = &state.auth_info {
242 Ok(auth_info.clone())
243 } else {
244 Err(CtpError::InitializationError("认证失败".to_string()))
245 }
246 }
247 Err(_) => {
248 let mut pending = self.pending_requests.lock().await;
250 pending.remove(&request_id);
251 Err(CtpError::InitializationError("认证超时".to_string()))
252 }
253 }
254 }
255
256 pub async fn login(
258 &self,
259 req: &ReqUserLoginField,
260 timeout_secs: u64,
261 ) -> CtpResult<RspUserLoginField> {
262 let mut api = self.inner.lock().await;
263 let request_id = api.req_user_login(req)?;
264 drop(api);
265
266 let pending_request = PendingRequest {
267 notify: Arc::new(Notify::new()),
268 response_data: Arc::new(Mutex::new(None)),
269 };
270
271 {
273 let mut pending = self.pending_requests.lock().await;
274 pending.insert(request_id, pending_request.clone());
275 }
276
277 match timeout(
278 Duration::from_secs(timeout_secs),
279 pending_request.notify.notified(),
280 )
281 .await
282 {
283 Ok(_) => {
284 let state = self.state.lock().await;
285 if let Some(login_info) = &state.login_info {
286 Ok(login_info.clone())
287 } else {
288 Err(CtpError::InitializationError("登录失败".to_string()))
289 }
290 }
291 Err(_) => {
292 let mut pending = self.pending_requests.lock().await;
294 pending.remove(&request_id);
295 Err(CtpError::InitializationError("登录超时".to_string()))
296 }
297 }
298 }
299
300 pub async fn order_insert(
302 &self,
303 req: &InputOrderField,
304 timeout_secs: u64,
305 ) -> CtpResult<AsyncTraderEvent> {
306 let mut api = self.inner.lock().await;
307 let request_id = api.req_order_insert(req)?;
308 drop(api);
309
310 self.wait_for_response(request_id, timeout_secs).await
311 }
312
313 pub async fn order_action(
315 &self,
316 req: &InputOrderActionField,
317 timeout_secs: u64,
318 ) -> CtpResult<AsyncTraderEvent> {
319 let mut api = self.inner.lock().await;
320 let request_id = api.req_order_action(req)?;
321 drop(api);
322
323 self.wait_for_response(request_id, timeout_secs).await
324 }
325
326 pub async fn qry_trading_account(
328 &self,
329 req: &QryTradingAccountField,
330 timeout_secs: u64,
331 ) -> CtpResult<Vec<TradingAccountField>> {
332 let mut api = self.inner.lock().await;
333 let request_id = api.req_qry_trading_account(req)?;
334 drop(api);
335
336 let mut results = Vec::new();
338 let mut is_finished = false;
339
340 let start_time = std::time::Instant::now();
341 let timeout_duration = Duration::from_secs(timeout_secs);
342
343 while !is_finished && start_time.elapsed() < timeout_duration {
344 if let Some(event) = self.recv_event().await {
345 match event {
346 AsyncTraderEvent::QryTradingAccountResponse {
347 trading_account,
348 rsp_info,
349 request_id: resp_id,
350 is_last,
351 } if resp_id == request_id => {
352 if let Some(rsp) = rsp_info {
353 if !rsp.is_success() {
354 return Err(CtpError::BusinessError(
355 rsp.error_id,
356 rsp.get_error_msg().unwrap_or_default(),
357 ));
358 }
359 }
360 if let Some(account) = trading_account {
361 results.push(account);
362 }
363 is_finished = is_last;
364 }
365 _ => continue,
366 }
367 }
368 }
369
370 if is_finished {
371 Ok(results)
372 } else {
373 Err(CtpError::InitializationError("查询超时".to_string()))
374 }
375 }
376
377 pub async fn qry_investor_position(
379 &self,
380 req: &QryInvestorPositionField,
381 timeout_secs: u64,
382 ) -> CtpResult<Vec<InvestorPositionField>> {
383 let mut api = self.inner.lock().await;
384 let request_id = api.req_qry_investor_position(req)?;
385 drop(api);
386
387 let mut results = Vec::new();
389 let mut is_finished = false;
390
391 let start_time = std::time::Instant::now();
392 let timeout_duration = Duration::from_secs(timeout_secs);
393
394 while !is_finished && start_time.elapsed() < timeout_duration {
395 if let Some(event) = self.recv_event().await {
396 match event {
397 AsyncTraderEvent::QryInvestorPositionResponse {
398 investor_position,
399 rsp_info,
400 request_id: resp_id,
401 is_last,
402 } if resp_id == request_id => {
403 if let Some(rsp) = rsp_info {
404 if !rsp.is_success() {
405 return Err(CtpError::BusinessError(
406 rsp.error_id,
407 rsp.get_error_msg().unwrap_or_default(),
408 ));
409 }
410 }
411 if let Some(position) = investor_position {
412 results.push(position);
413 }
414 is_finished = is_last;
415 }
416 _ => continue,
417 }
418 }
419 }
420
421 if is_finished {
422 Ok(results)
423 } else {
424 Err(CtpError::InitializationError("查询超时".to_string()))
425 }
426 }
427
428 async fn wait_for_response(
430 &self,
431 request_id: i32,
432 timeout_secs: u64,
433 ) -> CtpResult<AsyncTraderEvent> {
434 let pending_request = PendingRequest {
435 notify: Arc::new(Notify::new()),
436 response_data: Arc::new(Mutex::new(None)),
437 };
438
439 {
441 let mut pending = self.pending_requests.lock().await;
442 pending.insert(request_id, pending_request.clone());
443 }
444
445 match timeout(
446 Duration::from_secs(timeout_secs),
447 pending_request.notify.notified(),
448 )
449 .await
450 {
451 Ok(_) => {
452 let response_data = pending_request.response_data.lock().await;
453 if let Some(event) = response_data.as_ref() {
454 Ok(event.clone())
455 } else {
456 Err(CtpError::InitializationError("响应数据为空".to_string()))
457 }
458 }
459 Err(_) => {
460 let mut pending = self.pending_requests.lock().await;
462 pending.remove(&request_id);
463 Err(CtpError::InitializationError("请求超时".to_string()))
464 }
465 }
466 }
467
468 pub async fn recv_event(&self) -> Option<AsyncTraderEvent> {
470 let mut receiver = self.event_receiver.lock().await;
471 receiver.recv().await
472 }
473
474 pub async fn try_recv_event(&self) -> Result<AsyncTraderEvent, mpsc::error::TryRecvError> {
476 let mut receiver = self.event_receiver.lock().await;
477 receiver.try_recv()
478 }
479
480 pub async fn get_state(&self) -> AsyncTraderState {
482 self.state.lock().await.clone()
483 }
484
485 pub async fn release(&self) -> CtpResult<()> {
487 let mut api = self.inner.lock().await;
488 api.release();
489 Ok(())
490 }
491}
492
493#[derive(Clone)]
495struct AsyncTraderHandler {
496 event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
497 state: Arc<Mutex<AsyncTraderState>>,
498 connected_notify: Arc<Notify>,
499 auth_notify: Arc<Notify>,
500 login_notify: Arc<Notify>,
501 pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
502}
503
504impl AsyncTraderHandler {
505 fn new(
506 event_sender: mpsc::UnboundedSender<AsyncTraderEvent>,
507 state: Arc<Mutex<AsyncTraderState>>,
508 connected_notify: Arc<Notify>,
509 auth_notify: Arc<Notify>,
510 login_notify: Arc<Notify>,
511 pending_requests: Arc<Mutex<HashMap<i32, PendingRequest>>>,
512 ) -> Self {
513 Self {
514 event_sender,
515 state,
516 connected_notify,
517 auth_notify,
518 login_notify,
519 pending_requests,
520 }
521 }
522
523 fn notify_pending_request(&self, request_id: i32, event: AsyncTraderEvent) {
525 if let Ok(mut pending) = self.pending_requests.try_lock() {
526 if let Some(req) = pending.remove(&request_id) {
527 if let Ok(mut data) = req.response_data.try_lock() {
529 *data = Some(event);
530 }
531 req.notify.notify_waiters();
533 }
534 }
535 }
536}
537
538impl TraderSpiHandler for AsyncTraderHandler {
539 fn on_front_connected(&mut self) {
540 debug!("异步交易API: 连接成功");
541
542 if let Ok(mut state) = self.state.try_lock() {
544 state.connected = true;
545 }
546
547 self.connected_notify.notify_waiters();
549
550 let _ = self.event_sender.send(AsyncTraderEvent::Connected);
552 }
553
554 fn on_front_disconnected(&mut self, reason: i32) {
555 warn!("异步交易API: 连接断开, 原因: {}", reason);
556
557 if let Ok(mut state) = self.state.try_lock() {
559 state.connected = false;
560 state.authenticated = false;
561 state.logged_in = false;
562 state.auth_info = None;
563 state.login_info = None;
564 }
565
566 let _ = self
568 .event_sender
569 .send(AsyncTraderEvent::Disconnected(reason));
570 }
571
572 fn on_heart_beat_warning(&mut self, time_lapse: i32) {
573 warn!("异步交易API: 心跳超时警告, 时间间隔: {}秒", time_lapse);
574 let _ = self
575 .event_sender
576 .send(AsyncTraderEvent::HeartBeatWarning(time_lapse));
577 }
578
579 fn on_rsp_authenticate(
580 &mut self,
581 rsp_authenticate: Option<RspAuthenticateField>,
582 rsp_info: Option<RspInfoField>,
583 request_id: i32,
584 is_last: bool,
585 ) {
586 debug!("异步交易API: 收到认证响应");
587
588 let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
590
591 if success && is_last {
592 if let Ok(mut state) = self.state.try_lock() {
593 state.authenticated = true;
594 state.auth_info = rsp_authenticate.clone();
595 }
596 self.auth_notify.notify_waiters();
598 }
599
600 let event = AsyncTraderEvent::AuthenticateResponse {
602 rsp_authenticate,
603 rsp_info,
604 request_id,
605 is_last,
606 };
607
608 let _ = self.event_sender.send(event.clone());
609
610 self.notify_pending_request(request_id, event);
612 }
613
614 fn on_rsp_user_login(
615 &mut self,
616 user_login: Option<RspUserLoginField>,
617 rsp_info: Option<RspInfoField>,
618 request_id: i32,
619 is_last: bool,
620 ) {
621 debug!("异步交易API: 收到登录响应");
622
623 let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
625
626 if success && is_last {
627 if let Ok(mut state) = self.state.try_lock() {
628 state.logged_in = true;
629 state.login_info = user_login.clone();
630 }
631 self.login_notify.notify_waiters();
633 }
634
635 let event = AsyncTraderEvent::LoginResponse {
637 user_login,
638 rsp_info,
639 request_id,
640 is_last,
641 };
642
643 let _ = self.event_sender.send(event.clone());
644
645 self.notify_pending_request(request_id, event);
647 }
648
649 fn on_rsp_user_logout(
650 &mut self,
651 _user_logout: Option<()>,
652 rsp_info: Option<RspInfoField>,
653 request_id: i32,
654 is_last: bool,
655 ) {
656 debug!("异步交易API: 收到登出响应");
657
658 if is_last {
659 if let Ok(mut state) = self.state.try_lock() {
660 state.logged_in = false;
661 state.login_info = None;
662 }
663 }
664
665 let event = AsyncTraderEvent::LogoutResponse {
666 rsp_info,
667 request_id,
668 is_last,
669 };
670
671 let _ = self.event_sender.send(event.clone());
672
673 self.notify_pending_request(request_id, event);
675 }
676
677 fn on_rsp_error(&mut self, rsp_info: Option<RspInfoField>, request_id: i32, is_last: bool) {
678 error!("异步交易API: 收到错误响应");
679
680 let event = AsyncTraderEvent::ErrorResponse {
681 rsp_info,
682 request_id,
683 is_last,
684 };
685
686 let _ = self.event_sender.send(event.clone());
687
688 self.notify_pending_request(request_id, event);
690 }
691
692 fn on_rsp_order_insert(
693 &mut self,
694 input_order: Option<InputOrderField>,
695 rsp_info: Option<RspInfoField>,
696 request_id: i32,
697 is_last: bool,
698 ) {
699 debug!("异步交易API: 收到报单录入响应");
700
701 let event = AsyncTraderEvent::OrderInsertResponse {
702 input_order,
703 rsp_info,
704 request_id,
705 is_last,
706 };
707
708 let _ = self.event_sender.send(event.clone());
709
710 self.notify_pending_request(request_id, event);
712 }
713
714 fn on_rsp_order_action(
715 &mut self,
716 input_order_action: Option<InputOrderActionField>,
717 rsp_info: Option<RspInfoField>,
718 request_id: i32,
719 is_last: bool,
720 ) {
721 debug!("异步交易API: 收到报单操作响应");
722
723 let event = AsyncTraderEvent::OrderActionResponse {
724 input_order_action,
725 rsp_info,
726 request_id,
727 is_last,
728 };
729
730 let _ = self.event_sender.send(event.clone());
731
732 self.notify_pending_request(request_id, event);
734 }
735
736 fn on_rsp_qry_trading_account(
737 &mut self,
738 trading_account: Option<TradingAccountField>,
739 rsp_info: Option<RspInfoField>,
740 request_id: i32,
741 is_last: bool,
742 ) {
743 debug!("异步交易API: 收到查询交易账户响应");
744
745 let _ = self
746 .event_sender
747 .send(AsyncTraderEvent::QryTradingAccountResponse {
748 trading_account,
749 rsp_info,
750 request_id,
751 is_last,
752 });
753 }
754
755 fn on_rsp_qry_investor_position(
756 &mut self,
757 investor_position: Option<InvestorPositionField>,
758 rsp_info: Option<RspInfoField>,
759 request_id: i32,
760 is_last: bool,
761 ) {
762 debug!("异步交易API: 收到查询投资者持仓响应");
763
764 let _ = self
765 .event_sender
766 .send(AsyncTraderEvent::QryInvestorPositionResponse {
767 investor_position,
768 rsp_info,
769 request_id,
770 is_last,
771 });
772 }
773
774 fn on_rsp_qry_order(
775 &mut self,
776 order: Option<OrderField>,
777 rsp_info: Option<RspInfoField>,
778 request_id: i32,
779 is_last: bool,
780 ) {
781 debug!("异步交易API: 收到查询报单响应");
782
783 let _ = self.event_sender.send(AsyncTraderEvent::QryOrderResponse {
784 order,
785 rsp_info,
786 request_id,
787 is_last,
788 });
789 }
790
791 fn on_rsp_qry_trade(
792 &mut self,
793 trade: Option<TradeField>,
794 rsp_info: Option<RspInfoField>,
795 request_id: i32,
796 is_last: bool,
797 ) {
798 debug!("异步交易API: 收到查询成交响应");
799
800 let _ = self.event_sender.send(AsyncTraderEvent::QryTradeResponse {
801 trade,
802 rsp_info,
803 request_id,
804 is_last,
805 });
806 }
807
808 fn on_rtn_order(&mut self, order: OrderField) {
809 debug!("异步交易API: 收到报单回报");
810 let _ = self.event_sender.send(AsyncTraderEvent::OrderReturn(order));
811 }
812
813 fn on_rtn_trade(&mut self, trade: TradeField) {
814 debug!("异步交易API: 收到成交回报");
815 let _ = self.event_sender.send(AsyncTraderEvent::TradeReturn(trade));
816 }
817}