Skip to main content

nautilus_bitmex/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the `BitmEX` WebSocket client.
17//!
18//! # Design Pattern: Clone and Share State
19//!
20//! The WebSocket client must be cloned for async operations because PyO3's `future_into_py`
21//! requires `'static` futures (cannot borrow from `self`). To ensure clones share the same
22//! connection state, key fields use `Arc<RwLock<T>>`:
23//!
24//! - `inner: Arc<RwLock<Option<WebSocketClient>>>` - The WebSocket connection.
25//!
26//! Without shared state, clones would be independent, causing:
27//! - Lost WebSocket messages.
28//! - Missing instrument data.
29//! - Connection state desynchronization.
30//!
31//! ## Connection Flow
32//!
33//! 1. Clone the client for async operation.
34//! 2. Connect and populate shared state on the clone.
35//! 3. Spawn stream handler as background task.
36//! 4. Return immediately (non-blocking).
37//!
38//! ## Important Notes
39//!
40//! - Never use `block_on()` - it blocks the runtime.
41//! - Always clone before async blocks for lifetime requirements.
42//! - `RwLock` is preferred over Mutex (many reads, few writes).
43
44use futures_util::StreamExt;
45use nautilus_common::live::get_runtime;
46use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err};
47use nautilus_model::{
48    data::bar::BarType,
49    identifiers::{AccountId, InstrumentId},
50    python::{
51        data::data_to_pycapsule,
52        instruments::{instrument_any_to_pyobject, pyobject_to_instrument_any},
53    },
54};
55use pyo3::{conversion::IntoPyObjectExt, prelude::*};
56
57use crate::websocket::{BitmexWebSocketClient, messages::NautilusWsMessage};
58
59#[pymethods]
60impl BitmexWebSocketClient {
61    #[new]
62    #[pyo3(signature = (url=None, api_key=None, api_secret=None, account_id=None, heartbeat=None, testnet=false))]
63    fn py_new(
64        url: Option<String>,
65        api_key: Option<String>,
66        api_secret: Option<String>,
67        account_id: Option<AccountId>,
68        heartbeat: Option<u64>,
69        testnet: bool,
70    ) -> PyResult<Self> {
71        Self::new_with_env(url, api_key, api_secret, account_id, heartbeat, testnet)
72            .map_err(to_pyvalue_err)
73    }
74
75    #[staticmethod]
76    #[pyo3(name = "from_env")]
77    fn py_from_env() -> PyResult<Self> {
78        Self::from_env().map_err(to_pyvalue_err)
79    }
80
81    #[getter]
82    #[pyo3(name = "url")]
83    #[must_use]
84    pub const fn py_url(&self) -> &str {
85        self.url()
86    }
87
88    #[getter]
89    #[pyo3(name = "api_key")]
90    #[must_use]
91    pub fn py_api_key(&self) -> Option<&str> {
92        self.api_key()
93    }
94
95    #[getter]
96    #[pyo3(name = "api_key_masked")]
97    #[must_use]
98    pub fn py_api_key_masked(&self) -> Option<String> {
99        self.api_key_masked()
100    }
101
102    #[pyo3(name = "is_active")]
103    fn py_is_active(&mut self) -> bool {
104        self.is_active()
105    }
106
107    #[pyo3(name = "is_closed")]
108    fn py_is_closed(&mut self) -> bool {
109        self.is_closed()
110    }
111
112    #[pyo3(name = "get_subscriptions")]
113    fn py_get_subscriptions(&self, instrument_id: InstrumentId) -> Vec<String> {
114        self.get_subscriptions(instrument_id)
115    }
116
117    #[pyo3(name = "set_account_id")]
118    pub fn py_set_account_id(&mut self, account_id: AccountId) {
119        self.set_account_id(account_id);
120    }
121
122    #[pyo3(name = "cache_instrument")]
123    fn py_cache_instrument(&self, py: Python, instrument: Py<PyAny>) -> PyResult<()> {
124        let inst_any = pyobject_to_instrument_any(py, instrument)?;
125        self.cache_instrument(inst_any);
126        Ok(())
127    }
128
129    #[pyo3(name = "connect")]
130    fn py_connect<'py>(
131        &mut self,
132        py: Python<'py>,
133        loop_: Py<PyAny>,
134        instruments: Vec<Py<PyAny>>,
135        callback: Py<PyAny>,
136    ) -> PyResult<Bound<'py, PyAny>> {
137        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
138
139        let mut instruments_any = Vec::new();
140        for inst in instruments {
141            let inst_any = pyobject_to_instrument_any(py, inst)?;
142            instruments_any.push(inst_any);
143        }
144
145        self.cache_instruments(instruments_any);
146
147        // We need to clone self to move into the async block,
148        // the clone will be connected and kept alive to maintain the handler.
149        let mut client = self.clone();
150
151        pyo3_async_runtimes::tokio::future_into_py(py, async move {
152            client.connect().await.map_err(to_pyruntime_err)?;
153
154            let stream = client.stream();
155
156            get_runtime().spawn(async move {
157                let _client = client; // Keep client alive for the entire duration
158                tokio::pin!(stream);
159
160                while let Some(msg) = stream.next().await {
161                    Python::attach(|py| match msg {
162                        NautilusWsMessage::Data(data_vec) => {
163                            for data in data_vec {
164                                let py_obj = data_to_pycapsule(py, data);
165                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
166                            }
167                        }
168                        NautilusWsMessage::Instruments(instruments) => {
169                            for instrument in instruments {
170                                if let Ok(py_obj) = instrument_any_to_pyobject(py, instrument) {
171                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
172                                }
173                            }
174                        }
175                        NautilusWsMessage::OrderStatusReports(reports) => {
176                            for report in reports {
177                                if let Ok(py_obj) = report.into_py_any(py) {
178                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
179                                }
180                            }
181                        }
182                        NautilusWsMessage::FillReports(reports) => {
183                            for report in reports {
184                                if let Ok(py_obj) = report.into_py_any(py) {
185                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
186                                }
187                            }
188                        }
189                        NautilusWsMessage::PositionStatusReports(reports) => {
190                            for report in reports {
191                                if let Ok(py_obj) = report.into_py_any(py) {
192                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
193                                }
194                            }
195                        }
196                        NautilusWsMessage::FundingRateUpdates(updates) => {
197                            for update in updates {
198                                if let Ok(py_obj) = update.into_py_any(py) {
199                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
200                                }
201                            }
202                        }
203                        NautilusWsMessage::AccountStates(states) => {
204                            for state in states {
205                                if let Ok(py_obj) = state.into_py_any(py) {
206                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
207                                }
208                            }
209                        }
210                        NautilusWsMessage::OrderUpdated(event) => {
211                            if let Ok(py_obj) = (*event).into_py_any(py) {
212                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
213                            }
214                        }
215                        NautilusWsMessage::OrderUpdates(events) => {
216                            for event in events {
217                                if let Ok(py_obj) = event.into_py_any(py) {
218                                    call_python_threadsafe(py, &call_soon, &callback, py_obj);
219                                }
220                            }
221                        }
222                        NautilusWsMessage::InstrumentStatus(status) => {
223                            if let Ok(py_obj) = status.into_py_any(py) {
224                                call_python_threadsafe(py, &call_soon, &callback, py_obj);
225                            }
226                        }
227                        NautilusWsMessage::Reconnected => {}
228                        NautilusWsMessage::Authenticated => {}
229                    });
230                }
231            });
232
233            Ok(())
234        })
235    }
236
237    #[pyo3(name = "wait_until_active")]
238    fn py_wait_until_active<'py>(
239        &self,
240        py: Python<'py>,
241        timeout_secs: f64,
242    ) -> PyResult<Bound<'py, PyAny>> {
243        let client = self.clone();
244
245        pyo3_async_runtimes::tokio::future_into_py(py, async move {
246            client
247                .wait_until_active(timeout_secs)
248                .await
249                .map_err(to_pyruntime_err)?;
250            Ok(())
251        })
252    }
253
254    #[pyo3(name = "close")]
255    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
256        let mut client = self.clone();
257
258        pyo3_async_runtimes::tokio::future_into_py(py, async move {
259            if let Err(e) = client.close().await {
260                log::error!("Error on close: {e}");
261            }
262            Ok(())
263        })
264    }
265
266    #[pyo3(name = "subscribe_instruments")]
267    fn py_subscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
268        let client = self.clone();
269
270        pyo3_async_runtimes::tokio::future_into_py(py, async move {
271            if let Err(e) = client.subscribe_instruments().await {
272                log::error!("Failed to subscribe to instruments: {e}");
273            }
274            Ok(())
275        })
276    }
277
278    #[pyo3(name = "subscribe_instrument")]
279    fn py_subscribe_instrument<'py>(
280        &self,
281        py: Python<'py>,
282        instrument_id: InstrumentId,
283    ) -> PyResult<Bound<'py, PyAny>> {
284        let client = self.clone();
285
286        pyo3_async_runtimes::tokio::future_into_py(py, async move {
287            if let Err(e) = client.subscribe_instrument(instrument_id).await {
288                log::error!("Failed to subscribe to instrument: {e}");
289            }
290            Ok(())
291        })
292    }
293
294    #[pyo3(name = "subscribe_book")]
295    fn py_subscribe_book<'py>(
296        &self,
297        py: Python<'py>,
298        instrument_id: InstrumentId,
299    ) -> PyResult<Bound<'py, PyAny>> {
300        let client = self.clone();
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            if let Err(e) = client.subscribe_book(instrument_id).await {
304                log::error!("Failed to subscribe to order book: {e}");
305            }
306            Ok(())
307        })
308    }
309
310    #[pyo3(name = "subscribe_book_25")]
311    fn py_subscribe_book_25<'py>(
312        &self,
313        py: Python<'py>,
314        instrument_id: InstrumentId,
315    ) -> PyResult<Bound<'py, PyAny>> {
316        let client = self.clone();
317
318        pyo3_async_runtimes::tokio::future_into_py(py, async move {
319            if let Err(e) = client.subscribe_book_25(instrument_id).await {
320                log::error!("Failed to subscribe to order book 25: {e}");
321            }
322            Ok(())
323        })
324    }
325
326    #[pyo3(name = "subscribe_book_depth10")]
327    fn py_subscribe_book_depth10<'py>(
328        &self,
329        py: Python<'py>,
330        instrument_id: InstrumentId,
331    ) -> PyResult<Bound<'py, PyAny>> {
332        let client = self.clone();
333
334        pyo3_async_runtimes::tokio::future_into_py(py, async move {
335            if let Err(e) = client.subscribe_book_depth10(instrument_id).await {
336                log::error!("Failed to subscribe to order book depth 10: {e}");
337            }
338            Ok(())
339        })
340    }
341
342    #[pyo3(name = "subscribe_quotes")]
343    fn py_subscribe_quotes<'py>(
344        &self,
345        py: Python<'py>,
346        instrument_id: InstrumentId,
347    ) -> PyResult<Bound<'py, PyAny>> {
348        let client = self.clone();
349
350        pyo3_async_runtimes::tokio::future_into_py(py, async move {
351            if let Err(e) = client.subscribe_quotes(instrument_id).await {
352                log::error!("Failed to subscribe to quotes: {e}");
353            }
354            Ok(())
355        })
356    }
357
358    #[pyo3(name = "subscribe_trades")]
359    fn py_subscribe_trades<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_id: InstrumentId,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            if let Err(e) = client.subscribe_trades(instrument_id).await {
368                log::error!("Failed to subscribe to trades: {e}");
369            }
370            Ok(())
371        })
372    }
373
374    #[pyo3(name = "subscribe_mark_prices")]
375    fn py_subscribe_mark_prices<'py>(
376        &self,
377        py: Python<'py>,
378        instrument_id: InstrumentId,
379    ) -> PyResult<Bound<'py, PyAny>> {
380        let client = self.clone();
381
382        pyo3_async_runtimes::tokio::future_into_py(py, async move {
383            if let Err(e) = client.subscribe_mark_prices(instrument_id).await {
384                log::error!("Failed to subscribe to mark prices: {e}");
385            }
386            Ok(())
387        })
388    }
389
390    #[pyo3(name = "subscribe_index_prices")]
391    fn py_subscribe_index_prices<'py>(
392        &self,
393        py: Python<'py>,
394        instrument_id: InstrumentId,
395    ) -> PyResult<Bound<'py, PyAny>> {
396        let client = self.clone();
397
398        pyo3_async_runtimes::tokio::future_into_py(py, async move {
399            if let Err(e) = client.subscribe_index_prices(instrument_id).await {
400                log::error!("Failed to subscribe to index prices: {e}");
401            }
402            Ok(())
403        })
404    }
405
406    #[pyo3(name = "subscribe_funding_rates")]
407    fn py_subscribe_funding_rates<'py>(
408        &self,
409        py: Python<'py>,
410        instrument_id: InstrumentId,
411    ) -> PyResult<Bound<'py, PyAny>> {
412        let client = self.clone();
413
414        pyo3_async_runtimes::tokio::future_into_py(py, async move {
415            if let Err(e) = client.subscribe_funding_rates(instrument_id).await {
416                log::error!("Failed to subscribe to funding: {e}");
417            }
418            Ok(())
419        })
420    }
421
422    #[pyo3(name = "subscribe_bars")]
423    fn py_subscribe_bars<'py>(
424        &self,
425        py: Python<'py>,
426        bar_type: BarType,
427    ) -> PyResult<Bound<'py, PyAny>> {
428        let client = self.clone();
429
430        pyo3_async_runtimes::tokio::future_into_py(py, async move {
431            if let Err(e) = client.subscribe_bars(bar_type).await {
432                log::error!("Failed to subscribe to bars: {e}");
433            }
434            Ok(())
435        })
436    }
437
438    #[pyo3(name = "unsubscribe_instruments")]
439    fn py_unsubscribe_instruments<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
440        let client = self.clone();
441
442        pyo3_async_runtimes::tokio::future_into_py(py, async move {
443            if let Err(e) = client.unsubscribe_instruments().await {
444                log::error!("Failed to unsubscribe from instruments: {e}");
445            }
446            Ok(())
447        })
448    }
449
450    #[pyo3(name = "unsubscribe_instrument")]
451    fn py_unsubscribe_instrument<'py>(
452        &self,
453        py: Python<'py>,
454        instrument_id: InstrumentId,
455    ) -> PyResult<Bound<'py, PyAny>> {
456        let client = self.clone();
457
458        pyo3_async_runtimes::tokio::future_into_py(py, async move {
459            if let Err(e) = client.unsubscribe_instrument(instrument_id).await {
460                log::error!("Failed to unsubscribe from instrument: {e}");
461            }
462            Ok(())
463        })
464    }
465
466    #[pyo3(name = "unsubscribe_book")]
467    fn py_unsubscribe_book<'py>(
468        &self,
469        py: Python<'py>,
470        instrument_id: InstrumentId,
471    ) -> PyResult<Bound<'py, PyAny>> {
472        let client = self.clone();
473
474        pyo3_async_runtimes::tokio::future_into_py(py, async move {
475            if let Err(e) = client.unsubscribe_book(instrument_id).await {
476                log::error!("Failed to unsubscribe from order book: {e}");
477            }
478            Ok(())
479        })
480    }
481
482    #[pyo3(name = "unsubscribe_book_25")]
483    fn py_unsubscribe_book_25<'py>(
484        &self,
485        py: Python<'py>,
486        instrument_id: InstrumentId,
487    ) -> PyResult<Bound<'py, PyAny>> {
488        let client = self.clone();
489
490        pyo3_async_runtimes::tokio::future_into_py(py, async move {
491            if let Err(e) = client.unsubscribe_book_25(instrument_id).await {
492                log::error!("Failed to unsubscribe from order book 25: {e}");
493            }
494            Ok(())
495        })
496    }
497
498    #[pyo3(name = "unsubscribe_book_depth10")]
499    fn py_unsubscribe_book_depth10<'py>(
500        &self,
501        py: Python<'py>,
502        instrument_id: InstrumentId,
503    ) -> PyResult<Bound<'py, PyAny>> {
504        let client = self.clone();
505
506        pyo3_async_runtimes::tokio::future_into_py(py, async move {
507            if let Err(e) = client.unsubscribe_book_depth10(instrument_id).await {
508                log::error!("Failed to unsubscribe from order book depth 10: {e}");
509            }
510            Ok(())
511        })
512    }
513
514    #[pyo3(name = "unsubscribe_quotes")]
515    fn py_unsubscribe_quotes<'py>(
516        &self,
517        py: Python<'py>,
518        instrument_id: InstrumentId,
519    ) -> PyResult<Bound<'py, PyAny>> {
520        let client = self.clone();
521
522        pyo3_async_runtimes::tokio::future_into_py(py, async move {
523            if let Err(e) = client.unsubscribe_quotes(instrument_id).await {
524                log::error!("Failed to unsubscribe from quotes: {e}");
525            }
526            Ok(())
527        })
528    }
529
530    #[pyo3(name = "unsubscribe_trades")]
531    fn py_unsubscribe_trades<'py>(
532        &self,
533        py: Python<'py>,
534        instrument_id: InstrumentId,
535    ) -> PyResult<Bound<'py, PyAny>> {
536        let client = self.clone();
537
538        pyo3_async_runtimes::tokio::future_into_py(py, async move {
539            if let Err(e) = client.unsubscribe_trades(instrument_id).await {
540                log::error!("Failed to unsubscribe from trades: {e}");
541            }
542            Ok(())
543        })
544    }
545
546    #[pyo3(name = "unsubscribe_mark_prices")]
547    fn py_unsubscribe_mark_prices<'py>(
548        &self,
549        py: Python<'py>,
550        instrument_id: InstrumentId,
551    ) -> PyResult<Bound<'py, PyAny>> {
552        let client = self.clone();
553
554        pyo3_async_runtimes::tokio::future_into_py(py, async move {
555            if let Err(e) = client.unsubscribe_mark_prices(instrument_id).await {
556                log::error!("Failed to unsubscribe from mark prices: {e}");
557            }
558            Ok(())
559        })
560    }
561
562    #[pyo3(name = "unsubscribe_index_prices")]
563    fn py_unsubscribe_index_prices<'py>(
564        &self,
565        py: Python<'py>,
566        instrument_id: InstrumentId,
567    ) -> PyResult<Bound<'py, PyAny>> {
568        let client = self.clone();
569
570        pyo3_async_runtimes::tokio::future_into_py(py, async move {
571            if let Err(e) = client.unsubscribe_index_prices(instrument_id).await {
572                log::error!("Failed to unsubscribe from index prices: {e}");
573            }
574            Ok(())
575        })
576    }
577
578    #[pyo3(name = "unsubscribe_funding_rates")]
579    fn py_unsubscribe_funding_rates<'py>(
580        &self,
581        py: Python<'py>,
582        instrument_id: InstrumentId,
583    ) -> PyResult<Bound<'py, PyAny>> {
584        let client = self.clone();
585        pyo3_async_runtimes::tokio::future_into_py(py, async move {
586            if let Err(e) = client.unsubscribe_funding_rates(instrument_id).await {
587                log::error!("Failed to unsubscribe from funding rates: {e}");
588            }
589            Ok(())
590        })
591    }
592
593    #[pyo3(name = "unsubscribe_bars")]
594    fn py_unsubscribe_bars<'py>(
595        &self,
596        py: Python<'py>,
597        bar_type: BarType,
598    ) -> PyResult<Bound<'py, PyAny>> {
599        let client = self.clone();
600
601        pyo3_async_runtimes::tokio::future_into_py(py, async move {
602            if let Err(e) = client.unsubscribe_bars(bar_type).await {
603                log::error!("Failed to unsubscribe from bars: {e}");
604            }
605            Ok(())
606        })
607    }
608
609    #[pyo3(name = "subscribe_orders")]
610    fn py_subscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
611        let client = self.clone();
612
613        pyo3_async_runtimes::tokio::future_into_py(py, async move {
614            if let Err(e) = client.subscribe_orders().await {
615                log::error!("Failed to subscribe to orders: {e}");
616            }
617            Ok(())
618        })
619    }
620
621    #[pyo3(name = "subscribe_executions")]
622    fn py_subscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
623        let client = self.clone();
624
625        pyo3_async_runtimes::tokio::future_into_py(py, async move {
626            if let Err(e) = client.subscribe_executions().await {
627                log::error!("Failed to subscribe to executions: {e}");
628            }
629            Ok(())
630        })
631    }
632
633    #[pyo3(name = "subscribe_positions")]
634    fn py_subscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
635        let client = self.clone();
636
637        pyo3_async_runtimes::tokio::future_into_py(py, async move {
638            if let Err(e) = client.subscribe_positions().await {
639                log::error!("Failed to subscribe to positions: {e}");
640            }
641            Ok(())
642        })
643    }
644
645    #[pyo3(name = "subscribe_margin")]
646    fn py_subscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
647        let client = self.clone();
648
649        pyo3_async_runtimes::tokio::future_into_py(py, async move {
650            if let Err(e) = client.subscribe_margin().await {
651                log::error!("Failed to subscribe to margin: {e}");
652            }
653            Ok(())
654        })
655    }
656
657    #[pyo3(name = "subscribe_wallet")]
658    fn py_subscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
659        let client = self.clone();
660
661        pyo3_async_runtimes::tokio::future_into_py(py, async move {
662            if let Err(e) = client.subscribe_wallet().await {
663                log::error!("Failed to subscribe to wallet: {e}");
664            }
665            Ok(())
666        })
667    }
668
669    #[pyo3(name = "unsubscribe_orders")]
670    fn py_unsubscribe_orders<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
671        let client = self.clone();
672
673        pyo3_async_runtimes::tokio::future_into_py(py, async move {
674            if let Err(e) = client.unsubscribe_orders().await {
675                log::error!("Failed to unsubscribe from orders: {e}");
676            }
677            Ok(())
678        })
679    }
680
681    #[pyo3(name = "unsubscribe_executions")]
682    fn py_unsubscribe_executions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
683        let client = self.clone();
684
685        pyo3_async_runtimes::tokio::future_into_py(py, async move {
686            if let Err(e) = client.unsubscribe_executions().await {
687                log::error!("Failed to unsubscribe from executions: {e}");
688            }
689            Ok(())
690        })
691    }
692
693    #[pyo3(name = "unsubscribe_positions")]
694    fn py_unsubscribe_positions<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
695        let client = self.clone();
696
697        pyo3_async_runtimes::tokio::future_into_py(py, async move {
698            if let Err(e) = client.unsubscribe_positions().await {
699                log::error!("Failed to unsubscribe from positions: {e}");
700            }
701            Ok(())
702        })
703    }
704
705    #[pyo3(name = "unsubscribe_margin")]
706    fn py_unsubscribe_margin<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
707        let client = self.clone();
708
709        pyo3_async_runtimes::tokio::future_into_py(py, async move {
710            if let Err(e) = client.unsubscribe_margin().await {
711                log::error!("Failed to unsubscribe from margin: {e}");
712            }
713            Ok(())
714        })
715    }
716
717    #[pyo3(name = "unsubscribe_wallet")]
718    fn py_unsubscribe_wallet<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
719        let client = self.clone();
720
721        pyo3_async_runtimes::tokio::future_into_py(py, async move {
722            if let Err(e) = client.unsubscribe_wallet().await {
723                log::error!("Failed to unsubscribe from wallet: {e}");
724            }
725            Ok(())
726        })
727    }
728}