qvopenapi_async/
client.rs

1use std::{
2    collections::HashMap,
3    sync::{Arc, Mutex, RwLock},
4    thread::sleep,
5    time::{Duration, Instant},
6};
7
8use crate::context::*;
9use log::*;
10use qvopenapi::{
11    error::*, models::*, AbstractQvOpenApiClient, QvOpenApiClient, QvOpenApiRequest, WindowHelper,
12};
13use serde_json::{json, Value};
14
15type TrContextMap = HashMap<i32, Arc<TrContext>>;
16const INITIAL_TR_INDEX: i32 = 3;
17const MAX_TR_INDEX: i32 = 255;
18const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
19
20pub struct QvOpenApiAsyncClient {
21    delegate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>,
22    tr_context_map: Arc<RwLock<TrContextMap>>,
23    next_tr_index: Mutex<i32>,
24    connected_info: Arc<RwLock<Option<ConnectResponse>>>,
25    is_connecting: Arc<RwLock<bool>>,
26    is_dropping: Arc<Mutex<bool>>,
27    hwnd: isize,
28}
29
30impl QvOpenApiAsyncClient {
31    pub fn new() -> Result<QvOpenApiAsyncClient, QvOpenApiError> {
32        // Create a window
33        let client = Arc::new(QvOpenApiClient::new()?);
34        let window_helper = WindowHelper::new();
35        let hwnd = window_helper.run(client.as_ref())?;
36        client.set_hwnd(hwnd);
37
38        Ok(Self::new_custom(client.clone(), hwnd))
39    }
40
41    pub fn new_custom(
42        delgate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>,
43        hwnd: isize,
44    ) -> QvOpenApiAsyncClient {
45        let client = QvOpenApiAsyncClient {
46            delegate: delgate.clone(),
47            tr_context_map: Arc::new(RwLock::new(HashMap::new())),
48            next_tr_index: Mutex::new(INITIAL_TR_INDEX),
49            connected_info: Arc::new(RwLock::new(None)),
50            is_connecting: Arc::new(RwLock::new(false)),
51            is_dropping: Arc::new(Mutex::new(false)),
52            hwnd,
53        };
54
55        client.setup_callbacks(delgate);
56
57        let cloned_context_map = client.tr_context_map.clone();
58        let cloned_is_dropping = client.is_dropping.clone();
59        {
60            std::thread::spawn(move || loop {
61                let is_dropping = *cloned_is_dropping.clone().lock().unwrap();
62                if is_dropping {
63                    break;
64                }
65                Self::check_expired(cloned_context_map.clone());
66                sleep(Duration::from_millis(100));
67            });
68        }
69
70        client
71    }
72
73    fn set_context(
74        &self,
75        tr_index: i32,
76        tr_type: TrType,
77    ) -> Result<Arc<TrContext>, QvOpenApiError> {
78        let mut map = self.tr_context_map.write().unwrap();
79        if map.contains_key(&tr_index) {
80            Err(QvOpenApiError::BadRequestError {
81                message: format!("Already using tr_index {}", tr_index),
82            })
83        } else {
84            let context = Arc::new(TrContext::new(tr_index, tr_type.clone()));
85            map.insert(tr_index, context.clone());
86
87            if matches!(tr_type, TrType::CONNECT) {
88                let mut locked = self.is_connecting.write().unwrap();
89                *locked = true;
90            }
91
92            Ok(context)
93        }
94    }
95
96    pub fn connect(
97        &self,
98        account_type: AccountType,
99        id: &str,
100        password: &str,
101        cert_password: &str,
102    ) -> TrFuture {
103        TrFuture::new(self.do_connect(self.hwnd, account_type, id, password, cert_password))
104    }
105
106    fn do_connect(
107        &self,
108        new_hwnd: isize,
109        account_type: AccountType,
110        id: &str,
111        password: &str,
112        cert_password: &str,
113    ) -> Result<Arc<TrContext>, QvOpenApiError> {
114        let context = self.set_context(TR_INDEX_CONNECT, TrType::CONNECT)?;
115        match self
116            .delegate
117            .connect(new_hwnd, account_type, id, password, cert_password)
118        {
119            Ok(_) => Ok(context),
120            Err(err) => {
121                let mut context_map = self.tr_context_map.write().unwrap();
122                context_map.remove(&TR_INDEX_CONNECT);
123                Err(err)
124            }
125        }
126    }
127
128    pub fn get_connect_info(&self) -> Result<Value, QvOpenApiError> {
129        let connected_info = self.connected_info.read().unwrap();
130        match &*connected_info {
131            Some(res) => Ok(json!(res)),
132            None => Err(QvOpenApiError::NotConnectedError),
133        }
134    }
135
136    pub fn query(&self, req: Arc<dyn QvOpenApiRequest>) -> TrFuture {
137        TrFuture::new(self.do_query(req))
138    }
139
140    fn do_query(&self, req: Arc<dyn QvOpenApiRequest>) -> Result<Arc<TrContext>, QvOpenApiError> {
141        let tr_index = self.get_next_tr_index();
142        let context = self.set_context(tr_index, TrType::QUERY)?;
143        match self.delegate.query(tr_index, req) {
144            Ok(_) => Ok(context),
145            Err(err) => {
146                let mut context_map = self.tr_context_map.write().unwrap();
147                context_map.remove(&tr_index);
148                Err(err)
149            }
150        }
151    }
152
153    pub fn disconnect(&self) -> Result<(), QvOpenApiError> {
154        self.delegate.disconnect()
155    }
156
157    fn setup_callbacks(&self, delagate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>) {
158        {
159            let context_map_lock = self.tr_context_map.clone();
160            let is_connecting_lock = self.is_connecting.clone();
161            let connected_info_lock = self.connected_info.clone();
162            delagate.on_connect(Box::new(move |res| {
163                let mut connected_info = connected_info_lock.write().unwrap();
164                let mut is_connecting_locked = is_connecting_lock.write().unwrap();
165                Self::handle_callback(
166                    context_map_lock.clone(),
167                    TR_INDEX_CONNECT,
168                    res,
169                    |context, res| {
170                        *connected_info = Some(res.clone());
171                        context.on_connect(res)
172                    },
173                );
174                *is_connecting_locked = false;
175            }));
176        }
177        {
178            let context_map_lock = self.tr_context_map.clone();
179            delagate.on_data(Box::new(move |res| {
180                Self::handle_callback(
181                    context_map_lock.clone(),
182                    res.tr_index,
183                    res,
184                    |context, res| context.on_data(res),
185                );
186            }));
187        }
188        {
189            let context_map_lock = self.tr_context_map.clone();
190            delagate.on_complete(Box::new(move |tr_index| {
191                Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
192                    context.on_complete()
193                });
194            }));
195        }
196        {
197            let context_map_lock = self.tr_context_map.clone();
198            let is_connecting_lock = self.is_connecting.clone();
199            let connected_info_lock = self.connected_info.clone();
200            delagate.on_disconnect(Box::new(move || {
201                let mut connected_info = connected_info_lock.write().unwrap();
202                let mut context_map = context_map_lock.write().unwrap();
203                let mut is_connecting_locked = is_connecting_lock.write().unwrap();
204
205                // make all requests end when disconnect
206                for context in context_map.values() {
207                    context.on_disconnect();
208                }
209                context_map.clear();
210                *is_connecting_locked = false;
211                *connected_info = None;
212            }));
213        }
214        {
215            let context_map_lock = self.tr_context_map.clone();
216            let is_connecting_lock = self.is_connecting.clone();
217            let connected_info_lock = self.connected_info.clone();
218            let delagate_clone = self.delegate.clone();
219            delagate.on_socket_error(Box::new(move || {
220                let mut connected_info = connected_info_lock.write().unwrap();
221                let mut context_map = context_map_lock.write().unwrap();
222                let mut is_connecting_locked = is_connecting_lock.write().unwrap();
223
224                // make all requests end when disconnect
225                for context in context_map.values() {
226                    context.on_disconnect();
227                }
228                context_map.clear();
229                *is_connecting_locked = false;
230                *connected_info = None;
231                delagate_clone.disconnect().unwrap();
232            }));
233        }
234        {
235            let context_map_lock = self.tr_context_map.clone();
236            let is_connecting_lock = self.is_connecting.clone();
237            delagate.on_message(Box::new(move |res| {
238                let is_connecting_locked = is_connecting_lock.read().unwrap();
239
240                // If connecting, all messages should direct to connect context
241                let tr_index = match *is_connecting_locked {
242                    true => TR_INDEX_CONNECT,
243                    false => res.tr_index,
244                };
245                Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
246                    context.on_message(res.clone())
247                })
248            }));
249        }
250        {
251            let context_map_lock = self.tr_context_map.clone();
252            let is_connecting_lock = self.is_connecting.clone();
253            delagate.on_error(Box::new(move |res| {
254                let is_connecting_locked = is_connecting_lock.read().unwrap();
255
256                // If connecting, all messages should direct to connect context
257                let tr_index = match *is_connecting_locked {
258                    true => TR_INDEX_CONNECT,
259                    false => res.tr_index,
260                };
261                Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
262                    context.on_error_response(res.clone())
263                })
264            }));
265        }
266    }
267
268    fn handle_callback<F, R>(
269        context_map_lock: Arc<RwLock<TrContextMap>>,
270        tr_index: i32,
271        res: R,
272        mut callback: F,
273    ) where
274        F: FnMut(&TrContext, R) -> bool,
275    {
276        let mut completed = false;
277        {
278            let context_map = context_map_lock.read().unwrap();
279            match context_map.get(&tr_index) {
280                Some(context) => {
281                    completed = callback(context, res);
282                }
283                None => {
284                    if tr_index != TR_INDEX_CONNECT {
285                        error!("Context for tr_index {} is not found", tr_index);
286                    }
287                }
288            }
289        }
290
291        if completed {
292            let mut context_map = context_map_lock.write().unwrap();
293            context_map.remove(&tr_index);
294        }
295    }
296
297    fn get_next_tr_index(&self) -> i32 {
298        let mut locked = self.next_tr_index.lock().unwrap();
299        let ret: i32 = *locked;
300        *locked += 1;
301        if *locked > MAX_TR_INDEX {
302            *locked = INITIAL_TR_INDEX;
303        }
304        ret
305    }
306
307    fn check_expired(context_map_lock: Arc<RwLock<TrContextMap>>) {
308        let mut expired_vec: Vec<Arc<TrContext>> = Vec::new();
309        {
310            let now = Instant::now();
311            let context_map = context_map_lock.read().unwrap();
312            for context in context_map.values() {
313                let elapsed = now.duration_since(context.request_timestamp);
314                if elapsed > DEFAULT_TIMEOUT {
315                    expired_vec.push(context.clone());
316                }
317            }
318        }
319
320        if !expired_vec.is_empty() {
321            let mut context_map = context_map_lock.write().unwrap();
322            for expired in expired_vec {
323                expired.on_timeout();
324                context_map.remove(&expired.tr_index);
325            }
326        }
327    }
328}
329
330impl Drop for QvOpenApiAsyncClient {
331    fn drop(&mut self) {
332        {
333            let mut is_dropping = self.is_dropping.lock().unwrap();
334            *is_dropping = true;
335        }
336        {
337            let mut context_map = self.tr_context_map.write().unwrap();
338            for context in context_map.values() {
339                context.on_custom_error(QvOpenApiError::UnknownError);
340            }
341            context_map.clear();
342        }
343    }
344}