Skip to main content

nautilus_dydx/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the dYdX WebSocket client.
17
18use std::{
19    sync::atomic::Ordering,
20    time::{Duration, Instant},
21};
22
23use dashmap::DashMap;
24use nautilus_common::live::get_runtime;
25use nautilus_core::{
26    UUID4,
27    python::{call_python_threadsafe, to_pyvalue_err},
28    time::get_atomic_clock_realtime,
29};
30use nautilus_model::{
31    data::{BarType, Data, OrderBookDeltas_API},
32    enums::AccountType,
33    events::AccountState,
34    identifiers::{AccountId, InstrumentId},
35    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
36    types::{AccountBalance, Currency, Money},
37};
38use nautilus_network::mode::ConnectionMode;
39use pyo3::{IntoPyObjectExt, prelude::*, types::PyDict};
40
41use crate::{
42    common::{credential::DydxCredential, enums::DydxCandleResolution, parse::extract_raw_symbol},
43    execution::types::OrderContext,
44    http::{client::DydxHttpClient, parse::parse_account_state},
45    python::encoder::PyDydxClientOrderIdEncoder,
46    websocket::{
47        client::DydxWebSocketClient,
48        enums::NautilusWsMessage,
49        handler::HandlerCommand,
50        parse::{parse_ws_fill_report, parse_ws_order_report, parse_ws_position_report},
51    },
52};
53
54#[pymethods]
55impl DydxWebSocketClient {
56    #[staticmethod]
57    #[pyo3(name = "new_public")]
58    fn py_new_public(url: String, heartbeat: Option<u64>) -> Self {
59        Self::new_public(url, heartbeat)
60    }
61
62    #[staticmethod]
63    #[pyo3(name = "new_private")]
64    fn py_new_private(
65        url: String,
66        private_key: String,
67        authenticator_ids: Vec<u64>,
68        account_id: AccountId,
69        heartbeat: Option<u64>,
70    ) -> PyResult<Self> {
71        let credential = DydxCredential::from_private_key(&private_key, authenticator_ids)
72            .map_err(to_pyvalue_err)?;
73        Ok(Self::new_private(url, credential, account_id, heartbeat))
74    }
75
76    #[pyo3(name = "is_connected")]
77    fn py_is_connected(&self) -> bool {
78        self.is_connected()
79    }
80
81    #[pyo3(name = "set_bars_timestamp_on_close")]
82    fn py_set_bars_timestamp_on_close(&mut self, value: bool) {
83        self.set_bars_timestamp_on_close(value);
84    }
85
86    #[pyo3(name = "set_account_id")]
87    fn py_set_account_id(&mut self, account_id: AccountId) {
88        self.set_account_id(account_id);
89    }
90
91    /// Share the HTTP client's instrument cache with this WebSocket client.
92    ///
93    /// The HTTP client's cache includes CLOB pair ID and market ticker indices
94    /// needed for parsing SubaccountsChannelData into typed execution events.
95    /// Must be called before `connect()`.
96    #[pyo3(name = "share_instrument_cache")]
97    fn py_share_instrument_cache(&mut self, http_client: &DydxHttpClient) {
98        self.set_instrument_cache(http_client.instrument_cache().clone());
99    }
100
101    #[pyo3(name = "account_id")]
102    fn py_account_id(&self) -> Option<AccountId> {
103        self.account_id()
104    }
105
106    /// Returns the shared client order ID encoder.
107    #[pyo3(name = "encoder")]
108    fn py_encoder(&self) -> PyDydxClientOrderIdEncoder {
109        PyDydxClientOrderIdEncoder::from_arc(self.encoder().clone())
110    }
111
112    #[getter]
113    fn py_url(&self) -> String {
114        self.url().to_string()
115    }
116
117    #[pyo3(name = "connect")]
118    fn py_connect<'py>(
119        &mut self,
120        py: Python<'py>,
121        loop_: Py<PyAny>,
122        instruments: Vec<Py<PyAny>>,
123        callback: Py<PyAny>,
124    ) -> PyResult<Bound<'py, PyAny>> {
125        let call_soon = loop_.getattr(py, "call_soon_threadsafe")?;
126
127        let mut instruments_any = Vec::new();
128        for inst in instruments {
129            let inst_any = pyobject_to_instrument_any(py, inst)?;
130            instruments_any.push(inst_any);
131        }
132
133        self.cache_instruments(instruments_any);
134
135        let mut client = self.clone();
136
137        pyo3_async_runtimes::tokio::future_into_py(py, async move {
138            client.connect().await.map_err(to_pyvalue_err)?;
139
140            if let Some(mut rx) = client.take_receiver() {
141                get_runtime().spawn(async move {
142                    let _client = client; // Keep client alive in spawned task
143                    let clock = get_atomic_clock_realtime();
144                    let order_contexts: DashMap<u32, OrderContext> = DashMap::new();
145                    let order_id_map: DashMap<String, (u32, u32)> = DashMap::new();
146
147                    while let Some(msg) = rx.recv().await {
148                        match msg {
149                            NautilusWsMessage::Data(items) => {
150                                Python::attach(|py| {
151                                    for data in items {
152                                        let py_obj = data_to_pycapsule(py, data);
153                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
154                                    }
155                                });
156                            }
157                            NautilusWsMessage::Deltas(deltas) => {
158                                Python::attach(|py| {
159                                    let data = Data::Deltas(OrderBookDeltas_API::new(*deltas));
160                                    let py_obj = data_to_pycapsule(py, data);
161                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
162                                });
163                            }
164                            NautilusWsMessage::BlockHeight { height, time } => {
165                                Python::attach(|py| {
166                                    let dict = PyDict::new(py);
167                                    let _ = dict.set_item("type", "block_height");
168                                    let _ = dict.set_item("height", height);
169                                    let _ = dict.set_item("time", time.to_rfc3339());
170                                    if let Ok(py_obj) = dict.into_py_any(py) {
171                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
172                                    }
173                                });
174                            }
175                            NautilusWsMessage::SubaccountSubscribed(data) => {
176                                let Some(account_id) = _client.account_id() else {
177                                    log::warn!("Cannot parse subaccount subscription: account_id not set");
178                                    continue;
179                                };
180
181                                let instrument_cache = _client.instrument_cache();
182                                let ts_init = clock.get_time_ns();
183
184                                let inst_map = instrument_cache.to_instrument_id_map();
185                                let oracle_map = instrument_cache.to_oracle_prices_map();
186
187                                if let Some(ref subaccount) = data.contents.subaccount {
188                                    match parse_account_state(
189                                        subaccount,
190                                        account_id,
191                                        &inst_map,
192                                        &oracle_map,
193                                        ts_init,
194                                        ts_init,
195                                    ) {
196                                        Ok(account_state) => {
197                                            Python::attach(|py| {
198                                                match account_state.into_py_any(py) {
199                                                    Ok(py_obj) => {
200                                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
201                                                    }
202                                                    Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
203                                                }
204                                            });
205                                        }
206                                    Err(e) => log::error!("Failed to parse account state: {e}"),
207                                }
208
209                                if let Some(ref positions) = subaccount.open_perpetual_positions {
210                                    for (market, ws_position) in positions {
211                                        match parse_ws_position_report(
212                                            ws_position,
213                                            instrument_cache,
214                                            account_id,
215                                            ts_init,
216                                        ) {
217                                            Ok(report) => {
218                                                Python::attach(|py| {
219                                                    match pyo3::Py::new(py, report) {
220                                                        Ok(py_obj) => {
221                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
222                                                        }
223                                                        Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
224                                                    }
225                                                });
226                                            }
227                                            Err(e) => log::error!("Failed to parse position for {market}: {e}"),
228                                        }
229                                    }
230                                }
231                                } else {
232                                    log::warn!("Subaccount subscription without initial state (new/empty subaccount)");
233
234                                    // Emit zero-balance account state so account gets registered
235                                    let currency = Currency::get_or_create_crypto_with_context("USDC", None);
236                                    let zero = Money::zero(currency);
237                                    let balance = AccountBalance::new_checked(zero, zero, zero)
238                                        .expect("zero balance should always be valid");
239                                    let account_state = AccountState::new(
240                                        account_id,
241                                        AccountType::Margin,
242                                        vec![balance],
243                                        vec![],
244                                        true,
245                                        UUID4::new(),
246                                        ts_init,
247                                        ts_init,
248                                        None,
249                                    );
250                                    Python::attach(|py| {
251                                        match account_state.into_py_any(py) {
252                                            Ok(py_obj) => {
253                                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
254                                            }
255                                            Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
256                                        }
257                                    });
258                                }
259                            }
260                            NautilusWsMessage::SubaccountsChannelData(data) => {
261                                let Some(account_id) = _client.account_id() else {
262                                    log::warn!("Cannot parse SubaccountsChannelData: account_id not set");
263                                    continue;
264                                };
265
266                                let instrument_cache = _client.instrument_cache();
267                                let encoder = _client.encoder();
268                                let ts_init = clock.get_time_ns();
269
270                                let mut terminal_orders: Vec<(u32, u32, String)> = Vec::new();
271
272                                // Phase 1: Parse orders and build order_id_map (needed for fill correlation)
273                                // but DON'T send order reports yet — fills must be sent first
274                                // to prevent reconciliation from inferring fills at the limit price.
275                                let mut pending_order_reports = Vec::new();
276
277                                if let Some(ref orders) = data.contents.orders {
278                                    for ws_order in orders {
279                                        // Build order_id → (client_id, client_metadata) for fill correlation
280                                        if let Ok(client_id_u32) = ws_order.client_id.parse::<u32>() {
281                                            let client_meta = ws_order.client_metadata
282                                                .as_ref()
283                                                .and_then(|s| s.parse::<u32>().ok())
284                                                .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
285                                            order_id_map.insert(ws_order.id.clone(), (client_id_u32, client_meta));
286                                        }
287
288                                        match parse_ws_order_report(
289                                            ws_order,
290                                            instrument_cache,
291                                            &order_contexts,
292                                            encoder,
293                                            account_id,
294                                            ts_init,
295                                        ) {
296                                            Ok(report) => {
297                                                if !report.order_status.is_open()
298                                                    && let Ok(cid) = ws_order.client_id.parse::<u32>()
299                                                {
300                                                    let meta = ws_order.client_metadata
301                                                        .as_ref()
302                                                        .and_then(|s| s.parse::<u32>().ok())
303                                                        .unwrap_or(crate::grpc::DEFAULT_RUST_CLIENT_METADATA);
304                                                    terminal_orders.push((cid, meta, ws_order.id.clone()));
305                                                }
306                                                pending_order_reports.push(report);
307                                            }
308                                            Err(e) => log::error!("Failed to parse WS order: {e}"),
309                                        }
310                                    }
311                                }
312
313                                // Phase 2: Send fills FIRST so reconciliation sees them before
314                                // the terminal order status (prevents inferred fills at limit price)
315                                if let Some(ref fills) = data.contents.fills {
316                                    for ws_fill in fills {
317                                        match parse_ws_fill_report(
318                                            ws_fill,
319                                            instrument_cache,
320                                            &order_id_map,
321                                            &order_contexts,
322                                            encoder,
323                                            account_id,
324                                            ts_init,
325                                        ) {
326                                            Ok(report) => {
327                                                Python::attach(|py| {
328                                                    match pyo3::Py::new(py, report) {
329                                                        Ok(py_obj) => {
330                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
331                                                        }
332                                                        Err(e) => log::error!("Failed to convert FillReport: {e}"),
333                                                    }
334                                                });
335                                            }
336                                            Err(e) => log::error!("Failed to parse WS fill: {e}"),
337                                        }
338                                    }
339                                }
340
341                                // Phase 3: Now send order status reports
342                                for report in pending_order_reports {
343                                    Python::attach(|py| {
344                                        match pyo3::Py::new(py, report) {
345                                            Ok(py_obj) => {
346                                                call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
347                                            }
348                                            Err(e) => log::error!("Failed to convert OrderStatusReport: {e}"),
349                                        }
350                                    });
351                                }
352
353                                // Deferred cleanup after fills are correlated
354                                for (client_id, client_metadata, order_id) in terminal_orders {
355                                    order_contexts.remove(&client_id);
356                                    encoder.remove(client_id, client_metadata);
357                                    order_id_map.remove(&order_id);
358                                }
359                            }
360                            NautilusWsMessage::MarkPrice(mark_price) => {
361                                Python::attach(|py| {
362                                    match mark_price.into_py_any(py) {
363                                        Ok(py_obj) => {
364                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
365                                        }
366                                        Err(e) => log::error!("Failed to convert MarkPriceUpdate to Python: {e}"),
367                                    }
368                                });
369                            }
370                            NautilusWsMessage::IndexPrice(index_price) => {
371                                Python::attach(|py| {
372                                    match index_price.into_py_any(py) {
373                                        Ok(py_obj) => {
374                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
375                                        }
376                                        Err(e) => log::error!("Failed to convert IndexPriceUpdate to Python: {e}"),
377                                    }
378                                });
379                            }
380                            NautilusWsMessage::FundingRate(funding_rate) => {
381                                Python::attach(|py| {
382                                    match funding_rate.into_py_any(py) {
383                                        Ok(py_obj) => {
384                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
385                                        }
386                                        Err(e) => log::error!("Failed to convert FundingRateUpdate to Python: {e}"),
387                                    }
388                                });
389                            }
390                            NautilusWsMessage::InstrumentStatus(status) => {
391                                Python::attach(|py| {
392                                    match status.into_py_any(py) {
393                                        Ok(py_obj) => {
394                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
395                                        }
396                                        Err(e) => log::error!("Failed to convert InstrumentStatus to Python: {e}"),
397                                    }
398                                });
399                            }
400                            NautilusWsMessage::Error(err) => {
401                                log::error!("dYdX WebSocket error: {err}");
402                            }
403                            NautilusWsMessage::Reconnected => {
404                                log::info!("dYdX WebSocket reconnected");
405                            }
406                            NautilusWsMessage::AccountState(state) => {
407                                Python::attach(|py| {
408                                    match state.into_py_any(py) {
409                                        Ok(py_obj) => {
410                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
411                                        }
412                                        Err(e) => log::error!("Failed to convert AccountState to Python: {e}"),
413                                    }
414                                });
415                            }
416                            NautilusWsMessage::Position(report) => {
417                                Python::attach(|py| {
418                                    match pyo3::Py::new(py, *report) {
419                                        Ok(py_obj) => {
420                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
421                                        }
422                                        Err(e) => log::error!("Failed to convert PositionStatusReport to Python: {e}"),
423                                    }
424                                });
425                            }
426                            NautilusWsMessage::Order(report) => {
427                                Python::attach(|py| {
428                                    match pyo3::Py::new(py, *report) {
429                                        Ok(py_obj) => {
430                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
431                                        }
432                                        Err(e) => log::error!("Failed to convert OrderStatusReport to Python: {e}"),
433                                    }
434                                });
435                            }
436                            NautilusWsMessage::Fill(report) => {
437                                Python::attach(|py| {
438                                    match pyo3::Py::new(py, *report) {
439                                        Ok(py_obj) => {
440                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
441                                        }
442                                        Err(e) => log::error!("Failed to convert FillReport to Python: {e}"),
443                                    }
444                                });
445                            }
446                            NautilusWsMessage::NewInstrumentDiscovered { ticker } => {
447                                log::info!("New instrument discovered via WebSocket: {ticker}");
448                                Python::attach(|py| {
449                                    let dict = PyDict::new(py);
450                                    let _ = dict.set_item("type", "new_instrument_discovered");
451                                    let _ = dict.set_item("ticker", &ticker);
452                                    if let Ok(py_obj) = dict.into_py_any(py) {
453                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
454                                    }
455                                });
456                            }
457                        }
458                    }
459                });
460            }
461
462            Ok(())
463        })
464    }
465
466    #[pyo3(name = "disconnect")]
467    fn py_disconnect<'py>(&mut self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
468        let mut client = self.clone();
469        pyo3_async_runtimes::tokio::future_into_py(py, async move {
470            client.disconnect().await.map_err(to_pyvalue_err)?;
471            Ok(())
472        })
473    }
474
475    #[pyo3(name = "wait_until_active")]
476    fn py_wait_until_active<'py>(
477        &self,
478        py: Python<'py>,
479        timeout_secs: f64,
480    ) -> PyResult<Bound<'py, PyAny>> {
481        let connection_mode = self.connection_mode_atomic();
482
483        pyo3_async_runtimes::tokio::future_into_py(py, async move {
484            let timeout = Duration::from_secs_f64(timeout_secs);
485            let start = Instant::now();
486
487            loop {
488                let mode = connection_mode.load();
489                let mode_u8 = mode.load(Ordering::Relaxed);
490                let is_connected = matches!(
491                    mode_u8,
492                    x if x == ConnectionMode::Active as u8 || x == ConnectionMode::Reconnect as u8
493                );
494
495                if is_connected {
496                    break;
497                }
498
499                if start.elapsed() > timeout {
500                    return Err(to_pyvalue_err(std::io::Error::new(
501                        std::io::ErrorKind::TimedOut,
502                        format!("Client did not become active within {timeout_secs}s"),
503                    )));
504                }
505                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
506            }
507
508            Ok(())
509        })
510    }
511
512    #[pyo3(name = "cache_instrument")]
513    fn py_cache_instrument(&self, instrument: Py<PyAny>, py: Python<'_>) -> PyResult<()> {
514        let inst_any = pyobject_to_instrument_any(py, instrument)?;
515        self.cache_instrument(inst_any);
516        Ok(())
517    }
518
519    #[pyo3(name = "cache_instruments")]
520    fn py_cache_instruments(&self, instruments: Vec<Py<PyAny>>, py: Python<'_>) -> PyResult<()> {
521        let mut instruments_any = Vec::new();
522        for inst in instruments {
523            let inst_any = pyobject_to_instrument_any(py, inst)?;
524            instruments_any.push(inst_any);
525        }
526        self.cache_instruments(instruments_any);
527        Ok(())
528    }
529
530    #[pyo3(name = "is_closed")]
531    fn py_is_closed(&self) -> bool {
532        !self.is_connected()
533    }
534
535    #[pyo3(name = "subscribe_trades")]
536    fn py_subscribe_trades<'py>(
537        &self,
538        py: Python<'py>,
539        instrument_id: InstrumentId,
540    ) -> PyResult<Bound<'py, PyAny>> {
541        let client = self.clone();
542        pyo3_async_runtimes::tokio::future_into_py(py, async move {
543            client
544                .subscribe_trades(instrument_id)
545                .await
546                .map_err(to_pyvalue_err)?;
547            Ok(())
548        })
549    }
550
551    #[pyo3(name = "unsubscribe_trades")]
552    fn py_unsubscribe_trades<'py>(
553        &self,
554        py: Python<'py>,
555        instrument_id: InstrumentId,
556    ) -> PyResult<Bound<'py, PyAny>> {
557        let client = self.clone();
558        pyo3_async_runtimes::tokio::future_into_py(py, async move {
559            client
560                .unsubscribe_trades(instrument_id)
561                .await
562                .map_err(to_pyvalue_err)?;
563            Ok(())
564        })
565    }
566
567    #[pyo3(name = "subscribe_orderbook")]
568    fn py_subscribe_orderbook<'py>(
569        &self,
570        py: Python<'py>,
571        instrument_id: InstrumentId,
572    ) -> PyResult<Bound<'py, PyAny>> {
573        let client = self.clone();
574        pyo3_async_runtimes::tokio::future_into_py(py, async move {
575            client
576                .subscribe_orderbook(instrument_id)
577                .await
578                .map_err(to_pyvalue_err)?;
579            Ok(())
580        })
581    }
582
583    #[pyo3(name = "unsubscribe_orderbook")]
584    fn py_unsubscribe_orderbook<'py>(
585        &self,
586        py: Python<'py>,
587        instrument_id: InstrumentId,
588    ) -> PyResult<Bound<'py, PyAny>> {
589        let client = self.clone();
590        pyo3_async_runtimes::tokio::future_into_py(py, async move {
591            client
592                .unsubscribe_orderbook(instrument_id)
593                .await
594                .map_err(to_pyvalue_err)?;
595            Ok(())
596        })
597    }
598
599    #[pyo3(name = "subscribe_bars")]
600    fn py_subscribe_bars<'py>(
601        &self,
602        py: Python<'py>,
603        bar_type: BarType,
604    ) -> PyResult<Bound<'py, PyAny>> {
605        let spec = bar_type.spec();
606        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
607        let resolution = resolution.to_string();
608
609        let client = self.clone();
610        let instrument_id = bar_type.instrument_id();
611
612        // Build topic for bar type registration (e.g., "ETH-USD/1MIN")
613        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
614        let topic = format!("{ticker}/{resolution}");
615
616        pyo3_async_runtimes::tokio::future_into_py(py, async move {
617            // Register bar type in handler before subscribing
618            client
619                .send_command(HandlerCommand::RegisterBarType { topic, bar_type })
620                .map_err(to_pyvalue_err)?;
621
622            // Brief delay to ensure handler processes registration
623            tokio::time::sleep(Duration::from_millis(50)).await;
624
625            client
626                .subscribe_candles(instrument_id, &resolution)
627                .await
628                .map_err(to_pyvalue_err)?;
629            Ok(())
630        })
631    }
632
633    #[pyo3(name = "unsubscribe_bars")]
634    fn py_unsubscribe_bars<'py>(
635        &self,
636        py: Python<'py>,
637        bar_type: BarType,
638    ) -> PyResult<Bound<'py, PyAny>> {
639        let spec = bar_type.spec();
640        let resolution = DydxCandleResolution::from_bar_spec(&spec).map_err(to_pyvalue_err)?;
641        let resolution = resolution.to_string();
642
643        let client = self.clone();
644        let instrument_id = bar_type.instrument_id();
645
646        // Build topic for unregistration
647        let ticker = extract_raw_symbol(instrument_id.symbol.as_str());
648        let topic = format!("{ticker}/{resolution}");
649
650        pyo3_async_runtimes::tokio::future_into_py(py, async move {
651            client
652                .unsubscribe_candles(instrument_id, &resolution)
653                .await
654                .map_err(to_pyvalue_err)?;
655
656            // Unregister bar type after unsubscribing
657            client
658                .send_command(HandlerCommand::UnregisterBarType { topic })
659                .map_err(to_pyvalue_err)?;
660
661            Ok(())
662        })
663    }
664
665    #[pyo3(name = "subscribe_markets")]
666    fn py_subscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
667        let client = self.clone();
668        pyo3_async_runtimes::tokio::future_into_py(py, async move {
669            client.subscribe_markets().await.map_err(to_pyvalue_err)?;
670            Ok(())
671        })
672    }
673
674    #[pyo3(name = "unsubscribe_markets")]
675    fn py_unsubscribe_markets<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
676        let client = self.clone();
677        pyo3_async_runtimes::tokio::future_into_py(py, async move {
678            client.unsubscribe_markets().await.map_err(to_pyvalue_err)?;
679            Ok(())
680        })
681    }
682
683    #[pyo3(name = "subscribe_subaccount")]
684    fn py_subscribe_subaccount<'py>(
685        &self,
686        py: Python<'py>,
687        address: String,
688        subaccount_number: u32,
689    ) -> PyResult<Bound<'py, PyAny>> {
690        let client = self.clone();
691        pyo3_async_runtimes::tokio::future_into_py(py, async move {
692            client
693                .subscribe_subaccount(&address, subaccount_number)
694                .await
695                .map_err(to_pyvalue_err)?;
696            Ok(())
697        })
698    }
699
700    #[pyo3(name = "unsubscribe_subaccount")]
701    fn py_unsubscribe_subaccount<'py>(
702        &self,
703        py: Python<'py>,
704        address: String,
705        subaccount_number: u32,
706    ) -> PyResult<Bound<'py, PyAny>> {
707        let client = self.clone();
708        pyo3_async_runtimes::tokio::future_into_py(py, async move {
709            client
710                .unsubscribe_subaccount(&address, subaccount_number)
711                .await
712                .map_err(to_pyvalue_err)?;
713            Ok(())
714        })
715    }
716
717    #[pyo3(name = "subscribe_block_height")]
718    fn py_subscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
719        let client = self.clone();
720        pyo3_async_runtimes::tokio::future_into_py(py, async move {
721            client
722                .subscribe_block_height()
723                .await
724                .map_err(to_pyvalue_err)?;
725            Ok(())
726        })
727    }
728
729    #[pyo3(name = "unsubscribe_block_height")]
730    fn py_unsubscribe_block_height<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
731        let client = self.clone();
732        pyo3_async_runtimes::tokio::future_into_py(py, async move {
733            client
734                .unsubscribe_block_height()
735                .await
736                .map_err(to_pyvalue_err)?;
737            Ok(())
738        })
739    }
740}