1use std::{
17 sync::{
18 Arc,
19 atomic::{AtomicU64, Ordering},
20 },
21 time::Duration,
22};
23
24use ahash::AHashMap;
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30 time,
31};
32
33use crate::{
34 common::{consts::INFLIGHT_MAX, enums::HyperliquidInfoRequestType},
35 http::{
36 error::{Error, Result},
37 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38 },
39 websocket::messages::{
40 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42 },
43};
44
45#[derive(Debug)]
46struct Waiter {
47 tx: oneshot::Sender<PostResponse>,
48 _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54 inner: Mutex<AHashMap<u64, Waiter>>,
55 inflight: Arc<Semaphore>, }
57
58impl Default for PostRouter {
59 fn default() -> Self {
60 Self {
61 inner: Mutex::new(AHashMap::new()),
62 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63 }
64 }
65}
66
67impl PostRouter {
68 pub fn new() -> Arc<Self> {
69 Arc::new(Self::default())
70 }
71
72 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74 let permit = self
76 .inflight
77 .clone()
78 .acquire_owned()
79 .await
80 .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82 let (tx, rx) = oneshot::channel::<PostResponse>();
83 let mut map = self.inner.lock().await;
84 if map.contains_key(&id) {
85 return Err(Error::transport(format!("post id {id} already registered")));
86 }
87 map.insert(
88 id,
89 Waiter {
90 tx,
91 _permit: permit,
92 },
93 );
94 Ok(rx)
95 }
96
97 pub async fn complete(&self, resp: PostResponse) {
99 let id = resp.id;
100 let waiter = {
101 let mut map = self.inner.lock().await;
102 map.remove(&id)
103 };
104
105 if let Some(waiter) = waiter {
106 if waiter.tx.send(resp).is_err() {
107 log::warn!("Post waiter dropped before delivery: id={id}");
108 }
109 } else {
111 log::warn!("Post response with unknown id (late/duplicate?): id={id}");
112 }
113 }
114
115 pub async fn cancel(&self, id: u64) {
117 let _ = {
118 let mut map = self.inner.lock().await;
119 map.remove(&id)
120 };
121 }
123
124 pub async fn await_with_timeout(
126 &self,
127 id: u64,
128 rx: oneshot::Receiver<PostResponse>,
129 timeout: Duration,
130 ) -> Result<PostResponse> {
131 match time::timeout(timeout, rx).await {
132 Ok(Ok(resp)) => Ok(resp),
133 Ok(Err(_closed)) => {
134 self.cancel(id).await;
135 Err(Error::transport("post response channel closed"))
136 }
137 Err(_elapsed) => {
138 self.cancel(id).await;
139 Err(Error::Timeout)
140 }
141 }
142 }
143}
144
145#[derive(Debug)]
146pub struct PostIds(AtomicU64);
147
148impl PostIds {
149 pub fn new(start: u64) -> Self {
150 Self(AtomicU64::new(start))
151 }
152 pub fn next(&self) -> u64 {
153 self.0.fetch_add(1, Ordering::Relaxed)
154 }
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum PostLane {
159 Alo, Normal, }
162
163#[derive(Debug)]
164pub struct ScheduledPost {
165 pub id: u64,
166 pub request: PostRequest,
167 pub lane: PostLane,
168}
169
170#[derive(Debug)]
171pub struct PostBatcher {
172 tx_alo: mpsc::Sender<ScheduledPost>,
173 tx_normal: mpsc::Sender<ScheduledPost>,
174}
175
176impl PostBatcher {
177 pub fn new<F>(send_fn: F) -> Self
179 where
180 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
181 {
182 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
183 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
184
185 get_runtime().spawn(Self::run_lane(
187 "ALO",
188 rx_alo,
189 Duration::from_millis(100),
190 send_fn.clone(),
191 ));
192
193 get_runtime().spawn(Self::run_lane(
195 "NORMAL",
196 rx_normal,
197 Duration::from_millis(50),
198 send_fn,
199 ));
200
201 Self { tx_alo, tx_normal }
202 }
203
204 async fn run_lane<F>(
205 lane_name: &'static str,
206 mut rx: mpsc::Receiver<ScheduledPost>,
207 tick: Duration,
208 mut send_fn: F,
209 ) where
210 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
211 {
212 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
213 let mut interval = time::interval(tick);
214 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
215
216 loop {
217 tokio::select! {
218 maybe_item = rx.recv() => {
219 match maybe_item {
220 Some(item) => pend.push(item),
221 None => break, }
223 }
224 _ = interval.tick() => {
225 if pend.is_empty() { continue; }
226 let to_send = std::mem::take(&mut pend);
227 for item in to_send {
228 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
229 if let Err(e) = send_fn(req).await {
230 log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
231 }
232 }
233 }
234 }
235 }
236 log::info!("Post lane terminated: lane={lane_name}");
237 }
238
239 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
240 match item.lane {
241 PostLane::Alo => self
242 .tx_alo
243 .send(item)
244 .await
245 .map_err(|_| Error::transport("ALO lane closed")),
246 PostLane::Normal => self
247 .tx_normal
248 .send(item)
249 .await
250 .map_err(|_| Error::transport("NORMAL lane closed")),
251 }
252 }
253}
254
255pub fn lane_for_action(action: &ActionRequest) -> PostLane {
257 match action {
258 ActionRequest::Order { orders, .. } => {
259 if orders.is_empty() {
260 return PostLane::Normal;
261 }
262 let all_alo = orders.iter().all(|o| {
263 matches!(
264 o.t,
265 OrderTypeRequest::Limit {
266 tif: TimeInForceRequest::Alo
267 }
268 )
269 });
270
271 if all_alo {
272 PostLane::Alo
273 } else {
274 PostLane::Normal
275 }
276 }
277 _ => PostLane::Normal,
278 }
279}
280
281#[derive(Debug, Clone, Copy, Default)]
282pub enum Grouping {
283 #[default]
284 Na,
285 NormalTpsl,
286 PositionTpsl,
287}
288impl Grouping {
289 pub fn as_str(&self) -> &'static str {
290 match self {
291 Self::Na => "na",
292 Self::NormalTpsl => "normalTpsl",
293 Self::PositionTpsl => "positionTpsl",
294 }
295 }
296}
297
298#[derive(Debug, Clone, Builder)]
300pub struct LimitOrderParams {
301 pub asset: u32,
302 pub is_buy: bool,
303 pub px: String,
304 pub sz: String,
305 pub reduce_only: bool,
306 pub tif: TimeInForceRequest,
307 pub cloid: Option<String>,
308}
309
310#[derive(Debug, Clone, Builder)]
312pub struct TriggerOrderParams {
313 pub asset: u32,
314 pub is_buy: bool,
315 pub px: String,
316 pub sz: String,
317 pub reduce_only: bool,
318 pub is_market: bool,
319 pub trigger_px: String,
320 pub tpsl: TpSlRequest,
321 pub cloid: Option<String>,
322}
323
324#[derive(Debug, Default)]
326pub struct OrderBuilder {
327 orders: Vec<OrderRequest>,
328 grouping: Grouping,
329}
330
331impl OrderBuilder {
332 pub fn new() -> Self {
333 Self::default()
334 }
335
336 #[must_use]
337 pub fn grouping(mut self, g: Grouping) -> Self {
338 self.grouping = g;
339 self
340 }
341
342 #[allow(clippy::too_many_arguments)]
344 #[must_use]
345 pub fn push_limit(
346 self,
347 asset: u32,
348 is_buy: bool,
349 px: &(impl ToString + ?Sized),
350 sz: &(impl ToString + ?Sized),
351 reduce_only: bool,
352 tif: TimeInForceRequest,
353 cloid: Option<String>,
354 ) -> Self {
355 let params = LimitOrderParams {
356 asset,
357 is_buy,
358 px: px.to_string(),
359 sz: sz.to_string(),
360 reduce_only,
361 tif,
362 cloid,
363 };
364 self.push_limit_order(params)
365 }
366
367 #[must_use]
369 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
370 self.orders.push(OrderRequest {
371 a: params.asset,
372 b: params.is_buy,
373 p: params.px,
374 s: params.sz,
375 r: params.reduce_only,
376 t: OrderTypeRequest::Limit { tif: params.tif },
377 c: params.cloid,
378 });
379 self
380 }
381
382 #[allow(clippy::too_many_arguments)]
384 #[must_use]
385 pub fn push_trigger(
386 self,
387 asset: u32,
388 is_buy: bool,
389 px: &(impl ToString + ?Sized),
390 sz: &(impl ToString + ?Sized),
391 reduce_only: bool,
392 is_market: bool,
393 trigger_px: &(impl ToString + ?Sized),
394 tpsl: TpSlRequest,
395 cloid: Option<String>,
396 ) -> Self {
397 let params = TriggerOrderParams {
398 asset,
399 is_buy,
400 px: px.to_string(),
401 sz: sz.to_string(),
402 reduce_only,
403 is_market,
404 trigger_px: trigger_px.to_string(),
405 tpsl,
406 cloid,
407 };
408 self.push_trigger_order(params)
409 }
410
411 #[must_use]
413 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
414 self.orders.push(OrderRequest {
415 a: params.asset,
416 b: params.is_buy,
417 p: params.px,
418 s: params.sz,
419 r: params.reduce_only,
420 t: OrderTypeRequest::Trigger {
421 is_market: params.is_market,
422 trigger_px: params.trigger_px,
423 tpsl: params.tpsl,
424 },
425 c: params.cloid,
426 });
427 self
428 }
429 pub fn build(self) -> ActionRequest {
430 ActionRequest::Order {
431 orders: self.orders,
432 grouping: self.grouping.as_str().to_string(),
433 }
434 }
435
436 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
453 Self::new().push_limit_order(params).build()
454 }
455
456 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
475 Self::new().push_trigger_order(params).build()
476 }
477}
478
479pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
480 ActionRequest::Cancel {
481 cancels: cancels
482 .into_iter()
483 .map(|(a, o)| CancelRequest { a, o })
484 .collect(),
485 }
486}
487pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
488 ActionRequest::CancelByCloid {
489 cancels: vec![CancelByCloidRequest {
490 asset,
491 cloid: cloid.into(),
492 }],
493 }
494}
495pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
496 ActionRequest::Modify {
497 modifies: vec![ModifyRequest {
498 oid,
499 order: new_order,
500 }],
501 }
502}
503
504pub fn info_l2_book(coin: &str) -> PostRequest {
505 PostRequest::Info {
506 payload: serde_json::json!({"type": HyperliquidInfoRequestType::L2Book.as_str(), "coin": coin}),
507 }
508}
509
510pub fn info_all_mids() -> PostRequest {
511 PostRequest::Info {
512 payload: serde_json::json!({"type": HyperliquidInfoRequestType::AllMids.as_str()}),
513 }
514}
515
516pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
517 PostRequest::Info {
518 payload: serde_json::json!({"type": HyperliquidInfoRequestType::OrderStatus.as_str(), "user": user, "oid": oid}),
519 }
520}
521
522pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
523 let mut body =
524 serde_json::json!({"type": HyperliquidInfoRequestType::OpenOrders.as_str(), "user": user});
525
526 if let Some(fe) = frontend {
527 body["frontend"] = serde_json::json!(fe);
528 }
529 PostRequest::Info { payload: body }
530}
531
532pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
533 let mut body =
534 serde_json::json!({"type": HyperliquidInfoRequestType::UserFills.as_str(), "user": user});
535
536 if let Some(agg) = aggregate_by_time {
537 body["aggregateByTime"] = serde_json::json!(agg);
538 }
539 PostRequest::Info { payload: body }
540}
541
542pub fn info_user_rate_limit(user: &str) -> PostRequest {
543 PostRequest::Info {
544 payload: serde_json::json!({"type": HyperliquidInfoRequestType::UserRateLimit.as_str(), "user": user}),
545 }
546}
547
548pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
549 PostRequest::Info {
550 payload: serde_json::json!({"type": HyperliquidInfoRequestType::Candle.as_str(), "coin": coin, "interval": interval}),
551 }
552}
553
554pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
555 serde_json::from_value(payload.clone()).map_err(Error::Serde)
556}
557pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
558 serde_json::from_value(payload.clone()).map_err(Error::Serde)
559}
560pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
561 serde_json::from_value(payload.clone()).map_err(Error::Serde)
562}
563
564#[derive(Debug)]
566pub enum ActionOutcome<'a> {
567 Resting {
568 oid: u64,
569 },
570 Filled {
571 total_sz: &'a str,
572 avg_px: &'a str,
573 oid: Option<u64>,
574 },
575 Error {
576 msg: &'a str,
577 },
578 Unknown(&'a serde_json::Value),
579}
580pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
581 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
582 if let (Some(total_sz), Some(avg_px)) = (
583 payload.get("totalSz").and_then(|v| v.as_str()),
584 payload.get("avgPx").and_then(|v| v.as_str()),
585 ) {
586 return ActionOutcome::Filled {
587 total_sz,
588 avg_px,
589 oid: Some(oid),
590 };
591 }
592 return ActionOutcome::Resting { oid };
593 }
594
595 if let (Some(total_sz), Some(avg_px)) = (
596 payload.get("totalSz").and_then(|v| v.as_str()),
597 payload.get("avgPx").and_then(|v| v.as_str()),
598 ) {
599 return ActionOutcome::Filled {
600 total_sz,
601 avg_px,
602 oid: None,
603 };
604 }
605
606 if let Some(msg) = payload
607 .get("error")
608 .and_then(|v| v.as_str())
609 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
610 {
611 return ActionOutcome::Error { msg };
612 }
613 ActionOutcome::Unknown(payload)
614}
615
616#[derive(Clone, Debug)]
617pub struct WsSender {
618 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
619}
620
621impl WsSender {
622 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
623 Self {
624 inner: Arc::new(tokio::sync::Mutex::new(tx)),
625 }
626 }
627
628 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
629 let sender = self.inner.lock().await;
630 sender
631 .send(req)
632 .await
633 .map_err(|_| Error::transport("WebSocket sender closed"))
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use nautilus_common::testing::wait_until_async;
640 use rstest::rstest;
641 use tokio::{
642 sync::oneshot,
643 time::{Duration, timeout},
644 };
645
646 use super::*;
647 use crate::{
648 common::consts::INFLIGHT_MAX,
649 websocket::messages::{
650 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
651 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
652 },
653 };
654
655 fn mk_limit_alo(asset: u32) -> OrderRequest {
656 OrderRequest {
657 a: asset,
658 b: true,
659 p: "1".to_string(),
660 s: "1".to_string(),
661 r: false,
662 t: OrderTypeRequest::Limit {
663 tif: TimeInForceRequest::Alo,
664 },
665 c: None,
666 }
667 }
668
669 fn mk_limit_gtc(asset: u32) -> OrderRequest {
670 OrderRequest {
671 a: asset,
672 b: true,
673 p: "1".to_string(),
674 s: "1".to_string(),
675 r: false,
676 t: OrderTypeRequest::Limit {
677 tif: TimeInForceRequest::Gtc,
679 },
680 c: None,
681 }
682 }
683
684 #[rstest]
685 #[tokio::test(flavor = "multi_thread")]
686 async fn register_duplicate_id_errors() {
687 let router = PostRouter::new();
688 let _rx = router.register(42).await.expect("first register OK");
689
690 let err = router.register(42).await.expect_err("duplicate must error");
691 let msg = err.to_string().to_lowercase();
692 assert!(
693 msg.contains("already") || msg.contains("duplicate"),
694 "unexpected error: {msg}"
695 );
696 }
697
698 #[rstest]
699 #[tokio::test(flavor = "multi_thread")]
700 async fn timeout_cancels_and_allows_reregister() {
701 let router = PostRouter::new();
702 let id = 7;
703
704 let rx = router.register(id).await.unwrap();
705 let err = router
707 .await_with_timeout(id, rx, Duration::from_millis(25))
708 .await
709 .expect_err("should timeout");
710 assert!(
711 err.to_string().to_lowercase().contains("timeout")
712 || err.to_string().to_lowercase().contains("closed"),
713 "unexpected error kind: {err}"
714 );
715
716 let _rx2 = router
718 .register(id)
719 .await
720 .expect("id should be reusable after timeout cancel");
721 }
722
723 #[rstest]
724 #[tokio::test(flavor = "multi_thread")]
725 async fn inflight_cap_blocks_then_unblocks() {
726 let router = PostRouter::new();
727
728 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
730 for i in 0..INFLIGHT_MAX {
731 let rx = router.register(i as u64).await.unwrap();
732 rxs.push(rx); }
734
735 let router2 = Arc::clone(&router);
737 let (entered_tx, entered_rx) = oneshot::channel::<()>();
738 let (done_tx, done_rx) = oneshot::channel::<()>();
739 let (check_tx, check_rx) = oneshot::channel::<()>(); get_runtime().spawn(async move {
742 let _ = entered_tx.send(());
743 let _rx = router2.register(9_999_999).await.unwrap();
744 let _ = done_tx.send(());
745 });
746
747 entered_rx.await.unwrap();
749
750 get_runtime().spawn(async move {
752 if done_rx.await.is_ok() {
753 let _ = check_tx.send(());
754 }
755 });
756
757 assert!(
758 timeout(Duration::from_millis(50), check_rx).await.is_err(),
759 "should still be blocked while at cap"
760 );
761
762 router.cancel(0).await;
764
765 tokio::time::sleep(Duration::from_millis(100)).await;
767 }
768
769 #[rstest(
770 orders, expected,
771 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
772 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
773 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
774 case::empty(vec![], PostLane::Normal),
775 )]
776 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
777 let action = ActionRequest::Order {
778 orders,
779 grouping: "na".to_string(),
780 };
781 assert_eq!(lane_for_action(&action), expected);
782 }
783
784 #[rstest]
785 fn test_order_request_builder() {
786 let order = OrderRequestBuilder::default()
788 .a(0)
789 .b(true)
790 .p("40000.0".to_string())
791 .s("0.01".to_string())
792 .r(false)
793 .t(OrderTypeRequest::Limit {
794 tif: TimeInForceRequest::Gtc,
795 })
796 .c(Some("test-order-1".to_string()))
797 .build()
798 .expect("should build order");
799
800 assert_eq!(order.a, 0);
801 assert!(order.b);
802 assert_eq!(order.p, "40000.0");
803 assert_eq!(order.s, "0.01");
804 assert!(!order.r);
805 assert_eq!(order.c, Some("test-order-1".to_string()));
806 }
807
808 #[rstest]
809 fn test_limit_order_params_builder() {
810 let params = LimitOrderParamsBuilder::default()
812 .asset(0)
813 .is_buy(true)
814 .px("40000.0".to_string())
815 .sz("0.01".to_string())
816 .reduce_only(false)
817 .tif(TimeInForceRequest::Alo)
818 .cloid(Some("test-limit-1".to_string()))
819 .build()
820 .expect("should build limit params");
821
822 assert_eq!(params.asset, 0);
823 assert!(params.is_buy);
824 assert_eq!(params.px, "40000.0");
825 assert_eq!(params.sz, "0.01");
826 assert!(!params.reduce_only);
827 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
828 }
829
830 #[rstest]
831 fn test_trigger_order_params_builder() {
832 let params = TriggerOrderParamsBuilder::default()
834 .asset(1)
835 .is_buy(false)
836 .px("39000.0".to_string())
837 .sz("0.02".to_string())
838 .reduce_only(false)
839 .is_market(true)
840 .trigger_px("39500.0".to_string())
841 .tpsl(TpSlRequest::Sl)
842 .cloid(Some("test-trigger-1".to_string()))
843 .build()
844 .expect("should build trigger params");
845
846 assert_eq!(params.asset, 1);
847 assert!(!params.is_buy);
848 assert_eq!(params.px, "39000.0");
849 assert!(params.is_market);
850 assert_eq!(params.trigger_px, "39500.0");
851 }
852
853 #[rstest]
854 fn test_order_builder_single_limit_convenience() {
855 let params = LimitOrderParamsBuilder::default()
857 .asset(0)
858 .is_buy(true)
859 .px("40000.0".to_string())
860 .sz("0.01".to_string())
861 .reduce_only(false)
862 .tif(TimeInForceRequest::Gtc)
863 .cloid(None)
864 .build()
865 .unwrap();
866
867 let action = OrderBuilder::single_limit_order(params);
868
869 match action {
870 ActionRequest::Order { orders, grouping } => {
871 assert_eq!(orders.len(), 1);
872 assert_eq!(orders[0].a, 0);
873 assert!(orders[0].b);
874 assert_eq!(grouping, "na");
875 }
876 _ => panic!("Expected ActionRequest::Order variant"),
877 }
878 }
879
880 #[rstest]
881 fn test_order_builder_single_trigger_convenience() {
882 let params = TriggerOrderParamsBuilder::default()
884 .asset(1)
885 .is_buy(false)
886 .px("39000.0".to_string())
887 .sz("0.02".to_string())
888 .reduce_only(false)
889 .is_market(true)
890 .trigger_px("39500.0".to_string())
891 .tpsl(TpSlRequest::Sl)
892 .cloid(Some("sl-order".to_string()))
893 .build()
894 .unwrap();
895
896 let action = OrderBuilder::single_trigger_order(params);
897
898 match action {
899 ActionRequest::Order { orders, grouping } => {
900 assert_eq!(orders.len(), 1);
901 assert_eq!(orders[0].a, 1);
902 assert_eq!(orders[0].c, Some("sl-order".to_string()));
903 assert_eq!(grouping, "na");
904 }
905 _ => panic!("Expected ActionRequest::Order variant"),
906 }
907 }
908
909 #[rstest]
910 fn test_order_builder_batch_orders() {
911 let params1 = LimitOrderParams {
913 asset: 0,
914 is_buy: true,
915 px: "40000.0".to_string(),
916 sz: "0.01".to_string(),
917 reduce_only: false,
918 tif: TimeInForceRequest::Gtc,
919 cloid: Some("order-1".to_string()),
920 };
921
922 let params2 = LimitOrderParams {
923 asset: 1,
924 is_buy: false,
925 px: "2000.0".to_string(),
926 sz: "0.5".to_string(),
927 reduce_only: false,
928 tif: TimeInForceRequest::Ioc,
929 cloid: Some("order-2".to_string()),
930 };
931
932 let action = OrderBuilder::new()
933 .grouping(Grouping::NormalTpsl)
934 .push_limit_order(params1)
935 .push_limit_order(params2)
936 .build();
937
938 match action {
939 ActionRequest::Order { orders, grouping } => {
940 assert_eq!(orders.len(), 2);
941 assert_eq!(orders[0].c, Some("order-1".to_string()));
942 assert_eq!(orders[1].c, Some("order-2".to_string()));
943 assert_eq!(grouping, "normalTpsl");
944 }
945 _ => panic!("Expected ActionRequest::Order variant"),
946 }
947 }
948
949 #[rstest]
950 fn test_action_request_constructors() {
951 let order1 = mk_limit_gtc(0);
953 let order2 = mk_limit_gtc(1);
954 let action = ActionRequest::order(vec![order1, order2], "na");
955
956 match action {
957 ActionRequest::Order { orders, grouping } => {
958 assert_eq!(orders.len(), 2);
959 assert_eq!(grouping, "na");
960 }
961 _ => panic!("Expected ActionRequest::Order variant"),
962 }
963
964 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
966 let action = ActionRequest::cancel(cancels);
967 assert!(matches!(action, ActionRequest::Cancel { .. }));
968
969 let cancels = vec![CancelByCloidRequest {
971 asset: 0,
972 cloid: "order-1".to_string(),
973 }];
974 let action = ActionRequest::cancel_by_cloid(cancels);
975 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
976 }
977
978 #[rstest]
979 #[tokio::test(flavor = "multi_thread")]
980 async fn batcher_sends_on_tick() {
981 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
983 let sent_closure = sent.clone();
984
985 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
986 let sent_inner = sent_closure.clone();
987 Box::pin(async move {
988 if let HyperliquidWsRequest::Post { id, .. } = req {
989 sent_inner.lock().await.push(id);
990 }
991 Ok(())
992 })
993 };
994
995 let batcher = PostBatcher::new(send_fn);
996
997 for id in 1..=5u64 {
999 batcher
1000 .enqueue(ScheduledPost {
1001 id,
1002 request: info_all_mids(),
1003 lane: PostLane::Normal,
1004 })
1005 .await
1006 .unwrap();
1007 }
1008
1009 let sent_check = sent.clone();
1011 wait_until_async(
1012 || {
1013 let sent_inner = sent_check.clone();
1014 async move { sent_inner.lock().await.len() == 5 }
1015 },
1016 Duration::from_secs(2),
1017 )
1018 .await;
1019
1020 let got = sent.lock().await.clone();
1021 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1022 }
1023}