1use std::sync::{
19 Arc, RwLock,
20 atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27 clients::DataClient,
28 live::{runner::get_data_event_sender, runtime::get_runtime},
29 messages::{
30 DataEvent,
31 data::{
32 BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
33 InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
34 RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
35 SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
36 SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
37 SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
38 UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
39 UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
40 },
41 },
42};
43use nautilus_core::{
44 MUTEX_POISONED,
45 datetime::datetime_to_unix_nanos,
46 time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49 data::{Data, FundingRateUpdate, OrderBookDeltas_API},
50 enums::BookType,
51 identifiers::{ClientId, InstrumentId, Venue},
52 instruments::{Instrument, InstrumentAny},
53};
54use tokio::{task::JoinHandle, time::Duration};
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58 common::{
59 consts::OKX_VENUE,
60 enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
61 parse::okx_instrument_type_from_symbol,
62 },
63 config::OKXDataClientConfig,
64 http::client::OKXHttpClient,
65 websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
66};
67
68#[derive(Debug)]
69pub struct OKXDataClient {
70 client_id: ClientId,
71 config: OKXDataClientConfig,
72 http_client: OKXHttpClient,
73 ws_public: Option<OKXWebSocketClient>,
74 ws_business: Option<OKXWebSocketClient>,
75 is_connected: AtomicBool,
76 cancellation_token: CancellationToken,
77 tasks: Vec<JoinHandle<()>>,
78 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
79 instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
80 book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
81 clock: &'static AtomicTime,
82}
83
84impl OKXDataClient {
85 pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
91 let clock = get_atomic_clock_realtime();
92 let data_sender = get_data_event_sender();
93
94 let http_client = if config.has_api_credentials() {
95 OKXHttpClient::with_credentials(
96 config.api_key.clone(),
97 config.api_secret.clone(),
98 config.api_passphrase.clone(),
99 config.base_url_http.clone(),
100 config.http_timeout_secs,
101 config.max_retries,
102 config.retry_delay_initial_ms,
103 config.retry_delay_max_ms,
104 config.is_demo,
105 config.http_proxy_url.clone(),
106 )?
107 } else {
108 OKXHttpClient::new(
109 config.base_url_http.clone(),
110 config.http_timeout_secs,
111 config.max_retries,
112 config.retry_delay_initial_ms,
113 config.retry_delay_max_ms,
114 config.is_demo,
115 config.http_proxy_url.clone(),
116 )?
117 };
118
119 let ws_public = OKXWebSocketClient::new(
120 Some(config.ws_public_url()),
121 None,
122 None,
123 None,
124 None,
125 Some(20), )
127 .context("failed to construct OKX public websocket client")?;
128
129 let ws_business = if config.requires_business_ws() {
130 Some(
131 OKXWebSocketClient::new(
132 Some(config.ws_business_url()),
133 config.api_key.clone(),
134 config.api_secret.clone(),
135 config.api_passphrase.clone(),
136 None,
137 Some(20), )
139 .context("failed to construct OKX business websocket client")?,
140 )
141 } else {
142 None
143 };
144
145 if let Some(vip_level) = config.vip_level {
146 ws_public.set_vip_level(vip_level);
147
148 if let Some(ref ws) = ws_business {
149 ws.set_vip_level(vip_level);
150 }
151 }
152
153 Ok(Self {
154 client_id,
155 config,
156 http_client,
157 ws_public: Some(ws_public),
158 ws_business,
159 is_connected: AtomicBool::new(false),
160 cancellation_token: CancellationToken::new(),
161 tasks: Vec::new(),
162 data_sender,
163 instruments: Arc::new(RwLock::new(AHashMap::new())),
164 book_channels: Arc::new(RwLock::new(AHashMap::new())),
165 clock,
166 })
167 }
168
169 fn venue(&self) -> Venue {
170 *OKX_VENUE
171 }
172
173 fn vip_level(&self) -> Option<OKXVipLevel> {
174 self.ws_public.as_ref().map(|ws| ws.vip_level())
175 }
176
177 fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
178 self.ws_public
179 .as_ref()
180 .context("public websocket client not initialized")
181 }
182
183 fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
184 self.ws_business
185 .as_ref()
186 .context("business websocket client not available (credentials required)")
187 }
188
189 fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
190 if let Err(e) = sender.send(DataEvent::Data(data)) {
191 log::error!("Failed to emit data event: {e}");
192 }
193 }
194
195 fn spawn_ws<F>(&self, fut: F, context: &'static str)
196 where
197 F: Future<Output = anyhow::Result<()>> + Send + 'static,
198 {
199 get_runtime().spawn(async move {
200 if let Err(e) = fut.await {
201 log::error!("{context}: {e:?}");
202 }
203 });
204 }
205
206 fn handle_ws_message(
207 message: NautilusWsMessage,
208 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
209 instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
210 ) {
211 match message {
212 NautilusWsMessage::Data(payloads) => {
213 for data in payloads {
214 Self::send_data(data_sender, data);
215 }
216 }
217 NautilusWsMessage::Deltas(deltas) => {
218 Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
219 }
220 NautilusWsMessage::FundingRates(updates) => {
221 emit_funding_rates(data_sender, updates);
222 }
223 NautilusWsMessage::Instrument(instrument, status) => {
224 upsert_instrument(instruments, *instrument);
225
226 if let Some(status) = status
227 && let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status))
228 {
229 log::error!("Failed to emit instrument status event: {e}");
230 }
231 }
232 NautilusWsMessage::InstrumentStatus(status) => {
233 if let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status)) {
234 log::error!("Failed to emit instrument status event: {e}");
235 }
236 }
237 NautilusWsMessage::AccountUpdate(_)
238 | NautilusWsMessage::PositionUpdate(_)
239 | NautilusWsMessage::OrderAccepted(_)
240 | NautilusWsMessage::OrderCanceled(_)
241 | NautilusWsMessage::OrderExpired(_)
242 | NautilusWsMessage::OrderRejected(_)
243 | NautilusWsMessage::OrderCancelRejected(_)
244 | NautilusWsMessage::OrderModifyRejected(_)
245 | NautilusWsMessage::OrderTriggered(_)
246 | NautilusWsMessage::OrderUpdated(_)
247 | NautilusWsMessage::ExecutionReports(_) => {
248 log::debug!("Ignoring trading message on data client");
249 }
250 NautilusWsMessage::Error(e) => {
251 log::error!("OKX websocket error: {e:?}");
252 }
253 NautilusWsMessage::Raw(value) => {
254 log::debug!("Unhandled websocket payload: {value:?}");
255 }
256 NautilusWsMessage::Reconnected => {
257 log::info!("Websocket reconnected");
258 }
259 NautilusWsMessage::Authenticated => {
260 log::debug!("Websocket authenticated");
261 }
262 }
263 }
264}
265
266fn emit_funding_rates(
267 sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
268 updates: Vec<FundingRateUpdate>,
269) {
270 for update in updates {
271 if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
272 log::error!("Failed to emit funding rate event: {e}");
273 }
274 }
275}
276
277fn upsert_instrument(
278 cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
279 instrument: InstrumentAny,
280) {
281 let mut guard = cache.write().expect(MUTEX_POISONED);
282 guard.insert(instrument.id(), instrument);
283}
284
285fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
286 contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
287}
288
289fn contract_filter_with_config_types(
290 contract_types: Option<&Vec<OKXContractType>>,
291 instrument: &InstrumentAny,
292) -> bool {
293 match contract_types {
294 None => true,
295 Some(filter) if filter.is_empty() => true,
296 Some(filter) => {
297 let is_inverse = instrument.is_inverse();
298 (is_inverse && filter.contains(&OKXContractType::Inverse))
299 || (!is_inverse && filter.contains(&OKXContractType::Linear))
300 }
301 }
302}
303
304#[async_trait::async_trait(?Send)]
305impl DataClient for OKXDataClient {
306 fn client_id(&self) -> ClientId {
307 self.client_id
308 }
309
310 fn venue(&self) -> Option<Venue> {
311 Some(self.venue())
312 }
313
314 fn start(&mut self) -> anyhow::Result<()> {
315 log::info!(
316 "Started: client_id={}, vip_level={:?}, instrument_types={:?}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
317 self.client_id,
318 self.vip_level(),
319 self.config.instrument_types,
320 self.config.is_demo,
321 self.config.http_proxy_url,
322 self.config.ws_proxy_url,
323 );
324 Ok(())
325 }
326
327 fn stop(&mut self) -> anyhow::Result<()> {
328 log::info!("Stopping {id}", id = self.client_id);
329 self.cancellation_token.cancel();
330 self.is_connected.store(false, Ordering::Relaxed);
331 Ok(())
332 }
333
334 fn reset(&mut self) -> anyhow::Result<()> {
335 log::debug!("Resetting {id}", id = self.client_id);
336 self.is_connected.store(false, Ordering::Relaxed);
337 self.cancellation_token = CancellationToken::new();
338 self.tasks.clear();
339 self.book_channels
340 .write()
341 .expect("book channel cache lock poisoned")
342 .clear();
343 Ok(())
344 }
345
346 fn dispose(&mut self) -> anyhow::Result<()> {
347 log::debug!("Disposing {id}", id = self.client_id);
348 self.stop()
349 }
350
351 async fn connect(&mut self) -> anyhow::Result<()> {
352 if self.is_connected() {
353 return Ok(());
354 }
355
356 self.cancellation_token = CancellationToken::new();
359
360 let instrument_types = if self.config.instrument_types.is_empty() {
361 vec![OKXInstrumentType::Spot]
362 } else {
363 self.config.instrument_types.clone()
364 };
365
366 let mut all_instruments = Vec::new();
367 for inst_type in &instrument_types {
368 let (mut fetched, _inst_id_codes) = self
369 .http_client
370 .request_instruments(*inst_type, None)
371 .await
372 .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
373
374 fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
375 self.http_client.cache_instruments(fetched.clone());
376
377 let mut guard = self.instruments.write().expect(MUTEX_POISONED);
378 for instrument in &fetched {
379 guard.insert(instrument.id(), instrument.clone());
380 }
381 drop(guard);
382
383 all_instruments.extend(fetched);
384 }
385
386 for instrument in all_instruments {
387 if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
388 log::warn!("Failed to send instrument: {e}");
389 }
390 }
391
392 if let Some(ref mut ws) = self.ws_public {
393 let instruments: Vec<_> = self
395 .instruments
396 .read()
397 .expect(MUTEX_POISONED)
398 .values()
399 .cloned()
400 .collect();
401 ws.cache_instruments(instruments);
402
403 ws.connect()
404 .await
405 .context("failed to connect OKX public websocket")?;
406 ws.wait_until_active(10.0)
407 .await
408 .context("public websocket did not become active")?;
409
410 let stream = ws.stream();
411 let sender = self.data_sender.clone();
412 let insts = self.instruments.clone();
413 let cancel = self.cancellation_token.clone();
414 let handle = get_runtime().spawn(async move {
415 pin_mut!(stream);
416 loop {
417 tokio::select! {
418 Some(message) = stream.next() => {
419 Self::handle_ws_message(message, &sender, &insts);
420 }
421 () = cancel.cancelled() => {
422 log::debug!("Public websocket stream task cancelled");
423 break;
424 }
425 }
426 }
427 });
428 self.tasks.push(handle);
429
430 for inst_type in &instrument_types {
431 ws.subscribe_instruments(*inst_type)
432 .await
433 .with_context(|| {
434 format!("failed to subscribe to instrument type {inst_type:?}")
435 })?;
436 }
437 }
438
439 if let Some(ref mut ws) = self.ws_business {
440 let instruments: Vec<_> = self
442 .instruments
443 .read()
444 .expect(MUTEX_POISONED)
445 .values()
446 .cloned()
447 .collect();
448 ws.cache_instruments(instruments);
449
450 ws.connect()
451 .await
452 .context("failed to connect OKX business websocket")?;
453 ws.wait_until_active(10.0)
454 .await
455 .context("business websocket did not become active")?;
456
457 let stream = ws.stream();
458 let sender = self.data_sender.clone();
459 let insts = self.instruments.clone();
460 let cancel = self.cancellation_token.clone();
461 let handle = get_runtime().spawn(async move {
462 pin_mut!(stream);
463 loop {
464 tokio::select! {
465 Some(message) = stream.next() => {
466 Self::handle_ws_message(message, &sender, &insts);
467 }
468 () = cancel.cancelled() => {
469 log::debug!("Business websocket stream task cancelled");
470 break;
471 }
472 }
473 }
474 });
475 self.tasks.push(handle);
476 }
477
478 self.is_connected.store(true, Ordering::Release);
479 log::info!("Connected: client_id={}", self.client_id);
480 Ok(())
481 }
482
483 async fn disconnect(&mut self) -> anyhow::Result<()> {
484 if self.is_disconnected() {
485 return Ok(());
486 }
487
488 self.cancellation_token.cancel();
489
490 if let Some(ref ws) = self.ws_public
491 && let Err(e) = ws.unsubscribe_all().await
492 {
493 log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
494 }
495
496 if let Some(ref ws) = self.ws_business
497 && let Err(e) = ws.unsubscribe_all().await
498 {
499 log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
500 }
501
502 tokio::time::sleep(Duration::from_millis(500)).await;
504
505 if let Some(ref mut ws) = self.ws_public {
506 let _ = ws.close().await;
507 }
508
509 if let Some(ref mut ws) = self.ws_business {
510 let _ = ws.close().await;
511 }
512
513 let handles: Vec<_> = self.tasks.drain(..).collect();
514 for handle in handles {
515 if let Err(e) = handle.await {
516 log::error!("Error joining websocket task: {e}");
517 }
518 }
519
520 self.book_channels.write().expect(MUTEX_POISONED).clear();
521 self.is_connected.store(false, Ordering::Release);
522 log::info!("Disconnected: client_id={}", self.client_id);
523 Ok(())
524 }
525
526 fn is_connected(&self) -> bool {
527 self.is_connected.load(Ordering::Relaxed)
528 }
529
530 fn is_disconnected(&self) -> bool {
531 !self.is_connected()
532 }
533
534 fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
535 for inst_type in &self.config.instrument_types {
536 let ws = self.public_ws()?.clone();
537 let inst_type = *inst_type;
538
539 self.spawn_ws(
540 async move {
541 ws.subscribe_instruments(inst_type)
542 .await
543 .context("instruments subscription")?;
544 Ok(())
545 },
546 "subscribe_instruments",
547 );
548 }
549 Ok(())
550 }
551
552 fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
553 let instrument_id = cmd.instrument_id;
556 let ws = self.public_ws()?.clone();
557
558 self.spawn_ws(
559 async move {
560 ws.subscribe_instrument(instrument_id)
561 .await
562 .context("instrument type subscription")?;
563 Ok(())
564 },
565 "subscribe_instrument",
566 );
567 Ok(())
568 }
569
570 fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
571 if cmd.book_type != BookType::L2_MBP {
572 anyhow::bail!("OKX only supports L2_MBP order book deltas");
573 }
574
575 let depth = cmd.depth.map_or(0, |d| d.get());
576 if !matches!(depth, 0 | 50 | 400) {
577 anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
578 }
579
580 let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
581 let channel = match depth {
582 50 => {
583 if vip < OKXVipLevel::Vip4 {
584 anyhow::bail!(
585 "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
586 );
587 }
588 OKXBookChannel::Books50L2Tbt
589 }
590 0 | 400 => {
591 if vip >= OKXVipLevel::Vip5 {
592 OKXBookChannel::BookL2Tbt
593 } else {
594 OKXBookChannel::Book
595 }
596 }
597 _ => unreachable!(),
598 };
599
600 let instrument_id = cmd.instrument_id;
601 let ws = self.public_ws()?.clone();
602 let book_channels = Arc::clone(&self.book_channels);
603
604 self.spawn_ws(
605 async move {
606 match channel {
607 OKXBookChannel::Books50L2Tbt => ws
608 .subscribe_book50_l2_tbt(instrument_id)
609 .await
610 .context("books50-l2-tbt subscription")?,
611 OKXBookChannel::BookL2Tbt => ws
612 .subscribe_book_l2_tbt(instrument_id)
613 .await
614 .context("books-l2-tbt subscription")?,
615 OKXBookChannel::Book => ws
616 .subscribe_books_channel(instrument_id)
617 .await
618 .context("books subscription")?,
619 }
620 book_channels
621 .write()
622 .expect("book channel cache lock poisoned")
623 .insert(instrument_id, channel);
624 Ok(())
625 },
626 "order book delta subscription",
627 );
628
629 Ok(())
630 }
631
632 fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
633 let ws = self.public_ws()?.clone();
634 let instrument_id = cmd.instrument_id;
635
636 self.spawn_ws(
637 async move {
638 ws.subscribe_quotes(instrument_id)
639 .await
640 .context("quotes subscription")
641 },
642 "quote subscription",
643 );
644 Ok(())
645 }
646
647 fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
648 let ws = self.public_ws()?.clone();
649 let instrument_id = cmd.instrument_id;
650
651 self.spawn_ws(
652 async move {
653 ws.subscribe_trades(instrument_id, false)
654 .await
655 .context("trades subscription")
656 },
657 "trade subscription",
658 );
659 Ok(())
660 }
661
662 fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
663 let ws = self.public_ws()?.clone();
664 let instrument_id = cmd.instrument_id;
665
666 self.spawn_ws(
667 async move {
668 ws.subscribe_mark_prices(instrument_id)
669 .await
670 .context("mark price subscription")
671 },
672 "mark price subscription",
673 );
674 Ok(())
675 }
676
677 fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
678 let ws = self.public_ws()?.clone();
679 let instrument_id = cmd.instrument_id;
680
681 self.spawn_ws(
682 async move {
683 ws.subscribe_index_prices(instrument_id)
684 .await
685 .context("index price subscription")
686 },
687 "index price subscription",
688 );
689 Ok(())
690 }
691
692 fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
693 let ws = self.business_ws()?.clone();
694 let bar_type = cmd.bar_type;
695
696 self.spawn_ws(
697 async move {
698 ws.subscribe_bars(bar_type)
699 .await
700 .context("bars subscription")
701 },
702 "bar subscription",
703 );
704 Ok(())
705 }
706
707 fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
708 let ws = self.public_ws()?.clone();
709 let instrument_id = cmd.instrument_id;
710
711 self.spawn_ws(
712 async move {
713 ws.subscribe_funding_rates(instrument_id)
714 .await
715 .context("funding rate subscription")
716 },
717 "funding rate subscription",
718 );
719 Ok(())
720 }
721
722 fn subscribe_instrument_status(
723 &mut self,
724 cmd: &SubscribeInstrumentStatus,
725 ) -> anyhow::Result<()> {
726 let ws = self.public_ws()?.clone();
727 let instrument_id = cmd.instrument_id;
728
729 self.spawn_ws(
730 async move {
731 ws.subscribe_instrument(instrument_id)
732 .await
733 .context("instrument status subscription")
734 },
735 "instrument status subscription",
736 );
737 Ok(())
738 }
739
740 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
741 let ws = self.public_ws()?.clone();
742 let instrument_id = cmd.instrument_id;
743 let channel = self
744 .book_channels
745 .write()
746 .expect("book channel cache lock poisoned")
747 .remove(&instrument_id);
748
749 self.spawn_ws(
750 async move {
751 match channel {
752 Some(OKXBookChannel::Books50L2Tbt) => ws
753 .unsubscribe_book50_l2_tbt(instrument_id)
754 .await
755 .context("books50-l2-tbt unsubscribe")?,
756 Some(OKXBookChannel::BookL2Tbt) => ws
757 .unsubscribe_book_l2_tbt(instrument_id)
758 .await
759 .context("books-l2-tbt unsubscribe")?,
760 Some(OKXBookChannel::Book) => ws
761 .unsubscribe_book(instrument_id)
762 .await
763 .context("book unsubscribe")?,
764 None => {
765 log::warn!(
766 "Book channel not found for {instrument_id}; unsubscribing fallback channel"
767 );
768 ws.unsubscribe_book(instrument_id)
769 .await
770 .context("book fallback unsubscribe")?;
771 }
772 }
773 Ok(())
774 },
775 "order book unsubscribe",
776 );
777 Ok(())
778 }
779
780 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
781 let ws = self.public_ws()?.clone();
782 let instrument_id = cmd.instrument_id;
783
784 self.spawn_ws(
785 async move {
786 ws.unsubscribe_quotes(instrument_id)
787 .await
788 .context("quotes unsubscribe")
789 },
790 "quote unsubscribe",
791 );
792 Ok(())
793 }
794
795 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
796 let ws = self.public_ws()?.clone();
797 let instrument_id = cmd.instrument_id;
798
799 self.spawn_ws(
800 async move {
801 ws.unsubscribe_trades(instrument_id, false) .await
803 .context("trades unsubscribe")
804 },
805 "trade unsubscribe",
806 );
807 Ok(())
808 }
809
810 fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
811 let ws = self.public_ws()?.clone();
812 let instrument_id = cmd.instrument_id;
813
814 self.spawn_ws(
815 async move {
816 ws.unsubscribe_mark_prices(instrument_id)
817 .await
818 .context("mark price unsubscribe")
819 },
820 "mark price unsubscribe",
821 );
822 Ok(())
823 }
824
825 fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
826 let ws = self.public_ws()?.clone();
827 let instrument_id = cmd.instrument_id;
828
829 self.spawn_ws(
830 async move {
831 ws.unsubscribe_index_prices(instrument_id)
832 .await
833 .context("index price unsubscribe")
834 },
835 "index price unsubscribe",
836 );
837 Ok(())
838 }
839
840 fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
841 let ws = self.business_ws()?.clone();
842 let bar_type = cmd.bar_type;
843
844 self.spawn_ws(
845 async move {
846 ws.unsubscribe_bars(bar_type)
847 .await
848 .context("bars unsubscribe")
849 },
850 "bar unsubscribe",
851 );
852 Ok(())
853 }
854
855 fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
856 let ws = self.public_ws()?.clone();
857 let instrument_id = cmd.instrument_id;
858
859 self.spawn_ws(
860 async move {
861 ws.unsubscribe_funding_rates(instrument_id)
862 .await
863 .context("funding rate unsubscribe")
864 },
865 "funding rate unsubscribe",
866 );
867 Ok(())
868 }
869
870 fn unsubscribe_instrument_status(
871 &mut self,
872 cmd: &UnsubscribeInstrumentStatus,
873 ) -> anyhow::Result<()> {
874 let ws = self.public_ws()?.clone();
875 let instrument_id = cmd.instrument_id;
876
877 self.spawn_ws(
878 async move {
879 ws.unsubscribe_instrument(instrument_id)
880 .await
881 .context("instrument status unsubscription")
882 },
883 "instrument status unsubscription",
884 );
885 Ok(())
886 }
887
888 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
889 let http = self.http_client.clone();
890 let sender = self.data_sender.clone();
891 let instruments_cache = self.instruments.clone();
892 let request_id = request.request_id;
893 let client_id = request.client_id.unwrap_or(self.client_id);
894 let venue = self.venue();
895 let start = request.start;
896 let end = request.end;
897 let params = request.params;
898 let clock = self.clock;
899 let start_nanos = datetime_to_unix_nanos(start);
900 let end_nanos = datetime_to_unix_nanos(end);
901 let instrument_types = if self.config.instrument_types.is_empty() {
902 vec![OKXInstrumentType::Spot]
903 } else {
904 self.config.instrument_types.clone()
905 };
906 let contract_types = self.config.contract_types.clone();
907 let instrument_families = self.config.instrument_families.clone();
908
909 get_runtime().spawn(async move {
910 let mut all_instruments = Vec::new();
911
912 for inst_type in instrument_types {
913 let supports_family = matches!(
914 inst_type,
915 OKXInstrumentType::Futures
916 | OKXInstrumentType::Swap
917 | OKXInstrumentType::Option
918 );
919
920 let families = match (&instrument_families, inst_type, supports_family) {
921 (Some(families), OKXInstrumentType::Option, true) => families.clone(),
922 (Some(families), _, true) => families.clone(),
923 (None, OKXInstrumentType::Option, _) => {
924 log::warn!(
925 "Skipping OPTION type: instrument_families required but not configured"
926 );
927 continue;
928 }
929 _ => vec![],
930 };
931
932 if families.is_empty() {
933 match http.request_instruments(inst_type, None).await {
934 Ok((instruments, _inst_id_codes)) => {
935 for instrument in instruments {
936 if !contract_filter_with_config_types(
937 contract_types.as_ref(),
938 &instrument,
939 ) {
940 continue;
941 }
942
943 upsert_instrument(&instruments_cache, instrument.clone());
944 all_instruments.push(instrument);
945 }
946 }
947 Err(e) => {
948 log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
949 }
950 }
951 } else {
952 for family in families {
953 match http
954 .request_instruments(inst_type, Some(family.clone()))
955 .await
956 {
957 Ok((instruments, _inst_id_codes)) => {
958 for instrument in instruments {
959 if !contract_filter_with_config_types(
960 contract_types.as_ref(),
961 &instrument,
962 ) {
963 continue;
964 }
965
966 upsert_instrument(&instruments_cache, instrument.clone());
967 all_instruments.push(instrument);
968 }
969 }
970 Err(e) => {
971 log::error!(
972 "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
973 );
974 }
975 }
976 }
977 }
978 }
979
980 let response = DataResponse::Instruments(InstrumentsResponse::new(
981 request_id,
982 client_id,
983 venue,
984 all_instruments,
985 start_nanos,
986 end_nanos,
987 clock.get_time_ns(),
988 params,
989 ));
990
991 if let Err(e) = sender.send(DataEvent::Response(response)) {
992 log::error!("Failed to send instruments response: {e}");
993 }
994 });
995
996 Ok(())
997 }
998
999 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1000 let http = self.http_client.clone();
1001 let sender = self.data_sender.clone();
1002 let instruments = self.instruments.clone();
1003 let instrument_id = request.instrument_id;
1004 let request_id = request.request_id;
1005 let client_id = request.client_id.unwrap_or(self.client_id);
1006 let start = request.start;
1007 let end = request.end;
1008 let params = request.params;
1009 let clock = self.clock;
1010 let start_nanos = datetime_to_unix_nanos(start);
1011 let end_nanos = datetime_to_unix_nanos(end);
1012 let instrument_types = if self.config.instrument_types.is_empty() {
1013 vec![OKXInstrumentType::Spot]
1014 } else {
1015 self.config.instrument_types.clone()
1016 };
1017 let contract_types = self.config.contract_types.clone();
1018
1019 get_runtime().spawn(async move {
1020 match http
1021 .request_instrument(instrument_id)
1022 .await
1023 .context("fetch instrument from API")
1024 {
1025 Ok(instrument) => {
1026 let inst_id = instrument.id();
1027 let symbol = inst_id.symbol.as_str();
1028 let inst_type = okx_instrument_type_from_symbol(symbol);
1029 if !instrument_types.contains(&inst_type) {
1030 log::error!(
1031 "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1032 );
1033 return;
1034 }
1035
1036 if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1037 log::error!(
1038 "Instrument {instrument_id} filtered out by contract_types config"
1039 );
1040 return;
1041 }
1042
1043 upsert_instrument(&instruments, instrument.clone());
1044
1045 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1046 request_id,
1047 client_id,
1048 instrument.id(),
1049 instrument,
1050 start_nanos,
1051 end_nanos,
1052 clock.get_time_ns(),
1053 params,
1054 )));
1055
1056 if let Err(e) = sender.send(DataEvent::Response(response)) {
1057 log::error!("Failed to send instrument response: {e}");
1058 }
1059 }
1060 Err(e) => log::error!("Instrument request failed: {e:?}"),
1061 }
1062 });
1063
1064 Ok(())
1065 }
1066
1067 fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1068 let http = self.http_client.clone();
1069 let sender = self.data_sender.clone();
1070 let instrument_id = request.instrument_id;
1071 let depth = request.depth.map(|n| n.get() as u32);
1072 let request_id = request.request_id;
1073 let client_id = request.client_id.unwrap_or(self.client_id);
1074 let params = request.params;
1075 let clock = self.clock;
1076
1077 get_runtime().spawn(async move {
1078 match http
1079 .request_book_snapshot(instrument_id, depth)
1080 .await
1081 .context("failed to request book snapshot from OKX")
1082 {
1083 Ok(book) => {
1084 let response = DataResponse::Book(BookResponse::new(
1085 request_id,
1086 client_id,
1087 instrument_id,
1088 book,
1089 None,
1090 None,
1091 clock.get_time_ns(),
1092 params,
1093 ));
1094
1095 if let Err(e) = sender.send(DataEvent::Response(response)) {
1096 log::error!("Failed to send book snapshot response: {e}");
1097 }
1098 }
1099 Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1100 }
1101 });
1102
1103 Ok(())
1104 }
1105
1106 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1107 let http = self.http_client.clone();
1108 let sender = self.data_sender.clone();
1109 let instrument_id = request.instrument_id;
1110 let start = request.start;
1111 let end = request.end;
1112 let limit = request.limit.map(|n| n.get() as u32);
1113 let request_id = request.request_id;
1114 let client_id = request.client_id.unwrap_or(self.client_id);
1115 let params = request.params;
1116 let clock = self.clock;
1117 let start_nanos = datetime_to_unix_nanos(start);
1118 let end_nanos = datetime_to_unix_nanos(end);
1119
1120 get_runtime().spawn(async move {
1121 match http
1122 .request_trades(instrument_id, start, end, limit)
1123 .await
1124 .context("failed to request trades from OKX")
1125 {
1126 Ok(trades) => {
1127 let response = DataResponse::Trades(TradesResponse::new(
1128 request_id,
1129 client_id,
1130 instrument_id,
1131 trades,
1132 start_nanos,
1133 end_nanos,
1134 clock.get_time_ns(),
1135 params,
1136 ));
1137
1138 if let Err(e) = sender.send(DataEvent::Response(response)) {
1139 log::error!("Failed to send trades response: {e}");
1140 }
1141 }
1142 Err(e) => log::error!("Trade request failed: {e:?}"),
1143 }
1144 });
1145
1146 Ok(())
1147 }
1148
1149 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1150 let http = self.http_client.clone();
1151 let sender = self.data_sender.clone();
1152 let bar_type = request.bar_type;
1153 let start = request.start;
1154 let end = request.end;
1155 let limit = request.limit.map(|n| n.get() as u32);
1156 let request_id = request.request_id;
1157 let client_id = request.client_id.unwrap_or(self.client_id);
1158 let params = request.params;
1159 let clock = self.clock;
1160 let start_nanos = datetime_to_unix_nanos(start);
1161 let end_nanos = datetime_to_unix_nanos(end);
1162
1163 get_runtime().spawn(async move {
1164 match http
1165 .request_bars(bar_type, start, end, limit)
1166 .await
1167 .context("failed to request bars from OKX")
1168 {
1169 Ok(bars) => {
1170 let response = DataResponse::Bars(BarsResponse::new(
1171 request_id,
1172 client_id,
1173 bar_type,
1174 bars,
1175 start_nanos,
1176 end_nanos,
1177 clock.get_time_ns(),
1178 params,
1179 ));
1180
1181 if let Err(e) = sender.send(DataEvent::Response(response)) {
1182 log::error!("Failed to send bars response: {e}");
1183 }
1184 }
1185 Err(e) => log::error!("Bar request failed: {e:?}"),
1186 }
1187 });
1188
1189 Ok(())
1190 }
1191
1192 fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1193 let http = self.http_client.clone();
1194 let sender = self.data_sender.clone();
1195 let instrument_id = request.instrument_id;
1196 let start = request.start;
1197 let end = request.end;
1198 let limit = request.limit.map(|n| n.get() as u32);
1199 let request_id = request.request_id;
1200 let client_id = request.client_id.unwrap_or(self.client_id);
1201 let params = request.params;
1202 let clock = self.clock;
1203 let start_nanos = datetime_to_unix_nanos(start);
1204 let end_nanos = datetime_to_unix_nanos(end);
1205
1206 get_runtime().spawn(async move {
1207 match http
1208 .request_funding_rates(instrument_id, start, end, limit)
1209 .await
1210 .context("failed to request funding rates from OKX")
1211 {
1212 Ok(funding_rates) => {
1213 let response = DataResponse::FundingRates(FundingRatesResponse::new(
1214 request_id,
1215 client_id,
1216 instrument_id,
1217 funding_rates,
1218 start_nanos,
1219 end_nanos,
1220 clock.get_time_ns(),
1221 params,
1222 ));
1223
1224 if let Err(e) = sender.send(DataEvent::Response(response)) {
1225 log::error!("Failed to send funding rates response: {e}");
1226 }
1227 }
1228 Err(e) => log::error!("Funding rates request failed: {e:?}"),
1229 }
1230 });
1231
1232 Ok(())
1233 }
1234}