1use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21 data::{BarType, Data, OrderBookDeltas_API},
22 identifiers::{AccountId, ClientOrderId, InstrumentId},
23 python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*};
26
27use crate::websocket::{
28 HyperliquidWebSocketClient,
29 messages::{ExecutionReport, NautilusWsMessage},
30};
31
32#[pymethods]
33#[pyo3_stub_gen::derive::gen_stub_pymethods]
34impl HyperliquidWebSocketClient {
35 #[new]
40 #[pyo3(signature = (url=None, testnet=false, account_id=None))]
41 fn py_new(url: Option<String>, testnet: bool, account_id: Option<String>) -> Self {
42 let account_id = account_id.map(|s| AccountId::from(s.as_str()));
43 Self::new(url, testnet, account_id)
44 }
45
46 #[getter]
48 #[pyo3(name = "url")]
49 #[must_use]
50 pub fn py_url(&self) -> String {
51 self.url().to_string()
52 }
53
54 #[pyo3(name = "is_active")]
56 fn py_is_active(&self) -> bool {
57 self.is_active()
58 }
59
60 #[pyo3(name = "is_closed")]
61 fn py_is_closed(&self) -> bool {
62 !self.is_active()
63 }
64
65 #[pyo3(name = "cache_spot_fill_coins")]
71 fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
72 let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
73 .into_iter()
74 .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
75 .collect();
76 self.cache_spot_fill_coins(ahash_mapping);
77 }
78
79 #[pyo3(name = "cache_cloid_mapping")]
88 fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
89 self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
90 }
91
92 #[pyo3(name = "remove_cloid_mapping")]
97 fn py_remove_cloid_mapping(&self, cloid: &str) {
98 self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
99 }
100
101 #[pyo3(name = "clear_cloid_cache")]
105 fn py_clear_cloid_cache(&self) {
106 self.clear_cloid_cache();
107 }
108
109 #[pyo3(name = "cloid_cache_len")]
111 fn py_cloid_cache_len(&self) -> usize {
112 self.cloid_cache_len()
113 }
114
115 #[pyo3(name = "get_cloid_mapping")]
119 fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
120 self.get_cloid_mapping(&ustr::Ustr::from(cloid))
121 }
122
123 #[pyo3(name = "connect")]
125 #[allow(clippy::needless_pass_by_value)]
126 fn py_connect<'py>(
127 &self,
128 py: Python<'py>,
129 loop_: Py<PyAny>,
130 instruments: Vec<Py<PyAny>>,
131 callback: Py<PyAny>,
132 ) -> PyResult<Bound<'py, PyAny>> {
133 let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
134
135 for inst in instruments {
136 let inst_any = pyobject_to_instrument_any(py, inst)?;
137 self.cache_instrument(inst_any);
138 }
139
140 let mut client = self.clone();
141
142 pyo3_async_runtimes::tokio::future_into_py(py, async move {
143 client.connect().await.map_err(to_pyruntime_err)?;
144
145 get_runtime().spawn(async move {
146 loop {
147 let event = client.next_event().await;
148
149 match event {
150 Some(msg) => {
151 log::trace!("Received WebSocket message: {msg:?}");
152
153 match msg {
154 NautilusWsMessage::Trades(trade_ticks) => {
155 Python::attach(|py| {
156 for tick in trade_ticks {
157 let py_obj = data_to_pycapsule(py, Data::Trade(tick));
158 call_python_threadsafe(py, &call_soon, &callback, py_obj);
159 }
160 });
161 }
162 NautilusWsMessage::Quote(quote_tick) => {
163 Python::attach(|py| {
164 let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
165 call_python_threadsafe(py, &call_soon, &callback, py_obj);
166 });
167 }
168 NautilusWsMessage::Deltas(deltas) => {
169 Python::attach(|py| {
170 let py_obj = data_to_pycapsule(
171 py,
172 Data::Deltas(OrderBookDeltas_API::new(deltas)),
173 );
174 call_python_threadsafe(py, &call_soon, &callback, py_obj);
175 });
176 }
177 NautilusWsMessage::Candle(bar) => {
178 Python::attach(|py| {
179 let py_obj = data_to_pycapsule(py, Data::Bar(bar));
180 call_python_threadsafe(py, &call_soon, &callback, py_obj);
181 });
182 }
183 NautilusWsMessage::MarkPrice(mark_price) => {
184 Python::attach(|py| {
185 let py_obj = data_to_pycapsule(
186 py,
187 Data::MarkPriceUpdate(mark_price),
188 );
189 call_python_threadsafe(py, &call_soon, &callback, py_obj);
190 });
191 }
192 NautilusWsMessage::IndexPrice(index_price) => {
193 Python::attach(|py| {
194 let py_obj = data_to_pycapsule(
195 py,
196 Data::IndexPriceUpdate(index_price),
197 );
198 call_python_threadsafe(py, &call_soon, &callback, py_obj);
199 });
200 }
201 NautilusWsMessage::FundingRate(funding_rate) => {
202 Python::attach(|py| {
203 if let Ok(py_obj) = funding_rate.into_py_any(py) {
204 call_python_threadsafe(py, &call_soon, &callback, py_obj);
205 }
206 });
207 }
208 NautilusWsMessage::ExecutionReports(reports) => {
209 Python::attach(|py| {
210 for report in reports {
211 match report {
212 ExecutionReport::Order(order_report) => {
213 log::debug!(
214 "Forwarding order status report: order_id={}, status={:?}",
215 order_report.venue_order_id,
216 order_report.order_status
217 );
218 match Py::new(py, order_report) {
219 Ok(py_obj) => {
220 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
221 }
222 Err(e) => {
223 log::error!("Error converting OrderStatusReport to Python: {e}");
224 }
225 }
226 }
227 ExecutionReport::Fill(fill_report) => {
228 log::debug!(
229 "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
230 fill_report.trade_id,
231 fill_report.order_side,
232 fill_report.last_qty,
233 fill_report.last_px
234 );
235 match Py::new(py, fill_report) {
236 Ok(py_obj) => {
237 call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
238 }
239 Err(e) => {
240 log::error!("Error converting FillReport to Python: {e}");
241 }
242 }
243 }
244 }
245 }
246 });
247 }
248 _ => {
249 log::debug!("Unhandled message type: {msg:?}");
250 }
251 }
252 }
253 None => {
254 log::debug!("WebSocket connection closed");
255 break;
256 }
257 }
258 }
259 });
260
261 Ok(())
262 })
263 }
264
265 #[pyo3(name = "wait_until_active")]
266 fn py_wait_until_active<'py>(
267 &self,
268 py: Python<'py>,
269 timeout_secs: f64,
270 ) -> PyResult<Bound<'py, PyAny>> {
271 let client = self.clone();
272
273 pyo3_async_runtimes::tokio::future_into_py(py, async move {
274 let start = std::time::Instant::now();
275 loop {
276 if client.is_active() {
277 return Ok(());
278 }
279
280 if start.elapsed().as_secs_f64() >= timeout_secs {
281 return Err(to_pyruntime_err(format!(
282 "WebSocket connection did not become active within {timeout_secs} seconds"
283 )));
284 }
285
286 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
287 }
288 })
289 }
290
291 #[pyo3(name = "close")]
292 fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
293 let mut client = self.clone();
294
295 pyo3_async_runtimes::tokio::future_into_py(py, async move {
296 if let Err(e) = client.disconnect().await {
297 log::error!("Error on close: {e}");
298 }
299 Ok(())
300 })
301 }
302
303 #[pyo3(name = "subscribe_trades")]
305 fn py_subscribe_trades<'py>(
306 &self,
307 py: Python<'py>,
308 instrument_id: InstrumentId,
309 ) -> PyResult<Bound<'py, PyAny>> {
310 let client = self.clone();
311
312 pyo3_async_runtimes::tokio::future_into_py(py, async move {
313 client
314 .subscribe_trades(instrument_id)
315 .await
316 .map_err(to_pyruntime_err)?;
317 Ok(())
318 })
319 }
320
321 #[pyo3(name = "unsubscribe_trades")]
323 fn py_unsubscribe_trades<'py>(
324 &self,
325 py: Python<'py>,
326 instrument_id: InstrumentId,
327 ) -> PyResult<Bound<'py, PyAny>> {
328 let client = self.clone();
329
330 pyo3_async_runtimes::tokio::future_into_py(py, async move {
331 client
332 .unsubscribe_trades(instrument_id)
333 .await
334 .map_err(to_pyruntime_err)?;
335 Ok(())
336 })
337 }
338
339 #[pyo3(name = "subscribe_book")]
341 fn py_subscribe_book<'py>(
342 &self,
343 py: Python<'py>,
344 instrument_id: InstrumentId,
345 ) -> PyResult<Bound<'py, PyAny>> {
346 let client = self.clone();
347
348 pyo3_async_runtimes::tokio::future_into_py(py, async move {
349 client
350 .subscribe_book(instrument_id)
351 .await
352 .map_err(to_pyruntime_err)?;
353 Ok(())
354 })
355 }
356
357 #[pyo3(name = "unsubscribe_book")]
359 fn py_unsubscribe_book<'py>(
360 &self,
361 py: Python<'py>,
362 instrument_id: InstrumentId,
363 ) -> PyResult<Bound<'py, PyAny>> {
364 let client = self.clone();
365
366 pyo3_async_runtimes::tokio::future_into_py(py, async move {
367 client
368 .unsubscribe_book(instrument_id)
369 .await
370 .map_err(to_pyruntime_err)?;
371 Ok(())
372 })
373 }
374
375 #[pyo3(name = "subscribe_book_deltas")]
376 fn py_subscribe_book_deltas<'py>(
377 &self,
378 py: Python<'py>,
379 instrument_id: InstrumentId,
380 _book_type: u8,
381 _depth: u64,
382 ) -> PyResult<Bound<'py, PyAny>> {
383 let client = self.clone();
384
385 pyo3_async_runtimes::tokio::future_into_py(py, async move {
386 client
387 .subscribe_book(instrument_id)
388 .await
389 .map_err(to_pyruntime_err)?;
390 Ok(())
391 })
392 }
393
394 #[pyo3(name = "unsubscribe_book_deltas")]
395 fn py_unsubscribe_book_deltas<'py>(
396 &self,
397 py: Python<'py>,
398 instrument_id: InstrumentId,
399 ) -> PyResult<Bound<'py, PyAny>> {
400 let client = self.clone();
401
402 pyo3_async_runtimes::tokio::future_into_py(py, async move {
403 client
404 .unsubscribe_book(instrument_id)
405 .await
406 .map_err(to_pyruntime_err)?;
407 Ok(())
408 })
409 }
410
411 #[pyo3(name = "subscribe_book_snapshots")]
412 fn py_subscribe_book_snapshots<'py>(
413 &self,
414 py: Python<'py>,
415 instrument_id: InstrumentId,
416 _book_type: u8,
417 _depth: u64,
418 ) -> PyResult<Bound<'py, PyAny>> {
419 let client = self.clone();
420
421 pyo3_async_runtimes::tokio::future_into_py(py, async move {
422 client
423 .subscribe_book(instrument_id)
424 .await
425 .map_err(to_pyruntime_err)?;
426 Ok(())
427 })
428 }
429
430 #[pyo3(name = "subscribe_quotes")]
432 fn py_subscribe_quotes<'py>(
433 &self,
434 py: Python<'py>,
435 instrument_id: InstrumentId,
436 ) -> PyResult<Bound<'py, PyAny>> {
437 let client = self.clone();
438
439 pyo3_async_runtimes::tokio::future_into_py(py, async move {
440 client
441 .subscribe_quotes(instrument_id)
442 .await
443 .map_err(to_pyruntime_err)?;
444 Ok(())
445 })
446 }
447
448 #[pyo3(name = "unsubscribe_quotes")]
450 fn py_unsubscribe_quotes<'py>(
451 &self,
452 py: Python<'py>,
453 instrument_id: InstrumentId,
454 ) -> PyResult<Bound<'py, PyAny>> {
455 let client = self.clone();
456
457 pyo3_async_runtimes::tokio::future_into_py(py, async move {
458 client
459 .unsubscribe_quotes(instrument_id)
460 .await
461 .map_err(to_pyruntime_err)?;
462 Ok(())
463 })
464 }
465
466 #[pyo3(name = "subscribe_bars")]
468 fn py_subscribe_bars<'py>(
469 &self,
470 py: Python<'py>,
471 bar_type: BarType,
472 ) -> PyResult<Bound<'py, PyAny>> {
473 let client = self.clone();
474
475 pyo3_async_runtimes::tokio::future_into_py(py, async move {
476 client
477 .subscribe_bars(bar_type)
478 .await
479 .map_err(to_pyruntime_err)?;
480 Ok(())
481 })
482 }
483
484 #[pyo3(name = "unsubscribe_bars")]
486 fn py_unsubscribe_bars<'py>(
487 &self,
488 py: Python<'py>,
489 bar_type: BarType,
490 ) -> PyResult<Bound<'py, PyAny>> {
491 let client = self.clone();
492
493 pyo3_async_runtimes::tokio::future_into_py(py, async move {
494 client
495 .unsubscribe_bars(bar_type)
496 .await
497 .map_err(to_pyruntime_err)?;
498 Ok(())
499 })
500 }
501
502 #[pyo3(name = "subscribe_order_updates")]
504 fn py_subscribe_order_updates<'py>(
505 &self,
506 py: Python<'py>,
507 user: String,
508 ) -> PyResult<Bound<'py, PyAny>> {
509 let client = self.clone();
510
511 pyo3_async_runtimes::tokio::future_into_py(py, async move {
512 client
513 .subscribe_order_updates(&user)
514 .await
515 .map_err(to_pyruntime_err)?;
516 Ok(())
517 })
518 }
519
520 #[pyo3(name = "subscribe_user_events")]
522 fn py_subscribe_user_events<'py>(
523 &self,
524 py: Python<'py>,
525 user: String,
526 ) -> PyResult<Bound<'py, PyAny>> {
527 let client = self.clone();
528
529 pyo3_async_runtimes::tokio::future_into_py(py, async move {
530 client
531 .subscribe_user_events(&user)
532 .await
533 .map_err(to_pyruntime_err)?;
534 Ok(())
535 })
536 }
537
538 #[pyo3(name = "subscribe_user_fills")]
543 fn py_subscribe_user_fills<'py>(
544 &self,
545 py: Python<'py>,
546 user: String,
547 ) -> PyResult<Bound<'py, PyAny>> {
548 let client = self.clone();
549
550 pyo3_async_runtimes::tokio::future_into_py(py, async move {
551 client
552 .subscribe_user_fills(&user)
553 .await
554 .map_err(to_pyruntime_err)?;
555 Ok(())
556 })
557 }
558
559 #[pyo3(name = "subscribe_mark_prices")]
561 fn py_subscribe_mark_prices<'py>(
562 &self,
563 py: Python<'py>,
564 instrument_id: InstrumentId,
565 ) -> PyResult<Bound<'py, PyAny>> {
566 let client = self.clone();
567
568 pyo3_async_runtimes::tokio::future_into_py(py, async move {
569 client
570 .subscribe_mark_prices(instrument_id)
571 .await
572 .map_err(to_pyruntime_err)?;
573 Ok(())
574 })
575 }
576
577 #[pyo3(name = "unsubscribe_mark_prices")]
579 fn py_unsubscribe_mark_prices<'py>(
580 &self,
581 py: Python<'py>,
582 instrument_id: InstrumentId,
583 ) -> PyResult<Bound<'py, PyAny>> {
584 let client = self.clone();
585
586 pyo3_async_runtimes::tokio::future_into_py(py, async move {
587 client
588 .unsubscribe_mark_prices(instrument_id)
589 .await
590 .map_err(to_pyruntime_err)?;
591 Ok(())
592 })
593 }
594
595 #[pyo3(name = "subscribe_index_prices")]
597 fn py_subscribe_index_prices<'py>(
598 &self,
599 py: Python<'py>,
600 instrument_id: InstrumentId,
601 ) -> PyResult<Bound<'py, PyAny>> {
602 let client = self.clone();
603
604 pyo3_async_runtimes::tokio::future_into_py(py, async move {
605 client
606 .subscribe_index_prices(instrument_id)
607 .await
608 .map_err(to_pyruntime_err)?;
609 Ok(())
610 })
611 }
612
613 #[pyo3(name = "unsubscribe_index_prices")]
615 fn py_unsubscribe_index_prices<'py>(
616 &self,
617 py: Python<'py>,
618 instrument_id: InstrumentId,
619 ) -> PyResult<Bound<'py, PyAny>> {
620 let client = self.clone();
621
622 pyo3_async_runtimes::tokio::future_into_py(py, async move {
623 client
624 .unsubscribe_index_prices(instrument_id)
625 .await
626 .map_err(to_pyruntime_err)?;
627 Ok(())
628 })
629 }
630
631 #[pyo3(name = "subscribe_funding_rates")]
633 fn py_subscribe_funding_rates<'py>(
634 &self,
635 py: Python<'py>,
636 instrument_id: InstrumentId,
637 ) -> PyResult<Bound<'py, PyAny>> {
638 let client = self.clone();
639
640 pyo3_async_runtimes::tokio::future_into_py(py, async move {
641 client
642 .subscribe_funding_rates(instrument_id)
643 .await
644 .map_err(to_pyruntime_err)?;
645 Ok(())
646 })
647 }
648
649 #[pyo3(name = "unsubscribe_funding_rates")]
651 fn py_unsubscribe_funding_rates<'py>(
652 &self,
653 py: Python<'py>,
654 instrument_id: InstrumentId,
655 ) -> PyResult<Bound<'py, PyAny>> {
656 let client = self.clone();
657
658 pyo3_async_runtimes::tokio::future_into_py(py, async move {
659 client
660 .unsubscribe_funding_rates(instrument_id)
661 .await
662 .map_err(to_pyruntime_err)?;
663 Ok(())
664 })
665 }
666}