1use std::{
17 sync::{
18 Arc,
19 atomic::{AtomicU64, Ordering},
20 },
21 time::Duration,
22};
23
24use ahash::AHashMap;
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29 sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30 time,
31};
32
33use crate::{
34 common::{consts::INFLIGHT_MAX, enums::HyperliquidInfoRequestType},
35 http::{
36 error::{Error, Result},
37 models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38 },
39 websocket::messages::{
40 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41 OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42 },
43};
44
45#[derive(Debug)]
46struct Waiter {
47 tx: oneshot::Sender<PostResponse>,
48 _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54 inner: Mutex<AHashMap<u64, Waiter>>,
55 inflight: Arc<Semaphore>, }
57
58impl Default for PostRouter {
59 fn default() -> Self {
60 Self {
61 inner: Mutex::new(AHashMap::new()),
62 inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63 }
64 }
65}
66
67impl PostRouter {
68 pub fn new() -> Arc<Self> {
69 Arc::new(Self::default())
70 }
71
72 pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74 let permit = self
76 .inflight
77 .clone()
78 .acquire_owned()
79 .await
80 .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82 let (tx, rx) = oneshot::channel::<PostResponse>();
83 let mut map = self.inner.lock().await;
84 if map.contains_key(&id) {
85 return Err(Error::transport(format!("post id {id} already registered")));
86 }
87 map.insert(
88 id,
89 Waiter {
90 tx,
91 _permit: permit,
92 },
93 );
94 Ok(rx)
95 }
96
97 pub async fn complete(&self, resp: PostResponse) {
99 let id = resp.id;
100 let waiter = {
101 let mut map = self.inner.lock().await;
102 map.remove(&id)
103 };
104 if let Some(waiter) = waiter {
105 if waiter.tx.send(resp).is_err() {
106 log::warn!("Post waiter dropped before delivery: id={id}");
107 }
108 } else {
110 log::warn!("Post response with unknown id (late/duplicate?): id={id}");
111 }
112 }
113
114 pub async fn cancel(&self, id: u64) {
116 let _ = {
117 let mut map = self.inner.lock().await;
118 map.remove(&id)
119 };
120 }
122
123 pub async fn await_with_timeout(
125 &self,
126 id: u64,
127 rx: oneshot::Receiver<PostResponse>,
128 timeout: Duration,
129 ) -> Result<PostResponse> {
130 match time::timeout(timeout, rx).await {
131 Ok(Ok(resp)) => Ok(resp),
132 Ok(Err(_closed)) => {
133 self.cancel(id).await;
134 Err(Error::transport("post response channel closed"))
135 }
136 Err(_elapsed) => {
137 self.cancel(id).await;
138 Err(Error::Timeout)
139 }
140 }
141 }
142}
143
144#[derive(Debug)]
145pub struct PostIds(AtomicU64);
146
147impl PostIds {
148 pub fn new(start: u64) -> Self {
149 Self(AtomicU64::new(start))
150 }
151 pub fn next(&self) -> u64 {
152 self.0.fetch_add(1, Ordering::Relaxed)
153 }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum PostLane {
158 Alo, Normal, }
161
162#[derive(Debug)]
163pub struct ScheduledPost {
164 pub id: u64,
165 pub request: PostRequest,
166 pub lane: PostLane,
167}
168
169#[derive(Debug)]
170pub struct PostBatcher {
171 tx_alo: mpsc::Sender<ScheduledPost>,
172 tx_normal: mpsc::Sender<ScheduledPost>,
173}
174
175impl PostBatcher {
176 pub fn new<F>(send_fn: F) -> Self
178 where
179 F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
180 {
181 let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
182 let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
183
184 get_runtime().spawn(Self::run_lane(
186 "ALO",
187 rx_alo,
188 Duration::from_millis(100),
189 send_fn.clone(),
190 ));
191
192 get_runtime().spawn(Self::run_lane(
194 "NORMAL",
195 rx_normal,
196 Duration::from_millis(50),
197 send_fn,
198 ));
199
200 Self { tx_alo, tx_normal }
201 }
202
203 async fn run_lane<F>(
204 lane_name: &'static str,
205 mut rx: mpsc::Receiver<ScheduledPost>,
206 tick: Duration,
207 mut send_fn: F,
208 ) where
209 F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
210 {
211 let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
212 let mut interval = time::interval(tick);
213 interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
214
215 loop {
216 tokio::select! {
217 maybe_item = rx.recv() => {
218 match maybe_item {
219 Some(item) => pend.push(item),
220 None => break, }
222 }
223 _ = interval.tick() => {
224 if pend.is_empty() { continue; }
225 let to_send = std::mem::take(&mut pend);
226 for item in to_send {
227 let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
228 if let Err(e) = send_fn(req).await {
229 log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
230 }
231 }
232 }
233 }
234 }
235 log::info!("Post lane terminated: lane={lane_name}");
236 }
237
238 pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
239 match item.lane {
240 PostLane::Alo => self
241 .tx_alo
242 .send(item)
243 .await
244 .map_err(|_| Error::transport("ALO lane closed")),
245 PostLane::Normal => self
246 .tx_normal
247 .send(item)
248 .await
249 .map_err(|_| Error::transport("NORMAL lane closed")),
250 }
251 }
252}
253
254pub fn lane_for_action(action: &ActionRequest) -> PostLane {
256 match action {
257 ActionRequest::Order { orders, .. } => {
258 if orders.is_empty() {
259 return PostLane::Normal;
260 }
261 let all_alo = orders.iter().all(|o| {
262 matches!(
263 o.t,
264 OrderTypeRequest::Limit {
265 tif: TimeInForceRequest::Alo
266 }
267 )
268 });
269 if all_alo {
270 PostLane::Alo
271 } else {
272 PostLane::Normal
273 }
274 }
275 _ => PostLane::Normal,
276 }
277}
278
279#[derive(Debug, Clone, Copy, Default)]
280pub enum Grouping {
281 #[default]
282 Na,
283 NormalTpsl,
284 PositionTpsl,
285}
286impl Grouping {
287 pub fn as_str(&self) -> &'static str {
288 match self {
289 Self::Na => "na",
290 Self::NormalTpsl => "normalTpsl",
291 Self::PositionTpsl => "positionTpsl",
292 }
293 }
294}
295
296#[derive(Debug, Clone, Builder)]
298pub struct LimitOrderParams {
299 pub asset: u32,
300 pub is_buy: bool,
301 pub px: String,
302 pub sz: String,
303 pub reduce_only: bool,
304 pub tif: TimeInForceRequest,
305 pub cloid: Option<String>,
306}
307
308#[derive(Debug, Clone, Builder)]
310pub struct TriggerOrderParams {
311 pub asset: u32,
312 pub is_buy: bool,
313 pub px: String,
314 pub sz: String,
315 pub reduce_only: bool,
316 pub is_market: bool,
317 pub trigger_px: String,
318 pub tpsl: TpSlRequest,
319 pub cloid: Option<String>,
320}
321
322#[derive(Debug, Default)]
324pub struct OrderBuilder {
325 orders: Vec<OrderRequest>,
326 grouping: Grouping,
327}
328
329impl OrderBuilder {
330 pub fn new() -> Self {
331 Self::default()
332 }
333
334 #[must_use]
335 pub fn grouping(mut self, g: Grouping) -> Self {
336 self.grouping = g;
337 self
338 }
339
340 #[allow(clippy::too_many_arguments)]
342 #[must_use]
343 pub fn push_limit(
344 self,
345 asset: u32,
346 is_buy: bool,
347 px: impl ToString,
348 sz: impl ToString,
349 reduce_only: bool,
350 tif: TimeInForceRequest,
351 cloid: Option<String>,
352 ) -> Self {
353 let params = LimitOrderParams {
354 asset,
355 is_buy,
356 px: px.to_string(),
357 sz: sz.to_string(),
358 reduce_only,
359 tif,
360 cloid,
361 };
362 self.push_limit_order(params)
363 }
364
365 #[must_use]
367 pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
368 self.orders.push(OrderRequest {
369 a: params.asset,
370 b: params.is_buy,
371 p: params.px,
372 s: params.sz,
373 r: params.reduce_only,
374 t: OrderTypeRequest::Limit { tif: params.tif },
375 c: params.cloid,
376 });
377 self
378 }
379
380 #[allow(clippy::too_many_arguments)]
382 #[must_use]
383 pub fn push_trigger(
384 self,
385 asset: u32,
386 is_buy: bool,
387 px: impl ToString,
388 sz: impl ToString,
389 reduce_only: bool,
390 is_market: bool,
391 trigger_px: impl ToString,
392 tpsl: TpSlRequest,
393 cloid: Option<String>,
394 ) -> Self {
395 let params = TriggerOrderParams {
396 asset,
397 is_buy,
398 px: px.to_string(),
399 sz: sz.to_string(),
400 reduce_only,
401 is_market,
402 trigger_px: trigger_px.to_string(),
403 tpsl,
404 cloid,
405 };
406 self.push_trigger_order(params)
407 }
408
409 #[must_use]
411 pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
412 self.orders.push(OrderRequest {
413 a: params.asset,
414 b: params.is_buy,
415 p: params.px,
416 s: params.sz,
417 r: params.reduce_only,
418 t: OrderTypeRequest::Trigger {
419 is_market: params.is_market,
420 trigger_px: params.trigger_px,
421 tpsl: params.tpsl,
422 },
423 c: params.cloid,
424 });
425 self
426 }
427 pub fn build(self) -> ActionRequest {
428 ActionRequest::Order {
429 orders: self.orders,
430 grouping: self.grouping.as_str().to_string(),
431 }
432 }
433
434 pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
451 Self::new().push_limit_order(params).build()
452 }
453
454 pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
473 Self::new().push_trigger_order(params).build()
474 }
475}
476
477pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
478 ActionRequest::Cancel {
479 cancels: cancels
480 .into_iter()
481 .map(|(a, o)| CancelRequest { a, o })
482 .collect(),
483 }
484}
485pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
486 ActionRequest::CancelByCloid {
487 cancels: vec![CancelByCloidRequest {
488 asset,
489 cloid: cloid.into(),
490 }],
491 }
492}
493pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
494 ActionRequest::Modify {
495 modifies: vec![ModifyRequest {
496 oid,
497 order: new_order,
498 }],
499 }
500}
501
502pub fn info_l2_book(coin: &str) -> PostRequest {
503 PostRequest::Info {
504 payload: serde_json::json!({"type": HyperliquidInfoRequestType::L2Book.as_str(), "coin": coin}),
505 }
506}
507
508pub fn info_all_mids() -> PostRequest {
509 PostRequest::Info {
510 payload: serde_json::json!({"type": HyperliquidInfoRequestType::AllMids.as_str()}),
511 }
512}
513
514pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
515 PostRequest::Info {
516 payload: serde_json::json!({"type": HyperliquidInfoRequestType::OrderStatus.as_str(), "user": user, "oid": oid}),
517 }
518}
519
520pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
521 let mut body =
522 serde_json::json!({"type": HyperliquidInfoRequestType::OpenOrders.as_str(), "user": user});
523 if let Some(fe) = frontend {
524 body["frontend"] = serde_json::json!(fe);
525 }
526 PostRequest::Info { payload: body }
527}
528
529pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
530 let mut body =
531 serde_json::json!({"type": HyperliquidInfoRequestType::UserFills.as_str(), "user": user});
532 if let Some(agg) = aggregate_by_time {
533 body["aggregateByTime"] = serde_json::json!(agg);
534 }
535 PostRequest::Info { payload: body }
536}
537
538pub fn info_user_rate_limit(user: &str) -> PostRequest {
539 PostRequest::Info {
540 payload: serde_json::json!({"type": HyperliquidInfoRequestType::UserRateLimit.as_str(), "user": user}),
541 }
542}
543
544pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
545 PostRequest::Info {
546 payload: serde_json::json!({"type": HyperliquidInfoRequestType::Candle.as_str(), "coin": coin, "interval": interval}),
547 }
548}
549
550pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
551 serde_json::from_value(payload.clone()).map_err(Error::Serde)
552}
553pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
554 serde_json::from_value(payload.clone()).map_err(Error::Serde)
555}
556pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
557 serde_json::from_value(payload.clone()).map_err(Error::Serde)
558}
559
560#[derive(Debug)]
562pub enum ActionOutcome<'a> {
563 Resting {
564 oid: u64,
565 },
566 Filled {
567 total_sz: &'a str,
568 avg_px: &'a str,
569 oid: Option<u64>,
570 },
571 Error {
572 msg: &'a str,
573 },
574 Unknown(&'a serde_json::Value),
575}
576pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
577 if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
578 if let (Some(total_sz), Some(avg_px)) = (
579 payload.get("totalSz").and_then(|v| v.as_str()),
580 payload.get("avgPx").and_then(|v| v.as_str()),
581 ) {
582 return ActionOutcome::Filled {
583 total_sz,
584 avg_px,
585 oid: Some(oid),
586 };
587 }
588 return ActionOutcome::Resting { oid };
589 }
590 if let (Some(total_sz), Some(avg_px)) = (
591 payload.get("totalSz").and_then(|v| v.as_str()),
592 payload.get("avgPx").and_then(|v| v.as_str()),
593 ) {
594 return ActionOutcome::Filled {
595 total_sz,
596 avg_px,
597 oid: None,
598 };
599 }
600 if let Some(msg) = payload
601 .get("error")
602 .and_then(|v| v.as_str())
603 .or_else(|| payload.get("message").and_then(|v| v.as_str()))
604 {
605 return ActionOutcome::Error { msg };
606 }
607 ActionOutcome::Unknown(payload)
608}
609
610#[derive(Clone, Debug)]
611pub struct WsSender {
612 inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
613}
614
615impl WsSender {
616 pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
617 Self {
618 inner: Arc::new(tokio::sync::Mutex::new(tx)),
619 }
620 }
621
622 pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
623 let sender = self.inner.lock().await;
624 sender
625 .send(req)
626 .await
627 .map_err(|_| Error::transport("WebSocket sender closed"))
628 }
629}
630
631#[cfg(test)]
632mod tests {
633 use nautilus_common::testing::wait_until_async;
634 use rstest::rstest;
635 use tokio::{
636 sync::oneshot,
637 time::{Duration, timeout},
638 };
639
640 use super::*;
641 use crate::{
642 common::consts::INFLIGHT_MAX,
643 websocket::messages::{
644 ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
645 OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
646 },
647 };
648
649 fn mk_limit_alo(asset: u32) -> OrderRequest {
650 OrderRequest {
651 a: asset,
652 b: true,
653 p: "1".to_string(),
654 s: "1".to_string(),
655 r: false,
656 t: OrderTypeRequest::Limit {
657 tif: TimeInForceRequest::Alo,
658 },
659 c: None,
660 }
661 }
662
663 fn mk_limit_gtc(asset: u32) -> OrderRequest {
664 OrderRequest {
665 a: asset,
666 b: true,
667 p: "1".to_string(),
668 s: "1".to_string(),
669 r: false,
670 t: OrderTypeRequest::Limit {
671 tif: TimeInForceRequest::Gtc,
673 },
674 c: None,
675 }
676 }
677
678 #[rstest]
679 #[tokio::test(flavor = "multi_thread")]
680 async fn register_duplicate_id_errors() {
681 let router = PostRouter::new();
682 let _rx = router.register(42).await.expect("first register OK");
683
684 let err = router.register(42).await.expect_err("duplicate must error");
685 let msg = err.to_string().to_lowercase();
686 assert!(
687 msg.contains("already") || msg.contains("duplicate"),
688 "unexpected error: {msg}"
689 );
690 }
691
692 #[rstest]
693 #[tokio::test(flavor = "multi_thread")]
694 async fn timeout_cancels_and_allows_reregister() {
695 let router = PostRouter::new();
696 let id = 7;
697
698 let rx = router.register(id).await.unwrap();
699 let err = router
701 .await_with_timeout(id, rx, Duration::from_millis(25))
702 .await
703 .expect_err("should timeout");
704 assert!(
705 err.to_string().to_lowercase().contains("timeout")
706 || err.to_string().to_lowercase().contains("closed"),
707 "unexpected error kind: {err}"
708 );
709
710 let _rx2 = router
712 .register(id)
713 .await
714 .expect("id should be reusable after timeout cancel");
715 }
716
717 #[rstest]
718 #[tokio::test(flavor = "multi_thread")]
719 async fn inflight_cap_blocks_then_unblocks() {
720 let router = PostRouter::new();
721
722 let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
724 for i in 0..INFLIGHT_MAX {
725 let rx = router.register(i as u64).await.unwrap();
726 rxs.push(rx); }
728
729 let router2 = Arc::clone(&router);
731 let (entered_tx, entered_rx) = oneshot::channel::<()>();
732 let (done_tx, done_rx) = oneshot::channel::<()>();
733 let (check_tx, check_rx) = oneshot::channel::<()>(); get_runtime().spawn(async move {
736 let _ = entered_tx.send(());
737 let _rx = router2.register(9_999_999).await.unwrap();
738 let _ = done_tx.send(());
739 });
740
741 entered_rx.await.unwrap();
743
744 get_runtime().spawn(async move {
746 if done_rx.await.is_ok() {
747 let _ = check_tx.send(());
748 }
749 });
750
751 assert!(
752 timeout(Duration::from_millis(50), check_rx).await.is_err(),
753 "should still be blocked while at cap"
754 );
755
756 router.cancel(0).await;
758
759 tokio::time::sleep(Duration::from_millis(100)).await;
761 }
762
763 #[rstest(
764 orders, expected,
765 case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
766 case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
767 case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
768 case::empty(vec![], PostLane::Normal),
769 )]
770 fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
771 let action = ActionRequest::Order {
772 orders,
773 grouping: "na".to_string(),
774 };
775 assert_eq!(lane_for_action(&action), expected);
776 }
777
778 #[rstest]
779 fn test_order_request_builder() {
780 let order = OrderRequestBuilder::default()
782 .a(0)
783 .b(true)
784 .p("40000.0".to_string())
785 .s("0.01".to_string())
786 .r(false)
787 .t(OrderTypeRequest::Limit {
788 tif: TimeInForceRequest::Gtc,
789 })
790 .c(Some("test-order-1".to_string()))
791 .build()
792 .expect("should build order");
793
794 assert_eq!(order.a, 0);
795 assert!(order.b);
796 assert_eq!(order.p, "40000.0");
797 assert_eq!(order.s, "0.01");
798 assert!(!order.r);
799 assert_eq!(order.c, Some("test-order-1".to_string()));
800 }
801
802 #[rstest]
803 fn test_limit_order_params_builder() {
804 let params = LimitOrderParamsBuilder::default()
806 .asset(0)
807 .is_buy(true)
808 .px("40000.0".to_string())
809 .sz("0.01".to_string())
810 .reduce_only(false)
811 .tif(TimeInForceRequest::Alo)
812 .cloid(Some("test-limit-1".to_string()))
813 .build()
814 .expect("should build limit params");
815
816 assert_eq!(params.asset, 0);
817 assert!(params.is_buy);
818 assert_eq!(params.px, "40000.0");
819 assert_eq!(params.sz, "0.01");
820 assert!(!params.reduce_only);
821 assert_eq!(params.cloid, Some("test-limit-1".to_string()));
822 }
823
824 #[rstest]
825 fn test_trigger_order_params_builder() {
826 let params = TriggerOrderParamsBuilder::default()
828 .asset(1)
829 .is_buy(false)
830 .px("39000.0".to_string())
831 .sz("0.02".to_string())
832 .reduce_only(false)
833 .is_market(true)
834 .trigger_px("39500.0".to_string())
835 .tpsl(TpSlRequest::Sl)
836 .cloid(Some("test-trigger-1".to_string()))
837 .build()
838 .expect("should build trigger params");
839
840 assert_eq!(params.asset, 1);
841 assert!(!params.is_buy);
842 assert_eq!(params.px, "39000.0");
843 assert!(params.is_market);
844 assert_eq!(params.trigger_px, "39500.0");
845 }
846
847 #[rstest]
848 fn test_order_builder_single_limit_convenience() {
849 let params = LimitOrderParamsBuilder::default()
851 .asset(0)
852 .is_buy(true)
853 .px("40000.0".to_string())
854 .sz("0.01".to_string())
855 .reduce_only(false)
856 .tif(TimeInForceRequest::Gtc)
857 .cloid(None)
858 .build()
859 .unwrap();
860
861 let action = OrderBuilder::single_limit_order(params);
862
863 match action {
864 ActionRequest::Order { orders, grouping } => {
865 assert_eq!(orders.len(), 1);
866 assert_eq!(orders[0].a, 0);
867 assert!(orders[0].b);
868 assert_eq!(grouping, "na");
869 }
870 _ => panic!("Expected ActionRequest::Order variant"),
871 }
872 }
873
874 #[rstest]
875 fn test_order_builder_single_trigger_convenience() {
876 let params = TriggerOrderParamsBuilder::default()
878 .asset(1)
879 .is_buy(false)
880 .px("39000.0".to_string())
881 .sz("0.02".to_string())
882 .reduce_only(false)
883 .is_market(true)
884 .trigger_px("39500.0".to_string())
885 .tpsl(TpSlRequest::Sl)
886 .cloid(Some("sl-order".to_string()))
887 .build()
888 .unwrap();
889
890 let action = OrderBuilder::single_trigger_order(params);
891
892 match action {
893 ActionRequest::Order { orders, grouping } => {
894 assert_eq!(orders.len(), 1);
895 assert_eq!(orders[0].a, 1);
896 assert_eq!(orders[0].c, Some("sl-order".to_string()));
897 assert_eq!(grouping, "na");
898 }
899 _ => panic!("Expected ActionRequest::Order variant"),
900 }
901 }
902
903 #[rstest]
904 fn test_order_builder_batch_orders() {
905 let params1 = LimitOrderParams {
907 asset: 0,
908 is_buy: true,
909 px: "40000.0".to_string(),
910 sz: "0.01".to_string(),
911 reduce_only: false,
912 tif: TimeInForceRequest::Gtc,
913 cloid: Some("order-1".to_string()),
914 };
915
916 let params2 = LimitOrderParams {
917 asset: 1,
918 is_buy: false,
919 px: "2000.0".to_string(),
920 sz: "0.5".to_string(),
921 reduce_only: false,
922 tif: TimeInForceRequest::Ioc,
923 cloid: Some("order-2".to_string()),
924 };
925
926 let action = OrderBuilder::new()
927 .grouping(Grouping::NormalTpsl)
928 .push_limit_order(params1)
929 .push_limit_order(params2)
930 .build();
931
932 match action {
933 ActionRequest::Order { orders, grouping } => {
934 assert_eq!(orders.len(), 2);
935 assert_eq!(orders[0].c, Some("order-1".to_string()));
936 assert_eq!(orders[1].c, Some("order-2".to_string()));
937 assert_eq!(grouping, "normalTpsl");
938 }
939 _ => panic!("Expected ActionRequest::Order variant"),
940 }
941 }
942
943 #[rstest]
944 fn test_action_request_constructors() {
945 let order1 = mk_limit_gtc(0);
947 let order2 = mk_limit_gtc(1);
948 let action = ActionRequest::order(vec![order1, order2], "na");
949
950 match action {
951 ActionRequest::Order { orders, grouping } => {
952 assert_eq!(orders.len(), 2);
953 assert_eq!(grouping, "na");
954 }
955 _ => panic!("Expected ActionRequest::Order variant"),
956 }
957
958 let cancels = vec![CancelRequest { a: 0, o: 12345 }];
960 let action = ActionRequest::cancel(cancels);
961 assert!(matches!(action, ActionRequest::Cancel { .. }));
962
963 let cancels = vec![CancelByCloidRequest {
965 asset: 0,
966 cloid: "order-1".to_string(),
967 }];
968 let action = ActionRequest::cancel_by_cloid(cancels);
969 assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
970 }
971
972 #[rstest]
973 #[tokio::test(flavor = "multi_thread")]
974 async fn batcher_sends_on_tick() {
975 let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
977 let sent_closure = sent.clone();
978
979 let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
980 let sent_inner = sent_closure.clone();
981 Box::pin(async move {
982 if let HyperliquidWsRequest::Post { id, .. } = req {
983 sent_inner.lock().await.push(id);
984 }
985 Ok(())
986 })
987 };
988
989 let batcher = PostBatcher::new(send_fn);
990
991 for id in 1..=5u64 {
993 batcher
994 .enqueue(ScheduledPost {
995 id,
996 request: info_all_mids(),
997 lane: PostLane::Normal,
998 })
999 .await
1000 .unwrap();
1001 }
1002
1003 let sent_check = sent.clone();
1005 wait_until_async(
1006 || {
1007 let sent_inner = sent_check.clone();
1008 async move { sent_inner.lock().await.len() == 5 }
1009 },
1010 Duration::from_secs(2),
1011 )
1012 .await;
1013
1014 let got = sent.lock().await.clone();
1015 assert_eq!(got, vec![1, 2, 3, 4, 5]);
1016 }
1017}