1use std::{
2 collections::{HashMap, HashSet},
3 sync::Arc,
4};
5
6use uuid::Uuid;
7
8use async_std::sync::{Mutex, RwLock};
9use async_std::task;
10
11use crate::{
12 fixapi::FixApi,
13 messages::MarketDataReq,
14 types::{
15 ConnectionHandler, DepthPrice, Error, Field, IncrementalRefresh, InternalMDResult,
16 MarketDataHandler, MarketType, SpotPrice,
17 },
18};
19
20#[derive(Debug, PartialEq, Clone, Eq)]
21enum RequestState {
22 Requested(String),
23 Accepted,
24 Rejected,
25}
26
27pub struct MarketClient {
28 internal: FixApi,
29
30 spot_req_states: Arc<Mutex<HashMap<u32, RequestState>>>,
31 depth_req_states: Arc<Mutex<HashMap<u32, RequestState>>>,
32
33 spot_market_data: Arc<Mutex<HashMap<u32, SpotPrice>>>,
34 depth_market_data: Arc<RwLock<HashMap<u32, HashMap<String, DepthPrice>>>>,
35 market_data_handler: Option<Arc<dyn MarketDataHandler + Send + Sync>>,
38}
39
40fn insert_entry_to(e: HashMap<Field, String>, depth_data: &mut HashMap<String, DepthPrice>) {
41 if e.len() < 4 {
42 return;
43 }
44 let eid = e.get(&Field::MDEntryID).unwrap();
45 depth_data.insert(
46 eid.clone(),
47 DepthPrice {
48 price_type: e.get(&Field::MDEntryType).unwrap().parse().unwrap(),
49 price: e.get(&Field::MDEntryPx).unwrap().parse::<f64>().unwrap(),
50 size: e.get(&Field::MDEntrySize).unwrap().parse::<f64>().unwrap(),
51 },
52 );
53}
54
55fn depth_data_from_entries(data: Vec<HashMap<Field, String>>) -> HashMap<String, DepthPrice> {
56 let mut depth_data = HashMap::new();
57 for e in data.into_iter() {
58 insert_entry_to(e, &mut depth_data);
59 }
60 depth_data
61}
62
63fn spot_price_from_market_data(data: Vec<HashMap<Field, String>>) -> SpotPrice {
64 let mut price = SpotPrice {
65 bid: 0f64,
66 ask: 0f64,
67 };
68
69 for i in 0..2 {
70 let value = data[i]
71 .get(&Field::MDEntryPx)
72 .unwrap()
73 .parse::<f64>()
74 .unwrap();
75
76 if data[i].get(&Field::MDEntryType).unwrap() == "0" {
77 price.bid = value;
78 } else {
79 price.ask = value;
80 }
81 }
82 price
83}
84
85impl MarketClient {
86 pub fn new(
87 host: String,
88 login: String,
89 password: String,
90 sender_comp_id: String,
91 heartbeat_interval: Option<u32>,
92 ) -> Self {
93 Self {
94 internal: FixApi::new(
95 crate::types::SubID::QUOTE,
96 host,
97 login,
98 password,
99 sender_comp_id,
100 heartbeat_interval,
101 ),
102
103 spot_req_states: Arc::new(Mutex::new(HashMap::new())),
104 spot_market_data: Arc::new(Mutex::new(HashMap::new())),
105
106 depth_req_states: Arc::new(Mutex::new(HashMap::new())),
107 depth_market_data: Arc::new(RwLock::new(HashMap::new())),
108 market_data_handler: None,
109 }
110 }
111 pub fn register_market_handler_arc<T: MarketDataHandler + Send + Sync + 'static>(
112 &mut self,
113 handler: Arc<T>,
114 ) {
115 self.market_data_handler = Some(handler);
116 }
117
118 pub fn register_market_handler<T: MarketDataHandler + Send + Sync + 'static>(
119 &mut self,
120 handler: T,
121 ) {
122 self.market_data_handler = Some(Arc::new(handler));
123 }
124
125 pub fn register_connection_handler_arc<T: ConnectionHandler + Send + Sync + 'static>(
126 &mut self,
127 handler: Arc<T>,
128 ) {
129 self.internal.register_connection_handler_arc(handler);
130 }
131
132 pub fn register_connection_handler<T: ConnectionHandler + Send + Sync + 'static>(
133 &mut self,
134 handler: T,
135 ) {
136 self.internal.register_connection_handler(handler);
137 }
138
139 fn register_internal_handler(&mut self) {
140 let spot_req_states_clone = self.spot_req_states.clone();
143 let spot_market_data_clone = self.spot_market_data.clone();
144
145 let depth_req_states_clone = self.depth_req_states.clone();
146 let depth_market_data_clone = self.depth_market_data.clone();
147
148 let market_data_handler = self.market_data_handler.clone();
149
150 let market_callback = move |mdresult: InternalMDResult| {
151 let spot_req_states_clone = spot_req_states_clone.clone();
155 let spot_market_data_clone = spot_market_data_clone.clone();
156 let depth_req_states_clone = depth_req_states_clone.clone();
157 let depth_market_data_clone = depth_market_data_clone.clone();
158
159 let market_data_handler = market_data_handler.clone();
160
161 task::spawn(async move {
164 match mdresult {
165 InternalMDResult::MD {
166 msg_type,
167 symbol_id,
168 data,
169 } => {
170 match msg_type {
171 'W' => {
172 if data.len() != 0 && !data[0].contains_key(&Field::MDEntryID) {
174 let requested_symbol = spot_req_states_clone
176 .lock()
177 .await
178 .get(&symbol_id)
179 .map(|v| match v {
180 RequestState::Requested(_) => true,
181 _ => false,
182 })
183 .unwrap_or(false);
184
185 if requested_symbol {
186 spot_req_states_clone
187 .lock()
188 .await
189 .insert(symbol_id, RequestState::Accepted);
190 if let Some(handler) = &market_data_handler {
192 handler.on_accpeted_spot_subscription(symbol_id).await;
193 }
194 }
195
196 if data.len() >= 2 {
198 let prices = spot_price_from_market_data(data);
199
200 spot_market_data_clone
201 .lock()
202 .await
203 .insert(symbol_id, prices.clone());
204
205 if let Some(handler) = &market_data_handler {
207 handler.on_price_of(symbol_id, prices).await;
208 }
209 }
210 } else {
211 let requested_symbol = depth_req_states_clone
213 .lock()
214 .await
215 .get(&symbol_id)
216 .map(|v| match v {
217 RequestState::Requested(_) => true,
218 _ => false,
219 })
220 .unwrap_or(false);
221
222 if requested_symbol {
223 depth_req_states_clone
224 .lock()
225 .await
226 .insert(symbol_id, RequestState::Accepted);
227
228 if let Some(handler) = &market_data_handler {
229 handler.on_accpeted_depth_subscription(symbol_id).await;
230 }
231 }
232
233 {
234 let depth_data = depth_data_from_entries(data);
235
236 if let Some(handler) = &market_data_handler {
239 handler
240 .on_market_depth_full_refresh(
241 symbol_id,
242 depth_data.clone(),
243 )
244 .await;
245 }
246
247 depth_market_data_clone
249 .write()
250 .await
251 .insert(symbol_id, depth_data);
252 }
253 }
254 }
255 'X' => {
256 if Some(&RequestState::Accepted)
260 == depth_req_states_clone.lock().await.get(&symbol_id)
261 {
262 let mut incre_list = Vec::new();
263 for e in data.into_iter() {
264 let symbol =
265 e.get(&Field::Symbol).unwrap().parse::<u32>().unwrap();
266
267 match e.get(&Field::MDUpdateAction) {
268 Some(s) if s == "2" => {
269 incre_list.push(IncrementalRefresh::Delete {
271 symbol_id: symbol,
272 entry_id: e
273 .get(&Field::MDEntryID)
274 .unwrap()
275 .clone(),
276 });
277 }
278 Some(s) if s == "0" => {
279 let eid = e.get(&Field::MDEntryID).unwrap();
281 incre_list.push(IncrementalRefresh::New {
282 symbol_id: symbol,
283 entry_id: eid.clone(),
284 data: DepthPrice {
285 price_type: e
286 .get(&Field::MDEntryType)
287 .unwrap()
288 .parse()
289 .unwrap(),
290 price: e
291 .get(&Field::MDEntryPx)
292 .unwrap()
293 .parse::<f64>()
294 .unwrap(),
295 size: e
296 .get(&Field::MDEntrySize)
297 .unwrap()
298 .parse::<f64>()
299 .unwrap(),
300 },
301 });
302 }
303 _ => {}
304 }
305 }
306
307 if let Some(handler) = market_data_handler {
310 handler
311 .on_market_depth_incremental_refresh(incre_list.clone())
312 .await;
313 }
314
315 {
316 let mut depth_cont = depth_market_data_clone.write().await;
317 for incre in incre_list.into_iter() {
318 match incre {
319 IncrementalRefresh::New {
320 symbol_id,
321 entry_id,
322 data,
323 } => {
324 let s = depth_cont
325 .entry(symbol_id)
326 .or_insert(HashMap::new());
327 s.insert(entry_id, data);
328 }
329 IncrementalRefresh::Delete {
330 symbol_id,
331 entry_id,
332 } => {
333 let s = depth_cont
334 .entry(symbol_id)
335 .or_insert(HashMap::new());
336 s.remove(&entry_id);
337 }
338 }
339 }
340 }
341 }
343 }
344 _ => {}
345 }
346 }
347 InternalMDResult::MDReject {
348 symbol_id,
349 md_req_id,
350 err_msg,
351 } => {
352 let spot_requested = spot_req_states_clone
353 .lock()
354 .await
355 .values()
356 .filter(|s| match s {
357 RequestState::Requested(value) => value == md_req_id.as_str(),
358 _ => false,
359 })
360 .count()
361 == 1;
362 if spot_requested {
363 spot_req_states_clone
365 .lock()
366 .await
367 .insert(symbol_id, RequestState::Rejected);
368 if let Some(handler) = &market_data_handler {
370 handler
371 .on_rejected_spot_subscription(symbol_id, err_msg.clone())
372 .await;
373 }
374 }
375
376 let depth_requested = depth_req_states_clone
377 .lock()
378 .await
379 .values()
380 .filter(|s| match s {
381 RequestState::Requested(value) => value == md_req_id.as_str(),
382 _ => false,
383 })
384 .count()
385 == 1;
386 if depth_requested {
387 depth_req_states_clone
389 .lock()
390 .await
391 .insert(symbol_id, RequestState::Rejected);
392 if let Some(handler) = &market_data_handler {
394 handler
395 .on_rejected_depth_subscription(symbol_id, err_msg.clone())
396 .await;
397 }
398 }
399 }
400 }
401 });
402 };
403 self.internal.register_market_callback(market_callback);
404 }
405
406 pub async fn connect(&mut self) -> Result<(), Error> {
411 self.register_internal_handler();
413 self.spot_req_states.lock().await.clear();
414 self.depth_req_states.lock().await.clear();
415 self.spot_market_data.lock().await.clear();
416 self.depth_market_data.write().await.clear();
417
418 self.internal.connect().await?;
420 self.internal.logon(true).await
421 }
422
423 pub async fn disconnect(&mut self) -> Result<(), Error> {
424 self.internal.logout().await?;
425 self.internal.disconnect().await
426 }
427
428 pub fn is_connected(&self) -> bool {
429 self.internal.is_connected()
430 }
431
432 pub async fn spot_subscription_list(&self) -> HashSet<u32> {
433 self.spot_req_states
434 .lock()
435 .await
436 .iter()
437 .filter(|(_, v)| *v == &RequestState::Accepted)
438 .map(|(k, _)| *k)
439 .collect()
440 }
441
442 pub async fn depth_subscription_list(&self) -> HashSet<u32> {
443 self.depth_req_states
444 .lock()
445 .await
446 .iter()
447 .filter(|(_, v)| *v == &RequestState::Accepted)
448 .map(|(k, _)| *k)
449 .collect()
450 }
451
452 pub async fn price_of(&self, symbol_id: u32) -> Result<SpotPrice, Error> {
453 self.spot_market_data
454 .lock()
455 .await
456 .get(&symbol_id)
457 .map(|v| v.clone())
458 .ok_or(Error::NotSubscribed(symbol_id, MarketType::Spot))
459 }
460
461 pub async fn depth_data(&self, symbol_id: u32) -> Result<HashMap<String, DepthPrice>, Error> {
462 self.depth_market_data
463 .read()
464 .await
465 .get(&symbol_id)
466 .map(|v| v.clone())
467 .ok_or(Error::NotSubscribed(symbol_id, MarketType::Spot))
468 }
469
470 pub async fn subscribe_spot(&self, symbol_id: u32) -> Result<(), Error> {
471 let mdreqid = Uuid::new_v4().to_string();
475 if let Some(state) = self.spot_req_states.lock().await.get(&symbol_id) {
477 match state {
478 RequestState::Accepted => {
479 return Err(Error::SubscribedAlready(symbol_id, MarketType::Spot));
480 }
481 RequestState::Requested(_) => {
482 return Err(Error::RequestingSubscription(symbol_id, MarketType::Spot));
483 }
484 _ => {}
485 }
486 }
487
488 self.spot_req_states
490 .lock()
491 .await
492 .insert(symbol_id, RequestState::Requested(mdreqid.clone()));
493
494 let req = MarketDataReq::new(mdreqid, '1', 1, None, &['0', '1'], 1, symbol_id);
496 self.internal.send_message(req).await?;
497
498 Ok(())
499 }
500
501 pub async fn unsubscribe_spot(&self, symbol_id: u32) -> Result<(), Error> {
502 let states = self
504 .spot_req_states
505 .lock()
506 .await
507 .get(&symbol_id)
508 .map(|v| v.clone());
509
510 match states {
511 Some(RequestState::Requested(_)) => {
512 return Err(Error::RequestingSubscription(symbol_id, MarketType::Spot));
513 }
514 Some(RequestState::Rejected) | None => {
515 return Err(Error::NotSubscribed(symbol_id, MarketType::Spot));
516 }
517 _ => {
518 self.spot_req_states.lock().await.remove(&symbol_id);
519 self.spot_market_data.lock().await.remove(&symbol_id);
520 let req = MarketDataReq::new("-1".into(), '2', 1, None, &['0', '1'], 1, symbol_id);
521 let _seq_num = self.internal.send_message(req).await?;
522
523 log::trace!("Unsubscribed spot for symbol({})", symbol_id);
524
525 Ok(())
526 }
527 }
528 }
529
530 pub async fn subscribe_depth(&self, symbol_id: u32) -> Result<(), Error> {
531 let mdreqid = Uuid::new_v4().to_string();
532 if let Some(state) = self.depth_req_states.lock().await.get(&symbol_id) {
534 match state {
535 RequestState::Accepted => {
536 return Err(Error::SubscribedAlready(symbol_id, MarketType::Depth));
537 }
538 RequestState::Requested(_) => {
539 return Err(Error::RequestingSubscription(symbol_id, MarketType::Depth));
540 }
541 _ => {}
542 }
543 }
544
545 self.depth_req_states
547 .lock()
548 .await
549 .insert(symbol_id, RequestState::Requested(mdreqid.clone()));
550
551 let req = MarketDataReq::new(mdreqid, '1', 0, None, &['0', '1'], 1, symbol_id);
553 self.internal.send_message(req).await?;
554
555 Ok(())
556 }
557
558 pub async fn unsubscribe_depth(&self, symbol_id: u32) -> Result<(), Error> {
559 let states = self
560 .depth_req_states
561 .lock()
562 .await
563 .get(&symbol_id)
564 .map(|v| v.clone());
565
566 match states {
567 Some(RequestState::Requested(_)) => {
568 return Err(Error::RequestingSubscription(symbol_id, MarketType::Depth));
569 }
570 Some(RequestState::Rejected) | None => {
571 return Err(Error::NotSubscribed(symbol_id, MarketType::Depth));
572 }
573 _ => {
574 self.depth_req_states.lock().await.remove(&symbol_id);
575 self.depth_market_data.write().await.remove(&symbol_id);
576 let req = MarketDataReq::new("-1".into(), '2', 0, None, &['0', '1'], 1, symbol_id);
577 self.internal.send_message(req).await?;
578
579 log::trace!("Unsubscribed depth for symbol({})", symbol_id);
580
581 Ok(())
582 }
583 }
584 }
585}