ctp_rust/api/
async_md_api.rs

1//! 异步行情API模块
2//!
3//! 基于同步MdApi提供异步封装,使用tokio实现
4
5use crate::api::md_api::{
6    DepthMarketDataField, ForQuoteRspField, MdApi, MdSpiHandler, SpecificInstrumentField,
7};
8use crate::api::CtpApi;
9use crate::error::{CtpError, CtpResult};
10use crate::types::{ReqUserLoginField, RspInfoField, RspUserLoginField};
11use std::sync::Arc;
12use tokio::sync::{mpsc, Mutex, Notify};
13use tokio::time::{timeout, Duration};
14use tracing::{debug, error, warn};
15
16/// 异步事件类型
17#[derive(Debug, Clone)]
18pub enum AsyncMdEvent {
19    /// 连接成功
20    Connected,
21    /// 连接断开
22    Disconnected(i32),
23    /// 心跳超时警告
24    HeartBeatWarning(i32),
25    /// 登录响应
26    LoginResponse {
27        user_login: Option<RspUserLoginField>,
28        rsp_info: Option<RspInfoField>,
29        request_id: i32,
30        is_last: bool,
31    },
32    /// 登出响应
33    LogoutResponse {
34        rsp_info: Option<RspInfoField>,
35        request_id: i32,
36        is_last: bool,
37    },
38    /// 错误响应
39    ErrorResponse {
40        rsp_info: Option<RspInfoField>,
41        request_id: i32,
42        is_last: bool,
43    },
44    /// 订阅行情响应
45    SubMarketDataResponse {
46        specific_instrument: Option<SpecificInstrumentField>,
47        rsp_info: Option<RspInfoField>,
48        request_id: i32,
49        is_last: bool,
50    },
51    /// 取消订阅行情响应
52    UnsubMarketDataResponse {
53        specific_instrument: Option<SpecificInstrumentField>,
54        rsp_info: Option<RspInfoField>,
55        request_id: i32,
56        is_last: bool,
57    },
58    /// 深度行情数据
59    DepthMarketData(DepthMarketDataField),
60    /// 询价响应
61    ForQuoteResponse(ForQuoteRspField),
62}
63
64/// 异步行情API状态
65#[derive(Debug, Clone, Default)]
66pub struct AsyncMdState {
67    pub connected: bool,
68    pub logged_in: bool,
69    pub login_info: Option<RspUserLoginField>,
70}
71
72/// 异步行情API适配器
73pub struct AsyncMdApi {
74    /// 内部同步API
75    inner: Arc<Mutex<MdApi>>,
76    /// 事件发送器
77    event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
78    /// 事件接收器
79    event_receiver: Arc<Mutex<mpsc::UnboundedReceiver<AsyncMdEvent>>>,
80    /// 当前状态
81    state: Arc<Mutex<AsyncMdState>>,
82    /// 连接通知
83    connected_notify: Arc<Notify>,
84    /// 登录通知
85    login_notify: Arc<Notify>,
86}
87
88impl AsyncMdApi {
89    /// 创建异步行情API实例
90    pub async fn new(
91        flow_path: Option<&str>,
92        is_using_udp: bool,
93        is_multicast: bool,
94        is_production_mode: Option<bool>,
95    ) -> CtpResult<Self> {
96        let md_api = MdApi::new(
97            flow_path,
98            is_using_udp,
99            is_multicast,
100            is_production_mode.unwrap_or(false),
101        )?;
102        let (event_sender, event_receiver) = mpsc::unbounded_channel();
103
104        Ok(Self {
105            inner: Arc::new(Mutex::new(md_api)),
106            event_sender,
107            event_receiver: Arc::new(Mutex::new(event_receiver)),
108            state: Arc::new(Mutex::new(AsyncMdState::default())),
109            connected_notify: Arc::new(Notify::new()),
110            login_notify: Arc::new(Notify::new()),
111        })
112    }
113
114    /// 注册前置机地址
115    pub async fn register_front(&self, front_address: &str) -> CtpResult<()> {
116        let mut api = self.inner.lock().await;
117        api.register_front(front_address)
118    }
119
120    /// 初始化API
121    pub async fn init(&self) -> CtpResult<()> {
122        let mut api = self.inner.lock().await;
123
124        // 创建异步事件处理器
125        let handler = AsyncMdHandler::new(
126            self.event_sender.clone(),
127            self.state.clone(),
128            self.connected_notify.clone(),
129            self.login_notify.clone(),
130        );
131
132        // 注册处理器
133        api.register_spi(handler)?;
134
135        // 初始化
136        api.init()
137    }
138
139    /// 等待连接建立(带超时)
140    pub async fn wait_connected(&self, timeout_secs: u64) -> CtpResult<()> {
141        let state = self.state.lock().await;
142        if state.connected {
143            return Ok(());
144        }
145        drop(state);
146
147        match timeout(
148            Duration::from_secs(timeout_secs),
149            self.connected_notify.notified(),
150        )
151        .await
152        {
153            Ok(_) => {
154                let state = self.state.lock().await;
155                if state.connected {
156                    Ok(())
157                } else {
158                    Err(CtpError::InitializationError("连接失败".to_string()))
159                }
160            }
161            Err(_) => Err(CtpError::InitializationError("连接超时".to_string())),
162        }
163    }
164
165    /// 异步登录
166    pub async fn login(
167        &self,
168        req: &ReqUserLoginField,
169        timeout_secs: u64,
170    ) -> CtpResult<RspUserLoginField> {
171        let mut api = self.inner.lock().await;
172        api.req_user_login(req)?;
173        drop(api);
174
175        match timeout(
176            Duration::from_secs(timeout_secs),
177            self.login_notify.notified(),
178        )
179        .await
180        {
181            Ok(_) => {
182                let state = self.state.lock().await;
183                if let Some(login_info) = &state.login_info {
184                    Ok(login_info.clone())
185                } else {
186                    Err(CtpError::InitializationError("登录失败".to_string()))
187                }
188            }
189            Err(_) => Err(CtpError::InitializationError("登录超时".to_string())),
190        }
191    }
192
193    /// 订阅行情数据
194    pub async fn subscribe_market_data(&self, instrument_ids: &[&str]) -> CtpResult<()> {
195        let mut api = self.inner.lock().await;
196        api.subscribe_market_data(instrument_ids)
197    }
198
199    /// 取消订阅行情数据
200    pub async fn unsubscribe_market_data(&self, instrument_ids: &[&str]) -> CtpResult<()> {
201        let mut api = self.inner.lock().await;
202        api.unsubscribe_market_data(instrument_ids)
203    }
204
205    /// 接收下一个事件
206    pub async fn recv_event(&self) -> Option<AsyncMdEvent> {
207        let mut receiver = self.event_receiver.lock().await;
208        receiver.recv().await
209    }
210
211    /// 尝试接收事件(非阻塞)
212    pub async fn try_recv_event(&self) -> Result<AsyncMdEvent, mpsc::error::TryRecvError> {
213        let mut receiver = self.event_receiver.lock().await;
214        receiver.try_recv()
215    }
216
217    /// 获取当前状态
218    pub async fn get_state(&self) -> AsyncMdState {
219        self.state.lock().await.clone()
220    }
221}
222
223/// 异步事件处理器
224#[derive(Clone)]
225struct AsyncMdHandler {
226    event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
227    state: Arc<Mutex<AsyncMdState>>,
228    connected_notify: Arc<Notify>,
229    login_notify: Arc<Notify>,
230}
231
232impl AsyncMdHandler {
233    fn new(
234        event_sender: mpsc::UnboundedSender<AsyncMdEvent>,
235        state: Arc<Mutex<AsyncMdState>>,
236        connected_notify: Arc<Notify>,
237        login_notify: Arc<Notify>,
238    ) -> Self {
239        Self {
240            event_sender,
241            state,
242            connected_notify,
243            login_notify,
244        }
245    }
246}
247
248impl MdSpiHandler for AsyncMdHandler {
249    fn on_front_connected(&mut self) {
250        debug!("异步API: 连接成功");
251
252        // 更新状态
253        if let Ok(mut state) = self.state.try_lock() {
254            state.connected = true;
255        }
256
257        // 通知等待者
258        self.connected_notify.notify_waiters();
259
260        // 发送事件
261        let _ = self.event_sender.send(AsyncMdEvent::Connected);
262    }
263
264    fn on_front_disconnected(&mut self, reason: i32) {
265        warn!("异步API: 连接断开, 原因: {}", reason);
266
267        // 更新状态
268        if let Ok(mut state) = self.state.try_lock() {
269            state.connected = false;
270            state.logged_in = false;
271            state.login_info = None;
272        }
273
274        // 发送事件
275        let _ = self.event_sender.send(AsyncMdEvent::Disconnected(reason));
276    }
277
278    fn on_heart_beat_warning(&mut self, time_lapse: i32) {
279        warn!("异步API: 心跳超时警告, 时间间隔: {}秒", time_lapse);
280        let _ = self
281            .event_sender
282            .send(AsyncMdEvent::HeartBeatWarning(time_lapse));
283    }
284
285    fn on_rsp_user_login(
286        &mut self,
287        user_login: Option<RspUserLoginField>,
288        rsp_info: Option<RspInfoField>,
289        request_id: i32,
290        is_last: bool,
291    ) {
292        debug!("异步API: 收到登录响应");
293
294        // 检查是否成功
295        let success = rsp_info.as_ref().map_or(true, |info| info.is_success());
296
297        if success && is_last {
298            if let Ok(mut state) = self.state.try_lock() {
299                state.logged_in = true;
300                state.login_info = user_login.clone();
301            }
302            // 通知登录完成
303            self.login_notify.notify_waiters();
304        }
305
306        // 发送事件
307        let _ = self.event_sender.send(AsyncMdEvent::LoginResponse {
308            user_login,
309            rsp_info,
310            request_id,
311            is_last,
312        });
313    }
314
315    fn on_rsp_user_logout(
316        &mut self,
317        _user_logout: Option<()>,
318        rsp_info: Option<RspInfoField>,
319        request_id: i32,
320        is_last: bool,
321    ) {
322        debug!("异步API: 收到登出响应");
323
324        if is_last {
325            if let Ok(mut state) = self.state.try_lock() {
326                state.logged_in = false;
327                state.login_info = None;
328            }
329        }
330
331        let _ = self.event_sender.send(AsyncMdEvent::LogoutResponse {
332            rsp_info,
333            request_id,
334            is_last,
335        });
336    }
337
338    fn on_rsp_error(&mut self, rsp_info: Option<RspInfoField>, request_id: i32, is_last: bool) {
339        error!("异步API: 收到错误响应");
340        let _ = self.event_sender.send(AsyncMdEvent::ErrorResponse {
341            rsp_info,
342            request_id,
343            is_last,
344        });
345    }
346
347    fn on_rsp_sub_market_data(
348        &mut self,
349        specific_instrument: Option<SpecificInstrumentField>,
350        rsp_info: Option<RspInfoField>,
351        request_id: i32,
352        is_last: bool,
353    ) {
354        debug!("异步API: 收到订阅行情响应");
355        let _ = self.event_sender.send(AsyncMdEvent::SubMarketDataResponse {
356            specific_instrument,
357            rsp_info,
358            request_id,
359            is_last,
360        });
361    }
362
363    fn on_rsp_unsub_market_data(
364        &mut self,
365        specific_instrument: Option<SpecificInstrumentField>,
366        rsp_info: Option<RspInfoField>,
367        request_id: i32,
368        is_last: bool,
369    ) {
370        debug!("异步API: 收到取消订阅响应");
371        let _ = self
372            .event_sender
373            .send(AsyncMdEvent::UnsubMarketDataResponse {
374                specific_instrument,
375                rsp_info,
376                request_id,
377                is_last,
378            });
379    }
380
381    fn on_rtn_depth_market_data(&mut self, market_data: DepthMarketDataField) {
382        // 这里不使用debug,因为行情数据量大
383        let _ = self
384            .event_sender
385            .send(AsyncMdEvent::DepthMarketData(market_data));
386    }
387
388    fn on_rtn_for_quote_rsp(&mut self, for_quote_rsp: ForQuoteRspField) {
389        debug!("异步API: 收到询价响应");
390        let _ = self
391            .event_sender
392            .send(AsyncMdEvent::ForQuoteResponse(for_quote_rsp));
393    }
394}