1use std::{
2 collections::HashMap,
3 sync::{Arc, Mutex, RwLock},
4 thread::sleep,
5 time::{Duration, Instant},
6};
7
8use crate::context::*;
9use log::*;
10use qvopenapi::{
11 error::*, models::*, AbstractQvOpenApiClient, QvOpenApiClient, QvOpenApiRequest, WindowHelper,
12};
13use serde_json::{json, Value};
14
15type TrContextMap = HashMap<i32, Arc<TrContext>>;
16const INITIAL_TR_INDEX: i32 = 3;
17const MAX_TR_INDEX: i32 = 255;
18const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
19
20pub struct QvOpenApiAsyncClient {
21 delegate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>,
22 tr_context_map: Arc<RwLock<TrContextMap>>,
23 next_tr_index: Mutex<i32>,
24 connected_info: Arc<RwLock<Option<ConnectResponse>>>,
25 is_connecting: Arc<RwLock<bool>>,
26 is_dropping: Arc<Mutex<bool>>,
27 hwnd: isize,
28}
29
30impl QvOpenApiAsyncClient {
31 pub fn new() -> Result<QvOpenApiAsyncClient, QvOpenApiError> {
32 let client = Arc::new(QvOpenApiClient::new()?);
34 let window_helper = WindowHelper::new();
35 let hwnd = window_helper.run(client.as_ref())?;
36 client.set_hwnd(hwnd);
37
38 Ok(Self::new_custom(client.clone(), hwnd))
39 }
40
41 pub fn new_custom(
42 delgate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>,
43 hwnd: isize,
44 ) -> QvOpenApiAsyncClient {
45 let client = QvOpenApiAsyncClient {
46 delegate: delgate.clone(),
47 tr_context_map: Arc::new(RwLock::new(HashMap::new())),
48 next_tr_index: Mutex::new(INITIAL_TR_INDEX),
49 connected_info: Arc::new(RwLock::new(None)),
50 is_connecting: Arc::new(RwLock::new(false)),
51 is_dropping: Arc::new(Mutex::new(false)),
52 hwnd,
53 };
54
55 client.setup_callbacks(delgate);
56
57 let cloned_context_map = client.tr_context_map.clone();
58 let cloned_is_dropping = client.is_dropping.clone();
59 {
60 std::thread::spawn(move || loop {
61 let is_dropping = *cloned_is_dropping.clone().lock().unwrap();
62 if is_dropping {
63 break;
64 }
65 Self::check_expired(cloned_context_map.clone());
66 sleep(Duration::from_millis(100));
67 });
68 }
69
70 client
71 }
72
73 fn set_context(
74 &self,
75 tr_index: i32,
76 tr_type: TrType,
77 ) -> Result<Arc<TrContext>, QvOpenApiError> {
78 let mut map = self.tr_context_map.write().unwrap();
79 if map.contains_key(&tr_index) {
80 Err(QvOpenApiError::BadRequestError {
81 message: format!("Already using tr_index {}", tr_index),
82 })
83 } else {
84 let context = Arc::new(TrContext::new(tr_index, tr_type.clone()));
85 map.insert(tr_index, context.clone());
86
87 if matches!(tr_type, TrType::CONNECT) {
88 let mut locked = self.is_connecting.write().unwrap();
89 *locked = true;
90 }
91
92 Ok(context)
93 }
94 }
95
96 pub fn connect(
97 &self,
98 account_type: AccountType,
99 id: &str,
100 password: &str,
101 cert_password: &str,
102 ) -> TrFuture {
103 TrFuture::new(self.do_connect(self.hwnd, account_type, id, password, cert_password))
104 }
105
106 fn do_connect(
107 &self,
108 new_hwnd: isize,
109 account_type: AccountType,
110 id: &str,
111 password: &str,
112 cert_password: &str,
113 ) -> Result<Arc<TrContext>, QvOpenApiError> {
114 let context = self.set_context(TR_INDEX_CONNECT, TrType::CONNECT)?;
115 match self
116 .delegate
117 .connect(new_hwnd, account_type, id, password, cert_password)
118 {
119 Ok(_) => Ok(context),
120 Err(err) => {
121 let mut context_map = self.tr_context_map.write().unwrap();
122 context_map.remove(&TR_INDEX_CONNECT);
123 Err(err)
124 }
125 }
126 }
127
128 pub fn get_connect_info(&self) -> Result<Value, QvOpenApiError> {
129 let connected_info = self.connected_info.read().unwrap();
130 match &*connected_info {
131 Some(res) => Ok(json!(res)),
132 None => Err(QvOpenApiError::NotConnectedError),
133 }
134 }
135
136 pub fn query(&self, req: Arc<dyn QvOpenApiRequest>) -> TrFuture {
137 TrFuture::new(self.do_query(req))
138 }
139
140 fn do_query(&self, req: Arc<dyn QvOpenApiRequest>) -> Result<Arc<TrContext>, QvOpenApiError> {
141 let tr_index = self.get_next_tr_index();
142 let context = self.set_context(tr_index, TrType::QUERY)?;
143 match self.delegate.query(tr_index, req) {
144 Ok(_) => Ok(context),
145 Err(err) => {
146 let mut context_map = self.tr_context_map.write().unwrap();
147 context_map.remove(&tr_index);
148 Err(err)
149 }
150 }
151 }
152
153 pub fn disconnect(&self) -> Result<(), QvOpenApiError> {
154 self.delegate.disconnect()
155 }
156
157 fn setup_callbacks(&self, delagate: Arc<dyn AbstractQvOpenApiClient + Send + Sync>) {
158 {
159 let context_map_lock = self.tr_context_map.clone();
160 let is_connecting_lock = self.is_connecting.clone();
161 let connected_info_lock = self.connected_info.clone();
162 delagate.on_connect(Box::new(move |res| {
163 let mut connected_info = connected_info_lock.write().unwrap();
164 let mut is_connecting_locked = is_connecting_lock.write().unwrap();
165 Self::handle_callback(
166 context_map_lock.clone(),
167 TR_INDEX_CONNECT,
168 res,
169 |context, res| {
170 *connected_info = Some(res.clone());
171 context.on_connect(res)
172 },
173 );
174 *is_connecting_locked = false;
175 }));
176 }
177 {
178 let context_map_lock = self.tr_context_map.clone();
179 delagate.on_data(Box::new(move |res| {
180 Self::handle_callback(
181 context_map_lock.clone(),
182 res.tr_index,
183 res,
184 |context, res| context.on_data(res),
185 );
186 }));
187 }
188 {
189 let context_map_lock = self.tr_context_map.clone();
190 delagate.on_complete(Box::new(move |tr_index| {
191 Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
192 context.on_complete()
193 });
194 }));
195 }
196 {
197 let context_map_lock = self.tr_context_map.clone();
198 let is_connecting_lock = self.is_connecting.clone();
199 let connected_info_lock = self.connected_info.clone();
200 delagate.on_disconnect(Box::new(move || {
201 let mut connected_info = connected_info_lock.write().unwrap();
202 let mut context_map = context_map_lock.write().unwrap();
203 let mut is_connecting_locked = is_connecting_lock.write().unwrap();
204
205 for context in context_map.values() {
207 context.on_disconnect();
208 }
209 context_map.clear();
210 *is_connecting_locked = false;
211 *connected_info = None;
212 }));
213 }
214 {
215 let context_map_lock = self.tr_context_map.clone();
216 let is_connecting_lock = self.is_connecting.clone();
217 let connected_info_lock = self.connected_info.clone();
218 let delagate_clone = self.delegate.clone();
219 delagate.on_socket_error(Box::new(move || {
220 let mut connected_info = connected_info_lock.write().unwrap();
221 let mut context_map = context_map_lock.write().unwrap();
222 let mut is_connecting_locked = is_connecting_lock.write().unwrap();
223
224 for context in context_map.values() {
226 context.on_disconnect();
227 }
228 context_map.clear();
229 *is_connecting_locked = false;
230 *connected_info = None;
231 delagate_clone.disconnect().unwrap();
232 }));
233 }
234 {
235 let context_map_lock = self.tr_context_map.clone();
236 let is_connecting_lock = self.is_connecting.clone();
237 delagate.on_message(Box::new(move |res| {
238 let is_connecting_locked = is_connecting_lock.read().unwrap();
239
240 let tr_index = match *is_connecting_locked {
242 true => TR_INDEX_CONNECT,
243 false => res.tr_index,
244 };
245 Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
246 context.on_message(res.clone())
247 })
248 }));
249 }
250 {
251 let context_map_lock = self.tr_context_map.clone();
252 let is_connecting_lock = self.is_connecting.clone();
253 delagate.on_error(Box::new(move |res| {
254 let is_connecting_locked = is_connecting_lock.read().unwrap();
255
256 let tr_index = match *is_connecting_locked {
258 true => TR_INDEX_CONNECT,
259 false => res.tr_index,
260 };
261 Self::handle_callback(context_map_lock.clone(), tr_index, (), |context, _res| {
262 context.on_error_response(res.clone())
263 })
264 }));
265 }
266 }
267
268 fn handle_callback<F, R>(
269 context_map_lock: Arc<RwLock<TrContextMap>>,
270 tr_index: i32,
271 res: R,
272 mut callback: F,
273 ) where
274 F: FnMut(&TrContext, R) -> bool,
275 {
276 let mut completed = false;
277 {
278 let context_map = context_map_lock.read().unwrap();
279 match context_map.get(&tr_index) {
280 Some(context) => {
281 completed = callback(context, res);
282 }
283 None => {
284 if tr_index != TR_INDEX_CONNECT {
285 error!("Context for tr_index {} is not found", tr_index);
286 }
287 }
288 }
289 }
290
291 if completed {
292 let mut context_map = context_map_lock.write().unwrap();
293 context_map.remove(&tr_index);
294 }
295 }
296
297 fn get_next_tr_index(&self) -> i32 {
298 let mut locked = self.next_tr_index.lock().unwrap();
299 let ret: i32 = *locked;
300 *locked += 1;
301 if *locked > MAX_TR_INDEX {
302 *locked = INITIAL_TR_INDEX;
303 }
304 ret
305 }
306
307 fn check_expired(context_map_lock: Arc<RwLock<TrContextMap>>) {
308 let mut expired_vec: Vec<Arc<TrContext>> = Vec::new();
309 {
310 let now = Instant::now();
311 let context_map = context_map_lock.read().unwrap();
312 for context in context_map.values() {
313 let elapsed = now.duration_since(context.request_timestamp);
314 if elapsed > DEFAULT_TIMEOUT {
315 expired_vec.push(context.clone());
316 }
317 }
318 }
319
320 if !expired_vec.is_empty() {
321 let mut context_map = context_map_lock.write().unwrap();
322 for expired in expired_vec {
323 expired.on_timeout();
324 context_map.remove(&expired.tr_index);
325 }
326 }
327 }
328}
329
330impl Drop for QvOpenApiAsyncClient {
331 fn drop(&mut self) {
332 {
333 let mut is_dropping = self.is_dropping.lock().unwrap();
334 *is_dropping = true;
335 }
336 {
337 let mut context_map = self.tr_context_map.write().unwrap();
338 for context in context_map.values() {
339 context.on_custom_error(QvOpenApiError::UnknownError);
340 }
341 context_map.clear();
342 }
343 }
344}