1use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26
27use rust_decimal::Decimal;
28use tokio::sync::Mutex;
29
30use crate::error::KrakenError;
31use crate::rate_limit::{
32 KeyedRateLimiter, OrderTrackingInfo, RateLimitConfig, SlidingWindow, TradingRateLimiter,
33};
34use crate::spot::rest::private::{
35 AddOrderRequest, AddOrderResponse, AllocationStatus, CancelOrderRequest, CancelOrderResponse,
36 ClosedOrders, ClosedOrdersRequest, ConfirmationRefId, DepositAddress, DepositAddressesRequest,
37 DepositMethod, DepositMethodsRequest, DepositStatusRequest, DepositWithdrawStatusResponse,
38 EarnAllocationStatusRequest, EarnAllocateRequest, EarnAllocations, EarnAllocationsRequest,
39 EarnStrategies, EarnStrategiesRequest, ExtendedBalances, LedgersInfo, LedgersRequest,
40 OpenOrders, OpenOrdersRequest, OpenPositionsRequest, Order, Position, QueryOrdersRequest,
41 TradeBalance, TradeBalanceRequest, TradeVolume, TradeVolumeRequest, TradesHistory,
42 TradesHistoryRequest, WalletTransferRequest, WebSocketToken, WithdrawAddressesRequest,
43 WithdrawCancelRequest, WithdrawInfo, WithdrawInfoRequest, WithdrawMethod,
44 WithdrawMethodsRequest, WithdrawRequest, WithdrawStatusRequest, WithdrawalAddress,
45};
46use crate::spot::rest::public::{
47 AssetInfo, AssetInfoRequest, AssetPair, AssetPairsRequest, OhlcRequest, OhlcResponse, OrderBook,
48 OrderBookRequest, RecentSpreadsRequest, RecentSpreadsResponse, RecentTradesRequest,
49 RecentTradesResponse, ServerTime, SystemStatus, TickerInfo,
50};
51use crate::spot::rest::KrakenClient;
52use crate::types::VerificationTier;
53
54pub struct RateLimitedClient<C> {
74 inner: C,
75 config: RateLimitConfig,
76 public_limiter: Arc<Mutex<SlidingWindow>>,
78 private_limiter: Arc<Mutex<PrivateRateLimiter>>,
80 trading_limiter: Arc<Mutex<TradingRateLimiter>>,
82 orderbook_limiter: Arc<Mutex<KeyedRateLimiter<String>>>,
84}
85
86impl<C> RateLimitedClient<C> {
87 pub fn new(inner: C, config: RateLimitConfig) -> Self {
89 let (max_counter, decay_rate) = config.tier.rate_limit_params();
90
91 Self {
92 inner,
93 config: config.clone(),
94 public_limiter: Arc::new(Mutex::new(SlidingWindow::new(
96 Duration::from_secs(1),
97 1,
98 ))),
99 private_limiter: Arc::new(Mutex::new(PrivateRateLimiter::new(
100 max_counter,
101 decay_rate,
102 ))),
103 trading_limiter: Arc::new(Mutex::new(TradingRateLimiter::new(
104 max_counter,
105 decay_rate,
106 ))),
107 orderbook_limiter: Arc::new(Mutex::new(KeyedRateLimiter::new(
109 Duration::from_secs(1),
110 1,
111 ))),
112 }
113 }
114
115 pub fn with_tier(inner: C, tier: VerificationTier) -> Self {
117 Self::new(
118 inner,
119 RateLimitConfig {
120 tier,
121 enabled: true,
122 },
123 )
124 }
125
126 pub fn inner(&self) -> &C {
128 &self.inner
129 }
130
131 pub fn config(&self) -> &RateLimitConfig {
133 &self.config
134 }
135
136 pub fn set_enabled(&mut self, enabled: bool) {
138 self.config.enabled = enabled;
139 }
140
141 async fn wait_public(&self) -> Result<(), KrakenError> {
143 if !self.config.enabled {
144 return Ok(());
145 }
146
147 loop {
148 let mut limiter = self.public_limiter.lock().await;
149 match limiter.try_acquire() {
150 Ok(()) => return Ok(()),
151 Err(wait_time) => {
152 drop(limiter);
153 tokio::time::sleep(wait_time).await;
154 }
155 }
156 }
157 }
158
159 async fn wait_private(&self) -> Result<(), KrakenError> {
161 if !self.config.enabled {
162 return Ok(());
163 }
164
165 loop {
166 let mut limiter = self.private_limiter.lock().await;
167 match limiter.try_acquire() {
168 Ok(()) => return Ok(()),
169 Err(wait_time) => {
170 drop(limiter);
171 tokio::time::sleep(wait_time).await;
172 }
173 }
174 }
175 }
176
177 async fn wait_orderbook(&self, pair: &str) -> Result<(), KrakenError> {
179 if !self.config.enabled {
180 return Ok(());
181 }
182
183 loop {
184 let mut limiter = self.orderbook_limiter.lock().await;
185 match limiter.try_acquire(pair.to_string()) {
186 Ok(()) => return Ok(()),
187 Err(wait_time) => {
188 drop(limiter);
189 tokio::time::sleep(wait_time).await;
190 }
191 }
192 }
193 }
194
195 async fn wait_trading_order(
197 &self,
198 order_id: &str,
199 pair: &str,
200 ) -> Result<(), KrakenError> {
201 if !self.config.enabled {
202 return Ok(());
203 }
204
205 loop {
206 let mut limiter = self.trading_limiter.lock().await;
207 let info = OrderTrackingInfo::new(pair);
208 match limiter.try_place_order(order_id, info) {
209 Ok(()) => return Ok(()),
210 Err(wait_time) => {
211 drop(limiter);
212 tokio::time::sleep(wait_time).await;
213 }
214 }
215 }
216 }
217
218 async fn wait_trading_cancel(&self, order_id: &str) -> Result<(), KrakenError> {
220 if !self.config.enabled {
221 return Ok(());
222 }
223
224 loop {
225 let mut limiter = self.trading_limiter.lock().await;
226 match limiter.try_cancel_order(order_id) {
227 Ok(_penalty) => return Ok(()),
228 Err(wait_time) => {
229 drop(limiter);
230 tokio::time::sleep(wait_time).await;
231 }
232 }
233 }
234 }
235}
236
237impl<C: std::fmt::Debug> std::fmt::Debug for RateLimitedClient<C> {
238 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239 f.debug_struct("RateLimitedClient")
240 .field("inner", &self.inner)
241 .field("config", &self.config)
242 .finish()
243 }
244}
245
246impl<C: Clone> Clone for RateLimitedClient<C> {
247 fn clone(&self) -> Self {
248 Self {
249 inner: self.inner.clone(),
250 config: self.config.clone(),
251 public_limiter: self.public_limiter.clone(),
252 private_limiter: self.private_limiter.clone(),
253 trading_limiter: self.trading_limiter.clone(),
254 orderbook_limiter: self.orderbook_limiter.clone(),
255 }
256 }
257}
258
259#[derive(Debug)]
261struct PrivateRateLimiter {
262 counter: i64,
264 max_counter: i64,
266 decay_rate: i64,
268 last_update: std::time::Instant,
270}
271
272impl PrivateRateLimiter {
273 fn new(max_counter: u32, decay_rate_per_sec: f64) -> Self {
274 Self {
275 counter: 0,
276 max_counter: (max_counter as i64) * 100,
277 decay_rate: (decay_rate_per_sec * 100.0) as i64,
278 last_update: std::time::Instant::now(),
279 }
280 }
281
282 fn update(&mut self) {
283 let elapsed = self.last_update.elapsed();
284 let elapsed_secs = elapsed.as_secs_f64();
285 let decay = (elapsed_secs * self.decay_rate as f64) as i64;
286 self.counter = (self.counter - decay).max(0);
287 self.last_update = std::time::Instant::now();
288 }
289
290 fn try_acquire(&mut self) -> Result<(), Duration> {
291 self.update();
292
293 let cost = 100;
295
296 if self.counter + cost <= self.max_counter {
297 self.counter += cost;
298 Ok(())
299 } else {
300 let excess = self.counter + cost - self.max_counter;
301 let wait_secs = excess as f64 / self.decay_rate as f64;
302 Err(Duration::from_secs_f64(wait_secs))
303 }
304 }
305}
306
307
308impl<C: KrakenClient> KrakenClient for RateLimitedClient<C> {
312 async fn get_server_time(&self) -> Result<ServerTime, KrakenError> {
315 self.wait_public().await?;
316 self.inner.get_server_time().await
317 }
318
319 async fn get_system_status(&self) -> Result<SystemStatus, KrakenError> {
320 self.wait_public().await?;
321 self.inner.get_system_status().await
322 }
323
324 async fn get_assets(
325 &self,
326 request: Option<&AssetInfoRequest>,
327 ) -> Result<HashMap<String, AssetInfo>, KrakenError> {
328 self.wait_public().await?;
329 self.inner.get_assets(request).await
330 }
331
332 async fn get_asset_pairs(
333 &self,
334 request: Option<&AssetPairsRequest>,
335 ) -> Result<HashMap<String, AssetPair>, KrakenError> {
336 self.wait_public().await?;
337 self.inner.get_asset_pairs(request).await
338 }
339
340 async fn get_ticker(&self, pairs: &str) -> Result<HashMap<String, TickerInfo>, KrakenError> {
341 self.wait_public().await?;
342 self.inner.get_ticker(pairs).await
343 }
344
345 async fn get_ohlc(&self, request: &OhlcRequest) -> Result<OhlcResponse, KrakenError> {
346 self.wait_public().await?;
347 self.inner.get_ohlc(request).await
348 }
349
350 async fn get_order_book(
351 &self,
352 request: &OrderBookRequest,
353 ) -> Result<HashMap<String, OrderBook>, KrakenError> {
354 self.wait_orderbook(&request.pair).await?;
356 self.inner.get_order_book(request).await
357 }
358
359 async fn get_recent_trades(
360 &self,
361 request: &RecentTradesRequest,
362 ) -> Result<RecentTradesResponse, KrakenError> {
363 self.wait_public().await?;
364 self.inner.get_recent_trades(request).await
365 }
366
367 async fn get_recent_spreads(
368 &self,
369 request: &RecentSpreadsRequest,
370 ) -> Result<RecentSpreadsResponse, KrakenError> {
371 self.wait_public().await?;
372 self.inner.get_recent_spreads(request).await
373 }
374
375 async fn get_account_balance(&self) -> Result<HashMap<String, Decimal>, KrakenError> {
378 self.wait_private().await?;
379 self.inner.get_account_balance().await
380 }
381
382 async fn get_extended_balance(&self) -> Result<ExtendedBalances, KrakenError> {
383 self.wait_private().await?;
384 self.inner.get_extended_balance().await
385 }
386
387 async fn get_trade_balance(
388 &self,
389 request: Option<&TradeBalanceRequest>,
390 ) -> Result<TradeBalance, KrakenError> {
391 self.wait_private().await?;
392 self.inner.get_trade_balance(request).await
393 }
394
395 async fn get_open_orders(
396 &self,
397 request: Option<&OpenOrdersRequest>,
398 ) -> Result<OpenOrders, KrakenError> {
399 self.wait_private().await?;
400 self.inner.get_open_orders(request).await
401 }
402
403 async fn get_closed_orders(
404 &self,
405 request: Option<&ClosedOrdersRequest>,
406 ) -> Result<ClosedOrders, KrakenError> {
407 self.wait_private().await?;
408 self.inner.get_closed_orders(request).await
409 }
410
411 async fn query_orders(
412 &self,
413 request: &QueryOrdersRequest,
414 ) -> Result<HashMap<String, Order>, KrakenError> {
415 self.wait_private().await?;
416 self.inner.query_orders(request).await
417 }
418
419 async fn get_trades_history(
420 &self,
421 request: Option<&TradesHistoryRequest>,
422 ) -> Result<TradesHistory, KrakenError> {
423 self.wait_private().await?;
424 self.inner.get_trades_history(request).await
425 }
426
427 async fn get_open_positions(
428 &self,
429 request: Option<&OpenPositionsRequest>,
430 ) -> Result<HashMap<String, Position>, KrakenError> {
431 self.wait_private().await?;
432 self.inner.get_open_positions(request).await
433 }
434
435 async fn get_ledgers(
436 &self,
437 request: Option<&LedgersRequest>,
438 ) -> Result<LedgersInfo, KrakenError> {
439 self.wait_private().await?;
440 self.inner.get_ledgers(request).await
441 }
442
443 async fn get_trade_volume(
444 &self,
445 request: Option<&TradeVolumeRequest>,
446 ) -> Result<TradeVolume, KrakenError> {
447 self.wait_private().await?;
448 self.inner.get_trade_volume(request).await
449 }
450
451 async fn get_deposit_methods(
454 &self,
455 request: &DepositMethodsRequest,
456 ) -> Result<Vec<DepositMethod>, KrakenError> {
457 self.wait_private().await?;
458 self.inner.get_deposit_methods(request).await
459 }
460
461 async fn get_deposit_addresses(
462 &self,
463 request: &DepositAddressesRequest,
464 ) -> Result<Vec<DepositAddress>, KrakenError> {
465 self.wait_private().await?;
466 self.inner.get_deposit_addresses(request).await
467 }
468
469 async fn get_deposit_status(
470 &self,
471 request: Option<&DepositStatusRequest>,
472 ) -> Result<DepositWithdrawStatusResponse, KrakenError> {
473 self.wait_private().await?;
474 self.inner.get_deposit_status(request).await
475 }
476
477 async fn get_withdraw_methods(
478 &self,
479 request: Option<&WithdrawMethodsRequest>,
480 ) -> Result<Vec<WithdrawMethod>, KrakenError> {
481 self.wait_private().await?;
482 self.inner.get_withdraw_methods(request).await
483 }
484
485 async fn get_withdraw_addresses(
486 &self,
487 request: Option<&WithdrawAddressesRequest>,
488 ) -> Result<Vec<WithdrawalAddress>, KrakenError> {
489 self.wait_private().await?;
490 self.inner.get_withdraw_addresses(request).await
491 }
492
493 async fn get_withdraw_info(
494 &self,
495 request: &WithdrawInfoRequest,
496 ) -> Result<WithdrawInfo, KrakenError> {
497 self.wait_private().await?;
498 self.inner.get_withdraw_info(request).await
499 }
500
501 async fn withdraw_funds(
502 &self,
503 request: &WithdrawRequest,
504 ) -> Result<ConfirmationRefId, KrakenError> {
505 self.wait_private().await?;
506 self.inner.withdraw_funds(request).await
507 }
508
509 async fn get_withdraw_status(
510 &self,
511 request: Option<&WithdrawStatusRequest>,
512 ) -> Result<DepositWithdrawStatusResponse, KrakenError> {
513 self.wait_private().await?;
514 self.inner.get_withdraw_status(request).await
515 }
516
517 async fn withdraw_cancel(&self, request: &WithdrawCancelRequest) -> Result<bool, KrakenError> {
518 self.wait_private().await?;
519 self.inner.withdraw_cancel(request).await
520 }
521
522 async fn wallet_transfer(
523 &self,
524 request: &WalletTransferRequest,
525 ) -> Result<ConfirmationRefId, KrakenError> {
526 self.wait_private().await?;
527 self.inner.wallet_transfer(request).await
528 }
529
530 async fn earn_allocate(&self, request: &EarnAllocateRequest) -> Result<bool, KrakenError> {
533 self.wait_private().await?;
534 self.inner.earn_allocate(request).await
535 }
536
537 async fn earn_deallocate(&self, request: &EarnAllocateRequest) -> Result<bool, KrakenError> {
538 self.wait_private().await?;
539 self.inner.earn_deallocate(request).await
540 }
541
542 async fn get_earn_allocation_status(
543 &self,
544 request: &EarnAllocationStatusRequest,
545 ) -> Result<AllocationStatus, KrakenError> {
546 self.wait_private().await?;
547 self.inner.get_earn_allocation_status(request).await
548 }
549
550 async fn get_earn_deallocation_status(
551 &self,
552 request: &EarnAllocationStatusRequest,
553 ) -> Result<AllocationStatus, KrakenError> {
554 self.wait_private().await?;
555 self.inner.get_earn_deallocation_status(request).await
556 }
557
558 async fn list_earn_strategies(
559 &self,
560 request: Option<&EarnStrategiesRequest>,
561 ) -> Result<EarnStrategies, KrakenError> {
562 self.wait_private().await?;
563 self.inner.list_earn_strategies(request).await
564 }
565
566 async fn list_earn_allocations(
567 &self,
568 request: Option<&EarnAllocationsRequest>,
569 ) -> Result<EarnAllocations, KrakenError> {
570 self.wait_private().await?;
571 self.inner.list_earn_allocations(request).await
572 }
573
574 async fn add_order(&self, request: &AddOrderRequest) -> Result<AddOrderResponse, KrakenError> {
577 let temp_id = format!("pending_{}", std::time::SystemTime::now()
580 .duration_since(std::time::UNIX_EPOCH)
581 .unwrap_or_default()
582 .as_nanos());
583
584 self.wait_trading_order(&temp_id, &request.pair).await?;
585 let result = self.inner.add_order(request).await?;
586
587 if let Some(order_id) = result.txid.as_ref().and_then(|ids| ids.first()) {
589 let mut limiter = self.trading_limiter.lock().await;
590 limiter.track_order(order_id.to_string(), OrderTrackingInfo::new(&request.pair));
591 }
592
593 Ok(result)
594 }
595
596 async fn cancel_order(
597 &self,
598 request: &CancelOrderRequest,
599 ) -> Result<CancelOrderResponse, KrakenError> {
600 self.wait_trading_cancel(&request.txid).await?;
602 self.inner.cancel_order(request).await
603 }
604
605 async fn cancel_all_orders(&self) -> Result<CancelOrderResponse, KrakenError> {
606 self.wait_private().await?;
608 self.inner.cancel_all_orders().await
609 }
610
611 async fn get_websocket_token(&self) -> Result<WebSocketToken, KrakenError> {
614 self.wait_private().await?;
615 self.inner.get_websocket_token().await
616 }
617}
618
619#[cfg(test)]
620mod tests {
621 use super::*;
622
623 #[test]
624 fn test_private_rate_limiter_allows_initial_requests() {
625 let mut limiter = PrivateRateLimiter::new(20, 1.0);
626
627 for _ in 0..15 {
629 assert!(limiter.try_acquire().is_ok());
630 }
631 }
632
633 #[test]
634 fn test_private_rate_limiter_blocks_when_full() {
635 let mut limiter = PrivateRateLimiter::new(20, 1.0);
636
637 for _ in 0..20 {
639 limiter.try_acquire().ok();
640 }
641
642 assert!(limiter.try_acquire().is_err());
644 }
645
646 #[test]
647 fn test_private_rate_limiter_decay() {
648 let mut limiter = PrivateRateLimiter::new(20, 100.0); for _ in 0..10 {
652 limiter.try_acquire().ok();
653 }
654
655 std::thread::sleep(Duration::from_millis(150));
657
658 limiter.update();
660 assert!(limiter.counter < 1000); }
662}