cfix/
market_client.rs

1use std::{
2    collections::{HashMap, HashSet},
3    sync::Arc,
4};
5
6use uuid::Uuid;
7
8use async_std::sync::{Mutex, RwLock};
9use async_std::task;
10
11use crate::{
12    fixapi::FixApi,
13    messages::MarketDataReq,
14    types::{
15        ConnectionHandler, DepthPrice, Error, Field, IncrementalRefresh, InternalMDResult,
16        MarketDataHandler, MarketType, SpotPrice,
17    },
18};
19
20#[derive(Debug, PartialEq, Clone, Eq)]
21enum RequestState {
22    Requested(String),
23    Accepted,
24    Rejected,
25}
26
27pub struct MarketClient {
28    internal: FixApi,
29
30    spot_req_states: Arc<Mutex<HashMap<u32, RequestState>>>,
31    depth_req_states: Arc<Mutex<HashMap<u32, RequestState>>>,
32
33    spot_market_data: Arc<Mutex<HashMap<u32, SpotPrice>>>,
34    depth_market_data: Arc<RwLock<HashMap<u32, HashMap<String, DepthPrice>>>>,
35    //
36    //
37    market_data_handler: Option<Arc<dyn MarketDataHandler + Send + Sync>>,
38}
39
40fn insert_entry_to(e: HashMap<Field, String>, depth_data: &mut HashMap<String, DepthPrice>) {
41    if e.len() < 4 {
42        return;
43    }
44    let eid = e.get(&Field::MDEntryID).unwrap();
45    depth_data.insert(
46        eid.clone(),
47        DepthPrice {
48            price_type: e.get(&Field::MDEntryType).unwrap().parse().unwrap(),
49            price: e.get(&Field::MDEntryPx).unwrap().parse::<f64>().unwrap(),
50            size: e.get(&Field::MDEntrySize).unwrap().parse::<f64>().unwrap(),
51        },
52    );
53}
54
55fn depth_data_from_entries(data: Vec<HashMap<Field, String>>) -> HashMap<String, DepthPrice> {
56    let mut depth_data = HashMap::new();
57    for e in data.into_iter() {
58        insert_entry_to(e, &mut depth_data);
59    }
60    depth_data
61}
62
63fn spot_price_from_market_data(data: Vec<HashMap<Field, String>>) -> SpotPrice {
64    let mut price = SpotPrice {
65        bid: 0f64,
66        ask: 0f64,
67    };
68
69    for i in 0..2 {
70        let value = data[i]
71            .get(&Field::MDEntryPx)
72            .unwrap()
73            .parse::<f64>()
74            .unwrap();
75
76        if data[i].get(&Field::MDEntryType).unwrap() == "0" {
77            price.bid = value;
78        } else {
79            price.ask = value;
80        }
81    }
82    price
83}
84
85impl MarketClient {
86    pub fn new(
87        host: String,
88        login: String,
89        password: String,
90        sender_comp_id: String,
91        heartbeat_interval: Option<u32>,
92    ) -> Self {
93        Self {
94            internal: FixApi::new(
95                crate::types::SubID::QUOTE,
96                host,
97                login,
98                password,
99                sender_comp_id,
100                heartbeat_interval,
101            ),
102
103            spot_req_states: Arc::new(Mutex::new(HashMap::new())),
104            spot_market_data: Arc::new(Mutex::new(HashMap::new())),
105
106            depth_req_states: Arc::new(Mutex::new(HashMap::new())),
107            depth_market_data: Arc::new(RwLock::new(HashMap::new())),
108            market_data_handler: None,
109        }
110    }
111    pub fn register_market_handler_arc<T: MarketDataHandler + Send + Sync + 'static>(
112        &mut self,
113        handler: Arc<T>,
114    ) {
115        self.market_data_handler = Some(handler);
116    }
117
118    pub fn register_market_handler<T: MarketDataHandler + Send + Sync + 'static>(
119        &mut self,
120        handler: T,
121    ) {
122        self.market_data_handler = Some(Arc::new(handler));
123    }
124
125    pub fn register_connection_handler_arc<T: ConnectionHandler + Send + Sync + 'static>(
126        &mut self,
127        handler: Arc<T>,
128    ) {
129        self.internal.register_connection_handler_arc(handler);
130    }
131
132    pub fn register_connection_handler<T: ConnectionHandler + Send + Sync + 'static>(
133        &mut self,
134        handler: T,
135    ) {
136        self.internal.register_connection_handler(handler);
137    }
138
139    fn register_internal_handler(&mut self) {
140        // clone
141        // let trigger = self.internal.trigger.clone();
142        let spot_req_states_clone = self.spot_req_states.clone();
143        let spot_market_data_clone = self.spot_market_data.clone();
144
145        let depth_req_states_clone = self.depth_req_states.clone();
146        let depth_market_data_clone = self.depth_market_data.clone();
147
148        let market_data_handler = self.market_data_handler.clone();
149
150        let market_callback = move |mdresult: InternalMDResult| {
151            // symbol_id is only valid for msg_type - 'W'
152
153            // let tx = trigger.clone();
154            let spot_req_states_clone = spot_req_states_clone.clone();
155            let spot_market_data_clone = spot_market_data_clone.clone();
156            let depth_req_states_clone = depth_req_states_clone.clone();
157            let depth_market_data_clone = depth_market_data_clone.clone();
158
159            let market_data_handler = market_data_handler.clone();
160
161            // let mtype = String::from(msg_type);
162            //
163            task::spawn(async move {
164                match mdresult {
165                    InternalMDResult::MD {
166                        msg_type,
167                        symbol_id,
168                        data,
169                    } => {
170                        match msg_type {
171                            'W' => {
172                                // check whether data is spot or depth
173                                if data.len() != 0 && !data[0].contains_key(&Field::MDEntryID) {
174                                    //spot
175                                    let requested_symbol = spot_req_states_clone
176                                        .lock()
177                                        .await
178                                        .get(&symbol_id)
179                                        .map(|v| match v {
180                                            RequestState::Requested(_) => true,
181                                            _ => false,
182                                        })
183                                        .unwrap_or(false);
184
185                                    if requested_symbol {
186                                        spot_req_states_clone
187                                            .lock()
188                                            .await
189                                            .insert(symbol_id, RequestState::Accepted);
190                                        // to handler
191                                        if let Some(handler) = &market_data_handler {
192                                            handler.on_accpeted_spot_subscription(symbol_id).await;
193                                        }
194                                    }
195
196                                    // update spot data
197                                    if data.len() >= 2 {
198                                        let prices = spot_price_from_market_data(data);
199
200                                        spot_market_data_clone
201                                            .lock()
202                                            .await
203                                            .insert(symbol_id, prices.clone());
204
205                                        // to handler
206                                        if let Some(handler) = &market_data_handler {
207                                            handler.on_price_of(symbol_id, prices).await;
208                                        }
209                                    }
210                                } else {
211                                    // depth
212                                    let requested_symbol = depth_req_states_clone
213                                        .lock()
214                                        .await
215                                        .get(&symbol_id)
216                                        .map(|v| match v {
217                                            RequestState::Requested(_) => true,
218                                            _ => false,
219                                        })
220                                        .unwrap_or(false);
221
222                                    if requested_symbol {
223                                        depth_req_states_clone
224                                            .lock()
225                                            .await
226                                            .insert(symbol_id, RequestState::Accepted);
227
228                                        if let Some(handler) = &market_data_handler {
229                                            handler.on_accpeted_depth_subscription(symbol_id).await;
230                                        }
231                                    }
232
233                                    {
234                                        let depth_data = depth_data_from_entries(data);
235
236                                        // FIXME which one should be first?
237                                        // to handler
238                                        if let Some(handler) = &market_data_handler {
239                                            handler
240                                                .on_market_depth_full_refresh(
241                                                    symbol_id,
242                                                    depth_data.clone(),
243                                                )
244                                                .await;
245                                        }
246
247                                        // update the depth data
248                                        depth_market_data_clone
249                                            .write()
250                                            .await
251                                            .insert(symbol_id, depth_data);
252                                    }
253                                }
254                            }
255                            'X' => {
256                                // ignore the symbol_id argument
257                                //
258                                // Market data incremental refresh
259                                if Some(&RequestState::Accepted)
260                                    == depth_req_states_clone.lock().await.get(&symbol_id)
261                                {
262                                    let mut incre_list = Vec::new();
263                                    for e in data.into_iter() {
264                                        let symbol =
265                                            e.get(&Field::Symbol).unwrap().parse::<u32>().unwrap();
266
267                                        match e.get(&Field::MDUpdateAction) {
268                                            Some(s) if s == "2" => {
269                                                // delete
270                                                incre_list.push(IncrementalRefresh::Delete {
271                                                    symbol_id: symbol,
272                                                    entry_id: e
273                                                        .get(&Field::MDEntryID)
274                                                        .unwrap()
275                                                        .clone(),
276                                                });
277                                            }
278                                            Some(s) if s == "0" => {
279                                                // new
280                                                let eid = e.get(&Field::MDEntryID).unwrap();
281                                                incre_list.push(IncrementalRefresh::New {
282                                                    symbol_id: symbol,
283                                                    entry_id: eid.clone(),
284                                                    data: DepthPrice {
285                                                        price_type: e
286                                                            .get(&Field::MDEntryType)
287                                                            .unwrap()
288                                                            .parse()
289                                                            .unwrap(),
290                                                        price: e
291                                                            .get(&Field::MDEntryPx)
292                                                            .unwrap()
293                                                            .parse::<f64>()
294                                                            .unwrap(),
295                                                        size: e
296                                                            .get(&Field::MDEntrySize)
297                                                            .unwrap()
298                                                            .parse::<f64>()
299                                                            .unwrap(),
300                                                    },
301                                                });
302                                            }
303                                            _ => {}
304                                        }
305                                    }
306
307                                    // FIXME which one should be first?
308                                    // to handler
309                                    if let Some(handler) = market_data_handler {
310                                        handler
311                                            .on_market_depth_incremental_refresh(incre_list.clone())
312                                            .await;
313                                    }
314
315                                    {
316                                        let mut depth_cont = depth_market_data_clone.write().await;
317                                        for incre in incre_list.into_iter() {
318                                            match incre {
319                                                IncrementalRefresh::New {
320                                                    symbol_id,
321                                                    entry_id,
322                                                    data,
323                                                } => {
324                                                    let s = depth_cont
325                                                        .entry(symbol_id)
326                                                        .or_insert(HashMap::new());
327                                                    s.insert(entry_id, data);
328                                                }
329                                                IncrementalRefresh::Delete {
330                                                    symbol_id,
331                                                    entry_id,
332                                                } => {
333                                                    let s = depth_cont
334                                                        .entry(symbol_id)
335                                                        .or_insert(HashMap::new());
336                                                    s.remove(&entry_id);
337                                                }
338                                            }
339                                        }
340                                    }
341                                    //
342                                }
343                            }
344                            _ => {}
345                        }
346                    }
347                    InternalMDResult::MDReject {
348                        symbol_id,
349                        md_req_id,
350                        err_msg,
351                    } => {
352                        let spot_requested = spot_req_states_clone
353                            .lock()
354                            .await
355                            .values()
356                            .filter(|s| match s {
357                                RequestState::Requested(value) => value == md_req_id.as_str(),
358                                _ => false,
359                            })
360                            .count()
361                            == 1;
362                        if spot_requested {
363                            // change the state
364                            spot_req_states_clone
365                                .lock()
366                                .await
367                                .insert(symbol_id, RequestState::Rejected);
368                            // notify
369                            if let Some(handler) = &market_data_handler {
370                                handler
371                                    .on_rejected_spot_subscription(symbol_id, err_msg.clone())
372                                    .await;
373                            }
374                        }
375
376                        let depth_requested = depth_req_states_clone
377                            .lock()
378                            .await
379                            .values()
380                            .filter(|s| match s {
381                                RequestState::Requested(value) => value == md_req_id.as_str(),
382                                _ => false,
383                            })
384                            .count()
385                            == 1;
386                        if depth_requested {
387                            // change the state
388                            depth_req_states_clone
389                                .lock()
390                                .await
391                                .insert(symbol_id, RequestState::Rejected);
392                            // notify
393                            if let Some(handler) = &market_data_handler {
394                                handler
395                                    .on_rejected_depth_subscription(symbol_id, err_msg.clone())
396                                    .await;
397                            }
398                        }
399                    }
400                }
401            });
402        };
403        self.internal.register_market_callback(market_callback);
404    }
405
406    /// Connects to a server
407    ///
408    /// This method first attempt to establish a connection. If the connection is succesful, then
409    /// it proceeds to logon directly.
410    pub async fn connect(&mut self) -> Result<(), Error> {
411        // set market handler
412        self.register_internal_handler();
413        self.spot_req_states.lock().await.clear();
414        self.depth_req_states.lock().await.clear();
415        self.spot_market_data.lock().await.clear();
416        self.depth_market_data.write().await.clear();
417
418        // connection
419        self.internal.connect().await?;
420        self.internal.logon(true).await
421    }
422
423    pub async fn disconnect(&mut self) -> Result<(), Error> {
424        self.internal.logout().await?;
425        self.internal.disconnect().await
426    }
427
428    pub fn is_connected(&self) -> bool {
429        self.internal.is_connected()
430    }
431
432    pub async fn spot_subscription_list(&self) -> HashSet<u32> {
433        self.spot_req_states
434            .lock()
435            .await
436            .iter()
437            .filter(|(_, v)| *v == &RequestState::Accepted)
438            .map(|(k, _)| *k)
439            .collect()
440    }
441
442    pub async fn depth_subscription_list(&self) -> HashSet<u32> {
443        self.depth_req_states
444            .lock()
445            .await
446            .iter()
447            .filter(|(_, v)| *v == &RequestState::Accepted)
448            .map(|(k, _)| *k)
449            .collect()
450    }
451
452    pub async fn price_of(&self, symbol_id: u32) -> Result<SpotPrice, Error> {
453        self.spot_market_data
454            .lock()
455            .await
456            .get(&symbol_id)
457            .map(|v| v.clone())
458            .ok_or(Error::NotSubscribed(symbol_id, MarketType::Spot))
459    }
460
461    pub async fn depth_data(&self, symbol_id: u32) -> Result<HashMap<String, DepthPrice>, Error> {
462        self.depth_market_data
463            .read()
464            .await
465            .get(&symbol_id)
466            .map(|v| v.clone())
467            .ok_or(Error::NotSubscribed(symbol_id, MarketType::Spot))
468    }
469
470    pub async fn subscribe_spot(&self, symbol_id: u32) -> Result<(), Error> {
471        // FIXME later
472        // .. code is too messy. is there a better way?
473
474        let mdreqid = Uuid::new_v4().to_string();
475        // check already subscribed?
476        if let Some(state) = self.spot_req_states.lock().await.get(&symbol_id) {
477            match state {
478                RequestState::Accepted => {
479                    return Err(Error::SubscribedAlready(symbol_id, MarketType::Spot));
480                }
481                RequestState::Requested(_) => {
482                    return Err(Error::RequestingSubscription(symbol_id, MarketType::Spot));
483                }
484                _ => {}
485            }
486        }
487
488        // add to requested symbol.
489        self.spot_req_states
490            .lock()
491            .await
492            .insert(symbol_id, RequestState::Requested(mdreqid.clone()));
493
494        // intialize the request and send req
495        let req = MarketDataReq::new(mdreqid, '1', 1, None, &['0', '1'], 1, symbol_id);
496        self.internal.send_message(req).await?;
497
498        Ok(())
499    }
500
501    pub async fn unsubscribe_spot(&self, symbol_id: u32) -> Result<(), Error> {
502        // if let Some(RequestState::Requested) =
503        let states = self
504            .spot_req_states
505            .lock()
506            .await
507            .get(&symbol_id)
508            .map(|v| v.clone());
509
510        match states {
511            Some(RequestState::Requested(_)) => {
512                return Err(Error::RequestingSubscription(symbol_id, MarketType::Spot));
513            }
514            Some(RequestState::Rejected) | None => {
515                return Err(Error::NotSubscribed(symbol_id, MarketType::Spot));
516            }
517            _ => {
518                self.spot_req_states.lock().await.remove(&symbol_id);
519                self.spot_market_data.lock().await.remove(&symbol_id);
520                let req = MarketDataReq::new("-1".into(), '2', 1, None, &['0', '1'], 1, symbol_id);
521                let _seq_num = self.internal.send_message(req).await?;
522
523                log::trace!("Unsubscribed spot for symbol({})", symbol_id);
524
525                Ok(())
526            }
527        }
528    }
529
530    pub async fn subscribe_depth(&self, symbol_id: u32) -> Result<(), Error> {
531        let mdreqid = Uuid::new_v4().to_string();
532        // check already subscribed?
533        if let Some(state) = self.depth_req_states.lock().await.get(&symbol_id) {
534            match state {
535                RequestState::Accepted => {
536                    return Err(Error::SubscribedAlready(symbol_id, MarketType::Depth));
537                }
538                RequestState::Requested(_) => {
539                    return Err(Error::RequestingSubscription(symbol_id, MarketType::Depth));
540                }
541                _ => {}
542            }
543        }
544
545        // add to requested symbol.
546        self.depth_req_states
547            .lock()
548            .await
549            .insert(symbol_id, RequestState::Requested(mdreqid.clone()));
550
551        // intialize the request and send req
552        let req = MarketDataReq::new(mdreqid, '1', 0, None, &['0', '1'], 1, symbol_id);
553        self.internal.send_message(req).await?;
554
555        Ok(())
556    }
557
558    pub async fn unsubscribe_depth(&self, symbol_id: u32) -> Result<(), Error> {
559        let states = self
560            .depth_req_states
561            .lock()
562            .await
563            .get(&symbol_id)
564            .map(|v| v.clone());
565
566        match states {
567            Some(RequestState::Requested(_)) => {
568                return Err(Error::RequestingSubscription(symbol_id, MarketType::Depth));
569            }
570            Some(RequestState::Rejected) | None => {
571                return Err(Error::NotSubscribed(symbol_id, MarketType::Depth));
572            }
573            _ => {
574                self.depth_req_states.lock().await.remove(&symbol_id);
575                self.depth_market_data.write().await.remove(&symbol_id);
576                let req = MarketDataReq::new("-1".into(), '2', 0, None, &['0', '1'], 1, symbol_id);
577                self.internal.send_message(req).await?;
578
579                log::trace!("Unsubscribed depth for symbol({})", symbol_id);
580
581                Ok(())
582            }
583        }
584    }
585}