Skip to main content

nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use nautilus_network::websocket::TransportBackend;
26use pyo3::{conversion::IntoPyObjectExt, prelude::*};
27
28use crate::{
29    common::enums::HyperliquidEnvironment,
30    websocket::{
31        HyperliquidWebSocketClient,
32        messages::{ExecutionReport, NautilusWsMessage},
33    },
34};
35
36fn ws_data_to_pyobject(py: Python<'_>, data: Data) -> PyResult<Py<PyAny>> {
37    match data {
38        Data::Custom(custom) => Py::new(py, custom).map(|obj| obj.into_any()),
39        other => Ok(data_to_pycapsule(py, other)),
40    }
41}
42
43#[pymethods]
44#[pyo3_stub_gen::derive::gen_stub_pymethods]
45impl HyperliquidWebSocketClient {
46    /// Hyperliquid WebSocket client following the BitMEX pattern.
47    ///
48    /// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
49    /// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
50    #[new]
51    #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
52    fn py_new(
53        url: Option<String>,
54        environment: HyperliquidEnvironment,
55        account_id: Option<String>,
56        proxy_url: Option<String>,
57    ) -> Self {
58        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
59        Self::new(
60            url,
61            environment,
62            account_id,
63            TransportBackend::default(),
64            proxy_url,
65        )
66    }
67
68    /// Returns the URL of this WebSocket client.
69    #[getter]
70    #[pyo3(name = "url")]
71    #[must_use]
72    pub fn py_url(&self) -> String {
73        self.url().to_string()
74    }
75
76    /// Returns true if the WebSocket is actively connected.
77    #[pyo3(name = "is_active")]
78    fn py_is_active(&self) -> bool {
79        self.is_active()
80    }
81
82    #[pyo3(name = "is_closed")]
83    fn py_is_closed(&self) -> bool {
84        !self.is_active()
85    }
86
87    /// Caches spot fill coin mappings for instrument lookup.
88    ///
89    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
90    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
91    /// This mapping allows the handler to look up instruments from spot fills.
92    #[pyo3(name = "cache_spot_fill_coins")]
93    fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
94        let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
95            .into_iter()
96            .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
97            .collect();
98        self.cache_spot_fill_coins(ahash_mapping);
99    }
100
101    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
102    ///
103    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
104    /// This mapping allows WebSocket order status and fill reports to be resolved back to
105    /// the original client_order_id.
106    ///
107    /// This writes directly to a shared cache that the handler reads from, avoiding any
108    /// race conditions between caching and WebSocket message processing.
109    #[pyo3(name = "cache_cloid_mapping")]
110    fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
111        self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
112    }
113
114    /// Removes a cloid mapping from the cache.
115    ///
116    /// Called on terminal order state. The cache is FIFO-bounded so missed
117    /// removals self-evict (see GH-3972 cancel-replace drain).
118    #[pyo3(name = "remove_cloid_mapping")]
119    fn py_remove_cloid_mapping(&self, cloid: &str) {
120        self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
121    }
122
123    /// Clears all cloid mappings from the cache.
124    ///
125    /// Useful for cleanup during reconnection or shutdown.
126    #[pyo3(name = "clear_cloid_cache")]
127    fn py_clear_cloid_cache(&self) {
128        self.clear_cloid_cache();
129    }
130
131    /// Returns the number of cloid mappings in the cache.
132    #[pyo3(name = "cloid_cache_len")]
133    fn py_cloid_cache_len(&self) -> usize {
134        self.cloid_cache_len()
135    }
136
137    /// Looks up a client_order_id by its cloid hash.
138    ///
139    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
140    #[pyo3(name = "get_cloid_mapping")]
141    fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
142        self.get_cloid_mapping(&ustr::Ustr::from(cloid))
143    }
144
145    /// Establishes WebSocket connection and spawns the message handler.
146    #[pyo3(name = "connect")]
147    #[expect(clippy::needless_pass_by_value)]
148    fn py_connect<'py>(
149        &self,
150        py: Python<'py>,
151        loop_: Py<PyAny>,
152        instruments: Vec<Py<PyAny>>,
153        callback: Py<PyAny>,
154    ) -> PyResult<Bound<'py, PyAny>> {
155        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
156
157        for inst in instruments {
158            let inst_any = pyobject_to_instrument_any(py, inst)?;
159            self.cache_instrument(inst_any);
160        }
161
162        let mut client = self.clone();
163
164        pyo3_async_runtimes::tokio::future_into_py(py, async move {
165            client.connect().await.map_err(to_pyruntime_err)?;
166
167            get_runtime().spawn(async move {
168                loop {
169                    let event = client.next_event().await;
170
171                    match event {
172                        Some(msg) => {
173                            log::trace!("Received WebSocket message: {msg:?}");
174
175                            match msg {
176                                NautilusWsMessage::Trades(trade_ticks) => {
177                                    Python::attach(|py| {
178                                        for tick in trade_ticks {
179                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
180                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
181                                        }
182                                    });
183                                }
184                                NautilusWsMessage::Quote(quote_tick) => {
185                                    Python::attach(|py| {
186                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
187                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
188                                    });
189                                }
190                                NautilusWsMessage::Deltas(deltas) => {
191                                    Python::attach(|py| {
192                                        let py_obj = data_to_pycapsule(
193                                            py,
194                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
195                                        );
196                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
197                                    });
198                                }
199                                NautilusWsMessage::Depth10(depth) => {
200                                    Python::attach(|py| {
201                                        let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
202                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
203                                    });
204                                }
205                                NautilusWsMessage::Candle(bar) => {
206                                    Python::attach(|py| {
207                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
208                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
209                                    });
210                                }
211                                NautilusWsMessage::MarkPrice(mark_price) => {
212                                    Python::attach(|py| {
213                                        let py_obj = data_to_pycapsule(
214                                            py,
215                                            Data::MarkPriceUpdate(mark_price),
216                                        );
217                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
218                                    });
219                                }
220                                NautilusWsMessage::IndexPrice(index_price) => {
221                                    Python::attach(|py| {
222                                        let py_obj = data_to_pycapsule(
223                                            py,
224                                            Data::IndexPriceUpdate(index_price),
225                                        );
226                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
227                                    });
228                                }
229                                NautilusWsMessage::FundingRate(funding_rate) => {
230                                    Python::attach(|py| {
231                                        if let Ok(py_obj) = funding_rate.into_py_any(py) {
232                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
233                                        }
234                                    });
235                                }
236                                NautilusWsMessage::CustomData(data) => {
237                                    Python::attach(|py| match ws_data_to_pyobject(py, data) {
238                                        Ok(py_obj) => {
239                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
240                                        }
241                                        Err(e) => {
242                                            log::error!(
243                                                "Error converting CustomData to Python object: {e}"
244                                            );
245                                        }
246                                    });
247                                }
248                                NautilusWsMessage::ExecutionReports(reports) => {
249                                    Python::attach(|py| {
250                                        for report in reports {
251                                            match report {
252                                                ExecutionReport::Order(order_report) => {
253                                                    log::debug!(
254                                                        "Forwarding order status report: order_id={}, status={:?}",
255                                                        order_report.venue_order_id,
256                                                        order_report.order_status
257                                                    );
258
259                                                    match Py::new(py, order_report) {
260                                                        Ok(py_obj) => {
261                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
262                                                        }
263                                                        Err(e) => {
264                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
265                                                        }
266                                                    }
267                                                }
268                                                ExecutionReport::Fill(fill_report) => {
269                                                    log::debug!(
270                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
271                                                        fill_report.trade_id,
272                                                        fill_report.order_side,
273                                                        fill_report.last_qty,
274                                                        fill_report.last_px
275                                                    );
276
277                                                    match Py::new(py, fill_report) {
278                                                        Ok(py_obj) => {
279                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
280                                                        }
281                                                        Err(e) => {
282                                                            log::error!("Error converting FillReport to Python: {e}");
283                                                        }
284                                                    }
285                                                }
286                                            }
287                                        }
288                                    });
289                                }
290                                _ => {
291                                    log::debug!("Unhandled message type: {msg:?}");
292                                }
293                            }
294                        }
295                        None => {
296                            log::debug!("WebSocket connection closed");
297                            break;
298                        }
299                    }
300                }
301            });
302
303            Ok(())
304        })
305    }
306
307    #[pyo3(name = "wait_until_active")]
308    fn py_wait_until_active<'py>(
309        &self,
310        py: Python<'py>,
311        timeout_secs: f64,
312    ) -> PyResult<Bound<'py, PyAny>> {
313        let client = self.clone();
314
315        pyo3_async_runtimes::tokio::future_into_py(py, async move {
316            let start = std::time::Instant::now();
317
318            loop {
319                if client.is_active() {
320                    return Ok(());
321                }
322
323                if start.elapsed().as_secs_f64() >= timeout_secs {
324                    return Err(to_pyruntime_err(format!(
325                        "WebSocket connection did not become active within {timeout_secs} seconds"
326                    )));
327                }
328
329                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
330            }
331        })
332    }
333
334    #[pyo3(name = "close")]
335    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
336        let mut client = self.clone();
337
338        pyo3_async_runtimes::tokio::future_into_py(py, async move {
339            if let Err(e) = client.disconnect().await {
340                log::error!("Error on close: {e}");
341            }
342            Ok(())
343        })
344    }
345
346    /// Subscribe to trades for an instrument.
347    #[pyo3(name = "subscribe_trades")]
348    fn py_subscribe_trades<'py>(
349        &self,
350        py: Python<'py>,
351        instrument_id: InstrumentId,
352    ) -> PyResult<Bound<'py, PyAny>> {
353        let client = self.clone();
354
355        pyo3_async_runtimes::tokio::future_into_py(py, async move {
356            client
357                .subscribe_trades(instrument_id)
358                .await
359                .map_err(to_pyruntime_err)?;
360            Ok(())
361        })
362    }
363
364    /// Unsubscribe from trades for an instrument.
365    #[pyo3(name = "unsubscribe_trades")]
366    fn py_unsubscribe_trades<'py>(
367        &self,
368        py: Python<'py>,
369        instrument_id: InstrumentId,
370    ) -> PyResult<Bound<'py, PyAny>> {
371        let client = self.clone();
372
373        pyo3_async_runtimes::tokio::future_into_py(py, async move {
374            client
375                .unsubscribe_trades(instrument_id)
376                .await
377                .map_err(to_pyruntime_err)?;
378            Ok(())
379        })
380    }
381
382    /// Subscribe to all mid prices across markets.
383    #[pyo3(name = "subscribe_all_mids")]
384    fn py_subscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
385        let client = self.clone();
386
387        pyo3_async_runtimes::tokio::future_into_py(py, async move {
388            client
389                .subscribe_all_mids()
390                .await
391                .map_err(to_pyruntime_err)?;
392            Ok(())
393        })
394    }
395
396    /// Subscribe to all mid prices across markets, optionally scoped to a specific dex.
397    #[pyo3(name = "subscribe_all_mids_with_dex")]
398    fn py_subscribe_all_mids_with_dex<'py>(
399        &self,
400        py: Python<'py>,
401        dex: Option<String>,
402    ) -> PyResult<Bound<'py, PyAny>> {
403        let client = self.clone();
404
405        pyo3_async_runtimes::tokio::future_into_py(py, async move {
406            client
407                .subscribe_all_mids_with_dex(dex.as_deref())
408                .await
409                .map_err(to_pyruntime_err)?;
410            Ok(())
411        })
412    }
413
414    /// Subscribe to L2 order book for an instrument.
415    #[pyo3(name = "subscribe_book")]
416    fn py_subscribe_book<'py>(
417        &self,
418        py: Python<'py>,
419        instrument_id: InstrumentId,
420    ) -> PyResult<Bound<'py, PyAny>> {
421        let client = self.clone();
422
423        pyo3_async_runtimes::tokio::future_into_py(py, async move {
424            client
425                .subscribe_book(instrument_id)
426                .await
427                .map_err(to_pyruntime_err)?;
428            Ok(())
429        })
430    }
431
432    /// Unsubscribe from L2 order book for an instrument.
433    #[pyo3(name = "unsubscribe_book")]
434    fn py_unsubscribe_book<'py>(
435        &self,
436        py: Python<'py>,
437        instrument_id: InstrumentId,
438    ) -> PyResult<Bound<'py, PyAny>> {
439        let client = self.clone();
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            client
443                .unsubscribe_book(instrument_id)
444                .await
445                .map_err(to_pyruntime_err)?;
446            Ok(())
447        })
448    }
449
450    #[pyo3(name = "subscribe_book_deltas")]
451    fn py_subscribe_book_deltas<'py>(
452        &self,
453        py: Python<'py>,
454        instrument_id: InstrumentId,
455        _book_type: u8,
456        _depth: u64,
457    ) -> PyResult<Bound<'py, PyAny>> {
458        let client = self.clone();
459
460        pyo3_async_runtimes::tokio::future_into_py(py, async move {
461            client
462                .subscribe_book(instrument_id)
463                .await
464                .map_err(to_pyruntime_err)?;
465            Ok(())
466        })
467    }
468
469    #[pyo3(name = "unsubscribe_book_deltas")]
470    fn py_unsubscribe_book_deltas<'py>(
471        &self,
472        py: Python<'py>,
473        instrument_id: InstrumentId,
474    ) -> PyResult<Bound<'py, PyAny>> {
475        let client = self.clone();
476
477        pyo3_async_runtimes::tokio::future_into_py(py, async move {
478            client
479                .unsubscribe_book(instrument_id)
480                .await
481                .map_err(to_pyruntime_err)?;
482            Ok(())
483        })
484    }
485
486    #[pyo3(name = "subscribe_book_snapshots")]
487    fn py_subscribe_book_snapshots<'py>(
488        &self,
489        py: Python<'py>,
490        instrument_id: InstrumentId,
491        _book_type: u8,
492        _depth: u64,
493    ) -> PyResult<Bound<'py, PyAny>> {
494        let client = self.clone();
495
496        pyo3_async_runtimes::tokio::future_into_py(py, async move {
497            client
498                .subscribe_book(instrument_id)
499                .await
500                .map_err(to_pyruntime_err)?;
501            Ok(())
502        })
503    }
504
505    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
506    #[pyo3(name = "subscribe_quotes")]
507    fn py_subscribe_quotes<'py>(
508        &self,
509        py: Python<'py>,
510        instrument_id: InstrumentId,
511    ) -> PyResult<Bound<'py, PyAny>> {
512        let client = self.clone();
513
514        pyo3_async_runtimes::tokio::future_into_py(py, async move {
515            client
516                .subscribe_quotes(instrument_id)
517                .await
518                .map_err(to_pyruntime_err)?;
519            Ok(())
520        })
521    }
522
523    /// Unsubscribe from quote ticks for an instrument.
524    #[pyo3(name = "unsubscribe_quotes")]
525    fn py_unsubscribe_quotes<'py>(
526        &self,
527        py: Python<'py>,
528        instrument_id: InstrumentId,
529    ) -> PyResult<Bound<'py, PyAny>> {
530        let client = self.clone();
531
532        pyo3_async_runtimes::tokio::future_into_py(py, async move {
533            client
534                .unsubscribe_quotes(instrument_id)
535                .await
536                .map_err(to_pyruntime_err)?;
537            Ok(())
538        })
539    }
540
541    /// Subscribe to candle/bar data for a specific coin and interval.
542    #[pyo3(name = "subscribe_bars")]
543    fn py_subscribe_bars<'py>(
544        &self,
545        py: Python<'py>,
546        bar_type: BarType,
547    ) -> PyResult<Bound<'py, PyAny>> {
548        let client = self.clone();
549
550        pyo3_async_runtimes::tokio::future_into_py(py, async move {
551            client
552                .subscribe_bars(bar_type)
553                .await
554                .map_err(to_pyruntime_err)?;
555            Ok(())
556        })
557    }
558
559    /// Unsubscribe from candle/bar data.
560    #[pyo3(name = "unsubscribe_bars")]
561    fn py_unsubscribe_bars<'py>(
562        &self,
563        py: Python<'py>,
564        bar_type: BarType,
565    ) -> PyResult<Bound<'py, PyAny>> {
566        let client = self.clone();
567
568        pyo3_async_runtimes::tokio::future_into_py(py, async move {
569            client
570                .unsubscribe_bars(bar_type)
571                .await
572                .map_err(to_pyruntime_err)?;
573            Ok(())
574        })
575    }
576
577    /// Unsubscribe from all mid prices across markets.
578    #[pyo3(name = "unsubscribe_all_mids")]
579    fn py_unsubscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
580        let client = self.clone();
581
582        pyo3_async_runtimes::tokio::future_into_py(py, async move {
583            client
584                .unsubscribe_all_mids()
585                .await
586                .map_err(to_pyruntime_err)?;
587            Ok(())
588        })
589    }
590
591    /// Unsubscribe from all mid prices across markets, optionally scoped to a specific dex.
592    #[pyo3(name = "unsubscribe_all_mids_with_dex")]
593    fn py_unsubscribe_all_mids_with_dex<'py>(
594        &self,
595        py: Python<'py>,
596        dex: Option<String>,
597    ) -> PyResult<Bound<'py, PyAny>> {
598        let client = self.clone();
599
600        pyo3_async_runtimes::tokio::future_into_py(py, async move {
601            client
602                .unsubscribe_all_mids_with_dex(dex.as_deref())
603                .await
604                .map_err(to_pyruntime_err)?;
605            Ok(())
606        })
607    }
608
609    /// Subscribe to order updates for a specific user address.
610    #[pyo3(name = "subscribe_order_updates")]
611    fn py_subscribe_order_updates<'py>(
612        &self,
613        py: Python<'py>,
614        user: String,
615    ) -> PyResult<Bound<'py, PyAny>> {
616        let client = self.clone();
617
618        pyo3_async_runtimes::tokio::future_into_py(py, async move {
619            client
620                .subscribe_order_updates(&user)
621                .await
622                .map_err(to_pyruntime_err)?;
623            Ok(())
624        })
625    }
626
627    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
628    #[pyo3(name = "subscribe_user_events")]
629    fn py_subscribe_user_events<'py>(
630        &self,
631        py: Python<'py>,
632        user: String,
633    ) -> PyResult<Bound<'py, PyAny>> {
634        let client = self.clone();
635
636        pyo3_async_runtimes::tokio::future_into_py(py, async move {
637            client
638                .subscribe_user_events(&user)
639                .await
640                .map_err(to_pyruntime_err)?;
641            Ok(())
642        })
643    }
644
645    /// Subscribe to user fills for a specific user address.
646    ///
647    /// Note: This channel is redundant with `userEvents` which already includes fills.
648    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
649    #[pyo3(name = "subscribe_user_fills")]
650    fn py_subscribe_user_fills<'py>(
651        &self,
652        py: Python<'py>,
653        user: String,
654    ) -> PyResult<Bound<'py, PyAny>> {
655        let client = self.clone();
656
657        pyo3_async_runtimes::tokio::future_into_py(py, async move {
658            client
659                .subscribe_user_fills(&user)
660                .await
661                .map_err(to_pyruntime_err)?;
662            Ok(())
663        })
664    }
665
666    /// Subscribe to mark price updates for an instrument.
667    #[pyo3(name = "subscribe_mark_prices")]
668    fn py_subscribe_mark_prices<'py>(
669        &self,
670        py: Python<'py>,
671        instrument_id: InstrumentId,
672    ) -> PyResult<Bound<'py, PyAny>> {
673        let client = self.clone();
674
675        pyo3_async_runtimes::tokio::future_into_py(py, async move {
676            client
677                .subscribe_mark_prices(instrument_id)
678                .await
679                .map_err(to_pyruntime_err)?;
680            Ok(())
681        })
682    }
683
684    /// Unsubscribe from mark price updates for an instrument.
685    #[pyo3(name = "unsubscribe_mark_prices")]
686    fn py_unsubscribe_mark_prices<'py>(
687        &self,
688        py: Python<'py>,
689        instrument_id: InstrumentId,
690    ) -> PyResult<Bound<'py, PyAny>> {
691        let client = self.clone();
692
693        pyo3_async_runtimes::tokio::future_into_py(py, async move {
694            client
695                .unsubscribe_mark_prices(instrument_id)
696                .await
697                .map_err(to_pyruntime_err)?;
698            Ok(())
699        })
700    }
701
702    /// Subscribe to index/oracle price updates for an instrument.
703    #[pyo3(name = "subscribe_index_prices")]
704    fn py_subscribe_index_prices<'py>(
705        &self,
706        py: Python<'py>,
707        instrument_id: InstrumentId,
708    ) -> PyResult<Bound<'py, PyAny>> {
709        let client = self.clone();
710
711        pyo3_async_runtimes::tokio::future_into_py(py, async move {
712            client
713                .subscribe_index_prices(instrument_id)
714                .await
715                .map_err(to_pyruntime_err)?;
716            Ok(())
717        })
718    }
719
720    /// Unsubscribe from index/oracle price updates for an instrument.
721    #[pyo3(name = "unsubscribe_index_prices")]
722    fn py_unsubscribe_index_prices<'py>(
723        &self,
724        py: Python<'py>,
725        instrument_id: InstrumentId,
726    ) -> PyResult<Bound<'py, PyAny>> {
727        let client = self.clone();
728
729        pyo3_async_runtimes::tokio::future_into_py(py, async move {
730            client
731                .unsubscribe_index_prices(instrument_id)
732                .await
733                .map_err(to_pyruntime_err)?;
734            Ok(())
735        })
736    }
737
738    /// Subscribe to funding rate updates for an instrument.
739    #[pyo3(name = "subscribe_funding_rates")]
740    fn py_subscribe_funding_rates<'py>(
741        &self,
742        py: Python<'py>,
743        instrument_id: InstrumentId,
744    ) -> PyResult<Bound<'py, PyAny>> {
745        let client = self.clone();
746
747        pyo3_async_runtimes::tokio::future_into_py(py, async move {
748            client
749                .subscribe_funding_rates(instrument_id)
750                .await
751                .map_err(to_pyruntime_err)?;
752            Ok(())
753        })
754    }
755
756    /// Unsubscribe from funding rate updates for an instrument.
757    #[pyo3(name = "unsubscribe_funding_rates")]
758    fn py_unsubscribe_funding_rates<'py>(
759        &self,
760        py: Python<'py>,
761        instrument_id: InstrumentId,
762    ) -> PyResult<Bound<'py, PyAny>> {
763        let client = self.clone();
764
765        pyo3_async_runtimes::tokio::future_into_py(py, async move {
766            client
767                .unsubscribe_funding_rates(instrument_id)
768                .await
769                .map_err(to_pyruntime_err)?;
770            Ok(())
771        })
772    }
773}