hyperliquid_sdk_rs/providers/
batcher.rs1use crate::errors::HyperliquidError;
4use crate::types::requests::{CancelRequest, OrderRequest};
5use crate::types::responses::ExchangeResponseStatus;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::{mpsc, Mutex};
11use tokio::time::interval;
12use uuid::Uuid;
13
14type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send + 'static>>;
15
16#[derive(Clone)]
18pub struct PendingOrder {
19 pub order: OrderRequest,
20 pub nonce: u64,
21 pub id: Uuid,
22 pub response_tx:
23 mpsc::UnboundedSender<Result<ExchangeResponseStatus, HyperliquidError>>,
24}
25
26#[derive(Clone)]
28pub struct PendingCancel {
29 pub cancel: CancelRequest,
30 pub nonce: u64,
31 pub id: Uuid,
32 pub response_tx:
33 mpsc::UnboundedSender<Result<ExchangeResponseStatus, HyperliquidError>>,
34}
35
36#[derive(Debug, Clone, PartialEq)]
38pub enum OrderPriority {
39 ALO,
41 Regular,
43}
44
45pub enum OrderHandle {
47 Pending {
49 id: Uuid,
50 rx: mpsc::UnboundedReceiver<Result<ExchangeResponseStatus, HyperliquidError>>,
51 },
52 Immediate(Result<ExchangeResponseStatus, HyperliquidError>),
54}
55
56#[derive(Clone, Debug)]
58pub struct BatchConfig {
59 pub interval: Duration,
61 pub max_batch_size: usize,
63 pub prioritize_alo: bool,
65 pub max_wait_time: Duration,
67}
68
69impl Default for BatchConfig {
70 fn default() -> Self {
71 Self {
72 interval: Duration::from_millis(100), max_batch_size: 100,
74 prioritize_alo: true,
75 max_wait_time: Duration::from_millis(500),
76 }
77 }
78}
79
80pub struct OrderBatcher {
82 pending_orders: Arc<Mutex<Vec<PendingOrder>>>,
84 pending_cancels: Arc<Mutex<Vec<PendingCancel>>>,
86 _config: BatchConfig,
88 shutdown_tx: mpsc::Sender<()>,
90}
91
92impl OrderBatcher {
93 pub fn new(config: BatchConfig) -> (Self, BatcherHandle) {
95 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
96
97 let batcher = Self {
98 pending_orders: Arc::new(Mutex::new(Vec::new())),
99 pending_cancels: Arc::new(Mutex::new(Vec::new())),
100 _config: config,
101 shutdown_tx,
102 };
103
104 let handle = BatcherHandle {
105 pending_orders: batcher.pending_orders.clone(),
106 pending_cancels: batcher.pending_cancels.clone(),
107 shutdown_rx,
108 };
109
110 (batcher, handle)
111 }
112
113 pub async fn add_order(&self, order: OrderRequest, nonce: u64) -> OrderHandle {
115 let id = Uuid::new_v4();
116 let (tx, rx) = mpsc::unbounded_channel();
117
118 let pending = PendingOrder {
119 order,
120 nonce,
121 id,
122 response_tx: tx,
123 };
124
125 self.pending_orders.lock().await.push(pending);
126
127 OrderHandle::Pending { id, rx }
128 }
129
130 pub async fn add_cancel(&self, cancel: CancelRequest, nonce: u64) -> OrderHandle {
132 let id = Uuid::new_v4();
133 let (tx, rx) = mpsc::unbounded_channel();
134
135 let pending = PendingCancel {
136 cancel,
137 nonce,
138 id,
139 response_tx: tx,
140 };
141
142 self.pending_cancels.lock().await.push(pending);
143
144 OrderHandle::Pending { id, rx }
145 }
146
147 pub async fn shutdown(self) {
149 let _ = self.shutdown_tx.send(()).await;
150 }
151}
152
153pub struct BatcherHandle {
155 pending_orders: Arc<Mutex<Vec<PendingOrder>>>,
156 pending_cancels: Arc<Mutex<Vec<PendingCancel>>>,
157 shutdown_rx: mpsc::Receiver<()>,
158}
159
160impl BatcherHandle {
161 pub async fn run<F, G>(mut self, mut order_executor: F, mut cancel_executor: G)
163 where
164 F: FnMut(
165 Vec<PendingOrder>,
166 )
167 -> BoxFuture<Vec<Result<ExchangeResponseStatus, HyperliquidError>>>
168 + Send,
169 G: FnMut(
170 Vec<PendingCancel>,
171 )
172 -> BoxFuture<Vec<Result<ExchangeResponseStatus, HyperliquidError>>>
173 + Send,
174 {
175 let mut interval = interval(Duration::from_millis(100)); loop {
178 tokio::select! {
179 _ = interval.tick() => {
180 let orders = {
182 let mut pending = self.pending_orders.lock().await;
183 std::mem::take(&mut *pending)
184 };
185
186 if !orders.is_empty() {
187 let (alo_orders, regular_orders): (Vec<_>, Vec<_>) =
189 orders.into_iter().partition(|o| {
190 o.order.is_alo()
191 });
192
193 if !alo_orders.is_empty() {
195 let results = order_executor(alo_orders.clone()).await;
196 for (order, result) in alo_orders.into_iter().zip(results) {
197 let _ = order.response_tx.send(result);
198 }
199 }
200
201 if !regular_orders.is_empty() {
203 let results = order_executor(regular_orders.clone()).await;
204 for (order, result) in regular_orders.into_iter().zip(results) {
205 let _ = order.response_tx.send(result);
206 }
207 }
208 }
209
210 let cancels = {
212 let mut pending = self.pending_cancels.lock().await;
213 std::mem::take(&mut *pending)
214 };
215
216 if !cancels.is_empty() {
217 let results = cancel_executor(cancels.clone()).await;
218 for (cancel, result) in cancels.into_iter().zip(results) {
219 let _ = cancel.response_tx.send(result);
220 }
221 }
222 }
223
224 _ = self.shutdown_rx.recv() => {
225 break;
227 }
228 }
229 }
230 }
231}
232
233impl OrderRequest {
234 pub fn is_alo(&self) -> bool {
236 match &self.order_type {
237 crate::types::requests::OrderType::Limit(limit) => {
238 limit.tif.to_lowercase() == "alo"
239 }
240 _ => false,
241 }
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use super::*;
248 use crate::types::requests::{Limit, OrderType};
249
250 #[tokio::test]
251 async fn test_order_batching() {
252 let config = BatchConfig::default();
253 let (batcher, _handle) = OrderBatcher::new(config);
254
255 let order = OrderRequest {
257 asset: 0,
258 is_buy: true,
259 limit_px: "50000".to_string(),
260 sz: "0.1".to_string(),
261 reduce_only: false,
262 order_type: OrderType::Limit(Limit {
263 tif: "Gtc".to_string(),
264 }),
265 cloid: None,
266 };
267
268 let handle = batcher.add_order(order, 123456789).await;
270
271 assert!(matches!(handle, OrderHandle::Pending { .. }));
273 }
274}