1use std::str::FromStr;
19
20use anyhow::Context;
21use nautilus_core::{nanos::UnixNanos, uuid::UUID4};
22use nautilus_model::{
23 data::{
24 Bar, BarType, BookOrder, FundingRateUpdate, IndexPriceUpdate, MarkPriceUpdate,
25 OrderBookDelta, OrderBookDeltas, QuoteTick, TradeTick,
26 },
27 enums::{
28 AggressorSide, BookAction, LiquiditySide, OrderSide, OrderStatus, OrderType, RecordFlag,
29 TimeInForce,
30 },
31 identifiers::{AccountId, ClientOrderId, TradeId, VenueOrderId},
32 instruments::{Instrument, InstrumentAny},
33 reports::{FillReport, OrderStatusReport},
34 types::{Currency, Money, Price, Quantity},
35};
36use rust_decimal::{Decimal, prelude::FromPrimitive};
37
38use super::messages::{
39 CandleData, WsActiveAssetCtxData, WsBboData, WsBookData, WsFillData, WsOrderData, WsTradeData,
40};
41use crate::common::parse::{
42 is_conditional_order_data, make_fill_trade_id, millis_to_nanos, parse_trigger_order_type,
43};
44
45fn parse_price(
46 price_str: &str,
47 instrument: &InstrumentAny,
48 field_name: &str,
49) -> anyhow::Result<Price> {
50 let decimal = Decimal::from_str(price_str)
51 .with_context(|| format!("Failed to parse price from '{price_str}' for {field_name}"))?;
52
53 Price::from_decimal_dp(decimal, instrument.price_precision())
54 .with_context(|| format!("Failed to create price from '{price_str}' for {field_name}"))
55}
56
57fn parse_quantity(
58 quantity_str: &str,
59 instrument: &InstrumentAny,
60 field_name: &str,
61) -> anyhow::Result<Quantity> {
62 let decimal = Decimal::from_str(quantity_str).with_context(|| {
63 format!("Failed to parse quantity from '{quantity_str}' for {field_name}")
64 })?;
65
66 Quantity::from_decimal_dp(decimal.abs(), instrument.size_precision()).with_context(|| {
67 format!("Failed to create quantity from '{quantity_str}' for {field_name}")
68 })
69}
70
71pub fn parse_ws_trade_tick(
73 trade: &WsTradeData,
74 instrument: &InstrumentAny,
75 ts_init: UnixNanos,
76) -> anyhow::Result<TradeTick> {
77 let price = parse_price(&trade.px, instrument, "trade.px")?;
78 let size = parse_quantity(&trade.sz, instrument, "trade.sz")?;
79 let aggressor = AggressorSide::from(trade.side);
80 let trade_id = TradeId::new_checked(trade.tid.to_string())
81 .context("invalid trade identifier in Hyperliquid trade message")?;
82 let ts_event = millis_to_nanos(trade.time)?;
83
84 TradeTick::new_checked(
85 instrument.id(),
86 price,
87 size,
88 aggressor,
89 trade_id,
90 ts_event,
91 ts_init,
92 )
93 .context("failed to construct TradeTick from Hyperliquid trade message")
94}
95
96pub fn parse_ws_order_book_deltas(
98 book: &WsBookData,
99 instrument: &InstrumentAny,
100 ts_init: UnixNanos,
101) -> anyhow::Result<OrderBookDeltas> {
102 let ts_event = millis_to_nanos(book.time)?;
103 let mut deltas = Vec::new();
104
105 deltas.push(OrderBookDelta::clear(instrument.id(), 0, ts_event, ts_init));
107
108 for level in &book.levels[0] {
109 let price = parse_price(&level.px, instrument, "book.bid.px")?;
110 let size = parse_quantity(&level.sz, instrument, "book.bid.sz")?;
111
112 if !size.is_positive() {
113 continue;
114 }
115
116 let order = BookOrder::new(OrderSide::Buy, price, size, 0);
117
118 let delta = OrderBookDelta::new(
119 instrument.id(),
120 BookAction::Add,
121 order,
122 RecordFlag::F_LAST as u8,
123 0, ts_event,
125 ts_init,
126 );
127
128 deltas.push(delta);
129 }
130
131 for level in &book.levels[1] {
132 let price = parse_price(&level.px, instrument, "book.ask.px")?;
133 let size = parse_quantity(&level.sz, instrument, "book.ask.sz")?;
134
135 if !size.is_positive() {
136 continue;
137 }
138
139 let order = BookOrder::new(OrderSide::Sell, price, size, 0);
140
141 let delta = OrderBookDelta::new(
142 instrument.id(),
143 BookAction::Add,
144 order,
145 RecordFlag::F_LAST as u8,
146 0, ts_event,
148 ts_init,
149 );
150
151 deltas.push(delta);
152 }
153
154 Ok(OrderBookDeltas::new(instrument.id(), deltas))
155}
156
157pub fn parse_ws_quote_tick(
159 bbo: &WsBboData,
160 instrument: &InstrumentAny,
161 ts_init: UnixNanos,
162) -> anyhow::Result<QuoteTick> {
163 let bid_level = bbo.bbo[0]
164 .as_ref()
165 .context("BBO message missing bid level")?;
166 let ask_level = bbo.bbo[1]
167 .as_ref()
168 .context("BBO message missing ask level")?;
169
170 let bid_price = parse_price(&bid_level.px, instrument, "bbo.bid.px")?;
171 let ask_price = parse_price(&ask_level.px, instrument, "bbo.ask.px")?;
172 let bid_size = parse_quantity(&bid_level.sz, instrument, "bbo.bid.sz")?;
173 let ask_size = parse_quantity(&ask_level.sz, instrument, "bbo.ask.sz")?;
174
175 let ts_event = millis_to_nanos(bbo.time)?;
176
177 QuoteTick::new_checked(
178 instrument.id(),
179 bid_price,
180 ask_price,
181 bid_size,
182 ask_size,
183 ts_event,
184 ts_init,
185 )
186 .context("failed to construct QuoteTick from Hyperliquid BBO message")
187}
188
189pub fn parse_ws_candle(
191 candle: &CandleData,
192 instrument: &InstrumentAny,
193 bar_type: &BarType,
194 ts_init: UnixNanos,
195) -> anyhow::Result<Bar> {
196 let open = parse_price(&candle.o, instrument, "candle.o")?;
197 let high = parse_price(&candle.h, instrument, "candle.h")?;
198 let low = parse_price(&candle.l, instrument, "candle.l")?;
199 let close = parse_price(&candle.c, instrument, "candle.c")?;
200 let volume = parse_quantity(&candle.v, instrument, "candle.v")?;
201
202 let ts_event = millis_to_nanos(candle.t)?;
203
204 Ok(Bar::new(
205 *bar_type, open, high, low, close, volume, ts_event, ts_init,
206 ))
207}
208
209pub fn parse_ws_order_status_report(
214 order: &WsOrderData,
215 instrument: &InstrumentAny,
216 account_id: AccountId,
217 ts_init: UnixNanos,
218) -> anyhow::Result<OrderStatusReport> {
219 let instrument_id = instrument.id();
220 let venue_order_id = VenueOrderId::new(order.order.oid.to_string());
221 let order_side = OrderSide::from(order.order.side);
222
223 let order_type = if is_conditional_order_data(
225 order.order.trigger_px.as_deref(),
226 order.order.tpsl.as_ref(),
227 ) {
228 if let (Some(is_market), Some(tpsl)) = (order.order.is_market, order.order.tpsl.as_ref()) {
229 parse_trigger_order_type(is_market, tpsl)
230 } else {
231 OrderType::Limit }
233 } else {
234 OrderType::Limit };
236
237 let time_in_force = TimeInForce::Gtc;
238 let order_status = OrderStatus::from(order.status);
239
240 let orig_qty = parse_quantity(&order.order.orig_sz, instrument, "order.orig_sz")?;
242 let remaining_qty = parse_quantity(&order.order.sz, instrument, "order.sz")?;
243 let filled_qty = Quantity::from_raw(
244 orig_qty.raw.saturating_sub(remaining_qty.raw),
245 instrument.size_precision(),
246 );
247
248 let price = parse_price(&order.order.limit_px, instrument, "order.limitPx")?;
249
250 let ts_accepted = millis_to_nanos(order.order.timestamp)?;
251 let ts_last = millis_to_nanos(order.status_timestamp)?;
252
253 let mut report = OrderStatusReport::new(
254 account_id,
255 instrument_id,
256 None, venue_order_id,
258 order_side,
259 order_type,
260 time_in_force,
261 order_status,
262 orig_qty, filled_qty,
264 ts_accepted,
265 ts_last,
266 ts_init,
267 Some(UUID4::new()),
268 );
269
270 if let Some(ref cloid) = order.order.cloid {
271 report = report.with_client_order_id(ClientOrderId::new(cloid.as_str()));
272 }
273
274 report = report.with_price(price);
275
276 if let Some(ref trigger_px_str) = order.order.trigger_px {
277 let trigger_price = parse_price(trigger_px_str, instrument, "order.triggerPx")?;
278 report = report.with_trigger_price(trigger_price);
279 }
280
281 Ok(report)
282}
283
284pub fn parse_ws_fill_report(
288 fill: &WsFillData,
289 instrument: &InstrumentAny,
290 account_id: AccountId,
291 ts_init: UnixNanos,
292) -> anyhow::Result<FillReport> {
293 let instrument_id = instrument.id();
294 let venue_order_id = VenueOrderId::new(fill.oid.to_string());
295 let trade_id = make_fill_trade_id(
296 &fill.hash,
297 fill.oid,
298 &fill.px,
299 &fill.sz,
300 fill.time,
301 &fill.start_position,
302 );
303
304 let order_side = OrderSide::from(fill.side);
305 let last_qty = parse_quantity(&fill.sz, instrument, "fill.sz")?;
306 let last_px = parse_price(&fill.px, instrument, "fill.px")?;
307 let liquidity_side = if fill.crossed {
308 LiquiditySide::Taker
309 } else {
310 LiquiditySide::Maker
311 };
312
313 let fee_amount = Decimal::from_str(&fill.fee)
314 .with_context(|| format!("Failed to parse fee='{}' as decimal", fill.fee))?;
315
316 let commission_currency = Currency::from_str(fill.fee_token.as_str())
317 .with_context(|| format!("Unknown fee token '{}'", fill.fee_token))?;
318
319 let commission = Money::from_decimal(fee_amount, commission_currency)
320 .with_context(|| format!("Failed to create commission from fee='{}'", fill.fee))?;
321 let ts_event = millis_to_nanos(fill.time)?;
322
323 let client_order_id = None;
325
326 Ok(FillReport::new(
327 account_id,
328 instrument_id,
329 venue_order_id,
330 trade_id,
331 order_side,
332 last_qty,
333 last_px,
334 commission,
335 liquidity_side,
336 client_order_id,
337 None, ts_event,
339 ts_init,
340 None, ))
342}
343
344pub fn parse_ws_asset_context(
350 ctx: &WsActiveAssetCtxData,
351 instrument: &InstrumentAny,
352 ts_init: UnixNanos,
353) -> anyhow::Result<(
354 MarkPriceUpdate,
355 Option<IndexPriceUpdate>,
356 Option<FundingRateUpdate>,
357)> {
358 let instrument_id = instrument.id();
359
360 match ctx {
361 WsActiveAssetCtxData::Perp { coin: _, ctx } => {
362 let mark_px_f64 = ctx
363 .shared
364 .mark_px
365 .parse::<f64>()
366 .context("Failed to parse mark_px as f64")?;
367 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
368 let mark_price_update =
369 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
370
371 let oracle_px_f64 = ctx
372 .oracle_px
373 .parse::<f64>()
374 .context("Failed to parse oracle_px as f64")?;
375 let index_price = parse_f64_price(oracle_px_f64, instrument, "ctx.oracle_px")?;
376 let index_price_update =
377 IndexPriceUpdate::new(instrument_id, index_price, ts_init, ts_init);
378
379 let funding_f64 = ctx
380 .funding
381 .parse::<f64>()
382 .context("Failed to parse funding as f64")?;
383 let funding_rate_decimal = Decimal::from_f64(funding_f64)
384 .context("Failed to convert funding rate to Decimal")?;
385 let funding_rate_update = FundingRateUpdate::new(
386 instrument_id,
387 funding_rate_decimal,
388 None, ts_init,
390 ts_init,
391 );
392
393 Ok((
394 mark_price_update,
395 Some(index_price_update),
396 Some(funding_rate_update),
397 ))
398 }
399 WsActiveAssetCtxData::Spot { coin: _, ctx } => {
400 let mark_px_f64 = ctx
401 .shared
402 .mark_px
403 .parse::<f64>()
404 .context("Failed to parse mark_px as f64")?;
405 let mark_price = parse_f64_price(mark_px_f64, instrument, "ctx.mark_px")?;
406 let mark_price_update =
407 MarkPriceUpdate::new(instrument_id, mark_price, ts_init, ts_init);
408
409 Ok((mark_price_update, None, None))
410 }
411 }
412}
413
414fn parse_f64_price(
415 price: f64,
416 instrument: &InstrumentAny,
417 field_name: &str,
418) -> anyhow::Result<Price> {
419 if !price.is_finite() {
420 anyhow::bail!("Invalid price value for {field_name}: {price} (must be finite)");
421 }
422 Ok(Price::new(price, instrument.price_precision()))
423}
424
425#[cfg(test)]
426mod tests {
427 use nautilus_model::{
428 identifiers::{InstrumentId, Symbol, Venue},
429 instruments::CryptoPerpetual,
430 types::currency::Currency,
431 };
432 use rstest::rstest;
433 use ustr::Ustr;
434
435 use super::*;
436 use crate::{
437 common::enums::{
438 HyperliquidFillDirection, HyperliquidOrderStatus as HyperliquidOrderStatusEnum,
439 HyperliquidSide,
440 },
441 websocket::messages::{
442 PerpsAssetCtx, SharedAssetCtx, SpotAssetCtx, WsBasicOrderData, WsBookData, WsLevelData,
443 },
444 };
445
446 fn create_test_instrument() -> InstrumentAny {
447 let instrument_id = InstrumentId::new(Symbol::new("BTC-PERP"), Venue::new("HYPERLIQUID"));
448
449 InstrumentAny::CryptoPerpetual(CryptoPerpetual::new(
450 instrument_id,
451 Symbol::new("BTC-PERP"),
452 Currency::from("BTC"),
453 Currency::from("USDC"),
454 Currency::from("USDC"),
455 false, 2, 3, Price::from("0.01"),
459 Quantity::from("0.001"),
460 None, None, None, None, None, None, None, None, None, None, None, None, None, UnixNanos::default(),
474 UnixNanos::default(),
475 ))
476 }
477
478 #[rstest]
479 fn test_parse_ws_order_status_report_basic() {
480 let instrument = create_test_instrument();
481 let account_id = AccountId::new("HYPERLIQUID-001");
482 let ts_init = UnixNanos::default();
483
484 let order_data = WsOrderData {
485 order: WsBasicOrderData {
486 coin: Ustr::from("BTC"),
487 side: HyperliquidSide::Buy,
488 limit_px: "50000.0".to_string(),
489 sz: "0.5".to_string(),
490 oid: 12345,
491 timestamp: 1704470400000,
492 orig_sz: "1.0".to_string(),
493 cloid: Some("test-order-1".to_string()),
494 trigger_px: None,
495 is_market: None,
496 tpsl: None,
497 trigger_activated: None,
498 trailing_stop: None,
499 },
500 status: HyperliquidOrderStatusEnum::Open,
501 status_timestamp: 1704470400000,
502 };
503
504 let result = parse_ws_order_status_report(&order_data, &instrument, account_id, ts_init);
505 assert!(result.is_ok());
506
507 let report = result.unwrap();
508 assert_eq!(report.order_side, OrderSide::Buy);
509 assert_eq!(report.order_type, OrderType::Limit);
510 assert_eq!(report.order_status, OrderStatus::Accepted);
511 }
512
513 #[rstest]
514 fn test_parse_ws_fill_report_basic() {
515 let instrument = create_test_instrument();
516 let account_id = AccountId::new("HYPERLIQUID-001");
517 let ts_init = UnixNanos::default();
518
519 let fill_data = WsFillData {
520 coin: Ustr::from("BTC"),
521 px: "50000.0".to_string(),
522 sz: "0.1".to_string(),
523 side: HyperliquidSide::Buy,
524 time: 1704470400000,
525 start_position: "0.0".to_string(),
526 dir: HyperliquidFillDirection::OpenLong,
527 closed_pnl: "0.0".to_string(),
528 hash: "0xabc123".to_string(),
529 oid: 12345,
530 crossed: true,
531 fee: "0.05".to_string(),
532 tid: 98765,
533 liquidation: None,
534 fee_token: Ustr::from("USDC"),
535 builder_fee: None,
536 cloid: Some("0xd211f1c27288259290850338d22132a0".to_string()),
537 twap_id: None,
538 };
539
540 let result = parse_ws_fill_report(&fill_data, &instrument, account_id, ts_init);
541 assert!(result.is_ok());
542
543 let report = result.unwrap();
544 assert_eq!(report.order_side, OrderSide::Buy);
545 assert_eq!(report.liquidity_side, LiquiditySide::Taker);
546 }
547
548 #[rstest]
549 fn test_parse_ws_order_book_deltas_snapshot_behavior() {
550 let instrument = create_test_instrument();
551 let ts_init = UnixNanos::default();
552
553 let book = WsBookData {
554 coin: Ustr::from("BTC"),
555 levels: [
556 vec![WsLevelData {
557 px: "50000.0".to_string(),
558 sz: "1.0".to_string(),
559 n: 1,
560 }],
561 vec![WsLevelData {
562 px: "50001.0".to_string(),
563 sz: "2.0".to_string(),
564 n: 1,
565 }],
566 ],
567 time: 1_704_470_400_000,
568 };
569
570 let deltas = parse_ws_order_book_deltas(&book, &instrument, ts_init).unwrap();
571
572 assert_eq!(deltas.deltas.len(), 3); assert_eq!(deltas.deltas[0].action, BookAction::Clear);
574
575 let bid_delta = &deltas.deltas[1];
576 assert_eq!(bid_delta.action, BookAction::Add);
577 assert_eq!(bid_delta.order.side, OrderSide::Buy);
578 assert!(bid_delta.order.size.is_positive());
579 assert_eq!(bid_delta.order.order_id, 0);
580
581 let ask_delta = &deltas.deltas[2];
582 assert_eq!(ask_delta.action, BookAction::Add);
583 assert_eq!(ask_delta.order.side, OrderSide::Sell);
584 assert!(ask_delta.order.size.is_positive());
585 assert_eq!(ask_delta.order.order_id, 0);
586 }
587
588 #[rstest]
589 fn test_parse_ws_asset_context_perp() {
590 let instrument = create_test_instrument();
591 let ts_init = UnixNanos::default();
592
593 let ctx_data = WsActiveAssetCtxData::Perp {
594 coin: Ustr::from("BTC"),
595 ctx: PerpsAssetCtx {
596 shared: SharedAssetCtx {
597 day_ntl_vlm: "1000000.0".to_string(),
598 prev_day_px: "49000.0".to_string(),
599 mark_px: "50000.0".to_string(),
600 mid_px: Some("50001.0".to_string()),
601 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
602 day_base_vlm: Some("100.0".to_string()),
603 },
604 funding: "0.0001".to_string(),
605 open_interest: "100000.0".to_string(),
606 oracle_px: "50005.0".to_string(),
607 premium: Some("-0.0001".to_string()),
608 },
609 };
610
611 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
612 assert!(result.is_ok());
613
614 let (mark_price, index_price, funding_rate) = result.unwrap();
615
616 assert_eq!(mark_price.instrument_id, instrument.id());
617 assert_eq!(mark_price.value.as_f64(), 50_000.0);
618
619 assert!(index_price.is_some());
620 let index = index_price.unwrap();
621 assert_eq!(index.instrument_id, instrument.id());
622 assert_eq!(index.value.as_f64(), 50_005.0);
623
624 assert!(funding_rate.is_some());
625 let funding = funding_rate.unwrap();
626 assert_eq!(funding.instrument_id, instrument.id());
627 assert_eq!(funding.rate.to_string(), "0.0001");
628 }
629
630 #[rstest]
631 fn test_parse_ws_asset_context_spot() {
632 let instrument = create_test_instrument();
633 let ts_init = UnixNanos::default();
634
635 let ctx_data = WsActiveAssetCtxData::Spot {
636 coin: Ustr::from("BTC"),
637 ctx: SpotAssetCtx {
638 shared: SharedAssetCtx {
639 day_ntl_vlm: "1000000.0".to_string(),
640 prev_day_px: "49000.0".to_string(),
641 mark_px: "50000.0".to_string(),
642 mid_px: Some("50001.0".to_string()),
643 impact_pxs: Some(vec!["50000.0".to_string(), "50002.0".to_string()]),
644 day_base_vlm: Some("100.0".to_string()),
645 },
646 circulating_supply: "19000000.0".to_string(),
647 },
648 };
649
650 let result = parse_ws_asset_context(&ctx_data, &instrument, ts_init);
651 assert!(result.is_ok());
652
653 let (mark_price, index_price, funding_rate) = result.unwrap();
654
655 assert_eq!(mark_price.instrument_id, instrument.id());
656 assert_eq!(mark_price.value.as_f64(), 50_000.0);
657 assert!(index_price.is_none());
658 assert!(funding_rate.is_none());
659 }
660}