Skip to main content

nautilus_hyperliquid/websocket/
parse.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Parsing helpers for Hyperliquid WebSocket payloads.
17
18use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23    data::{
24        Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25        OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26    },
27    enums::{
28        AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29        TimeInForce,
30    },
31    identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32    instruments::{Instrument, InstrumentAny},
33    reports::{FillReport, OrderStatusReport},
34    types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{Decimal, prelude::FromPrimitive};
37
38use super::messages::{
39    CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
40};
41use crate::common::parse::{
42    is_conditional_order_data, make_fill_trade_id, millis_to_nanos, parse_trigger_order_type,
43};
44
45fn parse_price(
46    price_str: &str,
47    instrument: &InstrumentAny,
48    field_name: &str,
49) -> anyhow::Result<Price> {
50    let decimal = Decimal::from_str(price_str)
51        .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
52
53    Price::from_decimal_dp(decimal, instrument.price_precision())
54        .with_context(|| format!("Failed to create price from '{price_str}' for {field_name}"))
55}
56
57fn parse_quantity(
58    quantity_str: &str,
59    instrument: &InstrumentAny,
60    field_name: &str,
61) -> anyhow::Result<Quantity> {
62    let decimal = Decimal::from_str(quantity_str).with_context(|| {
63        format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
64    })?;
65
66    Quantity::from_decimal_dp(decimal.abs(), instrument.size_precision()).with_context(|| {
67        format!("Failed to create quantity from '{quantity_str}' for {field_name}")
68    })
69}
70
71/// Parses a WebSocket trade frame into a [`TradeTick`].
72pub fn parse_ws_trade_tick(
73    trade: &WsTradeData,
74    instrument: &InstrumentAny,
75    ts_init: UnixNanos,
76) -> anyhow::Result<TradeTick> {
77    let price = parse_price(&trade.px, instrument, "trade.px")?;
78    let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
79    let aggressor = AggressorSide::from(trade.side);
80    let trade_id = TradeId::new_checked(trade.tid.to_string())
81        .context("invalid trade identifier in Hyperliquid trade message")?;
82    let ts_event = millis_to_nanos(trade.time)?;
83
84    TradeTick::new_checked(
85        instrument.id(),
86        price,
87        size,
88        aggressor,
89        trade_id,
90        ts_event,
91        ts_init,
92    )
93    .context("failed to construct TradeTick from Hyperliquid trade message")
94}
95
96/// Parses a WebSocket L2 order book message into [`OrderBookDeltas`].
97pub fn parse_ws_order_book_deltas(
98    book: &WsBookData,
99    instrument: &InstrumentAny,
100    ts_init: UnixNanos,
101) -> anyhow::Result<OrderBookDeltas> {
102    let ts_event = millis_to_nanos(book.time)?;
103    let mut deltas = Vec::new();
104
105    // Treat every book payload as a snapshot: clear existing depth and rebuild it
106    deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
107
108    for level in &book.levels[0] {
109        let price = parse_price(&level.px, instrument, "book.bid.px")?;
110        let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
111
112        if !size.is_positive() {
113            continue;
114        }
115
116        let order = BookOrder::new(OrderSide::Buy, price, size, 0);
117
118        let delta = OrderBookDelta::new(
119            instrument.id(),
120            BookAction::Add,
121            order,
122            RecordFlag::F_LAST as u8,
123            0, // sequence
124            ts_event,
125            ts_init,
126        );
127
128        deltas.push(delta);
129    }
130
131    for level in &book.levels[1] {
132        let price = parse_price(&level.px, instrument, "book.ask.px")?;
133        let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
134
135        if !size.is_positive() {
136            continue;
137        }
138
139        let order = BookOrder::new(OrderSide::Sell, price, size, 0);
140
141        let delta = OrderBookDelta::new(
142            instrument.id(),
143            BookAction::Add,
144            order,
145            RecordFlag::F_LAST as u8,
146            0, // sequence
147            ts_event,
148            ts_init,
149        );
150
151        deltas.push(delta);
152    }
153
154    Ok(OrderBookDeltas::new(instrument.id(), deltas))
155}
156
157/// Parses a WebSocket BBO (best bid/offer) message into a [`QuoteTick`].
158pub fn parse_ws_quote_tick(
159    bbo: &WsBboData,
160    instrument: &InstrumentAny,
161    ts_init: UnixNanos,
162) -> anyhow::Result<QuoteTick> {
163    let bid_level = bbo.bbo[0]
164        .as_ref()
165        .context("BBO message missing bid level")?;
166    let ask_level = bbo.bbo[1]
167        .as_ref()
168        .context("BBO message missing ask level")?;
169
170    let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
171    let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
172    let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
173    let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
174
175    let ts_event = millis_to_nanos(bbo.time)?;
176
177    QuoteTick::new_checked(
178        instrument.id(),
179        bid_price,
180        ask_price,
181        bid_size,
182        ask_size,
183        ts_event,
184        ts_init,
185    )
186    .context("failed to construct QuoteTick from Hyperliquid BBO message")
187}
188
189/// Parses a WebSocket candle message into a [`Bar`].
190pub fn parse_ws_candle(
191    candle: &CandleData,
192    instrument: &InstrumentAny,
193    bar_type: &BarType,
194    ts_init: UnixNanos,
195) -> anyhow::Result<Bar> {
196    let open = parse_price(&candle.o, instrument, "candle.o")?;
197    let high = parse_price(&candle.h, instrument, "candle.h")?;
198    let low = parse_price(&candle.l, instrument, "candle.l")?;
199    let close = parse_price(&candle.c, instrument, "candle.c")?;
200    let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
201
202    let ts_event = millis_to_nanos(candle.t)?;
203
204    Ok(Bar::new(
205        *bar_type, open, high, low, close, volume, ts_event, ts_init,
206    ))
207}
208
209/// Parses a WebSocket order update message into an [`OrderStatusReport`].
210///
211/// This converts Hyperliquid order data from WebSocket into Nautilus order status reports.
212/// Handles both regular and conditional orders (stop/limit-if-touched).
213pub fn parse_ws_order_status_report(
214    order: &WsOrderData,
215    instrument: &InstrumentAny,
216    account_id: AccountId,
217    ts_init: UnixNanos,
218) -> anyhow::Result<OrderStatusReport> {
219    let instrument_id = instrument.id();
220    let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
221    let order_side = OrderSide::from(order.order.side);
222
223    // Determine order type based on trigger info
224    let order_type = if is_conditional_order_data(
225        order.order.trigger_px.as_deref(),
226        order.order.tpsl.as_ref(),
227    ) {
228        if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
229            parse_trigger_order_type(is_market, tpsl)
230        } else {
231            OrderType::Limit // fallback
232        }
233    } else {
234        OrderType::Limit // Regular limit order
235    };
236
237    let time_in_force = TimeInForce::Gtc;
238    let order_status = OrderStatus::from(order.status);
239
240    // orig_sz is the original order quantity, sz is the remaining quantity
241    let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
242    let remaining_qty = parse_quantity(&order.order.sz, instrument, "order.sz")?;
243    let filled_qty = Quantity::from_raw(
244        orig_qty.raw.saturating_sub(remaining_qty.raw),
245        instrument.size_precision(),
246    );
247
248    let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
249
250    let ts_accepted = millis_to_nanos(order.order.timestamp)?;
251    let ts_last = millis_to_nanos(order.status_timestamp)?;
252
253    let mut report = OrderStatusReport::new(
254        account_id,
255        instrument_id,
256        None, // venue_order_id_modified
257        venue_order_id,
258        order_side,
259        order_type,
260        time_in_force,
261        order_status,
262        orig_qty, // Use original quantity, not remaining
263        filled_qty,
264        ts_accepted,
265        ts_last,
266        ts_init,
267        Some(UUID4::new()),
268    );
269
270    if let Some(ref cloid) = order.order.cloid {
271        report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
272    }
273
274    report = report.with_price(price);
275
276    if let Some(ref trigger_px_str) = order.order.trigger_px {
277        let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
278        report = report.with_trigger_price(trigger_price);
279    }
280
281    Ok(report)
282}
283
284/// Parses a WebSocket fill message into a [`FillReport`].
285///
286/// This converts Hyperliquid fill data from WebSocket user events into Nautilus fill reports.
287pub fn parse_ws_fill_report(
288    fill: &WsFillData,
289    instrument: &InstrumentAny,
290    account_id: AccountId,
291    ts_init: UnixNanos,
292) -> anyhow::Result<FillReport> {
293    let instrument_id = instrument.id();
294    let venue_order_id = VenueOrderId::new(fill.oid.to_string());
295    let trade_id = make_fill_trade_id(
296        &fill.hash,
297        fill.oid,
298        &fill.px,
299        &fill.sz,
300        fill.time,
301        &fill.start_position,
302    );
303
304    let order_side = OrderSide::from(fill.side);
305    let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
306    let last_px = parse_price(&fill.px, instrument, "fill.px")?;
307    let liquidity_side = if fill.crossed {
308        LiquiditySide::Taker
309    } else {
310        LiquiditySide::Maker
311    };
312
313    let fee_amount = Decimal::from_str(&fill.fee)
314        .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?;
315
316    let commission_currency = Currency::from_str(fill.fee_token.as_str())
317        .with_context(|| format!("Unknown fee token '{}'", fill.fee_token))?;
318
319    let commission = Money::from_decimal(fee_amount, commission_currency)
320        .with_context(|| format!("Failed to create commission from fee='{}'", fill.fee))?;
321    let ts_event = millis_to_nanos(fill.time)?;
322
323    // No client order ID available in fill data directly
324    let client_order_id = None;
325
326    Ok(FillReport::new(
327        account_id,
328        instrument_id,
329        venue_order_id,
330        trade_id,
331        order_side,
332        last_qty,
333        last_px,
334        commission,
335        liquidity_side,
336        client_order_id,
337        None, // venue_position_id
338        ts_event,
339        ts_init,
340        None, // report_id
341    ))
342}
343
344/// Parses a WebSocket ActiveAssetCtx message into mark price, index price, and funding rate updates.
345///
346/// This converts Hyperliquid asset context data into Nautilus price and funding rate updates.
347/// Returns a tuple of (`MarkPriceUpdate`, `Option<IndexPriceUpdate>`, `Option<FundingRateUpdate>`).
348/// Index price and funding rate are only present for perpetual contracts.
349pub fn parse_ws_asset_context(
350    ctx: &WsActiveAssetCtxData,
351    instrument: &InstrumentAny,
352    ts_init: UnixNanos,
353) -> anyhow::Result<(
354    MarkPriceUpdate,
355    Option<IndexPriceUpdate>,
356    Option<FundingRateUpdate>,
357)> {
358    let instrument_id = instrument.id();
359
360    match ctx {
361        WsActiveAssetCtxData::Perp { coin: _, ctx } => {
362            let mark_px_f64 = ctx
363                .shared
364                .mark_px
365                .parse::<f64>()
366                .context("Failed to parse mark_px as f64")?;
367            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
368            let mark_price_update =
369                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
370
371            let oracle_px_f64 = ctx
372                .oracle_px
373                .parse::<f64>()
374                .context("Failed to parse oracle_px as f64")?;
375            let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
376            let index_price_update =
377                IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
378
379            let funding_f64 = ctx
380                .funding
381                .parse::<f64>()
382                .context("Failed to parse funding as f64")?;
383            let funding_rate_decimal = Decimal::from_f64(funding_f64)
384                .context("Failed to convert funding rate to Decimal")?;
385            let funding_rate_update = FundingRateUpdate::new(
386                instrument_id,
387                funding_rate_decimal,
388                None, // Hyperliquid doesn't provide next funding time in this message
389                ts_init,
390                ts_init,
391            );
392
393            Ok((
394                mark_price_update,
395                Some(index_price_update),
396                Some(funding_rate_update),
397            ))
398        }
399        WsActiveAssetCtxData::Spot { coin: _, ctx } => {
400            let mark_px_f64 = ctx
401                .shared
402                .mark_px
403                .parse::<f64>()
404                .context("Failed to parse mark_px as f64")?;
405            let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
406            let mark_price_update =
407                MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
408
409            Ok((mark_price_update, None, None))
410        }
411    }
412}
413
414fn parse_f64_price(
415    price: f64,
416    instrument: &InstrumentAny,
417    field_name: &str,
418) -> anyhow::Result<Price> {
419    if !price.is_finite() {
420        anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
421    }
422    Ok(Price::new(price, instrument.price_precision()))
423}
424
425#[cfg(test)]
426mod tests {
427    use nautilus_model::{
428        identifiers::{InstrumentId, Symbol, Venue},
429        instruments::CryptoPerpetual,
430        types::currency::Currency,
431    };
432    use rstest::rstest;
433    use ustr::Ustr;
434
435    use super::*;
436    use crate::{
437        common::enums::{
438            HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
439            HyperliquidSide,
440        },
441        websocket::messages::{
442            PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
443        },
444    };
445
446    fn create_test_instrument() -> InstrumentAny {
447        let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
448
449        InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
450            instrument_id,
451            Symbol::new("BTC-PERP"),
452            Currency::from("BTC"),
453            Currency::from("USDC"),
454            Currency::from("USDC"),
455            false, // is_inverse
456            2,     // price_precision
457            3,     // size_precision
458            Price::from("0.01"),
459            Quantity::from("0.001"),
460            None, // multiplier
461            None, // lot_size
462            None, // max_quantity
463            None, // min_quantity
464            None, // max_notional
465            None, // min_notional
466            None, // max_price
467            None, // min_price
468            None, // margin_init
469            None, // margin_maint
470            None, // maker_fee
471            None, // taker_fee
472            None, // info
473            UnixNanos::default(),
474            UnixNanos::default(),
475        ))
476    }
477
478    #[rstest]
479    fn test_parse_ws_order_status_report_basic() {
480        let instrument = create_test_instrument();
481        let account_id = AccountId::new("HYPERLIQUID-001");
482        let ts_init = UnixNanos::default();
483
484        let order_data = WsOrderData {
485            order: WsBasicOrderData {
486                coin: Ustr::from("BTC"),
487                side: HyperliquidSide::Buy,
488                limit_px: "50000.0".to_string(),
489                sz: "0.5".to_string(),
490                oid: 12345,
491                timestamp: 1704470400000,
492                orig_sz: "1.0".to_string(),
493                cloid: Some("test-order-1".to_string()),
494                trigger_px: None,
495                is_market: None,
496                tpsl: None,
497                trigger_activated: None,
498                trailing_stop: None,
499            },
500            status: HyperliquidOrderStatusEnum::Open,
501            status_timestamp: 1704470400000,
502        };
503
504        let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
505        assert!(result.is_ok());
506
507        let report = result.unwrap();
508        assert_eq!(report.order_side, OrderSide::Buy);
509        assert_eq!(report.order_type, OrderType::Limit);
510        assert_eq!(report.order_status, OrderStatus::Accepted);
511    }
512
513    #[rstest]
514    fn test_parse_ws_fill_report_basic() {
515        let instrument = create_test_instrument();
516        let account_id = AccountId::new("HYPERLIQUID-001");
517        let ts_init = UnixNanos::default();
518
519        let fill_data = WsFillData {
520            coin: Ustr::from("BTC"),
521            px: "50000.0".to_string(),
522            sz: "0.1".to_string(),
523            side: HyperliquidSide::Buy,
524            time: 1704470400000,
525            start_position: "0.0".to_string(),
526            dir: HyperliquidFillDirection::OpenLong,
527            closed_pnl: "0.0".to_string(),
528            hash: "0xabc123".to_string(),
529            oid: 12345,
530            crossed: true,
531            fee: "0.05".to_string(),
532            tid: 98765,
533            liquidation: None,
534            fee_token: Ustr::from("USDC"),
535            builder_fee: None,
536            cloid: Some("0xd211f1c27288259290850338d22132a0".to_string()),
537            twap_id: None,
538        };
539
540        let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
541        assert!(result.is_ok());
542
543        let report = result.unwrap();
544        assert_eq!(report.order_side, OrderSide::Buy);
545        assert_eq!(report.liquidity_side, LiquiditySide::Taker);
546    }
547
548    #[rstest]
549    fn test_parse_ws_order_book_deltas_snapshot_behavior() {
550        let instrument = create_test_instrument();
551        let ts_init = UnixNanos::default();
552
553        let book = WsBookData {
554            coin: Ustr::from("BTC"),
555            levels: [
556                vec![WsLevelData {
557                    px: "50000.0".to_string(),
558                    sz: "1.0".to_string(),
559                    n: 1,
560                }],
561                vec![WsLevelData {
562                    px: "50001.0".to_string(),
563                    sz: "2.0".to_string(),
564                    n: 1,
565                }],
566            ],
567            time: 1_704_470_400_000,
568        };
569
570        let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
571
572        assert_eq!(deltas.deltas.len(), 3); // clear + bid + ask
573        assert_eq!(deltas.deltas[0].action, BookAction::Clear);
574
575        let bid_delta = &deltas.deltas[1];
576        assert_eq!(bid_delta.action, BookAction::Add);
577        assert_eq!(bid_delta.order.side, OrderSide::Buy);
578        assert!(bid_delta.order.size.is_positive());
579        assert_eq!(bid_delta.order.order_id, 0);
580
581        let ask_delta = &deltas.deltas[2];
582        assert_eq!(ask_delta.action, BookAction::Add);
583        assert_eq!(ask_delta.order.side, OrderSide::Sell);
584        assert!(ask_delta.order.size.is_positive());
585        assert_eq!(ask_delta.order.order_id, 0);
586    }
587
588    #[rstest]
589    fn test_parse_ws_asset_context_perp() {
590        let instrument = create_test_instrument();
591        let ts_init = UnixNanos::default();
592
593        let ctx_data = WsActiveAssetCtxData::Perp {
594            coin: Ustr::from("BTC"),
595            ctx: PerpsAssetCtx {
596                shared: SharedAssetCtx {
597                    day_ntl_vlm: "1000000.0".to_string(),
598                    prev_day_px: "49000.0".to_string(),
599                    mark_px: "50000.0".to_string(),
600                    mid_px: Some("50001.0".to_string()),
601                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
602                    day_base_vlm: Some("100.0".to_string()),
603                },
604                funding: "0.0001".to_string(),
605                open_interest: "100000.0".to_string(),
606                oracle_px: "50005.0".to_string(),
607                premium: Some("-0.0001".to_string()),
608            },
609        };
610
611        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
612        assert!(result.is_ok());
613
614        let (mark_price, index_price, funding_rate) = result.unwrap();
615
616        assert_eq!(mark_price.instrument_id, instrument.id());
617        assert_eq!(mark_price.value.as_f64(), 50_000.0);
618
619        assert!(index_price.is_some());
620        let index = index_price.unwrap();
621        assert_eq!(index.instrument_id, instrument.id());
622        assert_eq!(index.value.as_f64(), 50_005.0);
623
624        assert!(funding_rate.is_some());
625        let funding = funding_rate.unwrap();
626        assert_eq!(funding.instrument_id, instrument.id());
627        assert_eq!(funding.rate.to_string(), "0.0001");
628    }
629
630    #[rstest]
631    fn test_parse_ws_asset_context_spot() {
632        let instrument = create_test_instrument();
633        let ts_init = UnixNanos::default();
634
635        let ctx_data = WsActiveAssetCtxData::Spot {
636            coin: Ustr::from("BTC"),
637            ctx: SpotAssetCtx {
638                shared: SharedAssetCtx {
639                    day_ntl_vlm: "1000000.0".to_string(),
640                    prev_day_px: "49000.0".to_string(),
641                    mark_px: "50000.0".to_string(),
642                    mid_px: Some("50001.0".to_string()),
643                    impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
644                    day_base_vlm: Some("100.0".to_string()),
645                },
646                circulating_supply: "19000000.0".to_string(),
647            },
648        };
649
650        let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
651        assert!(result.is_ok());
652
653        let (mark_price, index_price, funding_rate) = result.unwrap();
654
655        assert_eq!(mark_price.instrument_id, instrument.id());
656        assert_eq!(mark_price.value.as_f64(), 50_000.0);
657        assert!(index_price.is_none());
658        assert!(funding_rate.is_none());
659    }
660}