Skip to main content

nautilus_okx/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Live market data client implementation for the OKX adapter.
17
18use std::sync::{
19    Arc, RwLock,
20    atomic::{AtomicBool, Ordering},
21};
22
23use ahash::AHashMap;
24use anyhow::Context;
25use futures_util::{StreamExt, pin_mut};
26use nautilus_common::{
27    clients::DataClient,
28    live::{runner::get_data_event_sender, runtime::get_runtime},
29    messages::{
30        DataEvent,
31        data::{
32            BarsResponse, BookResponse, DataResponse, FundingRatesResponse, InstrumentResponse,
33            InstrumentsResponse, RequestBars, RequestBookSnapshot, RequestFundingRates,
34            RequestInstrument, RequestInstruments, RequestTrades, SubscribeBars,
35            SubscribeBookDeltas, SubscribeFundingRates, SubscribeIndexPrices, SubscribeInstrument,
36            SubscribeInstrumentStatus, SubscribeInstruments, SubscribeMarkPrices, SubscribeQuotes,
37            SubscribeTrades, TradesResponse, UnsubscribeBars, UnsubscribeBookDeltas,
38            UnsubscribeFundingRates, UnsubscribeIndexPrices, UnsubscribeInstrumentStatus,
39            UnsubscribeMarkPrices, UnsubscribeQuotes, UnsubscribeTrades,
40        },
41    },
42};
43use nautilus_core::{
44    MUTEX_POISONED,
45    datetime::datetime_to_unix_nanos,
46    time::{AtomicTime, get_atomic_clock_realtime},
47};
48use nautilus_model::{
49    data::{Data, FundingRateUpdate, OrderBookDeltas_API},
50    enums::BookType,
51    identifiers::{ClientId, InstrumentId, Venue},
52    instruments::{Instrument, InstrumentAny},
53};
54use tokio::{task::JoinHandle, time::Duration};
55use tokio_util::sync::CancellationToken;
56
57use crate::{
58    common::{
59        consts::OKX_VENUE,
60        enums::{OKXBookChannel, OKXContractType, OKXInstrumentType, OKXVipLevel},
61        parse::okx_instrument_type_from_symbol,
62    },
63    config::OKXDataClientConfig,
64    http::client::OKXHttpClient,
65    websocket::{client::OKXWebSocketClient, messages::NautilusWsMessage},
66};
67
68#[derive(Debug)]
69pub struct OKXDataClient {
70    client_id: ClientId,
71    config: OKXDataClientConfig,
72    http_client: OKXHttpClient,
73    ws_public: Option<OKXWebSocketClient>,
74    ws_business: Option<OKXWebSocketClient>,
75    is_connected: AtomicBool,
76    cancellation_token: CancellationToken,
77    tasks: Vec<JoinHandle<()>>,
78    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
79    instruments: Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
80    book_channels: Arc<RwLock<AHashMap<InstrumentId, OKXBookChannel>>>,
81    clock: &'static AtomicTime,
82}
83
84impl OKXDataClient {
85    /// Creates a new [`OKXDataClient`] instance.
86    ///
87    /// # Errors
88    ///
89    /// Returns an error if the client fails to initialize.
90    pub fn new(client_id: ClientId, config: OKXDataClientConfig) -> anyhow::Result<Self> {
91        let clock = get_atomic_clock_realtime();
92        let data_sender = get_data_event_sender();
93
94        let http_client = if config.has_api_credentials() {
95            OKXHttpClient::with_credentials(
96                config.api_key.clone(),
97                config.api_secret.clone(),
98                config.api_passphrase.clone(),
99                config.base_url_http.clone(),
100                config.http_timeout_secs,
101                config.max_retries,
102                config.retry_delay_initial_ms,
103                config.retry_delay_max_ms,
104                config.is_demo,
105                config.http_proxy_url.clone(),
106            )?
107        } else {
108            OKXHttpClient::new(
109                config.base_url_http.clone(),
110                config.http_timeout_secs,
111                config.max_retries,
112                config.retry_delay_initial_ms,
113                config.retry_delay_max_ms,
114                config.is_demo,
115                config.http_proxy_url.clone(),
116            )?
117        };
118
119        let ws_public = OKXWebSocketClient::new(
120            Some(config.ws_public_url()),
121            None,
122            None,
123            None,
124            None,
125            Some(20), // Heartbeat
126        )
127        .context("failed to construct OKX public websocket client")?;
128
129        let ws_business = if config.requires_business_ws() {
130            Some(
131                OKXWebSocketClient::new(
132                    Some(config.ws_business_url()),
133                    config.api_key.clone(),
134                    config.api_secret.clone(),
135                    config.api_passphrase.clone(),
136                    None,
137                    Some(20), // Heartbeat
138                )
139                .context("failed to construct OKX business websocket client")?,
140            )
141        } else {
142            None
143        };
144
145        if let Some(vip_level) = config.vip_level {
146            ws_public.set_vip_level(vip_level);
147
148            if let Some(ref ws) = ws_business {
149                ws.set_vip_level(vip_level);
150            }
151        }
152
153        Ok(Self {
154            client_id,
155            config,
156            http_client,
157            ws_public: Some(ws_public),
158            ws_business,
159            is_connected: AtomicBool::new(false),
160            cancellation_token: CancellationToken::new(),
161            tasks: Vec::new(),
162            data_sender,
163            instruments: Arc::new(RwLock::new(AHashMap::new())),
164            book_channels: Arc::new(RwLock::new(AHashMap::new())),
165            clock,
166        })
167    }
168
169    fn venue(&self) -> Venue {
170        *OKX_VENUE
171    }
172
173    fn vip_level(&self) -> Option<OKXVipLevel> {
174        self.ws_public.as_ref().map(|ws| ws.vip_level())
175    }
176
177    fn public_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
178        self.ws_public
179            .as_ref()
180            .context("public websocket client not initialized")
181    }
182
183    fn business_ws(&self) -> anyhow::Result<&OKXWebSocketClient> {
184        self.ws_business
185            .as_ref()
186            .context("business websocket client not available (credentials required)")
187    }
188
189    fn send_data(sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>, data: Data) {
190        if let Err(e) = sender.send(DataEvent::Data(data)) {
191            log::error!("Failed to emit data event: {e}");
192        }
193    }
194
195    fn spawn_ws<F>(&self, fut: F, context: &'static str)
196    where
197        F: Future<Output = anyhow::Result<()>> + Send + 'static,
198    {
199        get_runtime().spawn(async move {
200            if let Err(e) = fut.await {
201                log::error!("{context}: {e:?}");
202            }
203        });
204    }
205
206    fn handle_ws_message(
207        message: NautilusWsMessage,
208        data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
209        instruments: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
210    ) {
211        match message {
212            NautilusWsMessage::Data(payloads) => {
213                for data in payloads {
214                    Self::send_data(data_sender, data);
215                }
216            }
217            NautilusWsMessage::Deltas(deltas) => {
218                Self::send_data(data_sender, Data::Deltas(OrderBookDeltas_API::new(deltas)));
219            }
220            NautilusWsMessage::FundingRates(updates) => {
221                emit_funding_rates(data_sender, updates);
222            }
223            NautilusWsMessage::Instrument(instrument, status) => {
224                upsert_instrument(instruments, *instrument);
225
226                if let Some(status) = status
227                    && let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status))
228                {
229                    log::error!("Failed to emit instrument status event: {e}");
230                }
231            }
232            NautilusWsMessage::InstrumentStatus(status) => {
233                if let Err(e) = data_sender.send(DataEvent::InstrumentStatus(status)) {
234                    log::error!("Failed to emit instrument status event: {e}");
235                }
236            }
237            NautilusWsMessage::AccountUpdate(_)
238            | NautilusWsMessage::PositionUpdate(_)
239            | NautilusWsMessage::OrderAccepted(_)
240            | NautilusWsMessage::OrderCanceled(_)
241            | NautilusWsMessage::OrderExpired(_)
242            | NautilusWsMessage::OrderRejected(_)
243            | NautilusWsMessage::OrderCancelRejected(_)
244            | NautilusWsMessage::OrderModifyRejected(_)
245            | NautilusWsMessage::OrderTriggered(_)
246            | NautilusWsMessage::OrderUpdated(_)
247            | NautilusWsMessage::ExecutionReports(_) => {
248                log::debug!("Ignoring trading message on data client");
249            }
250            NautilusWsMessage::Error(e) => {
251                log::error!("OKX websocket error: {e:?}");
252            }
253            NautilusWsMessage::Raw(value) => {
254                log::debug!("Unhandled websocket payload: {value:?}");
255            }
256            NautilusWsMessage::Reconnected => {
257                log::info!("Websocket reconnected");
258            }
259            NautilusWsMessage::Authenticated => {
260                log::debug!("Websocket authenticated");
261            }
262        }
263    }
264}
265
266fn emit_funding_rates(
267    sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
268    updates: Vec<FundingRateUpdate>,
269) {
270    for update in updates {
271        if let Err(e) = sender.send(DataEvent::FundingRate(update)) {
272            log::error!("Failed to emit funding rate event: {e}");
273        }
274    }
275}
276
277fn upsert_instrument(
278    cache: &Arc<RwLock<AHashMap<InstrumentId, InstrumentAny>>>,
279    instrument: InstrumentAny,
280) {
281    let mut guard = cache.write().expect(MUTEX_POISONED);
282    guard.insert(instrument.id(), instrument);
283}
284
285fn contract_filter_with_config(config: &OKXDataClientConfig, instrument: &InstrumentAny) -> bool {
286    contract_filter_with_config_types(config.contract_types.as_ref(), instrument)
287}
288
289fn contract_filter_with_config_types(
290    contract_types: Option<&Vec<OKXContractType>>,
291    instrument: &InstrumentAny,
292) -> bool {
293    match contract_types {
294        None => true,
295        Some(filter) if filter.is_empty() => true,
296        Some(filter) => {
297            let is_inverse = instrument.is_inverse();
298            (is_inverse && filter.contains(&OKXContractType::Inverse))
299                || (!is_inverse && filter.contains(&OKXContractType::Linear))
300        }
301    }
302}
303
304#[async_trait::async_trait(?Send)]
305impl DataClient for OKXDataClient {
306    fn client_id(&self) -> ClientId {
307        self.client_id
308    }
309
310    fn venue(&self) -> Option<Venue> {
311        Some(self.venue())
312    }
313
314    fn start(&mut self) -> anyhow::Result<()> {
315        log::info!(
316            "Started: client_id={}, vip_level={:?}, instrument_types={:?}, is_demo={}, http_proxy_url={:?}, ws_proxy_url={:?}",
317            self.client_id,
318            self.vip_level(),
319            self.config.instrument_types,
320            self.config.is_demo,
321            self.config.http_proxy_url,
322            self.config.ws_proxy_url,
323        );
324        Ok(())
325    }
326
327    fn stop(&mut self) -> anyhow::Result<()> {
328        log::info!("Stopping {id}", id = self.client_id);
329        self.cancellation_token.cancel();
330        self.is_connected.store(false, Ordering::Relaxed);
331        Ok(())
332    }
333
334    fn reset(&mut self) -> anyhow::Result<()> {
335        log::debug!("Resetting {id}", id = self.client_id);
336        self.is_connected.store(false, Ordering::Relaxed);
337        self.cancellation_token = CancellationToken::new();
338        self.tasks.clear();
339        self.book_channels
340            .write()
341            .expect("book channel cache lock poisoned")
342            .clear();
343        Ok(())
344    }
345
346    fn dispose(&mut self) -> anyhow::Result<()> {
347        log::debug!("Disposing {id}", id = self.client_id);
348        self.stop()
349    }
350
351    async fn connect(&mut self) -> anyhow::Result<()> {
352        if self.is_connected() {
353            return Ok(());
354        }
355
356        // Create fresh token so tasks from a previous connection cycle are not
357        // immediately cancelled (the old token may already be in cancelled state)
358        self.cancellation_token = CancellationToken::new();
359
360        let instrument_types = if self.config.instrument_types.is_empty() {
361            vec![OKXInstrumentType::Spot]
362        } else {
363            self.config.instrument_types.clone()
364        };
365
366        let mut all_instruments = Vec::new();
367        for inst_type in &instrument_types {
368            let (mut fetched, _inst_id_codes) = self
369                .http_client
370                .request_instruments(*inst_type, None)
371                .await
372                .with_context(|| format!("failed to request OKX instruments for {inst_type:?}"))?;
373
374            fetched.retain(|instrument| contract_filter_with_config(&self.config, instrument));
375            self.http_client.cache_instruments(fetched.clone());
376
377            let mut guard = self.instruments.write().expect(MUTEX_POISONED);
378            for instrument in &fetched {
379                guard.insert(instrument.id(), instrument.clone());
380            }
381            drop(guard);
382
383            all_instruments.extend(fetched);
384        }
385
386        for instrument in all_instruments {
387            if let Err(e) = self.data_sender.send(DataEvent::Instrument(instrument)) {
388                log::warn!("Failed to send instrument: {e}");
389            }
390        }
391
392        if let Some(ref mut ws) = self.ws_public {
393            // Cache instruments to websocket before connecting so handler has them
394            let instruments: Vec<_> = self
395                .instruments
396                .read()
397                .expect(MUTEX_POISONED)
398                .values()
399                .cloned()
400                .collect();
401            ws.cache_instruments(instruments);
402
403            ws.connect()
404                .await
405                .context("failed to connect OKX public websocket")?;
406            ws.wait_until_active(10.0)
407                .await
408                .context("public websocket did not become active")?;
409
410            let stream = ws.stream();
411            let sender = self.data_sender.clone();
412            let insts = self.instruments.clone();
413            let cancel = self.cancellation_token.clone();
414            let handle = get_runtime().spawn(async move {
415                pin_mut!(stream);
416                loop {
417                    tokio::select! {
418                        Some(message) = stream.next() => {
419                            Self::handle_ws_message(message, &sender, &insts);
420                        }
421                        () = cancel.cancelled() => {
422                            log::debug!("Public websocket stream task cancelled");
423                            break;
424                        }
425                    }
426                }
427            });
428            self.tasks.push(handle);
429
430            for inst_type in &instrument_types {
431                ws.subscribe_instruments(*inst_type)
432                    .await
433                    .with_context(|| {
434                        format!("failed to subscribe to instrument type {inst_type:?}")
435                    })?;
436            }
437        }
438
439        if let Some(ref mut ws) = self.ws_business {
440            // Cache instruments to websocket before connecting so handler has them
441            let instruments: Vec<_> = self
442                .instruments
443                .read()
444                .expect(MUTEX_POISONED)
445                .values()
446                .cloned()
447                .collect();
448            ws.cache_instruments(instruments);
449
450            ws.connect()
451                .await
452                .context("failed to connect OKX business websocket")?;
453            ws.wait_until_active(10.0)
454                .await
455                .context("business websocket did not become active")?;
456
457            let stream = ws.stream();
458            let sender = self.data_sender.clone();
459            let insts = self.instruments.clone();
460            let cancel = self.cancellation_token.clone();
461            let handle = get_runtime().spawn(async move {
462                pin_mut!(stream);
463                loop {
464                    tokio::select! {
465                        Some(message) = stream.next() => {
466                            Self::handle_ws_message(message, &sender, &insts);
467                        }
468                        () = cancel.cancelled() => {
469                            log::debug!("Business websocket stream task cancelled");
470                            break;
471                        }
472                    }
473                }
474            });
475            self.tasks.push(handle);
476        }
477
478        self.is_connected.store(true, Ordering::Release);
479        log::info!("Connected: client_id={}", self.client_id);
480        Ok(())
481    }
482
483    async fn disconnect(&mut self) -> anyhow::Result<()> {
484        if self.is_disconnected() {
485            return Ok(());
486        }
487
488        self.cancellation_token.cancel();
489
490        if let Some(ref ws) = self.ws_public
491            && let Err(e) = ws.unsubscribe_all().await
492        {
493            log::warn!("Failed to unsubscribe all from public websocket: {e:?}");
494        }
495
496        if let Some(ref ws) = self.ws_business
497            && let Err(e) = ws.unsubscribe_all().await
498        {
499            log::warn!("Failed to unsubscribe all from business websocket: {e:?}");
500        }
501
502        // Allow time for unsubscribe confirmations
503        tokio::time::sleep(Duration::from_millis(500)).await;
504
505        if let Some(ref mut ws) = self.ws_public {
506            let _ = ws.close().await;
507        }
508
509        if let Some(ref mut ws) = self.ws_business {
510            let _ = ws.close().await;
511        }
512
513        let handles: Vec<_> = self.tasks.drain(..).collect();
514        for handle in handles {
515            if let Err(e) = handle.await {
516                log::error!("Error joining websocket task: {e}");
517            }
518        }
519
520        self.book_channels.write().expect(MUTEX_POISONED).clear();
521        self.is_connected.store(false, Ordering::Release);
522        log::info!("Disconnected: client_id={}", self.client_id);
523        Ok(())
524    }
525
526    fn is_connected(&self) -> bool {
527        self.is_connected.load(Ordering::Relaxed)
528    }
529
530    fn is_disconnected(&self) -> bool {
531        !self.is_connected()
532    }
533
534    fn subscribe_instruments(&mut self, _cmd: &SubscribeInstruments) -> anyhow::Result<()> {
535        for inst_type in &self.config.instrument_types {
536            let ws = self.public_ws()?.clone();
537            let inst_type = *inst_type;
538
539            self.spawn_ws(
540                async move {
541                    ws.subscribe_instruments(inst_type)
542                        .await
543                        .context("instruments subscription")?;
544                    Ok(())
545                },
546                "subscribe_instruments",
547            );
548        }
549        Ok(())
550    }
551
552    fn subscribe_instrument(&mut self, cmd: &SubscribeInstrument) -> anyhow::Result<()> {
553        // OKX instruments channel doesn't support subscribing to individual instruments via instId
554        // Instead, subscribe to the instrument type if not already subscribed
555        let instrument_id = cmd.instrument_id;
556        let ws = self.public_ws()?.clone();
557
558        self.spawn_ws(
559            async move {
560                ws.subscribe_instrument(instrument_id)
561                    .await
562                    .context("instrument type subscription")?;
563                Ok(())
564            },
565            "subscribe_instrument",
566        );
567        Ok(())
568    }
569
570    fn subscribe_book_deltas(&mut self, cmd: &SubscribeBookDeltas) -> anyhow::Result<()> {
571        if cmd.book_type != BookType::L2_MBP {
572            anyhow::bail!("OKX only supports L2_MBP order book deltas");
573        }
574
575        let depth = cmd.depth.map_or(0, |d| d.get());
576        if !matches!(depth, 0 | 50 | 400) {
577            anyhow::bail!("invalid depth {depth}; valid values are 50 or 400");
578        }
579
580        let vip = self.vip_level().unwrap_or(OKXVipLevel::Vip0);
581        let channel = match depth {
582            50 => {
583                if vip < OKXVipLevel::Vip4 {
584                    anyhow::bail!(
585                        "VIP level {vip} insufficient for 50 depth subscription (requires VIP4)"
586                    );
587                }
588                OKXBookChannel::Books50L2Tbt
589            }
590            0 | 400 => {
591                if vip >= OKXVipLevel::Vip5 {
592                    OKXBookChannel::BookL2Tbt
593                } else {
594                    OKXBookChannel::Book
595                }
596            }
597            _ => unreachable!(),
598        };
599
600        let instrument_id = cmd.instrument_id;
601        let ws = self.public_ws()?.clone();
602        let book_channels = Arc::clone(&self.book_channels);
603
604        self.spawn_ws(
605            async move {
606                match channel {
607                    OKXBookChannel::Books50L2Tbt => ws
608                        .subscribe_book50_l2_tbt(instrument_id)
609                        .await
610                        .context("books50-l2-tbt subscription")?,
611                    OKXBookChannel::BookL2Tbt => ws
612                        .subscribe_book_l2_tbt(instrument_id)
613                        .await
614                        .context("books-l2-tbt subscription")?,
615                    OKXBookChannel::Book => ws
616                        .subscribe_books_channel(instrument_id)
617                        .await
618                        .context("books subscription")?,
619                }
620                book_channels
621                    .write()
622                    .expect("book channel cache lock poisoned")
623                    .insert(instrument_id, channel);
624                Ok(())
625            },
626            "order book delta subscription",
627        );
628
629        Ok(())
630    }
631
632    fn subscribe_quotes(&mut self, cmd: &SubscribeQuotes) -> anyhow::Result<()> {
633        let ws = self.public_ws()?.clone();
634        let instrument_id = cmd.instrument_id;
635
636        self.spawn_ws(
637            async move {
638                ws.subscribe_quotes(instrument_id)
639                    .await
640                    .context("quotes subscription")
641            },
642            "quote subscription",
643        );
644        Ok(())
645    }
646
647    fn subscribe_trades(&mut self, cmd: &SubscribeTrades) -> anyhow::Result<()> {
648        let ws = self.public_ws()?.clone();
649        let instrument_id = cmd.instrument_id;
650
651        self.spawn_ws(
652            async move {
653                ws.subscribe_trades(instrument_id, false)
654                    .await
655                    .context("trades subscription")
656            },
657            "trade subscription",
658        );
659        Ok(())
660    }
661
662    fn subscribe_mark_prices(&mut self, cmd: &SubscribeMarkPrices) -> anyhow::Result<()> {
663        let ws = self.public_ws()?.clone();
664        let instrument_id = cmd.instrument_id;
665
666        self.spawn_ws(
667            async move {
668                ws.subscribe_mark_prices(instrument_id)
669                    .await
670                    .context("mark price subscription")
671            },
672            "mark price subscription",
673        );
674        Ok(())
675    }
676
677    fn subscribe_index_prices(&mut self, cmd: &SubscribeIndexPrices) -> anyhow::Result<()> {
678        let ws = self.public_ws()?.clone();
679        let instrument_id = cmd.instrument_id;
680
681        self.spawn_ws(
682            async move {
683                ws.subscribe_index_prices(instrument_id)
684                    .await
685                    .context("index price subscription")
686            },
687            "index price subscription",
688        );
689        Ok(())
690    }
691
692    fn subscribe_bars(&mut self, cmd: &SubscribeBars) -> anyhow::Result<()> {
693        let ws = self.business_ws()?.clone();
694        let bar_type = cmd.bar_type;
695
696        self.spawn_ws(
697            async move {
698                ws.subscribe_bars(bar_type)
699                    .await
700                    .context("bars subscription")
701            },
702            "bar subscription",
703        );
704        Ok(())
705    }
706
707    fn subscribe_funding_rates(&mut self, cmd: &SubscribeFundingRates) -> anyhow::Result<()> {
708        let ws = self.public_ws()?.clone();
709        let instrument_id = cmd.instrument_id;
710
711        self.spawn_ws(
712            async move {
713                ws.subscribe_funding_rates(instrument_id)
714                    .await
715                    .context("funding rate subscription")
716            },
717            "funding rate subscription",
718        );
719        Ok(())
720    }
721
722    fn subscribe_instrument_status(
723        &mut self,
724        cmd: &SubscribeInstrumentStatus,
725    ) -> anyhow::Result<()> {
726        let ws = self.public_ws()?.clone();
727        let instrument_id = cmd.instrument_id;
728
729        self.spawn_ws(
730            async move {
731                ws.subscribe_instrument(instrument_id)
732                    .await
733                    .context("instrument status subscription")
734            },
735            "instrument status subscription",
736        );
737        Ok(())
738    }
739
740    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
741        let ws = self.public_ws()?.clone();
742        let instrument_id = cmd.instrument_id;
743        let channel = self
744            .book_channels
745            .write()
746            .expect("book channel cache lock poisoned")
747            .remove(&instrument_id);
748
749        self.spawn_ws(
750            async move {
751                match channel {
752                    Some(OKXBookChannel::Books50L2Tbt) => ws
753                        .unsubscribe_book50_l2_tbt(instrument_id)
754                        .await
755                        .context("books50-l2-tbt unsubscribe")?,
756                    Some(OKXBookChannel::BookL2Tbt) => ws
757                        .unsubscribe_book_l2_tbt(instrument_id)
758                        .await
759                        .context("books-l2-tbt unsubscribe")?,
760                    Some(OKXBookChannel::Book) => ws
761                        .unsubscribe_book(instrument_id)
762                        .await
763                        .context("book unsubscribe")?,
764                    None => {
765                        log::warn!(
766                            "Book channel not found for {instrument_id}; unsubscribing fallback channel"
767                        );
768                        ws.unsubscribe_book(instrument_id)
769                            .await
770                            .context("book fallback unsubscribe")?;
771                    }
772                }
773                Ok(())
774            },
775            "order book unsubscribe",
776        );
777        Ok(())
778    }
779
780    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
781        let ws = self.public_ws()?.clone();
782        let instrument_id = cmd.instrument_id;
783
784        self.spawn_ws(
785            async move {
786                ws.unsubscribe_quotes(instrument_id)
787                    .await
788                    .context("quotes unsubscribe")
789            },
790            "quote unsubscribe",
791        );
792        Ok(())
793    }
794
795    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
796        let ws = self.public_ws()?.clone();
797        let instrument_id = cmd.instrument_id;
798
799        self.spawn_ws(
800            async move {
801                ws.unsubscribe_trades(instrument_id, false) // TODO: Aggregated trades?
802                    .await
803                    .context("trades unsubscribe")
804            },
805            "trade unsubscribe",
806        );
807        Ok(())
808    }
809
810    fn unsubscribe_mark_prices(&mut self, cmd: &UnsubscribeMarkPrices) -> anyhow::Result<()> {
811        let ws = self.public_ws()?.clone();
812        let instrument_id = cmd.instrument_id;
813
814        self.spawn_ws(
815            async move {
816                ws.unsubscribe_mark_prices(instrument_id)
817                    .await
818                    .context("mark price unsubscribe")
819            },
820            "mark price unsubscribe",
821        );
822        Ok(())
823    }
824
825    fn unsubscribe_index_prices(&mut self, cmd: &UnsubscribeIndexPrices) -> anyhow::Result<()> {
826        let ws = self.public_ws()?.clone();
827        let instrument_id = cmd.instrument_id;
828
829        self.spawn_ws(
830            async move {
831                ws.unsubscribe_index_prices(instrument_id)
832                    .await
833                    .context("index price unsubscribe")
834            },
835            "index price unsubscribe",
836        );
837        Ok(())
838    }
839
840    fn unsubscribe_bars(&mut self, cmd: &UnsubscribeBars) -> anyhow::Result<()> {
841        let ws = self.business_ws()?.clone();
842        let bar_type = cmd.bar_type;
843
844        self.spawn_ws(
845            async move {
846                ws.unsubscribe_bars(bar_type)
847                    .await
848                    .context("bars unsubscribe")
849            },
850            "bar unsubscribe",
851        );
852        Ok(())
853    }
854
855    fn unsubscribe_funding_rates(&mut self, cmd: &UnsubscribeFundingRates) -> anyhow::Result<()> {
856        let ws = self.public_ws()?.clone();
857        let instrument_id = cmd.instrument_id;
858
859        self.spawn_ws(
860            async move {
861                ws.unsubscribe_funding_rates(instrument_id)
862                    .await
863                    .context("funding rate unsubscribe")
864            },
865            "funding rate unsubscribe",
866        );
867        Ok(())
868    }
869
870    fn unsubscribe_instrument_status(
871        &mut self,
872        cmd: &UnsubscribeInstrumentStatus,
873    ) -> anyhow::Result<()> {
874        let ws = self.public_ws()?.clone();
875        let instrument_id = cmd.instrument_id;
876
877        self.spawn_ws(
878            async move {
879                ws.unsubscribe_instrument(instrument_id)
880                    .await
881                    .context("instrument status unsubscription")
882            },
883            "instrument status unsubscription",
884        );
885        Ok(())
886    }
887
888    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
889        let http = self.http_client.clone();
890        let sender = self.data_sender.clone();
891        let instruments_cache = self.instruments.clone();
892        let request_id = request.request_id;
893        let client_id = request.client_id.unwrap_or(self.client_id);
894        let venue = self.venue();
895        let start = request.start;
896        let end = request.end;
897        let params = request.params;
898        let clock = self.clock;
899        let start_nanos = datetime_to_unix_nanos(start);
900        let end_nanos = datetime_to_unix_nanos(end);
901        let instrument_types = if self.config.instrument_types.is_empty() {
902            vec![OKXInstrumentType::Spot]
903        } else {
904            self.config.instrument_types.clone()
905        };
906        let contract_types = self.config.contract_types.clone();
907        let instrument_families = self.config.instrument_families.clone();
908
909        get_runtime().spawn(async move {
910            let mut all_instruments = Vec::new();
911
912            for inst_type in instrument_types {
913                let supports_family = matches!(
914                    inst_type,
915                    OKXInstrumentType::Futures
916                        | OKXInstrumentType::Swap
917                        | OKXInstrumentType::Option
918                );
919
920                let families = match (&instrument_families, inst_type, supports_family) {
921                    (Some(families), OKXInstrumentType::Option, true) => families.clone(),
922                    (Some(families), _, true) => families.clone(),
923                    (None, OKXInstrumentType::Option, _) => {
924                        log::warn!(
925                            "Skipping OPTION type: instrument_families required but not configured"
926                        );
927                        continue;
928                    }
929                    _ => vec![],
930                };
931
932                if families.is_empty() {
933                    match http.request_instruments(inst_type, None).await {
934                        Ok((instruments, _inst_id_codes)) => {
935                            for instrument in instruments {
936                                if !contract_filter_with_config_types(
937                                    contract_types.as_ref(),
938                                    &instrument,
939                                ) {
940                                    continue;
941                                }
942
943                                upsert_instrument(&instruments_cache, instrument.clone());
944                                all_instruments.push(instrument);
945                            }
946                        }
947                        Err(e) => {
948                            log::error!("Failed to fetch instruments for {inst_type:?}: {e:?}");
949                        }
950                    }
951                } else {
952                    for family in families {
953                        match http
954                            .request_instruments(inst_type, Some(family.clone()))
955                            .await
956                        {
957                            Ok((instruments, _inst_id_codes)) => {
958                                for instrument in instruments {
959                                    if !contract_filter_with_config_types(
960                                        contract_types.as_ref(),
961                                        &instrument,
962                                    ) {
963                                        continue;
964                                    }
965
966                                    upsert_instrument(&instruments_cache, instrument.clone());
967                                    all_instruments.push(instrument);
968                                }
969                            }
970                            Err(e) => {
971                                log::error!(
972                                    "Failed to fetch instruments for {inst_type:?} family {family}: {e:?}"
973                                );
974                            }
975                        }
976                    }
977                }
978            }
979
980            let response = DataResponse::Instruments(InstrumentsResponse::new(
981                request_id,
982                client_id,
983                venue,
984                all_instruments,
985                start_nanos,
986                end_nanos,
987                clock.get_time_ns(),
988                params,
989            ));
990
991            if let Err(e) = sender.send(DataEvent::Response(response)) {
992                log::error!("Failed to send instruments response: {e}");
993            }
994        });
995
996        Ok(())
997    }
998
999    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
1000        let http = self.http_client.clone();
1001        let sender = self.data_sender.clone();
1002        let instruments = self.instruments.clone();
1003        let instrument_id = request.instrument_id;
1004        let request_id = request.request_id;
1005        let client_id = request.client_id.unwrap_or(self.client_id);
1006        let start = request.start;
1007        let end = request.end;
1008        let params = request.params;
1009        let clock = self.clock;
1010        let start_nanos = datetime_to_unix_nanos(start);
1011        let end_nanos = datetime_to_unix_nanos(end);
1012        let instrument_types = if self.config.instrument_types.is_empty() {
1013            vec![OKXInstrumentType::Spot]
1014        } else {
1015            self.config.instrument_types.clone()
1016        };
1017        let contract_types = self.config.contract_types.clone();
1018
1019        get_runtime().spawn(async move {
1020            match http
1021                .request_instrument(instrument_id)
1022                .await
1023                .context("fetch instrument from API")
1024            {
1025                Ok(instrument) => {
1026                    let inst_id = instrument.id();
1027                    let symbol = inst_id.symbol.as_str();
1028                    let inst_type = okx_instrument_type_from_symbol(symbol);
1029                    if !instrument_types.contains(&inst_type) {
1030                        log::error!(
1031                            "Instrument {instrument_id} type {inst_type:?} not in configured types {instrument_types:?}"
1032                        );
1033                        return;
1034                    }
1035
1036                    if !contract_filter_with_config_types(contract_types.as_ref(), &instrument) {
1037                        log::error!(
1038                            "Instrument {instrument_id} filtered out by contract_types config"
1039                        );
1040                        return;
1041                    }
1042
1043                    upsert_instrument(&instruments, instrument.clone());
1044
1045                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
1046                        request_id,
1047                        client_id,
1048                        instrument.id(),
1049                        instrument,
1050                        start_nanos,
1051                        end_nanos,
1052                        clock.get_time_ns(),
1053                        params,
1054                    )));
1055
1056                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1057                        log::error!("Failed to send instrument response: {e}");
1058                    }
1059                }
1060                Err(e) => log::error!("Instrument request failed: {e:?}"),
1061            }
1062        });
1063
1064        Ok(())
1065    }
1066
1067    fn request_book_snapshot(&self, request: RequestBookSnapshot) -> anyhow::Result<()> {
1068        let http = self.http_client.clone();
1069        let sender = self.data_sender.clone();
1070        let instrument_id = request.instrument_id;
1071        let depth = request.depth.map(|n| n.get() as u32);
1072        let request_id = request.request_id;
1073        let client_id = request.client_id.unwrap_or(self.client_id);
1074        let params = request.params;
1075        let clock = self.clock;
1076
1077        get_runtime().spawn(async move {
1078            match http
1079                .request_book_snapshot(instrument_id, depth)
1080                .await
1081                .context("failed to request book snapshot from OKX")
1082            {
1083                Ok(book) => {
1084                    let response = DataResponse::Book(BookResponse::new(
1085                        request_id,
1086                        client_id,
1087                        instrument_id,
1088                        book,
1089                        None,
1090                        None,
1091                        clock.get_time_ns(),
1092                        params,
1093                    ));
1094
1095                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1096                        log::error!("Failed to send book snapshot response: {e}");
1097                    }
1098                }
1099                Err(e) => log::error!("Book snapshot request failed: {e:?}"),
1100            }
1101        });
1102
1103        Ok(())
1104    }
1105
1106    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
1107        let http = self.http_client.clone();
1108        let sender = self.data_sender.clone();
1109        let instrument_id = request.instrument_id;
1110        let start = request.start;
1111        let end = request.end;
1112        let limit = request.limit.map(|n| n.get() as u32);
1113        let request_id = request.request_id;
1114        let client_id = request.client_id.unwrap_or(self.client_id);
1115        let params = request.params;
1116        let clock = self.clock;
1117        let start_nanos = datetime_to_unix_nanos(start);
1118        let end_nanos = datetime_to_unix_nanos(end);
1119
1120        get_runtime().spawn(async move {
1121            match http
1122                .request_trades(instrument_id, start, end, limit)
1123                .await
1124                .context("failed to request trades from OKX")
1125            {
1126                Ok(trades) => {
1127                    let response = DataResponse::Trades(TradesResponse::new(
1128                        request_id,
1129                        client_id,
1130                        instrument_id,
1131                        trades,
1132                        start_nanos,
1133                        end_nanos,
1134                        clock.get_time_ns(),
1135                        params,
1136                    ));
1137
1138                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1139                        log::error!("Failed to send trades response: {e}");
1140                    }
1141                }
1142                Err(e) => log::error!("Trade request failed: {e:?}"),
1143            }
1144        });
1145
1146        Ok(())
1147    }
1148
1149    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
1150        let http = self.http_client.clone();
1151        let sender = self.data_sender.clone();
1152        let bar_type = request.bar_type;
1153        let start = request.start;
1154        let end = request.end;
1155        let limit = request.limit.map(|n| n.get() as u32);
1156        let request_id = request.request_id;
1157        let client_id = request.client_id.unwrap_or(self.client_id);
1158        let params = request.params;
1159        let clock = self.clock;
1160        let start_nanos = datetime_to_unix_nanos(start);
1161        let end_nanos = datetime_to_unix_nanos(end);
1162
1163        get_runtime().spawn(async move {
1164            match http
1165                .request_bars(bar_type, start, end, limit)
1166                .await
1167                .context("failed to request bars from OKX")
1168            {
1169                Ok(bars) => {
1170                    let response = DataResponse::Bars(BarsResponse::new(
1171                        request_id,
1172                        client_id,
1173                        bar_type,
1174                        bars,
1175                        start_nanos,
1176                        end_nanos,
1177                        clock.get_time_ns(),
1178                        params,
1179                    ));
1180
1181                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1182                        log::error!("Failed to send bars response: {e}");
1183                    }
1184                }
1185                Err(e) => log::error!("Bar request failed: {e:?}"),
1186            }
1187        });
1188
1189        Ok(())
1190    }
1191
1192    fn request_funding_rates(&self, request: RequestFundingRates) -> anyhow::Result<()> {
1193        let http = self.http_client.clone();
1194        let sender = self.data_sender.clone();
1195        let instrument_id = request.instrument_id;
1196        let start = request.start;
1197        let end = request.end;
1198        let limit = request.limit.map(|n| n.get() as u32);
1199        let request_id = request.request_id;
1200        let client_id = request.client_id.unwrap_or(self.client_id);
1201        let params = request.params;
1202        let clock = self.clock;
1203        let start_nanos = datetime_to_unix_nanos(start);
1204        let end_nanos = datetime_to_unix_nanos(end);
1205
1206        get_runtime().spawn(async move {
1207            match http
1208                .request_funding_rates(instrument_id, start, end, limit)
1209                .await
1210                .context("failed to request funding rates from OKX")
1211            {
1212                Ok(funding_rates) => {
1213                    let response = DataResponse::FundingRates(FundingRatesResponse::new(
1214                        request_id,
1215                        client_id,
1216                        instrument_id,
1217                        funding_rates,
1218                        start_nanos,
1219                        end_nanos,
1220                        clock.get_time_ns(),
1221                        params,
1222                    ));
1223
1224                    if let Err(e) = sender.send(DataEvent::Response(response)) {
1225                        log::error!("Failed to send funding rates response: {e}");
1226                    }
1227                }
1228                Err(e) => log::error!("Funding rates request failed: {e:?}"),
1229            }
1230        });
1231
1232        Ok(())
1233    }
1234}