Skip to main content

nautilus_hyperliquid/websocket/
post.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::{
17    sync::{
18        Arc,
19        atomic::{AtomicU64, Ordering},
20    },
21    time::Duration,
22};
23
24use ahash::AHashMap;
25use derive_builder::Builder;
26use futures_util::future::BoxFuture;
27use nautilus_common::live::get_runtime;
28use tokio::{
29    sync::{Mutex, OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
30    time,
31};
32
33use crate::{
34    common::{consts::INFLIGHT_MAX, enums::HyperliquidInfoRequestType},
35    http::{
36        error::{Error, Result},
37        models::{HyperliquidFills, HyperliquidL2Book, HyperliquidOrderStatus},
38    },
39    websocket::messages::{
40        ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, ModifyRequest,
41        OrderRequest, OrderTypeRequest, PostRequest, PostResponse, TimeInForceRequest, TpSlRequest,
42    },
43};
44
45#[derive(Debug)]
46struct Waiter {
47    tx: oneshot::Sender<PostResponse>,
48    // When this is dropped, the permit is released, shrinking inflight
49    _permit: OwnedSemaphorePermit,
50}
51
52#[derive(Debug)]
53pub struct PostRouter {
54    inner: Mutex<AHashMap<u64, Waiter>>,
55    inflight: Arc<Semaphore>, // hard cap per HL docs (e.g., 100)
56}
57
58impl Default for PostRouter {
59    fn default() -> Self {
60        Self {
61            inner: Mutex::new(AHashMap::new()),
62            inflight: Arc::new(Semaphore::new(INFLIGHT_MAX)),
63        }
64    }
65}
66
67impl PostRouter {
68    pub fn new() -> Arc<Self> {
69        Arc::new(Self::default())
70    }
71
72    /// Registers interest in a post id, enforcing inflight cap.
73    pub async fn register(&self, id: u64) -> Result<oneshot::Receiver<PostResponse>> {
74        // Acquire and retain a permit per inflight call
75        let permit = self
76            .inflight
77            .clone()
78            .acquire_owned()
79            .await
80            .map_err(|_| Error::transport("post router semaphore closed"))?;
81
82        let (tx, rx) = oneshot::channel::<PostResponse>();
83        let mut map = self.inner.lock().await;
84        if map.contains_key(&id) {
85            return Err(Error::transport(format!("post id {id} already registered")));
86        }
87        map.insert(
88            id,
89            Waiter {
90                tx,
91                _permit: permit,
92            },
93        );
94        Ok(rx)
95    }
96
97    /// Completes a waiting caller when a response arrives (releases inflight via Waiter drop).
98    pub async fn complete(&self, resp: PostResponse) {
99        let id = resp.id;
100        let waiter = {
101            let mut map = self.inner.lock().await;
102            map.remove(&id)
103        };
104
105        if let Some(waiter) = waiter {
106            if waiter.tx.send(resp).is_err() {
107                log::warn!("Post waiter dropped before delivery: id={id}");
108            }
109            // waiter drops here → permit released
110        } else {
111            log::warn!("Post response with unknown id (late/duplicate?): id={id}");
112        }
113    }
114
115    /// Cancel a pending id (e.g., timeout); quietly succeed if id wasn't present.
116    pub async fn cancel(&self, id: u64) {
117        let _ = {
118            let mut map = self.inner.lock().await;
119            map.remove(&id)
120        };
121        // Waiter (and its permit) drop here if it existed
122    }
123
124    /// Await a response with timeout. On timeout or closed channel, cancels the id.
125    pub async fn await_with_timeout(
126        &self,
127        id: u64,
128        rx: oneshot::Receiver<PostResponse>,
129        timeout: Duration,
130    ) -> Result<PostResponse> {
131        match time::timeout(timeout, rx).await {
132            Ok(Ok(resp)) => Ok(resp),
133            Ok(Err(_closed)) => {
134                self.cancel(id).await;
135                Err(Error::transport("post response channel closed"))
136            }
137            Err(_elapsed) => {
138                self.cancel(id).await;
139                Err(Error::Timeout)
140            }
141        }
142    }
143}
144
145#[derive(Debug)]
146pub struct PostIds(AtomicU64);
147
148impl PostIds {
149    pub fn new(start: u64) -> Self {
150        Self(AtomicU64::new(start))
151    }
152    pub fn next(&self) -> u64 {
153        self.0.fetch_add(1, Ordering::Relaxed)
154    }
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq)]
158pub enum PostLane {
159    Alo,    // Post-only orders
160    Normal, // IOC/GTC + info + anything else
161}
162
163#[derive(Debug)]
164pub struct ScheduledPost {
165    pub id: u64,
166    pub request: PostRequest,
167    pub lane: PostLane,
168}
169
170#[derive(Debug)]
171pub struct PostBatcher {
172    tx_alo: mpsc::Sender<ScheduledPost>,
173    tx_normal: mpsc::Sender<ScheduledPost>,
174}
175
176impl PostBatcher {
177    /// Spawns two lane tasks that batch-send scheduled posts via `send_fn`.
178    pub fn new<F>(send_fn: F) -> Self
179    where
180        F: Send + 'static + Clone + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
181    {
182        let (tx_alo, rx_alo) = mpsc::channel::<ScheduledPost>(1024);
183        let (tx_normal, rx_normal) = mpsc::channel::<ScheduledPost>(4096);
184
185        // ALO lane: batchy tick, low jitter
186        get_runtime().spawn(Self::run_lane(
187            "ALO",
188            rx_alo,
189            Duration::from_millis(100),
190            send_fn.clone(),
191        ));
192
193        // NORMAL lane: faster tick; adjust as needed
194        get_runtime().spawn(Self::run_lane(
195            "NORMAL",
196            rx_normal,
197            Duration::from_millis(50),
198            send_fn,
199        ));
200
201        Self { tx_alo, tx_normal }
202    }
203
204    async fn run_lane<F>(
205        lane_name: &'static str,
206        mut rx: mpsc::Receiver<ScheduledPost>,
207        tick: Duration,
208        mut send_fn: F,
209    ) where
210        F: Send + 'static + FnMut(HyperliquidWsRequest) -> BoxFuture<'static, Result<()>>,
211    {
212        let mut pend: Vec<ScheduledPost> = Vec::with_capacity(128);
213        let mut interval = time::interval(tick);
214        interval.set_missed_tick_behavior(time::MissedTickBehavior::Delay);
215
216        loop {
217            tokio::select! {
218                maybe_item = rx.recv() => {
219                    match maybe_item {
220                        Some(item) => pend.push(item),
221                        None => break, // sender dropped → terminate lane task
222                    }
223                }
224                _ = interval.tick() => {
225                    if pend.is_empty() { continue; }
226                    let to_send = std::mem::take(&mut pend);
227                    for item in to_send {
228                        let req = HyperliquidWsRequest::Post { id: item.id, request: item.request.clone() };
229                        if let Err(e) = send_fn(req).await {
230                            log::error!("Failed to send post: lane={lane_name}, id={}, {e}", item.id);
231                        }
232                    }
233                }
234            }
235        }
236        log::info!("Post lane terminated: lane={lane_name}");
237    }
238
239    pub async fn enqueue(&self, item: ScheduledPost) -> Result<()> {
240        match item.lane {
241            PostLane::Alo => self
242                .tx_alo
243                .send(item)
244                .await
245                .map_err(|_| Error::transport("ALO lane closed")),
246            PostLane::Normal => self
247                .tx_normal
248                .send(item)
249                .await
250                .map_err(|_| Error::transport("NORMAL lane closed")),
251        }
252    }
253}
254
255// Helpers to classify lane from an action
256pub fn lane_for_action(action: &ActionRequest) -> PostLane {
257    match action {
258        ActionRequest::Order { orders, .. } => {
259            if orders.is_empty() {
260                return PostLane::Normal;
261            }
262            let all_alo = orders.iter().all(|o| {
263                matches!(
264                    o.t,
265                    OrderTypeRequest::Limit {
266                        tif: TimeInForceRequest::Alo
267                    }
268                )
269            });
270
271            if all_alo {
272                PostLane::Alo
273            } else {
274                PostLane::Normal
275            }
276        }
277        _ => PostLane::Normal,
278    }
279}
280
281#[derive(Debug, Clone, Copy, Default)]
282pub enum Grouping {
283    #[default]
284    Na,
285    NormalTpsl,
286    PositionTpsl,
287}
288impl Grouping {
289    pub fn as_str(&self) -> &'static str {
290        match self {
291            Self::Na => "na",
292            Self::NormalTpsl => "normalTpsl",
293            Self::PositionTpsl => "positionTpsl",
294        }
295    }
296}
297
298/// Parameters for creating a limit order.
299#[derive(Debug, Clone, Builder)]
300pub struct LimitOrderParams {
301    pub asset: u32,
302    pub is_buy: bool,
303    pub px: String,
304    pub sz: String,
305    pub reduce_only: bool,
306    pub tif: TimeInForceRequest,
307    pub cloid: Option<String>,
308}
309
310/// Parameters for creating a trigger order.
311#[derive(Debug, Clone, Builder)]
312pub struct TriggerOrderParams {
313    pub asset: u32,
314    pub is_buy: bool,
315    pub px: String,
316    pub sz: String,
317    pub reduce_only: bool,
318    pub is_market: bool,
319    pub trigger_px: String,
320    pub tpsl: TpSlRequest,
321    pub cloid: Option<String>,
322}
323
324// ORDER builder (single or many)
325#[derive(Debug, Default)]
326pub struct OrderBuilder {
327    orders: Vec<OrderRequest>,
328    grouping: Grouping,
329}
330
331impl OrderBuilder {
332    pub fn new() -> Self {
333        Self::default()
334    }
335
336    #[must_use]
337    pub fn grouping(mut self, g: Grouping) -> Self {
338        self.grouping = g;
339        self
340    }
341
342    /// Create a limit order with individual parameters (legacy method)
343    #[allow(clippy::too_many_arguments)]
344    #[must_use]
345    pub fn push_limit(
346        self,
347        asset: u32,
348        is_buy: bool,
349        px: &(impl ToString + ?Sized),
350        sz: &(impl ToString + ?Sized),
351        reduce_only: bool,
352        tif: TimeInForceRequest,
353        cloid: Option<String>,
354    ) -> Self {
355        let params = LimitOrderParams {
356            asset,
357            is_buy,
358            px: px.to_string(),
359            sz: sz.to_string(),
360            reduce_only,
361            tif,
362            cloid,
363        };
364        self.push_limit_order(params)
365    }
366
367    /// Create a limit order using parameters struct
368    #[must_use]
369    pub fn push_limit_order(mut self, params: LimitOrderParams) -> Self {
370        self.orders.push(OrderRequest {
371            a: params.asset,
372            b: params.is_buy,
373            p: params.px,
374            s: params.sz,
375            r: params.reduce_only,
376            t: OrderTypeRequest::Limit { tif: params.tif },
377            c: params.cloid,
378        });
379        self
380    }
381
382    /// Create a trigger order with individual parameters (legacy method)
383    #[allow(clippy::too_many_arguments)]
384    #[must_use]
385    pub fn push_trigger(
386        self,
387        asset: u32,
388        is_buy: bool,
389        px: &(impl ToString + ?Sized),
390        sz: &(impl ToString + ?Sized),
391        reduce_only: bool,
392        is_market: bool,
393        trigger_px: &(impl ToString + ?Sized),
394        tpsl: TpSlRequest,
395        cloid: Option<String>,
396    ) -> Self {
397        let params = TriggerOrderParams {
398            asset,
399            is_buy,
400            px: px.to_string(),
401            sz: sz.to_string(),
402            reduce_only,
403            is_market,
404            trigger_px: trigger_px.to_string(),
405            tpsl,
406            cloid,
407        };
408        self.push_trigger_order(params)
409    }
410
411    /// Create a trigger order using parameters struct
412    #[must_use]
413    pub fn push_trigger_order(mut self, params: TriggerOrderParams) -> Self {
414        self.orders.push(OrderRequest {
415            a: params.asset,
416            b: params.is_buy,
417            p: params.px,
418            s: params.sz,
419            r: params.reduce_only,
420            t: OrderTypeRequest::Trigger {
421                is_market: params.is_market,
422                trigger_px: params.trigger_px,
423                tpsl: params.tpsl,
424            },
425            c: params.cloid,
426        });
427        self
428    }
429    pub fn build(self) -> ActionRequest {
430        ActionRequest::Order {
431            orders: self.orders,
432            grouping: self.grouping.as_str().to_string(),
433        }
434    }
435
436    /// Create a single limit order action directly (convenience method)
437    ///
438    /// # Example
439    /// ```ignore
440    /// let action = OrderBuilder::single_limit_order(
441    ///     LimitOrderParamsBuilder::default()
442    ///         .asset(0)
443    ///         .is_buy(true)
444    ///         .px("40000.0")
445    ///         .sz("0.01")
446    ///         .reduce_only(false)
447    ///         .tif(TimeInForceRequest::Gtc)
448    ///         .build()
449    ///         .unwrap()
450    /// );
451    /// ```
452    pub fn single_limit_order(params: LimitOrderParams) -> ActionRequest {
453        Self::new().push_limit_order(params).build()
454    }
455
456    /// Create a single trigger order action directly (convenience method)
457    ///
458    /// # Example
459    /// ```ignore
460    /// let action = OrderBuilder::single_trigger_order(
461    ///     TriggerOrderParamsBuilder::default()
462    ///         .asset(0)
463    ///         .is_buy(false)
464    ///         .px("39000.0")
465    ///         .sz("0.01")
466    ///         .reduce_only(false)
467    ///         .is_market(true)
468    ///         .trigger_px("39500.0")
469    ///         .tpsl(TpSlRequest::Sl)
470    ///         .build()
471    ///         .unwrap()
472    /// );
473    /// ```
474    pub fn single_trigger_order(params: TriggerOrderParams) -> ActionRequest {
475        Self::new().push_trigger_order(params).build()
476    }
477}
478
479pub fn cancel_many(cancels: Vec<(u32, u64)>) -> ActionRequest {
480    ActionRequest::Cancel {
481        cancels: cancels
482            .into_iter()
483            .map(|(a, o)| CancelRequest { a, o })
484            .collect(),
485    }
486}
487pub fn cancel_by_cloid(asset: u32, cloid: impl Into<String>) -> ActionRequest {
488    ActionRequest::CancelByCloid {
489        cancels: vec![CancelByCloidRequest {
490            asset,
491            cloid: cloid.into(),
492        }],
493    }
494}
495pub fn modify(oid: u64, new_order: OrderRequest) -> ActionRequest {
496    ActionRequest::Modify {
497        modifies: vec![ModifyRequest {
498            oid,
499            order: new_order,
500        }],
501    }
502}
503
504pub fn info_l2_book(coin: &str) -> PostRequest {
505    PostRequest::Info {
506        payload: serde_json::json!({"type": HyperliquidInfoRequestType::L2Book.as_str(), "coin": coin}),
507    }
508}
509
510pub fn info_all_mids() -> PostRequest {
511    PostRequest::Info {
512        payload: serde_json::json!({"type": HyperliquidInfoRequestType::AllMids.as_str()}),
513    }
514}
515
516pub fn info_order_status(user: &str, oid: u64) -> PostRequest {
517    PostRequest::Info {
518        payload: serde_json::json!({"type": HyperliquidInfoRequestType::OrderStatus.as_str(), "user": user, "oid": oid}),
519    }
520}
521
522pub fn info_open_orders(user: &str, frontend: Option<bool>) -> PostRequest {
523    let mut body =
524        serde_json::json!({"type": HyperliquidInfoRequestType::OpenOrders.as_str(), "user": user});
525
526    if let Some(fe) = frontend {
527        body["frontend"] = serde_json::json!(fe);
528    }
529    PostRequest::Info { payload: body }
530}
531
532pub fn info_user_fills(user: &str, aggregate_by_time: Option<bool>) -> PostRequest {
533    let mut body =
534        serde_json::json!({"type": HyperliquidInfoRequestType::UserFills.as_str(), "user": user});
535
536    if let Some(agg) = aggregate_by_time {
537        body["aggregateByTime"] = serde_json::json!(agg);
538    }
539    PostRequest::Info { payload: body }
540}
541
542pub fn info_user_rate_limit(user: &str) -> PostRequest {
543    PostRequest::Info {
544        payload: serde_json::json!({"type": HyperliquidInfoRequestType::UserRateLimit.as_str(), "user": user}),
545    }
546}
547
548pub fn info_candle(coin: &str, interval: &str) -> PostRequest {
549    PostRequest::Info {
550        payload: serde_json::json!({"type": HyperliquidInfoRequestType::Candle.as_str(), "coin": coin, "interval": interval}),
551    }
552}
553
554pub fn parse_l2_book(payload: &serde_json::Value) -> Result<HyperliquidL2Book> {
555    serde_json::from_value(payload.clone()).map_err(Error::Serde)
556}
557pub fn parse_user_fills(payload: &serde_json::Value) -> Result<HyperliquidFills> {
558    serde_json::from_value(payload.clone()).map_err(Error::Serde)
559}
560pub fn parse_order_status(payload: &serde_json::Value) -> Result<HyperliquidOrderStatus> {
561    serde_json::from_value(payload.clone()).map_err(Error::Serde)
562}
563
564/// Heuristic classification for action responses.
565#[derive(Debug)]
566pub enum ActionOutcome<'a> {
567    Resting {
568        oid: u64,
569    },
570    Filled {
571        total_sz: &'a str,
572        avg_px: &'a str,
573        oid: Option<u64>,
574    },
575    Error {
576        msg: &'a str,
577    },
578    Unknown(&'a serde_json::Value),
579}
580pub fn classify_action_payload(payload: &serde_json::Value) -> ActionOutcome<'_> {
581    if let Some(oid) = payload.get("oid").and_then(|v| v.as_u64()) {
582        if let (Some(total_sz), Some(avg_px)) = (
583            payload.get("totalSz").and_then(|v| v.as_str()),
584            payload.get("avgPx").and_then(|v| v.as_str()),
585        ) {
586            return ActionOutcome::Filled {
587                total_sz,
588                avg_px,
589                oid: Some(oid),
590            };
591        }
592        return ActionOutcome::Resting { oid };
593    }
594
595    if let (Some(total_sz), Some(avg_px)) = (
596        payload.get("totalSz").and_then(|v| v.as_str()),
597        payload.get("avgPx").and_then(|v| v.as_str()),
598    ) {
599        return ActionOutcome::Filled {
600            total_sz,
601            avg_px,
602            oid: None,
603        };
604    }
605
606    if let Some(msg) = payload
607        .get("error")
608        .and_then(|v| v.as_str())
609        .or_else(|| payload.get("message").and_then(|v| v.as_str()))
610    {
611        return ActionOutcome::Error { msg };
612    }
613    ActionOutcome::Unknown(payload)
614}
615
616#[derive(Clone, Debug)]
617pub struct WsSender {
618    inner: Arc<tokio::sync::Mutex<mpsc::Sender<HyperliquidWsRequest>>>,
619}
620
621impl WsSender {
622    pub fn new(tx: mpsc::Sender<HyperliquidWsRequest>) -> Self {
623        Self {
624            inner: Arc::new(tokio::sync::Mutex::new(tx)),
625        }
626    }
627
628    pub async fn send(&self, req: HyperliquidWsRequest) -> Result<()> {
629        let sender = self.inner.lock().await;
630        sender
631            .send(req)
632            .await
633            .map_err(|_| Error::transport("WebSocket sender closed"))
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use nautilus_common::testing::wait_until_async;
640    use rstest::rstest;
641    use tokio::{
642        sync::oneshot,
643        time::{Duration, timeout},
644    };
645
646    use super::*;
647    use crate::{
648        common::consts::INFLIGHT_MAX,
649        websocket::messages::{
650            ActionRequest, CancelByCloidRequest, CancelRequest, HyperliquidWsRequest, OrderRequest,
651            OrderRequestBuilder, OrderTypeRequest, TimeInForceRequest,
652        },
653    };
654
655    fn mk_limit_alo(asset: u32) -> OrderRequest {
656        OrderRequest {
657            a: asset,
658            b: true,
659            p: "1".to_string(),
660            s: "1".to_string(),
661            r: false,
662            t: OrderTypeRequest::Limit {
663                tif: TimeInForceRequest::Alo,
664            },
665            c: None,
666        }
667    }
668
669    fn mk_limit_gtc(asset: u32) -> OrderRequest {
670        OrderRequest {
671            a: asset,
672            b: true,
673            p: "1".to_string(),
674            s: "1".to_string(),
675            r: false,
676            t: OrderTypeRequest::Limit {
677                // any non-ALO TIF keeps it in the Normal lane
678                tif: TimeInForceRequest::Gtc,
679            },
680            c: None,
681        }
682    }
683
684    #[rstest]
685    #[tokio::test(flavor = "multi_thread")]
686    async fn register_duplicate_id_errors() {
687        let router = PostRouter::new();
688        let _rx = router.register(42).await.expect("first register OK");
689
690        let err = router.register(42).await.expect_err("duplicate must error");
691        let msg = err.to_string().to_lowercase();
692        assert!(
693            msg.contains("already") || msg.contains("duplicate"),
694            "unexpected error: {msg}"
695        );
696    }
697
698    #[rstest]
699    #[tokio::test(flavor = "multi_thread")]
700    async fn timeout_cancels_and_allows_reregister() {
701        let router = PostRouter::new();
702        let id = 7;
703
704        let rx = router.register(id).await.unwrap();
705        // No complete() → ensure we time out and the waiter is removed.
706        let err = router
707            .await_with_timeout(id, rx, Duration::from_millis(25))
708            .await
709            .expect_err("should timeout");
710        assert!(
711            err.to_string().to_lowercase().contains("timeout")
712                || err.to_string().to_lowercase().contains("closed"),
713            "unexpected error kind: {err}"
714        );
715
716        // After timeout, id should be reusable (cancel dropped the waiter & released the permit).
717        let _rx2 = router
718            .register(id)
719            .await
720            .expect("id should be reusable after timeout cancel");
721    }
722
723    #[rstest]
724    #[tokio::test(flavor = "multi_thread")]
725    async fn inflight_cap_blocks_then_unblocks() {
726        let router = PostRouter::new();
727
728        // Fill the inflight capacity.
729        let mut rxs = Vec::with_capacity(INFLIGHT_MAX);
730        for i in 0..INFLIGHT_MAX {
731            let rx = router.register(i as u64).await.unwrap();
732            rxs.push(rx); // keep waiters alive
733        }
734
735        // Next register should block until a permit is freed.
736        let router2 = Arc::clone(&router);
737        let (entered_tx, entered_rx) = oneshot::channel::<()>();
738        let (done_tx, done_rx) = oneshot::channel::<()>();
739        let (check_tx, check_rx) = oneshot::channel::<()>(); // separate channel for checking
740
741        get_runtime().spawn(async move {
742            let _ = entered_tx.send(());
743            let _rx = router2.register(9_999_999).await.unwrap();
744            let _ = done_tx.send(());
745        });
746
747        // Confirm the task is trying to register…
748        entered_rx.await.unwrap();
749
750        // …and that it doesn't complete yet (still blocked on permit).
751        get_runtime().spawn(async move {
752            if done_rx.await.is_ok() {
753                let _ = check_tx.send(());
754            }
755        });
756
757        assert!(
758            timeout(Duration::from_millis(50), check_rx).await.is_err(),
759            "should still be blocked while at cap"
760        );
761
762        // Free one permit by cancelling a waiter.
763        router.cancel(0).await;
764
765        // Wait for the blocked register to complete.
766        tokio::time::sleep(Duration::from_millis(100)).await;
767    }
768
769    #[rstest(
770        orders, expected,
771        case::all_alo(vec![mk_limit_alo(0), mk_limit_alo(1)], PostLane::Alo),
772        case::mixed_alo_gtc(vec![mk_limit_alo(0), mk_limit_gtc(1)], PostLane::Normal),
773        case::all_gtc(vec![mk_limit_gtc(0), mk_limit_gtc(1)], PostLane::Normal),
774        case::empty(vec![], PostLane::Normal),
775    )]
776    fn lane_classifier_cases(orders: Vec<OrderRequest>, expected: PostLane) {
777        let action = ActionRequest::Order {
778            orders,
779            grouping: "na".to_string(),
780        };
781        assert_eq!(lane_for_action(&action), expected);
782    }
783
784    #[rstest]
785    fn test_order_request_builder() {
786        // Test OrderRequestBuilder derived from #[derive(Builder)]
787        let order = OrderRequestBuilder::default()
788            .a(0)
789            .b(true)
790            .p("40000.0".to_string())
791            .s("0.01".to_string())
792            .r(false)
793            .t(OrderTypeRequest::Limit {
794                tif: TimeInForceRequest::Gtc,
795            })
796            .c(Some("test-order-1".to_string()))
797            .build()
798            .expect("should build order");
799
800        assert_eq!(order.a, 0);
801        assert!(order.b);
802        assert_eq!(order.p, "40000.0");
803        assert_eq!(order.s, "0.01");
804        assert!(!order.r);
805        assert_eq!(order.c, Some("test-order-1".to_string()));
806    }
807
808    #[rstest]
809    fn test_limit_order_params_builder() {
810        // Test LimitOrderParamsBuilder
811        let params = LimitOrderParamsBuilder::default()
812            .asset(0)
813            .is_buy(true)
814            .px("40000.0".to_string())
815            .sz("0.01".to_string())
816            .reduce_only(false)
817            .tif(TimeInForceRequest::Alo)
818            .cloid(Some("test-limit-1".to_string()))
819            .build()
820            .expect("should build limit params");
821
822        assert_eq!(params.asset, 0);
823        assert!(params.is_buy);
824        assert_eq!(params.px, "40000.0");
825        assert_eq!(params.sz, "0.01");
826        assert!(!params.reduce_only);
827        assert_eq!(params.cloid, Some("test-limit-1".to_string()));
828    }
829
830    #[rstest]
831    fn test_trigger_order_params_builder() {
832        // Test TriggerOrderParamsBuilder
833        let params = TriggerOrderParamsBuilder::default()
834            .asset(1)
835            .is_buy(false)
836            .px("39000.0".to_string())
837            .sz("0.02".to_string())
838            .reduce_only(false)
839            .is_market(true)
840            .trigger_px("39500.0".to_string())
841            .tpsl(TpSlRequest::Sl)
842            .cloid(Some("test-trigger-1".to_string()))
843            .build()
844            .expect("should build trigger params");
845
846        assert_eq!(params.asset, 1);
847        assert!(!params.is_buy);
848        assert_eq!(params.px, "39000.0");
849        assert!(params.is_market);
850        assert_eq!(params.trigger_px, "39500.0");
851    }
852
853    #[rstest]
854    fn test_order_builder_single_limit_convenience() {
855        // Test OrderBuilder::single_limit_order convenience method
856        let params = LimitOrderParamsBuilder::default()
857            .asset(0)
858            .is_buy(true)
859            .px("40000.0".to_string())
860            .sz("0.01".to_string())
861            .reduce_only(false)
862            .tif(TimeInForceRequest::Gtc)
863            .cloid(None)
864            .build()
865            .unwrap();
866
867        let action = OrderBuilder::single_limit_order(params);
868
869        match action {
870            ActionRequest::Order { orders, grouping } => {
871                assert_eq!(orders.len(), 1);
872                assert_eq!(orders[0].a, 0);
873                assert!(orders[0].b);
874                assert_eq!(grouping, "na");
875            }
876            _ => panic!("Expected ActionRequest::Order variant"),
877        }
878    }
879
880    #[rstest]
881    fn test_order_builder_single_trigger_convenience() {
882        // Test OrderBuilder::single_trigger_order convenience method
883        let params = TriggerOrderParamsBuilder::default()
884            .asset(1)
885            .is_buy(false)
886            .px("39000.0".to_string())
887            .sz("0.02".to_string())
888            .reduce_only(false)
889            .is_market(true)
890            .trigger_px("39500.0".to_string())
891            .tpsl(TpSlRequest::Sl)
892            .cloid(Some("sl-order".to_string()))
893            .build()
894            .unwrap();
895
896        let action = OrderBuilder::single_trigger_order(params);
897
898        match action {
899            ActionRequest::Order { orders, grouping } => {
900                assert_eq!(orders.len(), 1);
901                assert_eq!(orders[0].a, 1);
902                assert_eq!(orders[0].c, Some("sl-order".to_string()));
903                assert_eq!(grouping, "na");
904            }
905            _ => panic!("Expected ActionRequest::Order variant"),
906        }
907    }
908
909    #[rstest]
910    fn test_order_builder_batch_orders() {
911        // Test existing batch order functionality still works
912        let params1 = LimitOrderParams {
913            asset: 0,
914            is_buy: true,
915            px: "40000.0".to_string(),
916            sz: "0.01".to_string(),
917            reduce_only: false,
918            tif: TimeInForceRequest::Gtc,
919            cloid: Some("order-1".to_string()),
920        };
921
922        let params2 = LimitOrderParams {
923            asset: 1,
924            is_buy: false,
925            px: "2000.0".to_string(),
926            sz: "0.5".to_string(),
927            reduce_only: false,
928            tif: TimeInForceRequest::Ioc,
929            cloid: Some("order-2".to_string()),
930        };
931
932        let action = OrderBuilder::new()
933            .grouping(Grouping::NormalTpsl)
934            .push_limit_order(params1)
935            .push_limit_order(params2)
936            .build();
937
938        match action {
939            ActionRequest::Order { orders, grouping } => {
940                assert_eq!(orders.len(), 2);
941                assert_eq!(orders[0].c, Some("order-1".to_string()));
942                assert_eq!(orders[1].c, Some("order-2".to_string()));
943                assert_eq!(grouping, "normalTpsl");
944            }
945            _ => panic!("Expected ActionRequest::Order variant"),
946        }
947    }
948
949    #[rstest]
950    fn test_action_request_constructors() {
951        // Test ActionRequest::order() constructor
952        let order1 = mk_limit_gtc(0);
953        let order2 = mk_limit_gtc(1);
954        let action = ActionRequest::order(vec![order1, order2], "na");
955
956        match action {
957            ActionRequest::Order { orders, grouping } => {
958                assert_eq!(orders.len(), 2);
959                assert_eq!(grouping, "na");
960            }
961            _ => panic!("Expected ActionRequest::Order variant"),
962        }
963
964        // Test ActionRequest::cancel() constructor
965        let cancels = vec![CancelRequest { a: 0, o: 12345 }];
966        let action = ActionRequest::cancel(cancels);
967        assert!(matches!(action, ActionRequest::Cancel { .. }));
968
969        // Test ActionRequest::cancel_by_cloid() constructor
970        let cancels = vec![CancelByCloidRequest {
971            asset: 0,
972            cloid: "order-1".to_string(),
973        }];
974        let action = ActionRequest::cancel_by_cloid(cancels);
975        assert!(matches!(action, ActionRequest::CancelByCloid { .. }));
976    }
977
978    #[rstest]
979    #[tokio::test(flavor = "multi_thread")]
980    async fn batcher_sends_on_tick() {
981        // Capture sent ids to prove dispatch happened.
982        let sent: Arc<tokio::sync::Mutex<Vec<u64>>> = Arc::new(tokio::sync::Mutex::new(Vec::new()));
983        let sent_closure = sent.clone();
984
985        let send_fn = move |req: HyperliquidWsRequest| -> BoxFuture<'static, Result<()>> {
986            let sent_inner = sent_closure.clone();
987            Box::pin(async move {
988                if let HyperliquidWsRequest::Post { id, .. } = req {
989                    sent_inner.lock().await.push(id);
990                }
991                Ok(())
992            })
993        };
994
995        let batcher = PostBatcher::new(send_fn);
996
997        // Enqueue a handful of posts into the NORMAL lane; tick is ~50ms.
998        for id in 1..=5u64 {
999            batcher
1000                .enqueue(ScheduledPost {
1001                    id,
1002                    request: info_all_mids(),
1003                    lane: PostLane::Normal,
1004                })
1005                .await
1006                .unwrap();
1007        }
1008
1009        // Wait for all 5 posts to be sent
1010        let sent_check = sent.clone();
1011        wait_until_async(
1012            || {
1013                let sent_inner = sent_check.clone();
1014                async move { sent_inner.lock().await.len() == 5 }
1015            },
1016            Duration::from_secs(2),
1017        )
1018        .await;
1019
1020        let got = sent.lock().await.clone();
1021        assert_eq!(got, vec![1, 2, 3, 4, 5]);
1022    }
1023}