Skip to main content

nautilus_hyperliquid/python/
websocket.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Hyperliquid WebSocket client.
17
18use nautilus_common::live::get_runtime;
19use nautilus_core::python::{call_python_threadsafe, to_pyruntime_err};
20use nautilus_model::{
21    data::{BarType, Data, OrderBookDeltas_API},
22    identifiers::{AccountId, ClientOrderId, InstrumentId},
23    python::{data::data_to_pycapsule, instruments::pyobject_to_instrument_any},
24};
25use pyo3::{conversion::IntoPyObjectExt, prelude::*};
26
27use crate::websocket::{
28    HyperliquidWebSocketClient,
29    messages::{ExecutionReport, NautilusWsMessage},
30};
31
32#[pymethods]
33#[pyo3_stub_gen::derive::gen_stub_pymethods]
34impl HyperliquidWebSocketClient {
35    /// Hyperliquid WebSocket client following the BitMEX pattern.
36    ///
37    /// Orchestrates WebSocket connection and subscriptions using a command-based architecture,
38    /// where the inner FeedHandler owns the WebSocketClient and handles all I/O.
39    #[new]
40    #[pyo3(signature = (url=None, testnet=false, account_id=None))]
41    fn py_new(url: Option<String>, testnet: bool, account_id: Option<String>) -> Self {
42        let account_id = account_id.map(|s| AccountId::from(s.as_str()));
43        Self::new(url, testnet, account_id)
44    }
45
46    /// Returns the URL of this WebSocket client.
47    #[getter]
48    #[pyo3(name = "url")]
49    #[must_use]
50    pub fn py_url(&self) -> String {
51        self.url().to_string()
52    }
53
54    /// Returns true if the WebSocket is actively connected.
55    #[pyo3(name = "is_active")]
56    fn py_is_active(&self) -> bool {
57        self.is_active()
58    }
59
60    #[pyo3(name = "is_closed")]
61    fn py_is_closed(&self) -> bool {
62        !self.is_active()
63    }
64
65    /// Caches spot fill coin mappings for instrument lookup.
66    ///
67    /// Hyperliquid WebSocket fills for spot use `@{pair_index}` format (e.g., `@107`),
68    /// while instruments are identified by full symbols (e.g., `HYPE-USDC-SPOT`).
69    /// This mapping allows the handler to look up instruments from spot fills.
70    #[pyo3(name = "cache_spot_fill_coins")]
71    fn py_cache_spot_fill_coins(&self, mapping: std::collections::HashMap<String, String>) {
72        let ahash_mapping: ahash::AHashMap<ustr::Ustr, ustr::Ustr> = mapping
73            .into_iter()
74            .map(|(k, v)| (ustr::Ustr::from(&k), ustr::Ustr::from(&v)))
75            .collect();
76        self.cache_spot_fill_coins(ahash_mapping);
77    }
78
79    /// Caches a cloid (hex hash) to client_order_id mapping for order/fill resolution.
80    ///
81    /// The cloid is a keccak256 hash of the client_order_id that Hyperliquid uses internally.
82    /// This mapping allows WebSocket order status and fill reports to be resolved back to
83    /// the original client_order_id.
84    ///
85    /// This writes directly to a shared cache that the handler reads from, avoiding any
86    /// race conditions between caching and WebSocket message processing.
87    #[pyo3(name = "cache_cloid_mapping")]
88    fn py_cache_cloid_mapping(&self, cloid: &str, client_order_id: ClientOrderId) {
89        self.cache_cloid_mapping(ustr::Ustr::from(cloid), client_order_id);
90    }
91
92    /// Removes a cloid mapping from the cache.
93    ///
94    /// Should be called when an order reaches a terminal state (filled, canceled, expired)
95    /// to prevent unbounded memory growth in long-running sessions.
96    #[pyo3(name = "remove_cloid_mapping")]
97    fn py_remove_cloid_mapping(&self, cloid: &str) {
98        self.remove_cloid_mapping(&ustr::Ustr::from(cloid));
99    }
100
101    /// Clears all cloid mappings from the cache.
102    ///
103    /// Useful for cleanup during reconnection or shutdown.
104    #[pyo3(name = "clear_cloid_cache")]
105    fn py_clear_cloid_cache(&self) {
106        self.clear_cloid_cache();
107    }
108
109    /// Returns the number of cloid mappings in the cache.
110    #[pyo3(name = "cloid_cache_len")]
111    fn py_cloid_cache_len(&self) -> usize {
112        self.cloid_cache_len()
113    }
114
115    /// Looks up a client_order_id by its cloid hash.
116    ///
117    /// Returns `Some(ClientOrderId)` if the mapping exists, `None` otherwise.
118    #[pyo3(name = "get_cloid_mapping")]
119    fn py_get_cloid_mapping(&self, cloid: &str) -> Option<ClientOrderId> {
120        self.get_cloid_mapping(&ustr::Ustr::from(cloid))
121    }
122
123    /// Establishes WebSocket connection and spawns the message handler.
124    #[pyo3(name = "connect")]
125    #[allow(clippy::needless_pass_by_value)]
126    fn py_connect<'py>(
127        &self,
128        py: Python<'py>,
129        loop_: Py<PyAny>,
130        instruments: Vec<Py<PyAny>>,
131        callback: Py<PyAny>,
132    ) -> PyResult<Bound<'py, PyAny>> {
133        let call_soon: Py<PyAny> = loop_.getattr(py, "call_soon_threadsafe")?;
134
135        for inst in instruments {
136            let inst_any = pyobject_to_instrument_any(py, inst)?;
137            self.cache_instrument(inst_any);
138        }
139
140        let mut client = self.clone();
141
142        pyo3_async_runtimes::tokio::future_into_py(py, async move {
143            client.connect().await.map_err(to_pyruntime_err)?;
144
145            get_runtime().spawn(async move {
146                loop {
147                    let event = client.next_event().await;
148
149                    match event {
150                        Some(msg) => {
151                            log::trace!("Received WebSocket message: {msg:?}");
152
153                            match msg {
154                                NautilusWsMessage::Trades(trade_ticks) => {
155                                    Python::attach(|py| {
156                                        for tick in trade_ticks {
157                                            let py_obj = data_to_pycapsule(py, Data::Trade(tick));
158                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
159                                        }
160                                    });
161                                }
162                                NautilusWsMessage::Quote(quote_tick) => {
163                                    Python::attach(|py| {
164                                        let py_obj = data_to_pycapsule(py, Data::Quote(quote_tick));
165                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
166                                    });
167                                }
168                                NautilusWsMessage::Deltas(deltas) => {
169                                    Python::attach(|py| {
170                                        let py_obj = data_to_pycapsule(
171                                            py,
172                                            Data::Deltas(OrderBookDeltas_API::new(deltas)),
173                                        );
174                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
175                                    });
176                                }
177                                NautilusWsMessage::Candle(bar) => {
178                                    Python::attach(|py| {
179                                        let py_obj = data_to_pycapsule(py, Data::Bar(bar));
180                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
181                                    });
182                                }
183                                NautilusWsMessage::MarkPrice(mark_price) => {
184                                    Python::attach(|py| {
185                                        let py_obj = data_to_pycapsule(
186                                            py,
187                                            Data::MarkPriceUpdate(mark_price),
188                                        );
189                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
190                                    });
191                                }
192                                NautilusWsMessage::IndexPrice(index_price) => {
193                                    Python::attach(|py| {
194                                        let py_obj = data_to_pycapsule(
195                                            py,
196                                            Data::IndexPriceUpdate(index_price),
197                                        );
198                                        call_python_threadsafe(py, &call_soon, &callback, py_obj);
199                                    });
200                                }
201                                NautilusWsMessage::FundingRate(funding_rate) => {
202                                    Python::attach(|py| {
203                                        if let Ok(py_obj) = funding_rate.into_py_any(py) {
204                                            call_python_threadsafe(py, &call_soon, &callback, py_obj);
205                                        }
206                                    });
207                                }
208                                NautilusWsMessage::ExecutionReports(reports) => {
209                                    Python::attach(|py| {
210                                        for report in reports {
211                                            match report {
212                                                ExecutionReport::Order(order_report) => {
213                                                    log::debug!(
214                                                        "Forwarding order status report: order_id={}, status={:?}",
215                                                        order_report.venue_order_id,
216                                                        order_report.order_status
217                                                    );
218                                                    match Py::new(py, order_report) {
219                                                        Ok(py_obj) => {
220                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
221                                                        }
222                                                        Err(e) => {
223                                                            log::error!("Error converting OrderStatusReport to Python: {e}");
224                                                        }
225                                                    }
226                                                }
227                                                ExecutionReport::Fill(fill_report) => {
228                                                    log::debug!(
229                                                        "Forwarding fill report: trade_id={}, side={:?}, qty={}, price={}",
230                                                        fill_report.trade_id,
231                                                        fill_report.order_side,
232                                                        fill_report.last_qty,
233                                                        fill_report.last_px
234                                                    );
235                                                    match Py::new(py, fill_report) {
236                                                        Ok(py_obj) => {
237                                                            call_python_threadsafe(py, &call_soon, &callback, py_obj.into_any());
238                                                        }
239                                                        Err(e) => {
240                                                            log::error!("Error converting FillReport to Python: {e}");
241                                                        }
242                                                    }
243                                                }
244                                            }
245                                        }
246                                    });
247                                }
248                                _ => {
249                                    log::debug!("Unhandled message type: {msg:?}");
250                                }
251                            }
252                        }
253                        None => {
254                            log::debug!("WebSocket connection closed");
255                            break;
256                        }
257                    }
258                }
259            });
260
261            Ok(())
262        })
263    }
264
265    #[pyo3(name = "wait_until_active")]
266    fn py_wait_until_active<'py>(
267        &self,
268        py: Python<'py>,
269        timeout_secs: f64,
270    ) -> PyResult<Bound<'py, PyAny>> {
271        let client = self.clone();
272
273        pyo3_async_runtimes::tokio::future_into_py(py, async move {
274            let start = std::time::Instant::now();
275            loop {
276                if client.is_active() {
277                    return Ok(());
278                }
279
280                if start.elapsed().as_secs_f64() >= timeout_secs {
281                    return Err(to_pyruntime_err(format!(
282                        "WebSocket connection did not become active within {timeout_secs} seconds"
283                    )));
284                }
285
286                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
287            }
288        })
289    }
290
291    #[pyo3(name = "close")]
292    fn py_close<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
293        let mut client = self.clone();
294
295        pyo3_async_runtimes::tokio::future_into_py(py, async move {
296            if let Err(e) = client.disconnect().await {
297                log::error!("Error on close: {e}");
298            }
299            Ok(())
300        })
301    }
302
303    /// Subscribe to trades for an instrument.
304    #[pyo3(name = "subscribe_trades")]
305    fn py_subscribe_trades<'py>(
306        &self,
307        py: Python<'py>,
308        instrument_id: InstrumentId,
309    ) -> PyResult<Bound<'py, PyAny>> {
310        let client = self.clone();
311
312        pyo3_async_runtimes::tokio::future_into_py(py, async move {
313            client
314                .subscribe_trades(instrument_id)
315                .await
316                .map_err(to_pyruntime_err)?;
317            Ok(())
318        })
319    }
320
321    /// Unsubscribe from trades for an instrument.
322    #[pyo3(name = "unsubscribe_trades")]
323    fn py_unsubscribe_trades<'py>(
324        &self,
325        py: Python<'py>,
326        instrument_id: InstrumentId,
327    ) -> PyResult<Bound<'py, PyAny>> {
328        let client = self.clone();
329
330        pyo3_async_runtimes::tokio::future_into_py(py, async move {
331            client
332                .unsubscribe_trades(instrument_id)
333                .await
334                .map_err(to_pyruntime_err)?;
335            Ok(())
336        })
337    }
338
339    /// Subscribe to L2 order book for an instrument.
340    #[pyo3(name = "subscribe_book")]
341    fn py_subscribe_book<'py>(
342        &self,
343        py: Python<'py>,
344        instrument_id: InstrumentId,
345    ) -> PyResult<Bound<'py, PyAny>> {
346        let client = self.clone();
347
348        pyo3_async_runtimes::tokio::future_into_py(py, async move {
349            client
350                .subscribe_book(instrument_id)
351                .await
352                .map_err(to_pyruntime_err)?;
353            Ok(())
354        })
355    }
356
357    /// Unsubscribe from L2 order book for an instrument.
358    #[pyo3(name = "unsubscribe_book")]
359    fn py_unsubscribe_book<'py>(
360        &self,
361        py: Python<'py>,
362        instrument_id: InstrumentId,
363    ) -> PyResult<Bound<'py, PyAny>> {
364        let client = self.clone();
365
366        pyo3_async_runtimes::tokio::future_into_py(py, async move {
367            client
368                .unsubscribe_book(instrument_id)
369                .await
370                .map_err(to_pyruntime_err)?;
371            Ok(())
372        })
373    }
374
375    #[pyo3(name = "subscribe_book_deltas")]
376    fn py_subscribe_book_deltas<'py>(
377        &self,
378        py: Python<'py>,
379        instrument_id: InstrumentId,
380        _book_type: u8,
381        _depth: u64,
382    ) -> PyResult<Bound<'py, PyAny>> {
383        let client = self.clone();
384
385        pyo3_async_runtimes::tokio::future_into_py(py, async move {
386            client
387                .subscribe_book(instrument_id)
388                .await
389                .map_err(to_pyruntime_err)?;
390            Ok(())
391        })
392    }
393
394    #[pyo3(name = "unsubscribe_book_deltas")]
395    fn py_unsubscribe_book_deltas<'py>(
396        &self,
397        py: Python<'py>,
398        instrument_id: InstrumentId,
399    ) -> PyResult<Bound<'py, PyAny>> {
400        let client = self.clone();
401
402        pyo3_async_runtimes::tokio::future_into_py(py, async move {
403            client
404                .unsubscribe_book(instrument_id)
405                .await
406                .map_err(to_pyruntime_err)?;
407            Ok(())
408        })
409    }
410
411    #[pyo3(name = "subscribe_book_snapshots")]
412    fn py_subscribe_book_snapshots<'py>(
413        &self,
414        py: Python<'py>,
415        instrument_id: InstrumentId,
416        _book_type: u8,
417        _depth: u64,
418    ) -> PyResult<Bound<'py, PyAny>> {
419        let client = self.clone();
420
421        pyo3_async_runtimes::tokio::future_into_py(py, async move {
422            client
423                .subscribe_book(instrument_id)
424                .await
425                .map_err(to_pyruntime_err)?;
426            Ok(())
427        })
428    }
429
430    /// Subscribe to best bid/offer (BBO) quotes for an instrument.
431    #[pyo3(name = "subscribe_quotes")]
432    fn py_subscribe_quotes<'py>(
433        &self,
434        py: Python<'py>,
435        instrument_id: InstrumentId,
436    ) -> PyResult<Bound<'py, PyAny>> {
437        let client = self.clone();
438
439        pyo3_async_runtimes::tokio::future_into_py(py, async move {
440            client
441                .subscribe_quotes(instrument_id)
442                .await
443                .map_err(to_pyruntime_err)?;
444            Ok(())
445        })
446    }
447
448    /// Unsubscribe from quote ticks for an instrument.
449    #[pyo3(name = "unsubscribe_quotes")]
450    fn py_unsubscribe_quotes<'py>(
451        &self,
452        py: Python<'py>,
453        instrument_id: InstrumentId,
454    ) -> PyResult<Bound<'py, PyAny>> {
455        let client = self.clone();
456
457        pyo3_async_runtimes::tokio::future_into_py(py, async move {
458            client
459                .unsubscribe_quotes(instrument_id)
460                .await
461                .map_err(to_pyruntime_err)?;
462            Ok(())
463        })
464    }
465
466    /// Subscribe to candle/bar data for a specific coin and interval.
467    #[pyo3(name = "subscribe_bars")]
468    fn py_subscribe_bars<'py>(
469        &self,
470        py: Python<'py>,
471        bar_type: BarType,
472    ) -> PyResult<Bound<'py, PyAny>> {
473        let client = self.clone();
474
475        pyo3_async_runtimes::tokio::future_into_py(py, async move {
476            client
477                .subscribe_bars(bar_type)
478                .await
479                .map_err(to_pyruntime_err)?;
480            Ok(())
481        })
482    }
483
484    /// Unsubscribe from candle/bar data.
485    #[pyo3(name = "unsubscribe_bars")]
486    fn py_unsubscribe_bars<'py>(
487        &self,
488        py: Python<'py>,
489        bar_type: BarType,
490    ) -> PyResult<Bound<'py, PyAny>> {
491        let client = self.clone();
492
493        pyo3_async_runtimes::tokio::future_into_py(py, async move {
494            client
495                .unsubscribe_bars(bar_type)
496                .await
497                .map_err(to_pyruntime_err)?;
498            Ok(())
499        })
500    }
501
502    /// Subscribe to order updates for a specific user address.
503    #[pyo3(name = "subscribe_order_updates")]
504    fn py_subscribe_order_updates<'py>(
505        &self,
506        py: Python<'py>,
507        user: String,
508    ) -> PyResult<Bound<'py, PyAny>> {
509        let client = self.clone();
510
511        pyo3_async_runtimes::tokio::future_into_py(py, async move {
512            client
513                .subscribe_order_updates(&user)
514                .await
515                .map_err(to_pyruntime_err)?;
516            Ok(())
517        })
518    }
519
520    /// Subscribe to user events (fills, funding, liquidations) for a specific user address.
521    #[pyo3(name = "subscribe_user_events")]
522    fn py_subscribe_user_events<'py>(
523        &self,
524        py: Python<'py>,
525        user: String,
526    ) -> PyResult<Bound<'py, PyAny>> {
527        let client = self.clone();
528
529        pyo3_async_runtimes::tokio::future_into_py(py, async move {
530            client
531                .subscribe_user_events(&user)
532                .await
533                .map_err(to_pyruntime_err)?;
534            Ok(())
535        })
536    }
537
538    /// Subscribe to user fills for a specific user address.
539    ///
540    /// Note: This channel is redundant with `userEvents` which already includes fills.
541    /// Prefer using `subscribe_user_events` or `subscribe_all_user_channels` instead.
542    #[pyo3(name = "subscribe_user_fills")]
543    fn py_subscribe_user_fills<'py>(
544        &self,
545        py: Python<'py>,
546        user: String,
547    ) -> PyResult<Bound<'py, PyAny>> {
548        let client = self.clone();
549
550        pyo3_async_runtimes::tokio::future_into_py(py, async move {
551            client
552                .subscribe_user_fills(&user)
553                .await
554                .map_err(to_pyruntime_err)?;
555            Ok(())
556        })
557    }
558
559    /// Subscribe to mark price updates for an instrument.
560    #[pyo3(name = "subscribe_mark_prices")]
561    fn py_subscribe_mark_prices<'py>(
562        &self,
563        py: Python<'py>,
564        instrument_id: InstrumentId,
565    ) -> PyResult<Bound<'py, PyAny>> {
566        let client = self.clone();
567
568        pyo3_async_runtimes::tokio::future_into_py(py, async move {
569            client
570                .subscribe_mark_prices(instrument_id)
571                .await
572                .map_err(to_pyruntime_err)?;
573            Ok(())
574        })
575    }
576
577    /// Unsubscribe from mark price updates for an instrument.
578    #[pyo3(name = "unsubscribe_mark_prices")]
579    fn py_unsubscribe_mark_prices<'py>(
580        &self,
581        py: Python<'py>,
582        instrument_id: InstrumentId,
583    ) -> PyResult<Bound<'py, PyAny>> {
584        let client = self.clone();
585
586        pyo3_async_runtimes::tokio::future_into_py(py, async move {
587            client
588                .unsubscribe_mark_prices(instrument_id)
589                .await
590                .map_err(to_pyruntime_err)?;
591            Ok(())
592        })
593    }
594
595    /// Subscribe to index/oracle price updates for an instrument.
596    #[pyo3(name = "subscribe_index_prices")]
597    fn py_subscribe_index_prices<'py>(
598        &self,
599        py: Python<'py>,
600        instrument_id: InstrumentId,
601    ) -> PyResult<Bound<'py, PyAny>> {
602        let client = self.clone();
603
604        pyo3_async_runtimes::tokio::future_into_py(py, async move {
605            client
606                .subscribe_index_prices(instrument_id)
607                .await
608                .map_err(to_pyruntime_err)?;
609            Ok(())
610        })
611    }
612
613    /// Unsubscribe from index/oracle price updates for an instrument.
614    #[pyo3(name = "unsubscribe_index_prices")]
615    fn py_unsubscribe_index_prices<'py>(
616        &self,
617        py: Python<'py>,
618        instrument_id: InstrumentId,
619    ) -> PyResult<Bound<'py, PyAny>> {
620        let client = self.clone();
621
622        pyo3_async_runtimes::tokio::future_into_py(py, async move {
623            client
624                .unsubscribe_index_prices(instrument_id)
625                .await
626                .map_err(to_pyruntime_err)?;
627            Ok(())
628        })
629    }
630
631    /// Subscribe to funding rate updates for an instrument.
632    #[pyo3(name = "subscribe_funding_rates")]
633    fn py_subscribe_funding_rates<'py>(
634        &self,
635        py: Python<'py>,
636        instrument_id: InstrumentId,
637    ) -> PyResult<Bound<'py, PyAny>> {
638        let client = self.clone();
639
640        pyo3_async_runtimes::tokio::future_into_py(py, async move {
641            client
642                .subscribe_funding_rates(instrument_id)
643                .await
644                .map_err(to_pyruntime_err)?;
645            Ok(())
646        })
647    }
648
649    /// Unsubscribe from funding rate updates for an instrument.
650    #[pyo3(name = "unsubscribe_funding_rates")]
651    fn py_unsubscribe_funding_rates<'py>(
652        &self,
653        py: Python<'py>,
654        instrument_id: InstrumentId,
655    ) -> PyResult<Bound<'py, PyAny>> {
656        let client = self.clone();
657
658        pyo3_async_runtimes::tokio::future_into_py(py, async move {
659            client
660                .unsubscribe_funding_rates(instrument_id)
661                .await
662                .map_err(to_pyruntime_err)?;
663            Ok(())
664        })
665    }
666}