1use crate::api::md_api::{
6 DepthMarketDataField, ForQuoteRspField, MdApi, MdSpiHandler, SpecificInstrumentField,
7};
8use crate::api::CtpApi;
9use crate::error::{CtpError, CtpResult};
10use crate::types::{ReqUserLoginField, RspInfoField, RspUserLoginField};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex, Notify};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, warn};
15
16#[derive(Debug, Clone)]
18pub enum AsyncMdEvent {
19 Connected,
21 Disconnected(i32),
23 HeartBeatWarning(i32),
25 LoginResponse {
27 user_login: Option<RspUserLoginField>,
28 rsp_info: Option<RspInfoField>,
29 request_id: i32,
30 is_last: bool,
31 },
32 LogoutResponse {
34 rsp_info: Option<RspInfoField>,
35 request_id: i32,
36 is_last: bool,
37 },
38 ErrorResponse {
40 rsp_info: Option<RspInfoField>,
41 request_id: i32,
42 is_last: bool,
43 },
44 SubMarketDataResponse {
46 specific_instrument: Option<SpecificInstrumentField>,
47 rsp_info: Option<RspInfoField>,
48 request_id: i32,
49 is_last: bool,
50 },
51 UnsubMarketDataResponse {
53 specific_instrument: Option<SpecificInstrumentField>,
54 rsp_info: Option<RspInfoField>,
55 request_id: i32,
56 is_last: bool,
57 },
58 DepthMarketData(DepthMarketDataField),
60 ForQuoteResponse(ForQuoteRspField),
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct AsyncMdState {
67 pub connected: bool,
68 pub logged_in: bool,
69 pub login_info: Option<RspUserLoginField>,
70}
71
72pub struct AsyncMdApi {
74 inner: Arc<Mutex<MdApi>>,
76 event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
78 event_receiver: Arc<Mutex<mpsc::UnboundedReceiver<AsyncMdEvent>>>,
80 state: Arc<Mutex<AsyncMdState>>,
82 connected_notify: Arc<Notify>,
84 login_notify: Arc<Notify>,
86}
87
88impl AsyncMdApi {
89 pub async fn new(
91 flow_path: Option<&str>,
92 is_using_udp: bool,
93 is_multicast: bool,
94 is_production_mode: Option<bool>,
95 ) -> CtpResult<Self> {
96 let md_api = MdApi::new(
97 flow_path,
98 is_using_udp,
99 is_multicast,
100 is_production_mode.unwrap_or(false),
101 )?;
102 let (event_sender, event_receiver) = mpsc::unbounded_channel();
103
104 Ok(Self {
105 inner: Arc::new(Mutex::new(md_api)),
106 event_sender,
107 event_receiver: Arc::new(Mutex::new(event_receiver)),
108 state: Arc::new(Mutex::new(AsyncMdState::default())),
109 connected_notify: Arc::new(Notify::new()),
110 login_notify: Arc::new(Notify::new()),
111 })
112 }
113
114 pub async fn register_front(&self, front_address: &str) -> CtpResult<()> {
116 let mut api = self.inner.lock().await;
117 api.register_front(front_address)
118 }
119
120 pub async fn init(&self) -> CtpResult<()> {
122 let mut api = self.inner.lock().await;
123
124 let handler = AsyncMdHandler::new(
126 self.event_sender.clone(),
127 self.state.clone(),
128 self.connected_notify.clone(),
129 self.login_notify.clone(),
130 );
131
132 api.register_spi(handler)?;
134
135 api.init()
137 }
138
139 pub async fn wait_connected(&self, timeout_secs: u64) -> CtpResult<()> {
141 let state = self.state.lock().await;
142 if state.connected {
143 return Ok(());
144 }
145 drop(state);
146
147 match timeout(
148 Duration::from_secs(timeout_secs),
149 self.connected_notify.notified(),
150 )
151 .await
152 {
153 Ok(_) => {
154 let state = self.state.lock().await;
155 if state.connected {
156 Ok(())
157 } else {
158 Err(CtpError::InitializationError("连接失败".to_string()))
159 }
160 }
161 Err(_) => Err(CtpError::InitializationError("连接超时".to_string())),
162 }
163 }
164
165 pub async fn login(
167 &self,
168 req: &ReqUserLoginField,
169 timeout_secs: u64,
170 ) -> CtpResult<RspUserLoginField> {
171 let mut api = self.inner.lock().await;
172 api.req_user_login(req)?;
173 drop(api);
174
175 match timeout(
176 Duration::from_secs(timeout_secs),
177 self.login_notify.notified(),
178 )
179 .await
180 {
181 Ok(_) => {
182 let state = self.state.lock().await;
183 if let Some(login_info) = &state.login_info {
184 Ok(login_info.clone())
185 } else {
186 Err(CtpError::InitializationError("登录失败".to_string()))
187 }
188 }
189 Err(_) => Err(CtpError::InitializationError("登录超时".to_string())),
190 }
191 }
192
193 pub async fn subscribe_market_data(&self, instrument_ids: &[&str]) -> CtpResult<()> {
195 let mut api = self.inner.lock().await;
196 api.subscribe_market_data(instrument_ids)
197 }
198
199 pub async fn unsubscribe_market_data(&self, instrument_ids: &[&str]) -> CtpResult<()> {
201 let mut api = self.inner.lock().await;
202 api.unsubscribe_market_data(instrument_ids)
203 }
204
205 pub async fn recv_event(&self) -> Option<AsyncMdEvent> {
207 let mut receiver = self.event_receiver.lock().await;
208 receiver.recv().await
209 }
210
211 pub async fn try_recv_event(&self) -> Result<AsyncMdEvent, mpsc::error::TryRecvError> {
213 let mut receiver = self.event_receiver.lock().await;
214 receiver.try_recv()
215 }
216
217 pub async fn get_state(&self) -> AsyncMdState {
219 self.state.lock().await.clone()
220 }
221}
222
223#[derive(Clone)]
225struct AsyncMdHandler {
226 event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
227 state: Arc<Mutex<AsyncMdState>>,
228 connected_notify: Arc<Notify>,
229 login_notify: Arc<Notify>,
230}
231
232impl AsyncMdHandler {
233 fn new(
234 event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
235 state: Arc<Mutex<AsyncMdState>>,
236 connected_notify: Arc<Notify>,
237 login_notify: Arc<Notify>,
238 ) -> Self {
239 Self {
240 event_sender,
241 state,
242 connected_notify,
243 login_notify,
244 }
245 }
246}
247
248impl MdSpiHandler for AsyncMdHandler {
249 fn on_front_connected(&mut self) {
250 debug!("异步API: 连接成功");
251
252 if let Ok(mut state) = self.state.try_lock() {
254 state.connected = true;
255 }
256
257 self.connected_notify.notify_waiters();
259
260 let _ = self.event_sender.send(AsyncMdEvent::Connected);
262 }
263
264 fn on_front_disconnected(&mut self, reason: i32) {
265 warn!("异步API: 连接断开, 原因: {}", reason);
266
267 if let Ok(mut state) = self.state.try_lock() {
269 state.connected = false;
270 state.logged_in = false;
271 state.login_info = None;
272 }
273
274 let _ = self.event_sender.send(AsyncMdEvent::Disconnected(reason));
276 }
277
278 fn on_heart_beat_warning(&mut self, time_lapse: i32) {
279 warn!("异步API: 心跳超时警告, 时间间隔: {}秒", time_lapse);
280 let _ = self
281 .event_sender
282 .send(AsyncMdEvent::HeartBeatWarning(time_lapse));
283 }
284
285 fn on_rsp_user_login(
286 &mut self,
287 user_login: Option<RspUserLoginField>,
288 rsp_info: Option<RspInfoField>,
289 request_id: i32,
290 is_last: bool,
291 ) {
292 debug!("异步API: 收到登录响应");
293
294 let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
296
297 if success && is_last {
298 if let Ok(mut state) = self.state.try_lock() {
299 state.logged_in = true;
300 state.login_info = user_login.clone();
301 }
302 self.login_notify.notify_waiters();
304 }
305
306 let _ = self.event_sender.send(AsyncMdEvent::LoginResponse {
308 user_login,
309 rsp_info,
310 request_id,
311 is_last,
312 });
313 }
314
315 fn on_rsp_user_logout(
316 &mut self,
317 _user_logout: Option<()>,
318 rsp_info: Option<RspInfoField>,
319 request_id: i32,
320 is_last: bool,
321 ) {
322 debug!("异步API: 收到登出响应");
323
324 if is_last {
325 if let Ok(mut state) = self.state.try_lock() {
326 state.logged_in = false;
327 state.login_info = None;
328 }
329 }
330
331 let _ = self.event_sender.send(AsyncMdEvent::LogoutResponse {
332 rsp_info,
333 request_id,
334 is_last,
335 });
336 }
337
338 fn on_rsp_error(&mut self, rsp_info: Option<RspInfoField>, request_id: i32, is_last: bool) {
339 error!("异步API: 收到错误响应");
340 let _ = self.event_sender.send(AsyncMdEvent::ErrorResponse {
341 rsp_info,
342 request_id,
343 is_last,
344 });
345 }
346
347 fn on_rsp_sub_market_data(
348 &mut self,
349 specific_instrument: Option<SpecificInstrumentField>,
350 rsp_info: Option<RspInfoField>,
351 request_id: i32,
352 is_last: bool,
353 ) {
354 debug!("异步API: 收到订阅行情响应");
355 let _ = self.event_sender.send(AsyncMdEvent::SubMarketDataResponse {
356 specific_instrument,
357 rsp_info,
358 request_id,
359 is_last,
360 });
361 }
362
363 fn on_rsp_unsub_market_data(
364 &mut self,
365 specific_instrument: Option<SpecificInstrumentField>,
366 rsp_info: Option<RspInfoField>,
367 request_id: i32,
368 is_last: bool,
369 ) {
370 debug!("异步API: 收到取消订阅响应");
371 let _ = self
372 .event_sender
373 .send(AsyncMdEvent::UnsubMarketDataResponse {
374 specific_instrument,
375 rsp_info,
376 request_id,
377 is_last,
378 });
379 }
380
381 fn on_rtn_depth_market_data(&mut self, market_data: DepthMarketDataField) {
382 let _ = self
384 .event_sender
385 .send(AsyncMdEvent::DepthMarketData(market_data));
386 }
387
388 fn on_rtn_for_quote_rsp(&mut self, for_quote_rsp: ForQuoteRspField) {
389 debug!("异步API: 收到询价响应");
390 let _ = self
391 .event_sender
392 .send(AsyncMdEvent::ForQuoteResponse(for_quote_rsp));
393 }
394}