hyperliquid_sdk_rs/providers/
batcher.rs

1//! Order batching for high-frequency trading strategies
2
3use crate::errors::HyperliquidError;
4use crate::types::requests::{CancelRequest, OrderRequest};
5use crate::types::responses::ExchangeResponseStatus;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{mpsc, Mutex};
11use tokio::time::interval;
12use uuid::Uuid;
13
14type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
15
16/// Order with metadata for batching
17#[derive(Clone)]
18pub struct PendingOrder {
19    pub order: OrderRequest,
20    pub nonce: u64,
21    pub id: Uuid,
22    pub response_tx:
23        mpsc::UnboundedSender<Result<ExchangeResponseStatus, HyperliquidError>>,
24}
25
26/// Cancel with metadata for batching
27#[derive(Clone)]
28pub struct PendingCancel {
29    pub cancel: CancelRequest,
30    pub nonce: u64,
31    pub id: Uuid,
32    pub response_tx:
33        mpsc::UnboundedSender<Result<ExchangeResponseStatus, HyperliquidError>>,
34}
35
36/// Order type classification for priority batching
37#[derive(Debug, Clone, PartialEq)]
38pub enum OrderPriority {
39    /// Add Liquidity Only orders (highest priority per docs)
40    ALO,
41    /// Regular GTC/IOC orders
42    Regular,
43}
44
45/// Handle returned when submitting to batcher
46pub enum OrderHandle {
47    /// Order submitted to batch, will be sent soon
48    Pending {
49        id: Uuid,
50        rx: mpsc::UnboundedReceiver<Result<ExchangeResponseStatus, HyperliquidError>>,
51    },
52    /// Order executed immediately (when batching disabled)
53    Immediate(Result<ExchangeResponseStatus, HyperliquidError>),
54}
55
56/// Configuration for order batching
57#[derive(Clone, Debug)]
58pub struct BatchConfig {
59    /// Interval between batch submissions
60    pub interval: Duration,
61    /// Maximum orders per batch
62    pub max_batch_size: usize,
63    /// Separate ALO orders into priority batches
64    pub prioritize_alo: bool,
65    /// Maximum time an order can wait in queue
66    pub max_wait_time: Duration,
67}
68
69impl Default for BatchConfig {
70    fn default() -> Self {
71        Self {
72            interval: Duration::from_millis(100), // 0.1s as recommended
73            max_batch_size: 100,
74            prioritize_alo: true,
75            max_wait_time: Duration::from_millis(500),
76        }
77    }
78}
79
80/// Batches orders for efficient submission
81pub struct OrderBatcher {
82    /// Pending orders queue
83    pending_orders: Arc<Mutex<Vec<PendingOrder>>>,
84    /// Pending cancels queue
85    pending_cancels: Arc<Mutex<Vec<PendingCancel>>>,
86    /// Configuration
87    _config: BatchConfig,
88    /// Shutdown signal
89    shutdown_tx: mpsc::Sender<()>,
90}
91
92impl OrderBatcher {
93    /// Create a new order batcher
94    pub fn new(config: BatchConfig) -> (Self, BatcherHandle) {
95        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
96
97        let batcher = Self {
98            pending_orders: Arc::new(Mutex::new(Vec::new())),
99            pending_cancels: Arc::new(Mutex::new(Vec::new())),
100            _config: config,
101            shutdown_tx,
102        };
103
104        let handle = BatcherHandle {
105            pending_orders: batcher.pending_orders.clone(),
106            pending_cancels: batcher.pending_cancels.clone(),
107            shutdown_rx,
108        };
109
110        (batcher, handle)
111    }
112
113    /// Add an order to the batch queue
114    pub async fn add_order(&self, order: OrderRequest, nonce: u64) -> OrderHandle {
115        let id = Uuid::new_v4();
116        let (tx, rx) = mpsc::unbounded_channel();
117
118        let pending = PendingOrder {
119            order,
120            nonce,
121            id,
122            response_tx: tx,
123        };
124
125        self.pending_orders.lock().await.push(pending);
126
127        OrderHandle::Pending { id, rx }
128    }
129
130    /// Add a cancel to the batch queue
131    pub async fn add_cancel(&self, cancel: CancelRequest, nonce: u64) -> OrderHandle {
132        let id = Uuid::new_v4();
133        let (tx, rx) = mpsc::unbounded_channel();
134
135        let pending = PendingCancel {
136            cancel,
137            nonce,
138            id,
139            response_tx: tx,
140        };
141
142        self.pending_cancels.lock().await.push(pending);
143
144        OrderHandle::Pending { id, rx }
145    }
146
147    /// Shutdown the batcher
148    pub async fn shutdown(self) {
149        let _ = self.shutdown_tx.send(()).await;
150    }
151}
152
153/// Handle for the background batching task
154pub struct BatcherHandle {
155    pending_orders: Arc<Mutex<Vec<PendingOrder>>>,
156    pending_cancels: Arc<Mutex<Vec<PendingCancel>>>,
157    shutdown_rx: mpsc::Receiver<()>,
158}
159
160impl BatcherHandle {
161    /// Run the batching loop (should be spawned as a task)
162    pub async fn run<F, G>(mut self, mut order_executor: F, mut cancel_executor: G)
163    where
164        F: FnMut(
165                Vec<PendingOrder>,
166            )
167                -> BoxFuture<Vec<Result<ExchangeResponseStatus, HyperliquidError>>>
168            + Send,
169        G: FnMut(
170                Vec<PendingCancel>,
171            )
172                -> BoxFuture<Vec<Result<ExchangeResponseStatus, HyperliquidError>>>
173            + Send,
174    {
175        let mut interval = interval(Duration::from_millis(100)); // Fixed interval for now
176
177        loop {
178            tokio::select! {
179                _ = interval.tick() => {
180                    // Process orders
181                    let orders = {
182                        let mut pending = self.pending_orders.lock().await;
183                        std::mem::take(&mut *pending)
184                    };
185
186                    if !orders.is_empty() {
187                        // Separate ALO from regular orders
188                        let (alo_orders, regular_orders): (Vec<_>, Vec<_>) =
189                            orders.into_iter().partition(|o| {
190                                o.order.is_alo()
191                            });
192
193                        // Process ALO orders first (priority)
194                        if !alo_orders.is_empty() {
195                            let results = order_executor(alo_orders.clone()).await;
196                            for (order, result) in alo_orders.into_iter().zip(results) {
197                                let _ = order.response_tx.send(result);
198                            }
199                        }
200
201                        // Process regular orders
202                        if !regular_orders.is_empty() {
203                            let results = order_executor(regular_orders.clone()).await;
204                            for (order, result) in regular_orders.into_iter().zip(results) {
205                                let _ = order.response_tx.send(result);
206                            }
207                        }
208                    }
209
210                    // Process cancels
211                    let cancels = {
212                        let mut pending = self.pending_cancels.lock().await;
213                        std::mem::take(&mut *pending)
214                    };
215
216                    if !cancels.is_empty() {
217                        let results = cancel_executor(cancels.clone()).await;
218                        for (cancel, result) in cancels.into_iter().zip(results) {
219                            let _ = cancel.response_tx.send(result);
220                        }
221                    }
222                }
223
224                _ = self.shutdown_rx.recv() => {
225                    // Graceful shutdown
226                    break;
227                }
228            }
229        }
230    }
231}
232
233impl OrderRequest {
234    /// Check if this is an ALO order
235    pub fn is_alo(&self) -> bool {
236        match &self.order_type {
237            crate::types::requests::OrderType::Limit(limit) => {
238                limit.tif.to_lowercase() == "alo"
239            }
240            _ => false,
241        }
242    }
243}
244
245#[cfg(test)]
246mod tests {
247    use super::*;
248    use crate::types::requests::{Limit, OrderType};
249
250    #[tokio::test]
251    async fn test_order_batching() {
252        let config = BatchConfig::default();
253        let (batcher, _handle) = OrderBatcher::new(config);
254
255        // Create a test order
256        let order = OrderRequest {
257            asset: 0,
258            is_buy: true,
259            limit_px: "50000".to_string(),
260            sz: "0.1".to_string(),
261            reduce_only: false,
262            order_type: OrderType::Limit(Limit {
263                tif: "Gtc".to_string(),
264            }),
265            cloid: None,
266        };
267
268        // Add to batch
269        let handle = batcher.add_order(order, 123456789).await;
270
271        // Should return pending handle
272        assert!(matches!(handle, OrderHandle::Pending { .. }));
273    }
274}