1use std::time::Duration;
19
20use nautilus_common::live::get_runtime;
21use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err, to_pyvalue_err};
22use nautilus_model::{
23 data::{BarType, Data, OrderBookDeltas_API},
24 enums::{OrderSide, OrderType, TimeInForce},
25 identifiers::{AccountId, ClientOrderId, InstrumentId, VenueOrderId},
26 orders::OrderAny,
27 python::{
28 data::data_to_pycapsule, instruments::pyobject_to_instrument_any,
29 orders::pyobject_to_order_any,
30 },
31 types::{Price, Quantity},
32};
33use nautilus_network::websocket::TransportBackend;
34use pyo3::{conversion::IntoPyObjectExt, prelude::*};
35
36use crate::{
37 common::enums::HyperliquidEnvironment,
38 http::client::HyperliquidHttpClient,
39 websocket::{
40 HyperliquidWebSocketClient,
41 messages::{ExecutionReport, NautilusWsMessage},
42 },
43};
44
45fn ws_data_to_pyobject(py: Python<'_>, data: Data) -> PyResult<Py<PyAny>> {
46 match data {
47 Data::Custom(custom) => Py::new(py, custom).map(|obj| obj.into_any()),
48 Data::OptionGreeks(greeks) => Py::new(py, greeks).map(|obj| obj.into_any()),
49 other => Ok(data_to_pycapsule(py, other)),
50 }
51}
52
53#[pymethods]
54#[pyo3_stub_gen::derive::gen_stub_pymethods]
55impl HyperliquidWebSocketClient {
56 #[new]
61 #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
62 fn py_new(
63 url: Option<String>,
64 environment: HyperliquidEnvironment,
65 account_id: Option<String>,
66 proxy_url: Option<String>,
67 ) -> Self {
68 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
69 Self::new(
70 url,
71 environment,
72 account_id,
73 TransportBackend::default(),
74 proxy_url,
75 )
76 }
77
78 #[getter]
80 #[pyo3(name = "url")]
81 #[must_use]
82 pub fn py_url(&self) -> String {
83 self.url().to_string()
84 }
85
86 #[pyo3(name = "is_active")]
88 fn py_is_active(&self) -> bool {
89 self.is_active()
90 }
91
92 #[pyo3(name = "is_closed")]
93 fn py_is_closed(&self) -> bool {
94 !self.is_active()
95 }
96
97 #[pyo3(name = "set_post_timeout")]
99 fn py_set_post_timeout(&mut self, timeout_secs: u64) {
100 self.set_post_timeout(Duration::from_secs(timeout_secs));
101 }
102
103 #[pyo3(name = "submit_order", signature = (
108 signer,
109 instrument_id,
110 client_order_id,
111 order_side,
112 order_type,
113 quantity,
114 time_in_force,
115 price=None,
116 trigger_price=None,
117 post_only=false,
118 reduce_only=false,
119 ))]
120 #[expect(clippy::too_many_arguments)]
121 fn py_submit_order<'py>(
122 &self,
123 py: Python<'py>,
124 signer: &HyperliquidHttpClient,
125 instrument_id: InstrumentId,
126 client_order_id: ClientOrderId,
127 order_side: OrderSide,
128 order_type: OrderType,
129 quantity: Quantity,
130 time_in_force: TimeInForce,
131 price: Option<Price>,
132 trigger_price: Option<Price>,
133 post_only: bool,
134 reduce_only: bool,
135 ) -> PyResult<Bound<'py, PyAny>> {
136 let client = self.clone();
137 let signer = signer.clone();
138
139 pyo3_async_runtimes::tokio::future_into_py(py, async move {
140 client
141 .submit_order(
142 &signer,
143 instrument_id,
144 client_order_id,
145 order_side,
146 order_type,
147 quantity,
148 time_in_force,
149 price,
150 trigger_price,
151 post_only,
152 reduce_only,
153 )
154 .await
155 .map_err(to_pyvalue_err)?;
156 Ok(())
157 })
158 }
159
160 #[pyo3(name = "submit_orders")]
162 fn py_submit_orders<'py>(
163 &self,
164 py: Python<'py>,
165 signer: &HyperliquidHttpClient,
166 orders: Vec<Py<PyAny>>,
167 ) -> PyResult<Bound<'py, PyAny>> {
168 let client = self.clone();
169 let signer = signer.clone();
170
171 pyo3_async_runtimes::tokio::future_into_py(py, async move {
172 let order_anys: Vec<OrderAny> = Python::attach(|py| {
173 orders
174 .into_iter()
175 .map(|order| pyobject_to_order_any(py, order))
176 .collect::<PyResult<Vec<_>>>()
177 .map_err(to_pyvalue_err)
178 })?;
179 let order_refs: Vec<&OrderAny> = order_anys.iter().collect();
180
181 client
182 .submit_orders(&signer, &order_refs)
183 .await
184 .map_err(to_pyvalue_err)?;
185 Ok(())
186 })
187 }
188
189 #[pyo3(name = "cancel_order", signature = (
191 signer,
192 instrument_id,
193 client_order_id=None,
194 venue_order_id=None,
195 ))]
196 fn py_cancel_order<'py>(
197 &self,
198 py: Python<'py>,
199 signer: &HyperliquidHttpClient,
200 instrument_id: InstrumentId,
201 client_order_id: Option<ClientOrderId>,
202 venue_order_id: Option<VenueOrderId>,
203 ) -> PyResult<Bound<'py, PyAny>> {
204 let client = self.clone();
205 let signer = signer.clone();
206
207 pyo3_async_runtimes::tokio::future_into_py(py, async move {
208 client
209 .cancel_order(&signer, instrument_id, client_order_id, venue_order_id)
210 .await
211 .map_err(to_pyvalue_err)?;
212 Ok(())
213 })
214 }
215
216 #[pyo3(name = "cancel_orders")]
218 fn py_cancel_orders<'py>(
219 &self,
220 py: Python<'py>,
221 signer: &HyperliquidHttpClient,
222 cancels: Vec<(InstrumentId, ClientOrderId, Option<VenueOrderId>)>,
223 ) -> PyResult<Bound<'py, PyAny>> {
224 let client = self.clone();
225 let signer = signer.clone();
226
227 pyo3_async_runtimes::tokio::future_into_py(py, async move {
228 client
229 .cancel_orders(&signer, &cancels)
230 .await
231 .map_err(to_pyvalue_err)
232 })
233 }
234
235 #[pyo3(name = "modify_order")]
237 #[expect(clippy::too_many_arguments)]
238 fn py_modify_order<'py>(
239 &self,
240 py: Python<'py>,
241 signer: &HyperliquidHttpClient,
242 instrument_id: InstrumentId,
243 venue_order_id: VenueOrderId,
244 order_side: OrderSide,
245 order_type: OrderType,
246 price: Price,
247 quantity: Quantity,
248 trigger_price: Option<Price>,
249 reduce_only: bool,
250 post_only: bool,
251 time_in_force: TimeInForce,
252 client_order_id: Option<ClientOrderId>,
253 ) -> PyResult<Bound<'py, PyAny>> {
254 let client = self.clone();
255 let signer = signer.clone();
256
257 pyo3_async_runtimes::tokio::future_into_py(py, async move {
258 client
259 .modify_order(
260 &signer,
261 instrument_id,
262 venue_order_id,
263 order_side,
264 order_type,
265 price,
266 quantity,
267 trigger_price,
268 reduce_only,
269 post_only,
270 time_in_force,
271 client_order_id,
272 )
273 .await
274 .map_err(to_pyvalue_err)?;
275 Ok(())
276 })
277 }
278
279 #[pyo3(name = "cache_spot_fill_coins")]
285 fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
286 let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
287 .into_iter()
288 .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
289 .collect();
290 self.cache_spot_fill_coins(ahash_mapping);
291 }
292
293 #[pyo3(name = "cache_all_dex_asset_ctxs_instrument_ids")]
295 fn py_cache_all_dex_asset_ctxs_instrument_ids(
296 &self,
297 mapping: std::collections::HashMap<String, Vec<Option<InstrumentId>>>,
298 ) {
299 let ahash_mapping: ahash::AHashMap<ustr::Ustr, Vec<Option<InstrumentId>>> = mapping
300 .into_iter()
301 .map(|(dex, instrument_ids)| (ustr::Ustr::from(&dex), instrument_ids))
302 .collect();
303 self.cache_all_dex_asset_ctxs_instrument_ids(ahash_mapping);
304 }
305
306 #[pyo3(name = "cache_cloid_mapping")]
314 fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
315 self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
316 }
317
318 #[pyo3(name = "remove_cloid_mapping")]
323 fn py_remove_cloid_mapping(&self, cloid: &str) {
324 self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
325 }
326
327 #[pyo3(name = "clear_cloid_cache")]
331 fn py_clear_cloid_cache(&self) {
332 self.clear_cloid_cache();
333 }
334
335 #[pyo3(name = "cloid_cache_len")]
337 fn py_cloid_cache_len(&self) -> usize {
338 self.cloid_cache_len()
339 }
340
341 #[pyo3(name = "get_cloid_mapping")]
345 fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
346 self.get_cloid_mapping(&ustr::Ustr::from(cloid))
347 }
348
349 #[pyo3(name = "connect")]
351 #[expect(clippy::needless_pass_by_value)]
352 fn py_connect<'py>(
353 &self,
354 py: Python<'py>,
355 loop_: Py<PyAny>,
356 instruments: Vec<Py<PyAny>>,
357 callback: Py<PyAny>,
358 ) -> PyResult<Bound<'py, PyAny>> {
359 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
360
361 for inst in instruments {
362 let inst_any = pyobject_to_instrument_any(py, inst)?;
363 self.cache_instrument(inst_any);
364 }
365
366 let mut client = self.clone();
367
368 pyo3_async_runtimes::tokio::future_into_py(py, async move {
369 client.connect().await.map_err(to_pyruntime_err)?;
370
371 get_runtime().spawn(async move {
372 loop {
373 let event = client.next_event().await;
374
375 match event {
376 Some(msg) => {
377 log::trace!("Received WebSocket message: {msg:?}");
378
379 match msg {
380 NautilusWsMessage::Trades(trade_ticks) => {
381 Python::attach(|py| {
382 for tick in trade_ticks {
383 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
384 call_python_threadsafe(py, &call_soon, &callback, py_obj);
385 }
386 });
387 }
388 NautilusWsMessage::Quote(quote_tick) => {
389 Python::attach(|py| {
390 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
391 call_python_threadsafe(py, &call_soon, &callback, py_obj);
392 });
393 }
394 NautilusWsMessage::Deltas(deltas) => {
395 Python::attach(|py| {
396 let py_obj = data_to_pycapsule(
397 py,
398 Data::Deltas(OrderBookDeltas_API::new(deltas)),
399 );
400 call_python_threadsafe(py, &call_soon, &callback, py_obj);
401 });
402 }
403 NautilusWsMessage::Depth10(depth) => {
404 Python::attach(|py| {
405 let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
406 call_python_threadsafe(py, &call_soon, &callback, py_obj);
407 });
408 }
409 NautilusWsMessage::Candle(bar) => {
410 Python::attach(|py| {
411 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
412 call_python_threadsafe(py, &call_soon, &callback, py_obj);
413 });
414 }
415 NautilusWsMessage::MarkPrice(mark_price) => {
416 Python::attach(|py| {
417 let py_obj = data_to_pycapsule(
418 py,
419 Data::MarkPriceUpdate(mark_price),
420 );
421 call_python_threadsafe(py, &call_soon, &callback, py_obj);
422 });
423 }
424 NautilusWsMessage::IndexPrice(index_price) => {
425 Python::attach(|py| {
426 let py_obj = data_to_pycapsule(
427 py,
428 Data::IndexPriceUpdate(index_price),
429 );
430 call_python_threadsafe(py, &call_soon, &callback, py_obj);
431 });
432 }
433 NautilusWsMessage::FundingRate(funding_rate) => {
434 Python::attach(|py| {
435 if let Ok(py_obj) = funding_rate.into_py_any(py) {
436 call_python_threadsafe(py, &call_soon, &callback, py_obj);
437 }
438 });
439 }
440 NautilusWsMessage::CustomData(data) => {
441 Python::attach(|py| match ws_data_to_pyobject(py, data) {
442 Ok(py_obj) => {
443 call_python_threadsafe(py, &call_soon, &callback, py_obj);
444 }
445 Err(e) => {
446 log::error!(
447 "Error converting CustomData to Python object: {e}"
448 );
449 }
450 });
451 }
452 NautilusWsMessage::ExecutionReports(reports) => {
453 Python::attach(|py| {
454 for report in reports {
455 match report {
456 ExecutionReport::Order(order_report) => {
457 log::debug!(
458 "Forwarding order status report: order_id={}, status={:?}",
459 order_report.venue_order_id,
460 order_report.order_status
461 );
462
463 match Py::new(py, order_report) {
464 Ok(py_obj) => {
465 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
466 }
467 Err(e) => {
468 log::error!("Error converting OrderStatusReport to Python: {e}");
469 }
470 }
471 }
472 ExecutionReport::Fill(fill_report) => {
473 log::debug!(
474 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
475 fill_report.trade_id,
476 fill_report.order_side,
477 fill_report.last_qty,
478 fill_report.last_px
479 );
480
481 match Py::new(py, fill_report) {
482 Ok(py_obj) => {
483 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
484 }
485 Err(e) => {
486 log::error!("Error converting FillReport to Python: {e}");
487 }
488 }
489 }
490 }
491 }
492 });
493 }
494 _ => {
495 log::debug!("Unhandled message type: {msg:?}");
496 }
497 }
498 }
499 None => {
500 log::debug!("WebSocket connection closed");
501 break;
502 }
503 }
504 }
505 });
506
507 Ok(())
508 })
509 }
510
511 #[pyo3(name = "wait_until_active")]
512 fn py_wait_until_active<'py>(
513 &self,
514 py: Python<'py>,
515 timeout_secs: f64,
516 ) -> PyResult<Bound<'py, PyAny>> {
517 let client = self.clone();
518
519 pyo3_async_runtimes::tokio::future_into_py(py, async move {
520 let start = std::time::Instant::now();
521
522 loop {
523 if client.is_active() {
524 return Ok(());
525 }
526
527 if start.elapsed().as_secs_f64() >= timeout_secs {
528 return Err(to_pyruntime_err(format!(
529 "WebSocket connection did not become active within {timeout_secs} seconds"
530 )));
531 }
532
533 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
534 }
535 })
536 }
537
538 #[pyo3(name = "close")]
539 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
540 let mut client = self.clone();
541
542 pyo3_async_runtimes::tokio::future_into_py(py, async move {
543 if let Err(e) = client.disconnect().await {
544 log::error!("Error on close: {e}");
545 }
546 Ok(())
547 })
548 }
549
550 #[pyo3(name = "subscribe_trades")]
552 fn py_subscribe_trades<'py>(
553 &self,
554 py: Python<'py>,
555 instrument_id: InstrumentId,
556 ) -> PyResult<Bound<'py, PyAny>> {
557 let client = self.clone();
558
559 pyo3_async_runtimes::tokio::future_into_py(py, async move {
560 client
561 .subscribe_trades(instrument_id)
562 .await
563 .map_err(to_pyruntime_err)?;
564 Ok(())
565 })
566 }
567
568 #[pyo3(name = "unsubscribe_trades")]
570 fn py_unsubscribe_trades<'py>(
571 &self,
572 py: Python<'py>,
573 instrument_id: InstrumentId,
574 ) -> PyResult<Bound<'py, PyAny>> {
575 let client = self.clone();
576
577 pyo3_async_runtimes::tokio::future_into_py(py, async move {
578 client
579 .unsubscribe_trades(instrument_id)
580 .await
581 .map_err(to_pyruntime_err)?;
582 Ok(())
583 })
584 }
585
586 #[pyo3(name = "subscribe_all_mids")]
588 fn py_subscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
589 let client = self.clone();
590
591 pyo3_async_runtimes::tokio::future_into_py(py, async move {
592 client
593 .subscribe_all_mids()
594 .await
595 .map_err(to_pyruntime_err)?;
596 Ok(())
597 })
598 }
599
600 #[pyo3(name = "subscribe_all_dexs_asset_ctxs")]
602 fn py_subscribe_all_dexs_asset_ctxs<'py>(
603 &self,
604 py: Python<'py>,
605 ) -> PyResult<Bound<'py, PyAny>> {
606 let client = self.clone();
607
608 pyo3_async_runtimes::tokio::future_into_py(py, async move {
609 client
610 .subscribe_all_dexs_asset_ctxs()
611 .await
612 .map_err(to_pyruntime_err)?;
613 Ok(())
614 })
615 }
616
617 #[pyo3(name = "subscribe_all_mids_with_dex")]
619 fn py_subscribe_all_mids_with_dex<'py>(
620 &self,
621 py: Python<'py>,
622 dex: Option<String>,
623 ) -> PyResult<Bound<'py, PyAny>> {
624 let client = self.clone();
625
626 pyo3_async_runtimes::tokio::future_into_py(py, async move {
627 client
628 .subscribe_all_mids_with_dex(dex.as_deref())
629 .await
630 .map_err(to_pyruntime_err)?;
631 Ok(())
632 })
633 }
634
635 #[pyo3(name = "subscribe_book")]
637 fn py_subscribe_book<'py>(
638 &self,
639 py: Python<'py>,
640 instrument_id: InstrumentId,
641 ) -> PyResult<Bound<'py, PyAny>> {
642 let client = self.clone();
643
644 pyo3_async_runtimes::tokio::future_into_py(py, async move {
645 client
646 .subscribe_book(instrument_id)
647 .await
648 .map_err(to_pyruntime_err)?;
649 Ok(())
650 })
651 }
652
653 #[pyo3(name = "unsubscribe_book")]
655 fn py_unsubscribe_book<'py>(
656 &self,
657 py: Python<'py>,
658 instrument_id: InstrumentId,
659 ) -> PyResult<Bound<'py, PyAny>> {
660 let client = self.clone();
661
662 pyo3_async_runtimes::tokio::future_into_py(py, async move {
663 client
664 .unsubscribe_book(instrument_id)
665 .await
666 .map_err(to_pyruntime_err)?;
667 Ok(())
668 })
669 }
670
671 #[pyo3(name = "subscribe_book_deltas")]
672 fn py_subscribe_book_deltas<'py>(
673 &self,
674 py: Python<'py>,
675 instrument_id: InstrumentId,
676 _book_type: u8,
677 _depth: u64,
678 ) -> PyResult<Bound<'py, PyAny>> {
679 let client = self.clone();
680
681 pyo3_async_runtimes::tokio::future_into_py(py, async move {
682 client
683 .subscribe_book(instrument_id)
684 .await
685 .map_err(to_pyruntime_err)?;
686 Ok(())
687 })
688 }
689
690 #[pyo3(name = "unsubscribe_book_deltas")]
691 fn py_unsubscribe_book_deltas<'py>(
692 &self,
693 py: Python<'py>,
694 instrument_id: InstrumentId,
695 ) -> PyResult<Bound<'py, PyAny>> {
696 let client = self.clone();
697
698 pyo3_async_runtimes::tokio::future_into_py(py, async move {
699 client
700 .unsubscribe_book(instrument_id)
701 .await
702 .map_err(to_pyruntime_err)?;
703 Ok(())
704 })
705 }
706
707 #[pyo3(name = "subscribe_book_snapshots")]
708 fn py_subscribe_book_snapshots<'py>(
709 &self,
710 py: Python<'py>,
711 instrument_id: InstrumentId,
712 _book_type: u8,
713 _depth: u64,
714 ) -> PyResult<Bound<'py, PyAny>> {
715 let client = self.clone();
716
717 pyo3_async_runtimes::tokio::future_into_py(py, async move {
718 client
719 .subscribe_book(instrument_id)
720 .await
721 .map_err(to_pyruntime_err)?;
722 Ok(())
723 })
724 }
725
726 #[pyo3(name = "subscribe_quotes")]
728 fn py_subscribe_quotes<'py>(
729 &self,
730 py: Python<'py>,
731 instrument_id: InstrumentId,
732 ) -> PyResult<Bound<'py, PyAny>> {
733 let client = self.clone();
734
735 pyo3_async_runtimes::tokio::future_into_py(py, async move {
736 client
737 .subscribe_quotes(instrument_id)
738 .await
739 .map_err(to_pyruntime_err)?;
740 Ok(())
741 })
742 }
743
744 #[pyo3(name = "unsubscribe_quotes")]
746 fn py_unsubscribe_quotes<'py>(
747 &self,
748 py: Python<'py>,
749 instrument_id: InstrumentId,
750 ) -> PyResult<Bound<'py, PyAny>> {
751 let client = self.clone();
752
753 pyo3_async_runtimes::tokio::future_into_py(py, async move {
754 client
755 .unsubscribe_quotes(instrument_id)
756 .await
757 .map_err(to_pyruntime_err)?;
758 Ok(())
759 })
760 }
761
762 #[pyo3(name = "subscribe_bars")]
764 fn py_subscribe_bars<'py>(
765 &self,
766 py: Python<'py>,
767 bar_type: BarType,
768 ) -> PyResult<Bound<'py, PyAny>> {
769 let client = self.clone();
770
771 pyo3_async_runtimes::tokio::future_into_py(py, async move {
772 client
773 .subscribe_bars(bar_type)
774 .await
775 .map_err(to_pyruntime_err)?;
776 Ok(())
777 })
778 }
779
780 #[pyo3(name = "unsubscribe_bars")]
782 fn py_unsubscribe_bars<'py>(
783 &self,
784 py: Python<'py>,
785 bar_type: BarType,
786 ) -> PyResult<Bound<'py, PyAny>> {
787 let client = self.clone();
788
789 pyo3_async_runtimes::tokio::future_into_py(py, async move {
790 client
791 .unsubscribe_bars(bar_type)
792 .await
793 .map_err(to_pyruntime_err)?;
794 Ok(())
795 })
796 }
797
798 #[pyo3(name = "unsubscribe_all_mids")]
800 fn py_unsubscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
801 let client = self.clone();
802
803 pyo3_async_runtimes::tokio::future_into_py(py, async move {
804 client
805 .unsubscribe_all_mids()
806 .await
807 .map_err(to_pyruntime_err)?;
808 Ok(())
809 })
810 }
811
812 #[pyo3(name = "unsubscribe_all_dexs_asset_ctxs")]
814 fn py_unsubscribe_all_dexs_asset_ctxs<'py>(
815 &self,
816 py: Python<'py>,
817 ) -> PyResult<Bound<'py, PyAny>> {
818 let client = self.clone();
819
820 pyo3_async_runtimes::tokio::future_into_py(py, async move {
821 client
822 .unsubscribe_all_dexs_asset_ctxs()
823 .await
824 .map_err(to_pyruntime_err)?;
825 Ok(())
826 })
827 }
828
829 #[pyo3(name = "unsubscribe_all_mids_with_dex")]
831 fn py_unsubscribe_all_mids_with_dex<'py>(
832 &self,
833 py: Python<'py>,
834 dex: Option<String>,
835 ) -> PyResult<Bound<'py, PyAny>> {
836 let client = self.clone();
837
838 pyo3_async_runtimes::tokio::future_into_py(py, async move {
839 client
840 .unsubscribe_all_mids_with_dex(dex.as_deref())
841 .await
842 .map_err(to_pyruntime_err)?;
843 Ok(())
844 })
845 }
846
847 #[pyo3(name = "subscribe_order_updates")]
849 fn py_subscribe_order_updates<'py>(
850 &self,
851 py: Python<'py>,
852 user: String,
853 ) -> PyResult<Bound<'py, PyAny>> {
854 let client = self.clone();
855
856 pyo3_async_runtimes::tokio::future_into_py(py, async move {
857 client
858 .subscribe_order_updates(&user)
859 .await
860 .map_err(to_pyruntime_err)?;
861 Ok(())
862 })
863 }
864
865 #[pyo3(name = "subscribe_user_events")]
867 fn py_subscribe_user_events<'py>(
868 &self,
869 py: Python<'py>,
870 user: String,
871 ) -> PyResult<Bound<'py, PyAny>> {
872 let client = self.clone();
873
874 pyo3_async_runtimes::tokio::future_into_py(py, async move {
875 client
876 .subscribe_user_events(&user)
877 .await
878 .map_err(to_pyruntime_err)?;
879 Ok(())
880 })
881 }
882
883 #[pyo3(name = "subscribe_user_fills")]
888 fn py_subscribe_user_fills<'py>(
889 &self,
890 py: Python<'py>,
891 user: String,
892 ) -> PyResult<Bound<'py, PyAny>> {
893 let client = self.clone();
894
895 pyo3_async_runtimes::tokio::future_into_py(py, async move {
896 client
897 .subscribe_user_fills(&user)
898 .await
899 .map_err(to_pyruntime_err)?;
900 Ok(())
901 })
902 }
903
904 #[pyo3(name = "subscribe_mark_prices")]
906 fn py_subscribe_mark_prices<'py>(
907 &self,
908 py: Python<'py>,
909 instrument_id: InstrumentId,
910 ) -> PyResult<Bound<'py, PyAny>> {
911 let client = self.clone();
912
913 pyo3_async_runtimes::tokio::future_into_py(py, async move {
914 client
915 .subscribe_mark_prices(instrument_id)
916 .await
917 .map_err(to_pyruntime_err)?;
918 Ok(())
919 })
920 }
921
922 #[pyo3(name = "unsubscribe_mark_prices")]
924 fn py_unsubscribe_mark_prices<'py>(
925 &self,
926 py: Python<'py>,
927 instrument_id: InstrumentId,
928 ) -> PyResult<Bound<'py, PyAny>> {
929 let client = self.clone();
930
931 pyo3_async_runtimes::tokio::future_into_py(py, async move {
932 client
933 .unsubscribe_mark_prices(instrument_id)
934 .await
935 .map_err(to_pyruntime_err)?;
936 Ok(())
937 })
938 }
939
940 #[pyo3(name = "subscribe_index_prices")]
942 fn py_subscribe_index_prices<'py>(
943 &self,
944 py: Python<'py>,
945 instrument_id: InstrumentId,
946 ) -> PyResult<Bound<'py, PyAny>> {
947 let client = self.clone();
948
949 pyo3_async_runtimes::tokio::future_into_py(py, async move {
950 client
951 .subscribe_index_prices(instrument_id)
952 .await
953 .map_err(to_pyruntime_err)?;
954 Ok(())
955 })
956 }
957
958 #[pyo3(name = "unsubscribe_index_prices")]
960 fn py_unsubscribe_index_prices<'py>(
961 &self,
962 py: Python<'py>,
963 instrument_id: InstrumentId,
964 ) -> PyResult<Bound<'py, PyAny>> {
965 let client = self.clone();
966
967 pyo3_async_runtimes::tokio::future_into_py(py, async move {
968 client
969 .unsubscribe_index_prices(instrument_id)
970 .await
971 .map_err(to_pyruntime_err)?;
972 Ok(())
973 })
974 }
975
976 #[pyo3(name = "subscribe_funding_rates")]
978 fn py_subscribe_funding_rates<'py>(
979 &self,
980 py: Python<'py>,
981 instrument_id: InstrumentId,
982 ) -> PyResult<Bound<'py, PyAny>> {
983 let client = self.clone();
984
985 pyo3_async_runtimes::tokio::future_into_py(py, async move {
986 client
987 .subscribe_funding_rates(instrument_id)
988 .await
989 .map_err(to_pyruntime_err)?;
990 Ok(())
991 })
992 }
993
994 #[pyo3(name = "subscribe_open_interest")]
996 fn py_subscribe_open_interest<'py>(
997 &self,
998 py: Python<'py>,
999 instrument_id: InstrumentId,
1000 ) -> PyResult<Bound<'py, PyAny>> {
1001 let client = self.clone();
1002
1003 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1004 client
1005 .subscribe_open_interest(instrument_id)
1006 .await
1007 .map_err(to_pyruntime_err)?;
1008 Ok(())
1009 })
1010 }
1011
1012 #[pyo3(name = "unsubscribe_funding_rates")]
1014 fn py_unsubscribe_funding_rates<'py>(
1015 &self,
1016 py: Python<'py>,
1017 instrument_id: InstrumentId,
1018 ) -> PyResult<Bound<'py, PyAny>> {
1019 let client = self.clone();
1020
1021 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1022 client
1023 .unsubscribe_funding_rates(instrument_id)
1024 .await
1025 .map_err(to_pyruntime_err)?;
1026 Ok(())
1027 })
1028 }
1029
1030 #[pyo3(name = "unsubscribe_open_interest")]
1032 fn py_unsubscribe_open_interest<'py>(
1033 &self,
1034 py: Python<'py>,
1035 instrument_id: InstrumentId,
1036 ) -> PyResult<Bound<'py, PyAny>> {
1037 let client = self.clone();
1038
1039 pyo3_async_runtimes::tokio::future_into_py(py, async move {
1040 client
1041 .unsubscribe_open_interest(instrument_id)
1042 .await
1043 .map_err(to_pyruntime_err)?;
1044 Ok(())
1045 })
1046 }
1047}