1use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use nautilus_network::websocket::TransportBackend;
26use pyo3::{conversion::IntoPyObjectExt, prelude::*};
27
28use crate::{
29 common::enums::HyperliquidEnvironment,
30 websocket::{
31 HyperliquidWebSocketClient,
32 messages::{ExecutionReport, NautilusWsMessage},
33 },
34};
35
36fn ws_data_to_pyobject(py: Python<'_>, data: Data) -> PyResult<Py<PyAny>> {
37 match data {
38 Data::Custom(custom) => Py::new(py, custom).map(|obj| obj.into_any()),
39 other => Ok(data_to_pycapsule(py, other)),
40 }
41}
42
43#[pymethods]
44#[pyo3_stub_gen::derive::gen_stub_pymethods]
45impl HyperliquidWebSocketClient {
46 #[new]
51 #[pyo3(signature = (url=None, environment=HyperliquidEnvironment::Mainnet, account_id=None, proxy_url=None))]
52 fn py_new(
53 url: Option<String>,
54 environment: HyperliquidEnvironment,
55 account_id: Option<String>,
56 proxy_url: Option<String>,
57 ) -> Self {
58 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
59 Self::new(
60 url,
61 environment,
62 account_id,
63 TransportBackend::default(),
64 proxy_url,
65 )
66 }
67
68 #[getter]
70 #[pyo3(name = "url")]
71 #[must_use]
72 pub fn py_url(&self) -> String {
73 self.url().to_string()
74 }
75
76 #[pyo3(name = "is_active")]
78 fn py_is_active(&self) -> bool {
79 self.is_active()
80 }
81
82 #[pyo3(name = "is_closed")]
83 fn py_is_closed(&self) -> bool {
84 !self.is_active()
85 }
86
87 #[pyo3(name = "cache_spot_fill_coins")]
93 fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
94 let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
95 .into_iter()
96 .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
97 .collect();
98 self.cache_spot_fill_coins(ahash_mapping);
99 }
100
101 #[pyo3(name = "cache_cloid_mapping")]
110 fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
111 self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
112 }
113
114 #[pyo3(name = "remove_cloid_mapping")]
119 fn py_remove_cloid_mapping(&self, cloid: &str) {
120 self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
121 }
122
123 #[pyo3(name = "clear_cloid_cache")]
127 fn py_clear_cloid_cache(&self) {
128 self.clear_cloid_cache();
129 }
130
131 #[pyo3(name = "cloid_cache_len")]
133 fn py_cloid_cache_len(&self) -> usize {
134 self.cloid_cache_len()
135 }
136
137 #[pyo3(name = "get_cloid_mapping")]
141 fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
142 self.get_cloid_mapping(&ustr::Ustr::from(cloid))
143 }
144
145 #[pyo3(name = "connect")]
147 #[expect(clippy::needless_pass_by_value)]
148 fn py_connect<'py>(
149 &self,
150 py: Python<'py>,
151 loop_: Py<PyAny>,
152 instruments: Vec<Py<PyAny>>,
153 callback: Py<PyAny>,
154 ) -> PyResult<Bound<'py, PyAny>> {
155 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
156
157 for inst in instruments {
158 let inst_any = pyobject_to_instrument_any(py, inst)?;
159 self.cache_instrument(inst_any);
160 }
161
162 let mut client = self.clone();
163
164 pyo3_async_runtimes::tokio::future_into_py(py, async move {
165 client.connect().await.map_err(to_pyruntime_err)?;
166
167 get_runtime().spawn(async move {
168 loop {
169 let event = client.next_event().await;
170
171 match event {
172 Some(msg) => {
173 log::trace!("Received WebSocket message: {msg:?}");
174
175 match msg {
176 NautilusWsMessage::Trades(trade_ticks) => {
177 Python::attach(|py| {
178 for tick in trade_ticks {
179 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
180 call_python_threadsafe(py, &call_soon, &callback, py_obj);
181 }
182 });
183 }
184 NautilusWsMessage::Quote(quote_tick) => {
185 Python::attach(|py| {
186 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
187 call_python_threadsafe(py, &call_soon, &callback, py_obj);
188 });
189 }
190 NautilusWsMessage::Deltas(deltas) => {
191 Python::attach(|py| {
192 let py_obj = data_to_pycapsule(
193 py,
194 Data::Deltas(OrderBookDeltas_API::new(deltas)),
195 );
196 call_python_threadsafe(py, &call_soon, &callback, py_obj);
197 });
198 }
199 NautilusWsMessage::Depth10(depth) => {
200 Python::attach(|py| {
201 let py_obj = data_to_pycapsule(py, Data::Depth10(depth));
202 call_python_threadsafe(py, &call_soon, &callback, py_obj);
203 });
204 }
205 NautilusWsMessage::Candle(bar) => {
206 Python::attach(|py| {
207 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
208 call_python_threadsafe(py, &call_soon, &callback, py_obj);
209 });
210 }
211 NautilusWsMessage::MarkPrice(mark_price) => {
212 Python::attach(|py| {
213 let py_obj = data_to_pycapsule(
214 py,
215 Data::MarkPriceUpdate(mark_price),
216 );
217 call_python_threadsafe(py, &call_soon, &callback, py_obj);
218 });
219 }
220 NautilusWsMessage::IndexPrice(index_price) => {
221 Python::attach(|py| {
222 let py_obj = data_to_pycapsule(
223 py,
224 Data::IndexPriceUpdate(index_price),
225 );
226 call_python_threadsafe(py, &call_soon, &callback, py_obj);
227 });
228 }
229 NautilusWsMessage::FundingRate(funding_rate) => {
230 Python::attach(|py| {
231 if let Ok(py_obj) = funding_rate.into_py_any(py) {
232 call_python_threadsafe(py, &call_soon, &callback, py_obj);
233 }
234 });
235 }
236 NautilusWsMessage::CustomData(data) => {
237 Python::attach(|py| match ws_data_to_pyobject(py, data) {
238 Ok(py_obj) => {
239 call_python_threadsafe(py, &call_soon, &callback, py_obj);
240 }
241 Err(e) => {
242 log::error!(
243 "Error converting CustomData to Python object: {e}"
244 );
245 }
246 });
247 }
248 NautilusWsMessage::ExecutionReports(reports) => {
249 Python::attach(|py| {
250 for report in reports {
251 match report {
252 ExecutionReport::Order(order_report) => {
253 log::debug!(
254 "Forwarding order status report: order_id={}, status={:?}",
255 order_report.venue_order_id,
256 order_report.order_status
257 );
258
259 match Py::new(py, order_report) {
260 Ok(py_obj) => {
261 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
262 }
263 Err(e) => {
264 log::error!("Error converting OrderStatusReport to Python: {e}");
265 }
266 }
267 }
268 ExecutionReport::Fill(fill_report) => {
269 log::debug!(
270 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
271 fill_report.trade_id,
272 fill_report.order_side,
273 fill_report.last_qty,
274 fill_report.last_px
275 );
276
277 match Py::new(py, fill_report) {
278 Ok(py_obj) => {
279 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
280 }
281 Err(e) => {
282 log::error!("Error converting FillReport to Python: {e}");
283 }
284 }
285 }
286 }
287 }
288 });
289 }
290 _ => {
291 log::debug!("Unhandled message type: {msg:?}");
292 }
293 }
294 }
295 None => {
296 log::debug!("WebSocket connection closed");
297 break;
298 }
299 }
300 }
301 });
302
303 Ok(())
304 })
305 }
306
307 #[pyo3(name = "wait_until_active")]
308 fn py_wait_until_active<'py>(
309 &self,
310 py: Python<'py>,
311 timeout_secs: f64,
312 ) -> PyResult<Bound<'py, PyAny>> {
313 let client = self.clone();
314
315 pyo3_async_runtimes::tokio::future_into_py(py, async move {
316 let start = std::time::Instant::now();
317
318 loop {
319 if client.is_active() {
320 return Ok(());
321 }
322
323 if start.elapsed().as_secs_f64() >= timeout_secs {
324 return Err(to_pyruntime_err(format!(
325 "WebSocket connection did not become active within {timeout_secs} seconds"
326 )));
327 }
328
329 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
330 }
331 })
332 }
333
334 #[pyo3(name = "close")]
335 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
336 let mut client = self.clone();
337
338 pyo3_async_runtimes::tokio::future_into_py(py, async move {
339 if let Err(e) = client.disconnect().await {
340 log::error!("Error on close: {e}");
341 }
342 Ok(())
343 })
344 }
345
346 #[pyo3(name = "subscribe_trades")]
348 fn py_subscribe_trades<'py>(
349 &self,
350 py: Python<'py>,
351 instrument_id: InstrumentId,
352 ) -> PyResult<Bound<'py, PyAny>> {
353 let client = self.clone();
354
355 pyo3_async_runtimes::tokio::future_into_py(py, async move {
356 client
357 .subscribe_trades(instrument_id)
358 .await
359 .map_err(to_pyruntime_err)?;
360 Ok(())
361 })
362 }
363
364 #[pyo3(name = "unsubscribe_trades")]
366 fn py_unsubscribe_trades<'py>(
367 &self,
368 py: Python<'py>,
369 instrument_id: InstrumentId,
370 ) -> PyResult<Bound<'py, PyAny>> {
371 let client = self.clone();
372
373 pyo3_async_runtimes::tokio::future_into_py(py, async move {
374 client
375 .unsubscribe_trades(instrument_id)
376 .await
377 .map_err(to_pyruntime_err)?;
378 Ok(())
379 })
380 }
381
382 #[pyo3(name = "subscribe_all_mids")]
384 fn py_subscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
385 let client = self.clone();
386
387 pyo3_async_runtimes::tokio::future_into_py(py, async move {
388 client
389 .subscribe_all_mids()
390 .await
391 .map_err(to_pyruntime_err)?;
392 Ok(())
393 })
394 }
395
396 #[pyo3(name = "subscribe_all_mids_with_dex")]
398 fn py_subscribe_all_mids_with_dex<'py>(
399 &self,
400 py: Python<'py>,
401 dex: Option<String>,
402 ) -> PyResult<Bound<'py, PyAny>> {
403 let client = self.clone();
404
405 pyo3_async_runtimes::tokio::future_into_py(py, async move {
406 client
407 .subscribe_all_mids_with_dex(dex.as_deref())
408 .await
409 .map_err(to_pyruntime_err)?;
410 Ok(())
411 })
412 }
413
414 #[pyo3(name = "subscribe_book")]
416 fn py_subscribe_book<'py>(
417 &self,
418 py: Python<'py>,
419 instrument_id: InstrumentId,
420 ) -> PyResult<Bound<'py, PyAny>> {
421 let client = self.clone();
422
423 pyo3_async_runtimes::tokio::future_into_py(py, async move {
424 client
425 .subscribe_book(instrument_id)
426 .await
427 .map_err(to_pyruntime_err)?;
428 Ok(())
429 })
430 }
431
432 #[pyo3(name = "unsubscribe_book")]
434 fn py_unsubscribe_book<'py>(
435 &self,
436 py: Python<'py>,
437 instrument_id: InstrumentId,
438 ) -> PyResult<Bound<'py, PyAny>> {
439 let client = self.clone();
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 client
443 .unsubscribe_book(instrument_id)
444 .await
445 .map_err(to_pyruntime_err)?;
446 Ok(())
447 })
448 }
449
450 #[pyo3(name = "subscribe_book_deltas")]
451 fn py_subscribe_book_deltas<'py>(
452 &self,
453 py: Python<'py>,
454 instrument_id: InstrumentId,
455 _book_type: u8,
456 _depth: u64,
457 ) -> PyResult<Bound<'py, PyAny>> {
458 let client = self.clone();
459
460 pyo3_async_runtimes::tokio::future_into_py(py, async move {
461 client
462 .subscribe_book(instrument_id)
463 .await
464 .map_err(to_pyruntime_err)?;
465 Ok(())
466 })
467 }
468
469 #[pyo3(name = "unsubscribe_book_deltas")]
470 fn py_unsubscribe_book_deltas<'py>(
471 &self,
472 py: Python<'py>,
473 instrument_id: InstrumentId,
474 ) -> PyResult<Bound<'py, PyAny>> {
475 let client = self.clone();
476
477 pyo3_async_runtimes::tokio::future_into_py(py, async move {
478 client
479 .unsubscribe_book(instrument_id)
480 .await
481 .map_err(to_pyruntime_err)?;
482 Ok(())
483 })
484 }
485
486 #[pyo3(name = "subscribe_book_snapshots")]
487 fn py_subscribe_book_snapshots<'py>(
488 &self,
489 py: Python<'py>,
490 instrument_id: InstrumentId,
491 _book_type: u8,
492 _depth: u64,
493 ) -> PyResult<Bound<'py, PyAny>> {
494 let client = self.clone();
495
496 pyo3_async_runtimes::tokio::future_into_py(py, async move {
497 client
498 .subscribe_book(instrument_id)
499 .await
500 .map_err(to_pyruntime_err)?;
501 Ok(())
502 })
503 }
504
505 #[pyo3(name = "subscribe_quotes")]
507 fn py_subscribe_quotes<'py>(
508 &self,
509 py: Python<'py>,
510 instrument_id: InstrumentId,
511 ) -> PyResult<Bound<'py, PyAny>> {
512 let client = self.clone();
513
514 pyo3_async_runtimes::tokio::future_into_py(py, async move {
515 client
516 .subscribe_quotes(instrument_id)
517 .await
518 .map_err(to_pyruntime_err)?;
519 Ok(())
520 })
521 }
522
523 #[pyo3(name = "unsubscribe_quotes")]
525 fn py_unsubscribe_quotes<'py>(
526 &self,
527 py: Python<'py>,
528 instrument_id: InstrumentId,
529 ) -> PyResult<Bound<'py, PyAny>> {
530 let client = self.clone();
531
532 pyo3_async_runtimes::tokio::future_into_py(py, async move {
533 client
534 .unsubscribe_quotes(instrument_id)
535 .await
536 .map_err(to_pyruntime_err)?;
537 Ok(())
538 })
539 }
540
541 #[pyo3(name = "subscribe_bars")]
543 fn py_subscribe_bars<'py>(
544 &self,
545 py: Python<'py>,
546 bar_type: BarType,
547 ) -> PyResult<Bound<'py, PyAny>> {
548 let client = self.clone();
549
550 pyo3_async_runtimes::tokio::future_into_py(py, async move {
551 client
552 .subscribe_bars(bar_type)
553 .await
554 .map_err(to_pyruntime_err)?;
555 Ok(())
556 })
557 }
558
559 #[pyo3(name = "unsubscribe_bars")]
561 fn py_unsubscribe_bars<'py>(
562 &self,
563 py: Python<'py>,
564 bar_type: BarType,
565 ) -> PyResult<Bound<'py, PyAny>> {
566 let client = self.clone();
567
568 pyo3_async_runtimes::tokio::future_into_py(py, async move {
569 client
570 .unsubscribe_bars(bar_type)
571 .await
572 .map_err(to_pyruntime_err)?;
573 Ok(())
574 })
575 }
576
577 #[pyo3(name = "unsubscribe_all_mids")]
579 fn py_unsubscribe_all_mids<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
580 let client = self.clone();
581
582 pyo3_async_runtimes::tokio::future_into_py(py, async move {
583 client
584 .unsubscribe_all_mids()
585 .await
586 .map_err(to_pyruntime_err)?;
587 Ok(())
588 })
589 }
590
591 #[pyo3(name = "unsubscribe_all_mids_with_dex")]
593 fn py_unsubscribe_all_mids_with_dex<'py>(
594 &self,
595 py: Python<'py>,
596 dex: Option<String>,
597 ) -> PyResult<Bound<'py, PyAny>> {
598 let client = self.clone();
599
600 pyo3_async_runtimes::tokio::future_into_py(py, async move {
601 client
602 .unsubscribe_all_mids_with_dex(dex.as_deref())
603 .await
604 .map_err(to_pyruntime_err)?;
605 Ok(())
606 })
607 }
608
609 #[pyo3(name = "subscribe_order_updates")]
611 fn py_subscribe_order_updates<'py>(
612 &self,
613 py: Python<'py>,
614 user: String,
615 ) -> PyResult<Bound<'py, PyAny>> {
616 let client = self.clone();
617
618 pyo3_async_runtimes::tokio::future_into_py(py, async move {
619 client
620 .subscribe_order_updates(&user)
621 .await
622 .map_err(to_pyruntime_err)?;
623 Ok(())
624 })
625 }
626
627 #[pyo3(name = "subscribe_user_events")]
629 fn py_subscribe_user_events<'py>(
630 &self,
631 py: Python<'py>,
632 user: String,
633 ) -> PyResult<Bound<'py, PyAny>> {
634 let client = self.clone();
635
636 pyo3_async_runtimes::tokio::future_into_py(py, async move {
637 client
638 .subscribe_user_events(&user)
639 .await
640 .map_err(to_pyruntime_err)?;
641 Ok(())
642 })
643 }
644
645 #[pyo3(name = "subscribe_user_fills")]
650 fn py_subscribe_user_fills<'py>(
651 &self,
652 py: Python<'py>,
653 user: String,
654 ) -> PyResult<Bound<'py, PyAny>> {
655 let client = self.clone();
656
657 pyo3_async_runtimes::tokio::future_into_py(py, async move {
658 client
659 .subscribe_user_fills(&user)
660 .await
661 .map_err(to_pyruntime_err)?;
662 Ok(())
663 })
664 }
665
666 #[pyo3(name = "subscribe_mark_prices")]
668 fn py_subscribe_mark_prices<'py>(
669 &self,
670 py: Python<'py>,
671 instrument_id: InstrumentId,
672 ) -> PyResult<Bound<'py, PyAny>> {
673 let client = self.clone();
674
675 pyo3_async_runtimes::tokio::future_into_py(py, async move {
676 client
677 .subscribe_mark_prices(instrument_id)
678 .await
679 .map_err(to_pyruntime_err)?;
680 Ok(())
681 })
682 }
683
684 #[pyo3(name = "unsubscribe_mark_prices")]
686 fn py_unsubscribe_mark_prices<'py>(
687 &self,
688 py: Python<'py>,
689 instrument_id: InstrumentId,
690 ) -> PyResult<Bound<'py, PyAny>> {
691 let client = self.clone();
692
693 pyo3_async_runtimes::tokio::future_into_py(py, async move {
694 client
695 .unsubscribe_mark_prices(instrument_id)
696 .await
697 .map_err(to_pyruntime_err)?;
698 Ok(())
699 })
700 }
701
702 #[pyo3(name = "subscribe_index_prices")]
704 fn py_subscribe_index_prices<'py>(
705 &self,
706 py: Python<'py>,
707 instrument_id: InstrumentId,
708 ) -> PyResult<Bound<'py, PyAny>> {
709 let client = self.clone();
710
711 pyo3_async_runtimes::tokio::future_into_py(py, async move {
712 client
713 .subscribe_index_prices(instrument_id)
714 .await
715 .map_err(to_pyruntime_err)?;
716 Ok(())
717 })
718 }
719
720 #[pyo3(name = "unsubscribe_index_prices")]
722 fn py_unsubscribe_index_prices<'py>(
723 &self,
724 py: Python<'py>,
725 instrument_id: InstrumentId,
726 ) -> PyResult<Bound<'py, PyAny>> {
727 let client = self.clone();
728
729 pyo3_async_runtimes::tokio::future_into_py(py, async move {
730 client
731 .unsubscribe_index_prices(instrument_id)
732 .await
733 .map_err(to_pyruntime_err)?;
734 Ok(())
735 })
736 }
737
738 #[pyo3(name = "subscribe_funding_rates")]
740 fn py_subscribe_funding_rates<'py>(
741 &self,
742 py: Python<'py>,
743 instrument_id: InstrumentId,
744 ) -> PyResult<Bound<'py, PyAny>> {
745 let client = self.clone();
746
747 pyo3_async_runtimes::tokio::future_into_py(py, async move {
748 client
749 .subscribe_funding_rates(instrument_id)
750 .await
751 .map_err(to_pyruntime_err)?;
752 Ok(())
753 })
754 }
755
756 #[pyo3(name = "unsubscribe_funding_rates")]
758 fn py_unsubscribe_funding_rates<'py>(
759 &self,
760 py: Python<'py>,
761 instrument_id: InstrumentId,
762 ) -> PyResult<Bound<'py, PyAny>> {
763 let client = self.clone();
764
765 pyo3_async_runtimes::tokio::future_into_py(py, async move {
766 client
767 .unsubscribe_funding_rates(instrument_id)
768 .await
769 .map_err(to_pyruntime_err)?;
770 Ok(())
771 })
772 }
773}