Skip to main content

nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use std::time::Duration;
19
20use nautilus_common::live::get_runtime;
21use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::{
23    data::{BarType, Data, OrderBookDeltas_API},
24    enums::{OrderSide, OrderType, TimeInForce},
25    identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
26    orders::OrderAny,
27    python::{
28        data::data_to_pycapsule, instruments::pyobject_to_instrument_any,
29        orders::pyobject_to_order_any,
30    },
31    types::{Price, Quantity},
32};
33use nautilus_network::websocket::TransportBackend;
34use pyo3::{conversion::IntoPyObjectExt, prelude::*};
35
36use crate::{
37    common::enums::HyperliquidEnvironment,
38    http::client::HyperliquidHttpClient,
39    websocket::{
40        HyperliquidWebSocketClient,
41        messages::{ExecutionReport, NautilusWsMessage},
42    },
43};
44
45fn ws_data_to_pyobject(py: Python<'_>, data: Data) -> PyResult<Py<PyAny>> {
46    match data {
47        Data::Custom(custom) => Py::new(py, custom).map(|obj| obj.into_any()),
48        Data::OptionGreeks(greeks) => Py::new(py, greeks).map(|obj| obj.into_any()),
49        other => Ok(data_to_pycapsule(py, other)),
50    }
51}
52
53#[pymethods]
54#[pyo3_stub_gen::derive::gen_stub_pymethods]
55impl HyperliquidWebSocketClient {
56    /// Hyperliquid WebSocket client following the BitMEX pattern.
57    ///
58    /// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
59    /// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
60    #[new]
61    #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
62    fn py_new(
63        url: Option<String>,
64        environment: HyperliquidEnvironment,
65        account_id: Option<String>,
66        proxy_url: Option<String>,
67    ) -> Self {
68        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
69        Self::new(
70            url,
71            environment,
72            account_id,
73            TransportBackend::default(),
74            proxy_url,
75        )
76    }
77
78    /// Returns the URL of this WebSocket client.
79    #[getter]
80    #[pyo3(name = "url")]
81    #[must_use]
82    pub fn py_url(&self) -> String {
83        self.url().to_string()
84    }
85
86    /// Returns true if the WebSocket is actively connected.
87    #[pyo3(name = "is_active")]
88    fn py_is_active(&self) -> bool {
89        self.is_active()
90    }
91
92    #[pyo3(name = "is_closed")]
93    fn py_is_closed(&self) -> bool {
94        !self.is_active()
95    }
96
97    /// Sets the timeout for WebSocket post trading requests.
98    #[pyo3(name = "set_post_timeout")]
99    fn py_set_post_timeout(&mut self, timeout_secs: u64) {
100        self.set_post_timeout(Duration::from_secs(timeout_secs));
101    }
102
103    /// Submit an order through the Hyperliquid WebSocket post API.
104    ///
105    /// The HTTP client supplies signing credentials, builder attribution, and
106    /// cached instrument metadata. The action itself is sent over WebSocket.
107    #[pyo3(name = "submit_order", signature = (
108        signer,
109        instrument_id,
110        client_order_id,
111        order_side,
112        order_type,
113        quantity,
114        time_in_force,
115        price=None,
116        trigger_price=None,
117        post_only=false,
118        reduce_only=false,
119    ))]
120    #[expect(clippy::too_many_arguments)]
121    fn py_submit_order<'py>(
122        &self,
123        py: Python<'py>,
124        signer: &HyperliquidHttpClient,
125        instrument_id: InstrumentId,
126        client_order_id: ClientOrderId,
127        order_side: OrderSide,
128        order_type: OrderType,
129        quantity: Quantity,
130        time_in_force: TimeInForce,
131        price: Option<Price>,
132        trigger_price: Option<Price>,
133        post_only: bool,
134        reduce_only: bool,
135    ) -> PyResult<Bound<'py, PyAny>> {
136        let client = self.clone();
137        let signer = signer.clone();
138
139        pyo3_async_runtimes::tokio::future_into_py(py, async move {
140            client
141                .submit_order(
142                    &signer,
143                    instrument_id,
144                    client_order_id,
145                    order_side,
146                    order_type,
147                    quantity,
148                    time_in_force,
149                    price,
150                    trigger_price,
151                    post_only,
152                    reduce_only,
153                )
154                .await
155                .map_err(to_pyvalue_err)?;
156            Ok(())
157        })
158    }
159
160    /// Submit multiple orders through the Hyperliquid WebSocket post API.
161    #[pyo3(name = "submit_orders")]
162    fn py_submit_orders<'py>(
163        &self,
164        py: Python<'py>,
165        signer: &HyperliquidHttpClient,
166        orders: Vec<Py<PyAny>>,
167    ) -> PyResult<Bound<'py, PyAny>> {
168        let client = self.clone();
169        let signer = signer.clone();
170
171        pyo3_async_runtimes::tokio::future_into_py(py, async move {
172            let order_anys: Vec<OrderAny> = Python::attach(|py| {
173                orders
174                    .into_iter()
175                    .map(|order| pyobject_to_order_any(py, order))
176                    .collect::<PyResult<Vec<_>>>()
177                    .map_err(to_pyvalue_err)
178            })?;
179            let order_refs: Vec<&OrderAny> = order_anys.iter().collect();
180
181            client
182                .submit_orders(&signer, &order_refs)
183                .await
184                .map_err(to_pyvalue_err)?;
185            Ok(())
186        })
187    }
188
189    /// Cancel an order through the Hyperliquid WebSocket post API.
190    #[pyo3(name = "cancel_order", signature = (
191        signer,
192        instrument_id,
193        client_order_id=None,
194        venue_order_id=None,
195    ))]
196    fn py_cancel_order<'py>(
197        &self,
198        py: Python<'py>,
199        signer: &HyperliquidHttpClient,
200        instrument_id: InstrumentId,
201        client_order_id: Option<ClientOrderId>,
202        venue_order_id: Option<VenueOrderId>,
203    ) -> PyResult<Bound<'py, PyAny>> {
204        let client = self.clone();
205        let signer = signer.clone();
206
207        pyo3_async_runtimes::tokio::future_into_py(py, async move {
208            client
209                .cancel_order(&signer, instrument_id, client_order_id, venue_order_id)
210                .await
211                .map_err(to_pyvalue_err)?;
212            Ok(())
213        })
214    }
215
216    /// Cancel multiple orders through one Hyperliquid WebSocket post action.
217    #[pyo3(name = "cancel_orders")]
218    fn py_cancel_orders<'py>(
219        &self,
220        py: Python<'py>,
221        signer: &HyperliquidHttpClient,
222        cancels: Vec<(InstrumentId, ClientOrderId, Option<VenueOrderId>)>,
223    ) -> PyResult<Bound<'py, PyAny>> {
224        let client = self.clone();
225        let signer = signer.clone();
226
227        pyo3_async_runtimes::tokio::future_into_py(py, async move {
228            client
229                .cancel_orders(&signer, &cancels)
230                .await
231                .map_err(to_pyvalue_err)
232        })
233    }
234
235    /// Modify an order through the Hyperliquid WebSocket post API.
236    #[pyo3(name = "modify_order")]
237    #[expect(clippy::too_many_arguments)]
238    fn py_modify_order<'py>(
239        &self,
240        py: Python<'py>,
241        signer: &HyperliquidHttpClient,
242        instrument_id: InstrumentId,
243        venue_order_id: VenueOrderId,
244        order_side: OrderSide,
245        order_type: OrderType,
246        price: Price,
247        quantity: Quantity,
248        trigger_price: Option<Price>,
249        reduce_only: bool,
250        post_only: bool,
251        time_in_force: TimeInForce,
252        client_order_id: Option<ClientOrderId>,
253    ) -> PyResult<Bound<'py, PyAny>> {
254        let client = self.clone();
255        let signer = signer.clone();
256
257        pyo3_async_runtimes::tokio::future_into_py(py, async move {
258            client
259                .modify_order(
260                    &signer,
261                    instrument_id,
262                    venue_order_id,
263                    order_side,
264                    order_type,
265                    price,
266                    quantity,
267                    trigger_price,
268                    reduce_only,
269                    post_only,
270                    time_in_force,
271                    client_order_id,
272                )
273                .await
274                .map_err(to_pyvalue_err)?;
275            Ok(())
276        })
277    }
278
279    /// Caches spot fill coin mappings for instrument lookup.
280    ///
281    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
282    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
283    /// This mapping allows the handler to look up instruments from spot fills.
284    #[pyo3(name = "cache_spot_fill_coins")]
285    fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
286        let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
287            .into_iter()
288            .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
289            .collect();
290        self.cache_spot_fill_coins(ahash_mapping);
291    }
292
293    /// Cache the ordered instrument IDs required to normalize `allDexsAssetCtxs`.
294    #[pyo3(name = "cache_all_dex_asset_ctxs_instrument_ids")]
295    fn py_cache_all_dex_asset_ctxs_instrument_ids(
296        &self,
297        mapping: std::collections::HashMap<String, Vec<Option<InstrumentId>>>,
298    ) {
299        let ahash_mapping: ahash::AHashMap<ustr::Ustr, Vec<Option<InstrumentId>>> = mapping
300            .into_iter()
301            .map(|(dex, instrument_ids)| (ustr::Ustr::from(&dex), instrument_ids))
302            .collect();
303        self.cache_all_dex_asset_ctxs_instrument_ids(ahash_mapping);
304    }
305
306    /// Caches a venue CLOID to client_order_id mapping for order/fill resolution.
307    ///
308    /// This mapping allows WebSocket order status and fill reports to be resolved back to
309    /// the original client_order_id.
310    ///
311    /// This writes directly to a shared cache that the handler reads from, avoiding any
312    /// race conditions between caching and WebSocket message processing.
313    #[pyo3(name = "cache_cloid_mapping")]
314    fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
315        self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
316    }
317
318    /// Removes a cloid mapping from the cache.
319    ///
320    /// Called on terminal order state. The cache is FIFO-bounded so missed
321    /// removals self-evict (see GH-3972 cancel-replace drain).
322    #[pyo3(name = "remove_cloid_mapping")]
323    fn py_remove_cloid_mapping(&self, cloid: &str) {
324        self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
325    }
326
327    /// Clears all cloid mappings from the cache.
328    ///
329    /// Useful for cleanup during reconnection or shutdown.
330    #[pyo3(name = "clear_cloid_cache")]
331    fn py_clear_cloid_cache(&self) {
332        self.clear_cloid_cache();
333    }
334
335    /// Returns the number of cloid mappings in the cache.
336    #[pyo3(name = "cloid_cache_len")]
337    fn py_cloid_cache_len(&self) -> usize {
338        self.cloid_cache_len()
339    }
340
341    /// Looks up a client_order_id by its venue CLOID.
342    ///
343    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
344    #[pyo3(name = "get_cloid_mapping")]
345    fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
346        self.get_cloid_mapping(&ustr::Ustr::from(cloid))
347    }
348
349    /// Establishes WebSocket connection and spawns the message handler.
350    #[pyo3(name = "connect")]
351    #[expect(clippy::needless_pass_by_value)]
352    fn py_connect<'py>(
353        &self,
354        py: Python<'py>,
355        loop_: Py<PyAny>,
356        instruments: Vec<Py<PyAny>>,
357        callback: Py<PyAny>,
358    ) -> PyResult<Bound<'py, PyAny>> {
359        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
360
361        for inst in instruments {
362            let inst_any = pyobject_to_instrument_any(py, inst)?;
363            self.cache_instrument(inst_any);
364        }
365
366        let mut client = self.clone();
367
368        pyo3_async_runtimes::tokio::future_into_py(py, async move {
369            client.connect().await.map_err(to_pyruntime_err)?;
370
371            get_runtime().spawn(async move {
372                loop {
373                    let event = client.next_event().await;
374
375                    match event {
376                        Some(msg) => {
377                            log::trace!("Received WebSocket message: {msg:?}");
378
379                            match msg {
380                                NautilusWsMessage::Trades(trade_ticks) => {
381                                    Python::attach(|py| {
382                                        for tick in trade_ticks {
383                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
384                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
385                                        }
386                                    });
387                                }
388                                NautilusWsMessage::Quote(quote_tick) => {
389                                    Python::attach(|py| {
390                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
391                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
392                                    });
393                                }
394                                NautilusWsMessage::Deltas(deltas) => {
395                                    Python::attach(|py| {
396                                        let py_obj = data_to_pycapsule(
397                                            py,
398                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
399                                        );
400                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
401                                    });
402                                }
403                                NautilusWsMessage::Depth10(depth) => {
404                                    Python::attach(|py| {
405                                        let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
406                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
407                                    });
408                                }
409                                NautilusWsMessage::Candle(bar) => {
410                                    Python::attach(|py| {
411                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
412                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
413                                    });
414                                }
415                                NautilusWsMessage::MarkPrice(mark_price) => {
416                                    Python::attach(|py| {
417                                        let py_obj = data_to_pycapsule(
418                                            py,
419                                            Data::MarkPriceUpdate(mark_price),
420                                        );
421                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
422                                    });
423                                }
424                                NautilusWsMessage::IndexPrice(index_price) => {
425                                    Python::attach(|py| {
426                                        let py_obj = data_to_pycapsule(
427                                            py,
428                                            Data::IndexPriceUpdate(index_price),
429                                        );
430                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
431                                    });
432                                }
433                                NautilusWsMessage::FundingRate(funding_rate) => {
434                                    Python::attach(|py| {
435                                        if let Ok(py_obj) = funding_rate.into_py_any(py) {
436                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
437                                        }
438                                    });
439                                }
440                                NautilusWsMessage::CustomData(data) => {
441                                    Python::attach(|py| match ws_data_to_pyobject(py, data) {
442                                        Ok(py_obj) => {
443                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
444                                        }
445                                        Err(e) => {
446                                            log::error!(
447                                                "Error converting CustomData to Python object: {e}"
448                                            );
449                                        }
450                                    });
451                                }
452                                NautilusWsMessage::ExecutionReports(reports) => {
453                                    Python::attach(|py| {
454                                        for report in reports {
455                                            match report {
456                                                ExecutionReport::Order(order_report) => {
457                                                    log::debug!(
458                                                        "Forwarding order status report: order_id={}, status={:?}",
459                                                        order_report.venue_order_id,
460                                                        order_report.order_status
461                                                    );
462
463                                                    match Py::new(py, order_report) {
464                                                        Ok(py_obj) => {
465                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
466                                                        }
467                                                        Err(e) => {
468                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
469                                                        }
470                                                    }
471                                                }
472                                                ExecutionReport::Fill(fill_report) => {
473                                                    log::debug!(
474                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
475                                                        fill_report.trade_id,
476                                                        fill_report.order_side,
477                                                        fill_report.last_qty,
478                                                        fill_report.last_px
479                                                    );
480
481                                                    match Py::new(py, fill_report) {
482                                                        Ok(py_obj) => {
483                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
484                                                        }
485                                                        Err(e) => {
486                                                            log::error!("Error converting FillReport to Python: {e}");
487                                                        }
488                                                    }
489                                                }
490                                            }
491                                        }
492                                    });
493                                }
494                                _ => {
495                                    log::debug!("Unhandled message type: {msg:?}");
496                                }
497                            }
498                        }
499                        None => {
500                            log::debug!("WebSocket connection closed");
501                            break;
502                        }
503                    }
504                }
505            });
506
507            Ok(())
508        })
509    }
510
511    #[pyo3(name = "wait_until_active")]
512    fn py_wait_until_active<'py>(
513        &self,
514        py: Python<'py>,
515        timeout_secs: f64,
516    ) -> PyResult<Bound<'py, PyAny>> {
517        let client = self.clone();
518
519        pyo3_async_runtimes::tokio::future_into_py(py, async move {
520            let start = std::time::Instant::now();
521
522            loop {
523                if client.is_active() {
524                    return Ok(());
525                }
526
527                if start.elapsed().as_secs_f64() >= timeout_secs {
528                    return Err(to_pyruntime_err(format!(
529                        "WebSocket connection did not become active within {timeout_secs} seconds"
530                    )));
531                }
532
533                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
534            }
535        })
536    }
537
538    #[pyo3(name = "close")]
539    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540        let mut client = self.clone();
541
542        pyo3_async_runtimes::tokio::future_into_py(py, async move {
543            if let Err(e) = client.disconnect().await {
544                log::error!("Error on close: {e}");
545            }
546            Ok(())
547        })
548    }
549
550    /// Subscribe to trades for an instrument.
551    #[pyo3(name = "subscribe_trades")]
552    fn py_subscribe_trades<'py>(
553        &self,
554        py: Python<'py>,
555        instrument_id: InstrumentId,
556    ) -> PyResult<Bound<'py, PyAny>> {
557        let client = self.clone();
558
559        pyo3_async_runtimes::tokio::future_into_py(py, async move {
560            client
561                .subscribe_trades(instrument_id)
562                .await
563                .map_err(to_pyruntime_err)?;
564            Ok(())
565        })
566    }
567
568    /// Unsubscribe from trades for an instrument.
569    #[pyo3(name = "unsubscribe_trades")]
570    fn py_unsubscribe_trades<'py>(
571        &self,
572        py: Python<'py>,
573        instrument_id: InstrumentId,
574    ) -> PyResult<Bound<'py, PyAny>> {
575        let client = self.clone();
576
577        pyo3_async_runtimes::tokio::future_into_py(py, async move {
578            client
579                .unsubscribe_trades(instrument_id)
580                .await
581                .map_err(to_pyruntime_err)?;
582            Ok(())
583        })
584    }
585
586    /// Subscribe to all mid prices across markets.
587    #[pyo3(name = "subscribe_all_mids")]
588    fn py_subscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
589        let client = self.clone();
590
591        pyo3_async_runtimes::tokio::future_into_py(py, async move {
592            client
593                .subscribe_all_mids()
594                .await
595                .map_err(to_pyruntime_err)?;
596            Ok(())
597        })
598    }
599
600    /// Subscribe to aggregate asset contexts across all perp dexes.
601    #[pyo3(name = "subscribe_all_dexs_asset_ctxs")]
602    fn py_subscribe_all_dexs_asset_ctxs<'py>(
603        &self,
604        py: Python<'py>,
605    ) -> PyResult<Bound<'py, PyAny>> {
606        let client = self.clone();
607
608        pyo3_async_runtimes::tokio::future_into_py(py, async move {
609            client
610                .subscribe_all_dexs_asset_ctxs()
611                .await
612                .map_err(to_pyruntime_err)?;
613            Ok(())
614        })
615    }
616
617    /// Subscribe to all mid prices across markets, optionally scoped to a specific dex.
618    #[pyo3(name = "subscribe_all_mids_with_dex")]
619    fn py_subscribe_all_mids_with_dex<'py>(
620        &self,
621        py: Python<'py>,
622        dex: Option<String>,
623    ) -> PyResult<Bound<'py, PyAny>> {
624        let client = self.clone();
625
626        pyo3_async_runtimes::tokio::future_into_py(py, async move {
627            client
628                .subscribe_all_mids_with_dex(dex.as_deref())
629                .await
630                .map_err(to_pyruntime_err)?;
631            Ok(())
632        })
633    }
634
635    /// Subscribe to L2 order book for an instrument.
636    #[pyo3(name = "subscribe_book")]
637    fn py_subscribe_book<'py>(
638        &self,
639        py: Python<'py>,
640        instrument_id: InstrumentId,
641    ) -> PyResult<Bound<'py, PyAny>> {
642        let client = self.clone();
643
644        pyo3_async_runtimes::tokio::future_into_py(py, async move {
645            client
646                .subscribe_book(instrument_id)
647                .await
648                .map_err(to_pyruntime_err)?;
649            Ok(())
650        })
651    }
652
653    /// Unsubscribe from L2 order book for an instrument.
654    #[pyo3(name = "unsubscribe_book")]
655    fn py_unsubscribe_book<'py>(
656        &self,
657        py: Python<'py>,
658        instrument_id: InstrumentId,
659    ) -> PyResult<Bound<'py, PyAny>> {
660        let client = self.clone();
661
662        pyo3_async_runtimes::tokio::future_into_py(py, async move {
663            client
664                .unsubscribe_book(instrument_id)
665                .await
666                .map_err(to_pyruntime_err)?;
667            Ok(())
668        })
669    }
670
671    #[pyo3(name = "subscribe_book_deltas")]
672    fn py_subscribe_book_deltas<'py>(
673        &self,
674        py: Python<'py>,
675        instrument_id: InstrumentId,
676        _book_type: u8,
677        _depth: u64,
678    ) -> PyResult<Bound<'py, PyAny>> {
679        let client = self.clone();
680
681        pyo3_async_runtimes::tokio::future_into_py(py, async move {
682            client
683                .subscribe_book(instrument_id)
684                .await
685                .map_err(to_pyruntime_err)?;
686            Ok(())
687        })
688    }
689
690    #[pyo3(name = "unsubscribe_book_deltas")]
691    fn py_unsubscribe_book_deltas<'py>(
692        &self,
693        py: Python<'py>,
694        instrument_id: InstrumentId,
695    ) -> PyResult<Bound<'py, PyAny>> {
696        let client = self.clone();
697
698        pyo3_async_runtimes::tokio::future_into_py(py, async move {
699            client
700                .unsubscribe_book(instrument_id)
701                .await
702                .map_err(to_pyruntime_err)?;
703            Ok(())
704        })
705    }
706
707    #[pyo3(name = "subscribe_book_snapshots")]
708    fn py_subscribe_book_snapshots<'py>(
709        &self,
710        py: Python<'py>,
711        instrument_id: InstrumentId,
712        _book_type: u8,
713        _depth: u64,
714    ) -> PyResult<Bound<'py, PyAny>> {
715        let client = self.clone();
716
717        pyo3_async_runtimes::tokio::future_into_py(py, async move {
718            client
719                .subscribe_book(instrument_id)
720                .await
721                .map_err(to_pyruntime_err)?;
722            Ok(())
723        })
724    }
725
726    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
727    #[pyo3(name = "subscribe_quotes")]
728    fn py_subscribe_quotes<'py>(
729        &self,
730        py: Python<'py>,
731        instrument_id: InstrumentId,
732    ) -> PyResult<Bound<'py, PyAny>> {
733        let client = self.clone();
734
735        pyo3_async_runtimes::tokio::future_into_py(py, async move {
736            client
737                .subscribe_quotes(instrument_id)
738                .await
739                .map_err(to_pyruntime_err)?;
740            Ok(())
741        })
742    }
743
744    /// Unsubscribe from quote ticks for an instrument.
745    #[pyo3(name = "unsubscribe_quotes")]
746    fn py_unsubscribe_quotes<'py>(
747        &self,
748        py: Python<'py>,
749        instrument_id: InstrumentId,
750    ) -> PyResult<Bound<'py, PyAny>> {
751        let client = self.clone();
752
753        pyo3_async_runtimes::tokio::future_into_py(py, async move {
754            client
755                .unsubscribe_quotes(instrument_id)
756                .await
757                .map_err(to_pyruntime_err)?;
758            Ok(())
759        })
760    }
761
762    /// Subscribe to candle/bar data for a specific coin and interval.
763    #[pyo3(name = "subscribe_bars")]
764    fn py_subscribe_bars<'py>(
765        &self,
766        py: Python<'py>,
767        bar_type: BarType,
768    ) -> PyResult<Bound<'py, PyAny>> {
769        let client = self.clone();
770
771        pyo3_async_runtimes::tokio::future_into_py(py, async move {
772            client
773                .subscribe_bars(bar_type)
774                .await
775                .map_err(to_pyruntime_err)?;
776            Ok(())
777        })
778    }
779
780    /// Unsubscribe from candle/bar data.
781    #[pyo3(name = "unsubscribe_bars")]
782    fn py_unsubscribe_bars<'py>(
783        &self,
784        py: Python<'py>,
785        bar_type: BarType,
786    ) -> PyResult<Bound<'py, PyAny>> {
787        let client = self.clone();
788
789        pyo3_async_runtimes::tokio::future_into_py(py, async move {
790            client
791                .unsubscribe_bars(bar_type)
792                .await
793                .map_err(to_pyruntime_err)?;
794            Ok(())
795        })
796    }
797
798    /// Unsubscribe from all mid prices across markets.
799    #[pyo3(name = "unsubscribe_all_mids")]
800    fn py_unsubscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
801        let client = self.clone();
802
803        pyo3_async_runtimes::tokio::future_into_py(py, async move {
804            client
805                .unsubscribe_all_mids()
806                .await
807                .map_err(to_pyruntime_err)?;
808            Ok(())
809        })
810    }
811
812    /// Unsubscribe from aggregate asset contexts across all perp dexes.
813    #[pyo3(name = "unsubscribe_all_dexs_asset_ctxs")]
814    fn py_unsubscribe_all_dexs_asset_ctxs<'py>(
815        &self,
816        py: Python<'py>,
817    ) -> PyResult<Bound<'py, PyAny>> {
818        let client = self.clone();
819
820        pyo3_async_runtimes::tokio::future_into_py(py, async move {
821            client
822                .unsubscribe_all_dexs_asset_ctxs()
823                .await
824                .map_err(to_pyruntime_err)?;
825            Ok(())
826        })
827    }
828
829    /// Unsubscribe from all mid prices across markets, optionally scoped to a specific dex.
830    #[pyo3(name = "unsubscribe_all_mids_with_dex")]
831    fn py_unsubscribe_all_mids_with_dex<'py>(
832        &self,
833        py: Python<'py>,
834        dex: Option<String>,
835    ) -> PyResult<Bound<'py, PyAny>> {
836        let client = self.clone();
837
838        pyo3_async_runtimes::tokio::future_into_py(py, async move {
839            client
840                .unsubscribe_all_mids_with_dex(dex.as_deref())
841                .await
842                .map_err(to_pyruntime_err)?;
843            Ok(())
844        })
845    }
846
847    /// Subscribe to order updates for a specific user address.
848    #[pyo3(name = "subscribe_order_updates")]
849    fn py_subscribe_order_updates<'py>(
850        &self,
851        py: Python<'py>,
852        user: String,
853    ) -> PyResult<Bound<'py, PyAny>> {
854        let client = self.clone();
855
856        pyo3_async_runtimes::tokio::future_into_py(py, async move {
857            client
858                .subscribe_order_updates(&user)
859                .await
860                .map_err(to_pyruntime_err)?;
861            Ok(())
862        })
863    }
864
865    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
866    #[pyo3(name = "subscribe_user_events")]
867    fn py_subscribe_user_events<'py>(
868        &self,
869        py: Python<'py>,
870        user: String,
871    ) -> PyResult<Bound<'py, PyAny>> {
872        let client = self.clone();
873
874        pyo3_async_runtimes::tokio::future_into_py(py, async move {
875            client
876                .subscribe_user_events(&user)
877                .await
878                .map_err(to_pyruntime_err)?;
879            Ok(())
880        })
881    }
882
883    /// Subscribe to user fills for a specific user address.
884    ///
885    /// Note: This channel is redundant with `userEvents` which already includes fills.
886    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
887    #[pyo3(name = "subscribe_user_fills")]
888    fn py_subscribe_user_fills<'py>(
889        &self,
890        py: Python<'py>,
891        user: String,
892    ) -> PyResult<Bound<'py, PyAny>> {
893        let client = self.clone();
894
895        pyo3_async_runtimes::tokio::future_into_py(py, async move {
896            client
897                .subscribe_user_fills(&user)
898                .await
899                .map_err(to_pyruntime_err)?;
900            Ok(())
901        })
902    }
903
904    /// Subscribe to mark price updates for an instrument.
905    #[pyo3(name = "subscribe_mark_prices")]
906    fn py_subscribe_mark_prices<'py>(
907        &self,
908        py: Python<'py>,
909        instrument_id: InstrumentId,
910    ) -> PyResult<Bound<'py, PyAny>> {
911        let client = self.clone();
912
913        pyo3_async_runtimes::tokio::future_into_py(py, async move {
914            client
915                .subscribe_mark_prices(instrument_id)
916                .await
917                .map_err(to_pyruntime_err)?;
918            Ok(())
919        })
920    }
921
922    /// Unsubscribe from mark price updates for an instrument.
923    #[pyo3(name = "unsubscribe_mark_prices")]
924    fn py_unsubscribe_mark_prices<'py>(
925        &self,
926        py: Python<'py>,
927        instrument_id: InstrumentId,
928    ) -> PyResult<Bound<'py, PyAny>> {
929        let client = self.clone();
930
931        pyo3_async_runtimes::tokio::future_into_py(py, async move {
932            client
933                .unsubscribe_mark_prices(instrument_id)
934                .await
935                .map_err(to_pyruntime_err)?;
936            Ok(())
937        })
938    }
939
940    /// Subscribe to index/oracle price updates for an instrument.
941    #[pyo3(name = "subscribe_index_prices")]
942    fn py_subscribe_index_prices<'py>(
943        &self,
944        py: Python<'py>,
945        instrument_id: InstrumentId,
946    ) -> PyResult<Bound<'py, PyAny>> {
947        let client = self.clone();
948
949        pyo3_async_runtimes::tokio::future_into_py(py, async move {
950            client
951                .subscribe_index_prices(instrument_id)
952                .await
953                .map_err(to_pyruntime_err)?;
954            Ok(())
955        })
956    }
957
958    /// Unsubscribe from index/oracle price updates for an instrument.
959    #[pyo3(name = "unsubscribe_index_prices")]
960    fn py_unsubscribe_index_prices<'py>(
961        &self,
962        py: Python<'py>,
963        instrument_id: InstrumentId,
964    ) -> PyResult<Bound<'py, PyAny>> {
965        let client = self.clone();
966
967        pyo3_async_runtimes::tokio::future_into_py(py, async move {
968            client
969                .unsubscribe_index_prices(instrument_id)
970                .await
971                .map_err(to_pyruntime_err)?;
972            Ok(())
973        })
974    }
975
976    /// Subscribe to funding rate updates for an instrument.
977    #[pyo3(name = "subscribe_funding_rates")]
978    fn py_subscribe_funding_rates<'py>(
979        &self,
980        py: Python<'py>,
981        instrument_id: InstrumentId,
982    ) -> PyResult<Bound<'py, PyAny>> {
983        let client = self.clone();
984
985        pyo3_async_runtimes::tokio::future_into_py(py, async move {
986            client
987                .subscribe_funding_rates(instrument_id)
988                .await
989                .map_err(to_pyruntime_err)?;
990            Ok(())
991        })
992    }
993
994    /// Subscribe to open interest updates for an instrument.
995    #[pyo3(name = "subscribe_open_interest")]
996    fn py_subscribe_open_interest<'py>(
997        &self,
998        py: Python<'py>,
999        instrument_id: InstrumentId,
1000    ) -> PyResult<Bound<'py, PyAny>> {
1001        let client = self.clone();
1002
1003        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1004            client
1005                .subscribe_open_interest(instrument_id)
1006                .await
1007                .map_err(to_pyruntime_err)?;
1008            Ok(())
1009        })
1010    }
1011
1012    /// Unsubscribe from funding rate updates for an instrument.
1013    #[pyo3(name = "unsubscribe_funding_rates")]
1014    fn py_unsubscribe_funding_rates<'py>(
1015        &self,
1016        py: Python<'py>,
1017        instrument_id: InstrumentId,
1018    ) -> PyResult<Bound<'py, PyAny>> {
1019        let client = self.clone();
1020
1021        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1022            client
1023                .unsubscribe_funding_rates(instrument_id)
1024                .await
1025                .map_err(to_pyruntime_err)?;
1026            Ok(())
1027        })
1028    }
1029
1030    /// Unsubscribe from open interest updates for an instrument.
1031    #[pyo3(name = "unsubscribe_open_interest")]
1032    fn py_unsubscribe_open_interest<'py>(
1033        &self,
1034        py: Python<'py>,
1035        instrument_id: InstrumentId,
1036    ) -> PyResult<Bound<'py, PyAny>> {
1037        let client = self.clone();
1038
1039        pyo3_async_runtimes::tokio::future_into_py(py, async move {
1040            client
1041                .unsubscribe_open_interest(instrument_id)
1042                .await
1043                .map_err(to_pyruntime_err)?;
1044            Ok(())
1045        })
1046    }
1047}