Skip to main content

nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{
18        Arc,
19        atomic::{AtomicU64, Ordering},
20    },
21    time::Duration,
22};
23
24use ahash::AHashMap;
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30    time,
31};
32
33use crate::{
34    common::{consts::INFLIGHT_MAX, enums::HyperliquidInfoRequestType},
35    http::{
36        error::{Error, Result},
37        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38    },
39    websocket::messages::{
40        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42    },
43};
44
45#[derive(Debug)]
46struct Waiter {
47    tx: oneshot::Sender<PostResponse>,
48    // When this is dropped, the permit is released, shrinking inflight
49    _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54    inner: Mutex<AHashMap<u64, Waiter>>,
55    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
56}
57
58impl Default for PostRouter {
59    fn default() -> Self {
60        Self {
61            inner: Mutex::new(AHashMap::new()),
62            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63        }
64    }
65}
66
67impl PostRouter {
68    pub fn new() -> Arc<Self> {
69        Arc::new(Self::default())
70    }
71
72    /// Registers interest in a post id, enforcing inflight cap.
73    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74        // Acquire and retain a permit per inflight call
75        let permit = self
76            .inflight
77            .clone()
78            .acquire_owned()
79            .await
80            .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82        let (tx, rx) = oneshot::channel::<PostResponse>();
83        let mut map = self.inner.lock().await;
84        if map.contains_key(&id) {
85            return Err(Error::transport(format!("post id {id} already registered")));
86        }
87        map.insert(
88            id,
89            Waiter {
90                tx,
91                _permit: permit,
92            },
93        );
94        Ok(rx)
95    }
96
97    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
98    pub async fn complete(&self, resp: PostResponse) {
99        let id = resp.id;
100        let waiter = {
101            let mut map = self.inner.lock().await;
102            map.remove(&id)
103        };
104        if let Some(waiter) = waiter {
105            if waiter.tx.send(resp).is_err() {
106                log::warn!("Post waiter dropped before delivery: id={id}");
107            }
108            // waiter drops here → permit released
109        } else {
110            log::warn!("Post response with unknown id (late/duplicate?): id={id}");
111        }
112    }
113
114    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
115    pub async fn cancel(&self, id: u64) {
116        let _ = {
117            let mut map = self.inner.lock().await;
118            map.remove(&id)
119        };
120        // Waiter (and its permit) drop here if it existed
121    }
122
123    /// Await a response with timeout. On timeout or closed channel, cancels the id.
124    pub async fn await_with_timeout(
125        &self,
126        id: u64,
127        rx: oneshot::Receiver<PostResponse>,
128        timeout: Duration,
129    ) -> Result<PostResponse> {
130        match time::timeout(timeout, rx).await {
131            Ok(Ok(resp)) => Ok(resp),
132            Ok(Err(_closed)) => {
133                self.cancel(id).await;
134                Err(Error::transport("post response channel closed"))
135            }
136            Err(_elapsed) => {
137                self.cancel(id).await;
138                Err(Error::Timeout)
139            }
140        }
141    }
142}
143
144#[derive(Debug)]
145pub struct PostIds(AtomicU64);
146
147impl PostIds {
148    pub fn new(start: u64) -> Self {
149        Self(AtomicU64::new(start))
150    }
151    pub fn next(&self) -> u64 {
152        self.0.fetch_add(1, Ordering::Relaxed)
153    }
154}
155
156#[derive(Debug, Clone, Copy, PartialEq, Eq)]
157pub enum PostLane {
158    Alo,    // Post-only orders
159    Normal, // IOC/GTC + info + anything else
160}
161
162#[derive(Debug)]
163pub struct ScheduledPost {
164    pub id: u64,
165    pub request: PostRequest,
166    pub lane: PostLane,
167}
168
169#[derive(Debug)]
170pub struct PostBatcher {
171    tx_alo: mpsc::Sender<ScheduledPost>,
172    tx_normal: mpsc::Sender<ScheduledPost>,
173}
174
175impl PostBatcher {
176    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
177    pub fn new<F>(send_fn: F) -> Self
178    where
179        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
180    {
181        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
182        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
183
184        // ALO lane: batchy tick, low jitter
185        get_runtime().spawn(Self::run_lane(
186            "ALO",
187            rx_alo,
188            Duration::from_millis(100),
189            send_fn.clone(),
190        ));
191
192        // NORMAL lane: faster tick; adjust as needed
193        get_runtime().spawn(Self::run_lane(
194            "NORMAL",
195            rx_normal,
196            Duration::from_millis(50),
197            send_fn,
198        ));
199
200        Self { tx_alo, tx_normal }
201    }
202
203    async fn run_lane<F>(
204        lane_name: &'static str,
205        mut rx: mpsc::Receiver<ScheduledPost>,
206        tick: Duration,
207        mut send_fn: F,
208    ) where
209        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
210    {
211        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
212        let mut interval = time::interval(tick);
213        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
214
215        loop {
216            tokio::select! {
217                maybe_item = rx.recv() => {
218                    match maybe_item {
219                        Some(item) => pend.push(item),
220                        None => break, // sender dropped → terminate lane task
221                    }
222                }
223                _ = interval.tick() => {
224                    if pend.is_empty() { continue; }
225                    let to_send = std::mem::take(&mut pend);
226                    for item in to_send {
227                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
228                        if let Err(e) = send_fn(req).await {
229                            log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
230                        }
231                    }
232                }
233            }
234        }
235        log::info!("Post lane terminated: lane={lane_name}");
236    }
237
238    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
239        match item.lane {
240            PostLane::Alo => self
241                .tx_alo
242                .send(item)
243                .await
244                .map_err(|_| Error::transport("ALO lane closed")),
245            PostLane::Normal => self
246                .tx_normal
247                .send(item)
248                .await
249                .map_err(|_| Error::transport("NORMAL lane closed")),
250        }
251    }
252}
253
254// Helpers to classify lane from an action
255pub fn lane_for_action(action: &ActionRequest) -> PostLane {
256    match action {
257        ActionRequest::Order { orders, .. } => {
258            if orders.is_empty() {
259                return PostLane::Normal;
260            }
261            let all_alo = orders.iter().all(|o| {
262                matches!(
263                    o.t,
264                    OrderTypeRequest::Limit {
265                        tif: TimeInForceRequest::Alo
266                    }
267                )
268            });
269            if all_alo {
270                PostLane::Alo
271            } else {
272                PostLane::Normal
273            }
274        }
275        _ => PostLane::Normal,
276    }
277}
278
279#[derive(Debug, Clone, Copy, Default)]
280pub enum Grouping {
281    #[default]
282    Na,
283    NormalTpsl,
284    PositionTpsl,
285}
286impl Grouping {
287    pub fn as_str(&self) -> &'static str {
288        match self {
289            Self::Na => "na",
290            Self::NormalTpsl => "normalTpsl",
291            Self::PositionTpsl => "positionTpsl",
292        }
293    }
294}
295
296/// Parameters for creating a limit order.
297#[derive(Debug, Clone, Builder)]
298pub struct LimitOrderParams {
299    pub asset: u32,
300    pub is_buy: bool,
301    pub px: String,
302    pub sz: String,
303    pub reduce_only: bool,
304    pub tif: TimeInForceRequest,
305    pub cloid: Option<String>,
306}
307
308/// Parameters for creating a trigger order.
309#[derive(Debug, Clone, Builder)]
310pub struct TriggerOrderParams {
311    pub asset: u32,
312    pub is_buy: bool,
313    pub px: String,
314    pub sz: String,
315    pub reduce_only: bool,
316    pub is_market: bool,
317    pub trigger_px: String,
318    pub tpsl: TpSlRequest,
319    pub cloid: Option<String>,
320}
321
322// ORDER builder (single or many)
323#[derive(Debug, Default)]
324pub struct OrderBuilder {
325    orders: Vec<OrderRequest>,
326    grouping: Grouping,
327}
328
329impl OrderBuilder {
330    pub fn new() -> Self {
331        Self::default()
332    }
333
334    #[must_use]
335    pub fn grouping(mut self, g: Grouping) -> Self {
336        self.grouping = g;
337        self
338    }
339
340    /// Create a limit order with individual parameters (legacy method)
341    #[allow(clippy::too_many_arguments)]
342    #[must_use]
343    pub fn push_limit(
344        self,
345        asset: u32,
346        is_buy: bool,
347        px: impl ToString,
348        sz: impl ToString,
349        reduce_only: bool,
350        tif: TimeInForceRequest,
351        cloid: Option<String>,
352    ) -> Self {
353        let params = LimitOrderParams {
354            asset,
355            is_buy,
356            px: px.to_string(),
357            sz: sz.to_string(),
358            reduce_only,
359            tif,
360            cloid,
361        };
362        self.push_limit_order(params)
363    }
364
365    /// Create a limit order using parameters struct
366    #[must_use]
367    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
368        self.orders.push(OrderRequest {
369            a: params.asset,
370            b: params.is_buy,
371            p: params.px,
372            s: params.sz,
373            r: params.reduce_only,
374            t: OrderTypeRequest::Limit { tif: params.tif },
375            c: params.cloid,
376        });
377        self
378    }
379
380    /// Create a trigger order with individual parameters (legacy method)
381    #[allow(clippy::too_many_arguments)]
382    #[must_use]
383    pub fn push_trigger(
384        self,
385        asset: u32,
386        is_buy: bool,
387        px: impl ToString,
388        sz: impl ToString,
389        reduce_only: bool,
390        is_market: bool,
391        trigger_px: impl ToString,
392        tpsl: TpSlRequest,
393        cloid: Option<String>,
394    ) -> Self {
395        let params = TriggerOrderParams {
396            asset,
397            is_buy,
398            px: px.to_string(),
399            sz: sz.to_string(),
400            reduce_only,
401            is_market,
402            trigger_px: trigger_px.to_string(),
403            tpsl,
404            cloid,
405        };
406        self.push_trigger_order(params)
407    }
408
409    /// Create a trigger order using parameters struct
410    #[must_use]
411    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
412        self.orders.push(OrderRequest {
413            a: params.asset,
414            b: params.is_buy,
415            p: params.px,
416            s: params.sz,
417            r: params.reduce_only,
418            t: OrderTypeRequest::Trigger {
419                is_market: params.is_market,
420                trigger_px: params.trigger_px,
421                tpsl: params.tpsl,
422            },
423            c: params.cloid,
424        });
425        self
426    }
427    pub fn build(self) -> ActionRequest {
428        ActionRequest::Order {
429            orders: self.orders,
430            grouping: self.grouping.as_str().to_string(),
431        }
432    }
433
434    /// Create a single limit order action directly (convenience method)
435    ///
436    /// # Example
437    /// ```ignore
438    /// let action = OrderBuilder::single_limit_order(
439    ///     LimitOrderParamsBuilder::default()
440    ///         .asset(0)
441    ///         .is_buy(true)
442    ///         .px("40000.0")
443    ///         .sz("0.01")
444    ///         .reduce_only(false)
445    ///         .tif(TimeInForceRequest::Gtc)
446    ///         .build()
447    ///         .unwrap()
448    /// );
449    /// ```
450    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
451        Self::new().push_limit_order(params).build()
452    }
453
454    /// Create a single trigger order action directly (convenience method)
455    ///
456    /// # Example
457    /// ```ignore
458    /// let action = OrderBuilder::single_trigger_order(
459    ///     TriggerOrderParamsBuilder::default()
460    ///         .asset(0)
461    ///         .is_buy(false)
462    ///         .px("39000.0")
463    ///         .sz("0.01")
464    ///         .reduce_only(false)
465    ///         .is_market(true)
466    ///         .trigger_px("39500.0")
467    ///         .tpsl(TpSlRequest::Sl)
468    ///         .build()
469    ///         .unwrap()
470    /// );
471    /// ```
472    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
473        Self::new().push_trigger_order(params).build()
474    }
475}
476
477pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
478    ActionRequest::Cancel {
479        cancels: cancels
480            .into_iter()
481            .map(|(a, o)| CancelRequest { a, o })
482            .collect(),
483    }
484}
485pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
486    ActionRequest::CancelByCloid {
487        cancels: vec![CancelByCloidRequest {
488            asset,
489            cloid: cloid.into(),
490        }],
491    }
492}
493pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
494    ActionRequest::Modify {
495        modifies: vec![ModifyRequest {
496            oid,
497            order: new_order,
498        }],
499    }
500}
501
502pub fn info_l2_book(coin: &str) -> PostRequest {
503    PostRequest::Info {
504        payload: serde_json::json!({"type": HyperliquidInfoRequestType::L2Book.as_str(), "coin": coin}),
505    }
506}
507
508pub fn info_all_mids() -> PostRequest {
509    PostRequest::Info {
510        payload: serde_json::json!({"type": HyperliquidInfoRequestType::AllMids.as_str()}),
511    }
512}
513
514pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
515    PostRequest::Info {
516        payload: serde_json::json!({"type": HyperliquidInfoRequestType::OrderStatus.as_str(), "user": user, "oid": oid}),
517    }
518}
519
520pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
521    let mut body =
522        serde_json::json!({"type": HyperliquidInfoRequestType::OpenOrders.as_str(), "user": user});
523    if let Some(fe) = frontend {
524        body["frontend"] = serde_json::json!(fe);
525    }
526    PostRequest::Info { payload: body }
527}
528
529pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
530    let mut body =
531        serde_json::json!({"type": HyperliquidInfoRequestType::UserFills.as_str(), "user": user});
532    if let Some(agg) = aggregate_by_time {
533        body["aggregateByTime"] = serde_json::json!(agg);
534    }
535    PostRequest::Info { payload: body }
536}
537
538pub fn info_user_rate_limit(user: &str) -> PostRequest {
539    PostRequest::Info {
540        payload: serde_json::json!({"type": HyperliquidInfoRequestType::UserRateLimit.as_str(), "user": user}),
541    }
542}
543
544pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
545    PostRequest::Info {
546        payload: serde_json::json!({"type": HyperliquidInfoRequestType::Candle.as_str(), "coin": coin, "interval": interval}),
547    }
548}
549
550pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
551    serde_json::from_value(payload.clone()).map_err(Error::Serde)
552}
553pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
554    serde_json::from_value(payload.clone()).map_err(Error::Serde)
555}
556pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
557    serde_json::from_value(payload.clone()).map_err(Error::Serde)
558}
559
560/// Heuristic classification for action responses.
561#[derive(Debug)]
562pub enum ActionOutcome<'a> {
563    Resting {
564        oid: u64,
565    },
566    Filled {
567        total_sz: &'a str,
568        avg_px: &'a str,
569        oid: Option<u64>,
570    },
571    Error {
572        msg: &'a str,
573    },
574    Unknown(&'a serde_json::Value),
575}
576pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
577    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
578        if let (Some(total_sz), Some(avg_px)) = (
579            payload.get("totalSz").and_then(|v| v.as_str()),
580            payload.get("avgPx").and_then(|v| v.as_str()),
581        ) {
582            return ActionOutcome::Filled {
583                total_sz,
584                avg_px,
585                oid: Some(oid),
586            };
587        }
588        return ActionOutcome::Resting { oid };
589    }
590    if let (Some(total_sz), Some(avg_px)) = (
591        payload.get("totalSz").and_then(|v| v.as_str()),
592        payload.get("avgPx").and_then(|v| v.as_str()),
593    ) {
594        return ActionOutcome::Filled {
595            total_sz,
596            avg_px,
597            oid: None,
598        };
599    }
600    if let Some(msg) = payload
601        .get("error")
602        .and_then(|v| v.as_str())
603        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
604    {
605        return ActionOutcome::Error { msg };
606    }
607    ActionOutcome::Unknown(payload)
608}
609
610#[derive(Clone, Debug)]
611pub struct WsSender {
612    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
613}
614
615impl WsSender {
616    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
617        Self {
618            inner: Arc::new(tokio::sync::Mutex::new(tx)),
619        }
620    }
621
622    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
623        let sender = self.inner.lock().await;
624        sender
625            .send(req)
626            .await
627            .map_err(|_| Error::transport("WebSocket sender closed"))
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use nautilus_common::testing::wait_until_async;
634    use rstest::rstest;
635    use tokio::{
636        sync::oneshot,
637        time::{Duration, timeout},
638    };
639
640    use super::*;
641    use crate::{
642        common::consts::INFLIGHT_MAX,
643        websocket::messages::{
644            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
645            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
646        },
647    };
648
649    fn mk_limit_alo(asset: u32) -> OrderRequest {
650        OrderRequest {
651            a: asset,
652            b: true,
653            p: "1".to_string(),
654            s: "1".to_string(),
655            r: false,
656            t: OrderTypeRequest::Limit {
657                tif: TimeInForceRequest::Alo,
658            },
659            c: None,
660        }
661    }
662
663    fn mk_limit_gtc(asset: u32) -> OrderRequest {
664        OrderRequest {
665            a: asset,
666            b: true,
667            p: "1".to_string(),
668            s: "1".to_string(),
669            r: false,
670            t: OrderTypeRequest::Limit {
671                // any non-ALO TIF keeps it in the Normal lane
672                tif: TimeInForceRequest::Gtc,
673            },
674            c: None,
675        }
676    }
677
678    #[rstest]
679    #[tokio::test(flavor = "multi_thread")]
680    async fn register_duplicate_id_errors() {
681        let router = PostRouter::new();
682        let _rx = router.register(42).await.expect("first register OK");
683
684        let err = router.register(42).await.expect_err("duplicate must error");
685        let msg = err.to_string().to_lowercase();
686        assert!(
687            msg.contains("already") || msg.contains("duplicate"),
688            "unexpected error: {msg}"
689        );
690    }
691
692    #[rstest]
693    #[tokio::test(flavor = "multi_thread")]
694    async fn timeout_cancels_and_allows_reregister() {
695        let router = PostRouter::new();
696        let id = 7;
697
698        let rx = router.register(id).await.unwrap();
699        // No complete() → ensure we time out and the waiter is removed.
700        let err = router
701            .await_with_timeout(id, rx, Duration::from_millis(25))
702            .await
703            .expect_err("should timeout");
704        assert!(
705            err.to_string().to_lowercase().contains("timeout")
706                || err.to_string().to_lowercase().contains("closed"),
707            "unexpected error kind: {err}"
708        );
709
710        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
711        let _rx2 = router
712            .register(id)
713            .await
714            .expect("id should be reusable after timeout cancel");
715    }
716
717    #[rstest]
718    #[tokio::test(flavor = "multi_thread")]
719    async fn inflight_cap_blocks_then_unblocks() {
720        let router = PostRouter::new();
721
722        // Fill the inflight capacity.
723        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
724        for i in 0..INFLIGHT_MAX {
725            let rx = router.register(i as u64).await.unwrap();
726            rxs.push(rx); // keep waiters alive
727        }
728
729        // Next register should block until a permit is freed.
730        let router2 = Arc::clone(&router);
731        let (entered_tx, entered_rx) = oneshot::channel::<()>();
732        let (done_tx, done_rx) = oneshot::channel::<()>();
733        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
734
735        get_runtime().spawn(async move {
736            let _ = entered_tx.send(());
737            let _rx = router2.register(9_999_999).await.unwrap();
738            let _ = done_tx.send(());
739        });
740
741        // Confirm the task is trying to register…
742        entered_rx.await.unwrap();
743
744        // …and that it doesn't complete yet (still blocked on permit).
745        get_runtime().spawn(async move {
746            if done_rx.await.is_ok() {
747                let _ = check_tx.send(());
748            }
749        });
750
751        assert!(
752            timeout(Duration::from_millis(50), check_rx).await.is_err(),
753            "should still be blocked while at cap"
754        );
755
756        // Free one permit by cancelling a waiter.
757        router.cancel(0).await;
758
759        // Wait for the blocked register to complete.
760        tokio::time::sleep(Duration::from_millis(100)).await;
761    }
762
763    #[rstest(
764        orders, expected,
765        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
766        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
767        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
768        case::empty(vec![], PostLane::Normal),
769    )]
770    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
771        let action = ActionRequest::Order {
772            orders,
773            grouping: "na".to_string(),
774        };
775        assert_eq!(lane_for_action(&action), expected);
776    }
777
778    #[rstest]
779    fn test_order_request_builder() {
780        // Test OrderRequestBuilder derived from #[derive(Builder)]
781        let order = OrderRequestBuilder::default()
782            .a(0)
783            .b(true)
784            .p("40000.0".to_string())
785            .s("0.01".to_string())
786            .r(false)
787            .t(OrderTypeRequest::Limit {
788                tif: TimeInForceRequest::Gtc,
789            })
790            .c(Some("test-order-1".to_string()))
791            .build()
792            .expect("should build order");
793
794        assert_eq!(order.a, 0);
795        assert!(order.b);
796        assert_eq!(order.p, "40000.0");
797        assert_eq!(order.s, "0.01");
798        assert!(!order.r);
799        assert_eq!(order.c, Some("test-order-1".to_string()));
800    }
801
802    #[rstest]
803    fn test_limit_order_params_builder() {
804        // Test LimitOrderParamsBuilder
805        let params = LimitOrderParamsBuilder::default()
806            .asset(0)
807            .is_buy(true)
808            .px("40000.0".to_string())
809            .sz("0.01".to_string())
810            .reduce_only(false)
811            .tif(TimeInForceRequest::Alo)
812            .cloid(Some("test-limit-1".to_string()))
813            .build()
814            .expect("should build limit params");
815
816        assert_eq!(params.asset, 0);
817        assert!(params.is_buy);
818        assert_eq!(params.px, "40000.0");
819        assert_eq!(params.sz, "0.01");
820        assert!(!params.reduce_only);
821        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
822    }
823
824    #[rstest]
825    fn test_trigger_order_params_builder() {
826        // Test TriggerOrderParamsBuilder
827        let params = TriggerOrderParamsBuilder::default()
828            .asset(1)
829            .is_buy(false)
830            .px("39000.0".to_string())
831            .sz("0.02".to_string())
832            .reduce_only(false)
833            .is_market(true)
834            .trigger_px("39500.0".to_string())
835            .tpsl(TpSlRequest::Sl)
836            .cloid(Some("test-trigger-1".to_string()))
837            .build()
838            .expect("should build trigger params");
839
840        assert_eq!(params.asset, 1);
841        assert!(!params.is_buy);
842        assert_eq!(params.px, "39000.0");
843        assert!(params.is_market);
844        assert_eq!(params.trigger_px, "39500.0");
845    }
846
847    #[rstest]
848    fn test_order_builder_single_limit_convenience() {
849        // Test OrderBuilder::single_limit_order convenience method
850        let params = LimitOrderParamsBuilder::default()
851            .asset(0)
852            .is_buy(true)
853            .px("40000.0".to_string())
854            .sz("0.01".to_string())
855            .reduce_only(false)
856            .tif(TimeInForceRequest::Gtc)
857            .cloid(None)
858            .build()
859            .unwrap();
860
861        let action = OrderBuilder::single_limit_order(params);
862
863        match action {
864            ActionRequest::Order { orders, grouping } => {
865                assert_eq!(orders.len(), 1);
866                assert_eq!(orders[0].a, 0);
867                assert!(orders[0].b);
868                assert_eq!(grouping, "na");
869            }
870            _ => panic!("Expected ActionRequest::Order variant"),
871        }
872    }
873
874    #[rstest]
875    fn test_order_builder_single_trigger_convenience() {
876        // Test OrderBuilder::single_trigger_order convenience method
877        let params = TriggerOrderParamsBuilder::default()
878            .asset(1)
879            .is_buy(false)
880            .px("39000.0".to_string())
881            .sz("0.02".to_string())
882            .reduce_only(false)
883            .is_market(true)
884            .trigger_px("39500.0".to_string())
885            .tpsl(TpSlRequest::Sl)
886            .cloid(Some("sl-order".to_string()))
887            .build()
888            .unwrap();
889
890        let action = OrderBuilder::single_trigger_order(params);
891
892        match action {
893            ActionRequest::Order { orders, grouping } => {
894                assert_eq!(orders.len(), 1);
895                assert_eq!(orders[0].a, 1);
896                assert_eq!(orders[0].c, Some("sl-order".to_string()));
897                assert_eq!(grouping, "na");
898            }
899            _ => panic!("Expected ActionRequest::Order variant"),
900        }
901    }
902
903    #[rstest]
904    fn test_order_builder_batch_orders() {
905        // Test existing batch order functionality still works
906        let params1 = LimitOrderParams {
907            asset: 0,
908            is_buy: true,
909            px: "40000.0".to_string(),
910            sz: "0.01".to_string(),
911            reduce_only: false,
912            tif: TimeInForceRequest::Gtc,
913            cloid: Some("order-1".to_string()),
914        };
915
916        let params2 = LimitOrderParams {
917            asset: 1,
918            is_buy: false,
919            px: "2000.0".to_string(),
920            sz: "0.5".to_string(),
921            reduce_only: false,
922            tif: TimeInForceRequest::Ioc,
923            cloid: Some("order-2".to_string()),
924        };
925
926        let action = OrderBuilder::new()
927            .grouping(Grouping::NormalTpsl)
928            .push_limit_order(params1)
929            .push_limit_order(params2)
930            .build();
931
932        match action {
933            ActionRequest::Order { orders, grouping } => {
934                assert_eq!(orders.len(), 2);
935                assert_eq!(orders[0].c, Some("order-1".to_string()));
936                assert_eq!(orders[1].c, Some("order-2".to_string()));
937                assert_eq!(grouping, "normalTpsl");
938            }
939            _ => panic!("Expected ActionRequest::Order variant"),
940        }
941    }
942
943    #[rstest]
944    fn test_action_request_constructors() {
945        // Test ActionRequest::order() constructor
946        let order1 = mk_limit_gtc(0);
947        let order2 = mk_limit_gtc(1);
948        let action = ActionRequest::order(vec![order1, order2], "na");
949
950        match action {
951            ActionRequest::Order { orders, grouping } => {
952                assert_eq!(orders.len(), 2);
953                assert_eq!(grouping, "na");
954            }
955            _ => panic!("Expected ActionRequest::Order variant"),
956        }
957
958        // Test ActionRequest::cancel() constructor
959        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
960        let action = ActionRequest::cancel(cancels);
961        assert!(matches!(action, ActionRequest::Cancel { .. }));
962
963        // Test ActionRequest::cancel_by_cloid() constructor
964        let cancels = vec![CancelByCloidRequest {
965            asset: 0,
966            cloid: "order-1".to_string(),
967        }];
968        let action = ActionRequest::cancel_by_cloid(cancels);
969        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
970    }
971
972    #[rstest]
973    #[tokio::test(flavor = "multi_thread")]
974    async fn batcher_sends_on_tick() {
975        // Capture sent ids to prove dispatch happened.
976        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
977        let sent_closure = sent.clone();
978
979        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
980            let sent_inner = sent_closure.clone();
981            Box::pin(async move {
982                if let HyperliquidWsRequest::Post { id, .. } = req {
983                    sent_inner.lock().await.push(id);
984                }
985                Ok(())
986            })
987        };
988
989        let batcher = PostBatcher::new(send_fn);
990
991        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
992        for id in 1..=5u64 {
993            batcher
994                .enqueue(ScheduledPost {
995                    id,
996                    request: info_all_mids(),
997                    lane: PostLane::Normal,
998                })
999                .await
1000                .unwrap();
1001        }
1002
1003        // Wait for all 5 posts to be sent
1004        let sent_check = sent.clone();
1005        wait_until_async(
1006            || {
1007                let sent_inner = sent_check.clone();
1008                async move { sent_inner.lock().await.len() == 5 }
1009            },
1010            Duration::from_secs(2),
1011        )
1012        .await;
1013
1014        let got = sent.lock().await.clone();
1015        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1016    }
1017}