Skip to main content

kraken_api_client/rate_limit/
client.rs

1//! Rate-limited REST client wrapper.
2//!
3//! Provides a wrapper around any [`KrakenClient`] implementation that automatically
4//! handles rate limiting based on Kraken's tier-based rate limit system.
5//!
6//! # Example
7//!
8//! ```rust,ignore
9//! use kraken_api_client::spot::rest::SpotRestClient;
10//! use kraken_api_client::rate_limit::{RateLimitedClient, RateLimitConfig};
11//! use kraken_api_client::types::VerificationTier;
12//!
13//! let client = SpotRestClient::new();
14//! let rate_limited = RateLimitedClient::new(client, RateLimitConfig {
15//!     tier: VerificationTier::Intermediate,
16//!     enabled: true,
17//! });
18//!
19//! // All requests will be automatically rate limited
20//! let time = rate_limited.get_server_time().await?;
21//! ```
22
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::time::Duration;
26
27use rust_decimal::Decimal;
28use tokio::sync::Mutex;
29
30use crate::error::KrakenError;
31use crate::rate_limit::{
32    KeyedRateLimiter, OrderTrackingInfo, RateLimitConfig, SlidingWindow, TradingRateLimiter,
33};
34use crate::spot::rest::private::{
35    AddOrderRequest, AddOrderResponse, AllocationStatus, CancelOrderRequest, CancelOrderResponse,
36    ClosedOrders, ClosedOrdersRequest, ConfirmationRefId, DepositAddress, DepositAddressesRequest,
37    DepositMethod, DepositMethodsRequest, DepositStatusRequest, DepositWithdrawStatusResponse,
38    EarnAllocationStatusRequest, EarnAllocateRequest, EarnAllocations, EarnAllocationsRequest,
39    EarnStrategies, EarnStrategiesRequest, ExtendedBalances, LedgersInfo, LedgersRequest,
40    OpenOrders, OpenOrdersRequest, OpenPositionsRequest, Order, Position, QueryOrdersRequest,
41    TradeBalance, TradeBalanceRequest, TradeVolume, TradeVolumeRequest, TradesHistory,
42    TradesHistoryRequest, WalletTransferRequest, WebSocketToken, WithdrawAddressesRequest,
43    WithdrawCancelRequest, WithdrawInfo, WithdrawInfoRequest, WithdrawMethod,
44    WithdrawMethodsRequest, WithdrawRequest, WithdrawStatusRequest, WithdrawalAddress,
45};
46use crate::spot::rest::public::{
47    AssetInfo, AssetInfoRequest, AssetPair, AssetPairsRequest, OhlcRequest, OhlcResponse, OrderBook,
48    OrderBookRequest, RecentSpreadsRequest, RecentSpreadsResponse, RecentTradesRequest,
49    RecentTradesResponse, ServerTime, SystemStatus, TickerInfo,
50};
51use crate::spot::rest::KrakenClient;
52use crate::types::VerificationTier;
53
54/// A rate-limited wrapper around any [`KrakenClient`] implementation.
55///
56/// This wrapper automatically handles:
57/// - Public endpoint rate limits (sliding window)
58/// - Private endpoint rate limits (token bucket, tier-based)
59/// - Trading rate limits with order lifetime penalties
60///
61/// # Example
62///
63/// ```rust,ignore
64/// use kraken_api_client::spot::rest::SpotRestClient;
65/// use kraken_api_client::rate_limit::{RateLimitedClient, RateLimitConfig};
66///
67/// let client = SpotRestClient::new();
68/// let rate_limited = RateLimitedClient::new(client, RateLimitConfig::default());
69///
70/// // Requests are automatically rate limited
71/// let time = rate_limited.get_server_time().await?;
72/// ```
73pub struct RateLimitedClient<C> {
74    inner: C,
75    config: RateLimitConfig,
76    /// Public endpoint rate limiter (sliding window)
77    public_limiter: Arc<Mutex<SlidingWindow>>,
78    /// Private endpoint rate limiter (token bucket)
79    private_limiter: Arc<Mutex<PrivateRateLimiter>>,
80    /// Trading rate limiter with order penalties
81    trading_limiter: Arc<Mutex<TradingRateLimiter>>,
82    /// Per-pair rate limiter for order book requests
83    orderbook_limiter: Arc<Mutex<KeyedRateLimiter<String>>>,
84}
85
86impl<C> RateLimitedClient<C> {
87    /// Create a new rate-limited client wrapper.
88    pub fn new(inner: C, config: RateLimitConfig) -> Self {
89        let (max_counter, decay_rate) = config.tier.rate_limit_params();
90
91        Self {
92            inner,
93            config: config.clone(),
94            // Public: 1 request per second per endpoint
95            public_limiter: Arc::new(Mutex::new(SlidingWindow::new(
96                Duration::from_secs(1),
97                1,
98            ))),
99            private_limiter: Arc::new(Mutex::new(PrivateRateLimiter::new(
100                max_counter,
101                decay_rate,
102            ))),
103            trading_limiter: Arc::new(Mutex::new(TradingRateLimiter::new(
104                max_counter,
105                decay_rate,
106            ))),
107            // Order book: 1 request per second per pair
108            orderbook_limiter: Arc::new(Mutex::new(KeyedRateLimiter::new(
109                Duration::from_secs(1),
110                1,
111            ))),
112        }
113    }
114
115    /// Create a new rate-limited client with a specific verification tier.
116    pub fn with_tier(inner: C, tier: VerificationTier) -> Self {
117        Self::new(
118            inner,
119            RateLimitConfig {
120                tier,
121                enabled: true,
122            },
123        )
124    }
125
126    /// Get a reference to the inner client.
127    pub fn inner(&self) -> &C {
128        &self.inner
129    }
130
131    /// Get the current configuration.
132    pub fn config(&self) -> &RateLimitConfig {
133        &self.config
134    }
135
136    /// Enable or disable rate limiting.
137    pub fn set_enabled(&mut self, enabled: bool) {
138        self.config.enabled = enabled;
139    }
140
141    /// Wait for the public rate limiter.
142    async fn wait_public(&self) -> Result<(), KrakenError> {
143        if !self.config.enabled {
144            return Ok(());
145        }
146
147        loop {
148            let mut limiter = self.public_limiter.lock().await;
149            match limiter.try_acquire() {
150                Ok(()) => return Ok(()),
151                Err(wait_time) => {
152                    drop(limiter);
153                    tokio::time::sleep(wait_time).await;
154                }
155            }
156        }
157    }
158
159    /// Wait for the private rate limiter.
160    async fn wait_private(&self) -> Result<(), KrakenError> {
161        if !self.config.enabled {
162            return Ok(());
163        }
164
165        loop {
166            let mut limiter = self.private_limiter.lock().await;
167            match limiter.try_acquire() {
168                Ok(()) => return Ok(()),
169                Err(wait_time) => {
170                    drop(limiter);
171                    tokio::time::sleep(wait_time).await;
172                }
173            }
174        }
175    }
176
177    /// Wait for the order book rate limiter (per-pair).
178    async fn wait_orderbook(&self, pair: &str) -> Result<(), KrakenError> {
179        if !self.config.enabled {
180            return Ok(());
181        }
182
183        loop {
184            let mut limiter = self.orderbook_limiter.lock().await;
185            match limiter.try_acquire(pair.to_string()) {
186                Ok(()) => return Ok(()),
187                Err(wait_time) => {
188                    drop(limiter);
189                    tokio::time::sleep(wait_time).await;
190                }
191            }
192        }
193    }
194
195    /// Wait for the trading rate limiter (order placement).
196    async fn wait_trading_order(
197        &self,
198        order_id: &str,
199        pair: &str,
200    ) -> Result<(), KrakenError> {
201        if !self.config.enabled {
202            return Ok(());
203        }
204
205        loop {
206            let mut limiter = self.trading_limiter.lock().await;
207            let info = OrderTrackingInfo::new(pair);
208            match limiter.try_place_order(order_id, info) {
209                Ok(()) => return Ok(()),
210                Err(wait_time) => {
211                    drop(limiter);
212                    tokio::time::sleep(wait_time).await;
213                }
214            }
215        }
216    }
217
218    /// Wait for the trading rate limiter (order cancellation).
219    async fn wait_trading_cancel(&self, order_id: &str) -> Result<(), KrakenError> {
220        if !self.config.enabled {
221            return Ok(());
222        }
223
224        loop {
225            let mut limiter = self.trading_limiter.lock().await;
226            match limiter.try_cancel_order(order_id) {
227                Ok(_penalty) => return Ok(()),
228                Err(wait_time) => {
229                    drop(limiter);
230                    tokio::time::sleep(wait_time).await;
231                }
232            }
233        }
234    }
235}
236
237impl<C: std::fmt::Debug> std::fmt::Debug for RateLimitedClient<C> {
238    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
239        f.debug_struct("RateLimitedClient")
240            .field("inner", &self.inner)
241            .field("config", &self.config)
242            .finish()
243    }
244}
245
246impl<C: Clone> Clone for RateLimitedClient<C> {
247    fn clone(&self) -> Self {
248        Self {
249            inner: self.inner.clone(),
250            config: self.config.clone(),
251            public_limiter: self.public_limiter.clone(),
252            private_limiter: self.private_limiter.clone(),
253            trading_limiter: self.trading_limiter.clone(),
254            orderbook_limiter: self.orderbook_limiter.clone(),
255        }
256    }
257}
258
259/// Private endpoint rate limiter using token bucket algorithm.
260#[derive(Debug)]
261struct PrivateRateLimiter {
262    /// Current counter (scaled 100x for precision)
263    counter: i64,
264    /// Maximum counter (scaled 100x)
265    max_counter: i64,
266    /// Decay rate per second (scaled 100x)
267    decay_rate: i64,
268    /// Last update timestamp
269    last_update: std::time::Instant,
270}
271
272impl PrivateRateLimiter {
273    fn new(max_counter: u32, decay_rate_per_sec: f64) -> Self {
274        Self {
275            counter: 0,
276            max_counter: (max_counter as i64) * 100,
277            decay_rate: (decay_rate_per_sec * 100.0) as i64,
278            last_update: std::time::Instant::now(),
279        }
280    }
281
282    fn update(&mut self) {
283        let elapsed = self.last_update.elapsed();
284        let elapsed_secs = elapsed.as_secs_f64();
285        let decay = (elapsed_secs * self.decay_rate as f64) as i64;
286        self.counter = (self.counter - decay).max(0);
287        self.last_update = std::time::Instant::now();
288    }
289
290    fn try_acquire(&mut self) -> Result<(), Duration> {
291        self.update();
292
293        // Most private endpoints cost 1 point
294        let cost = 100;
295
296        if self.counter + cost <= self.max_counter {
297            self.counter += cost;
298            Ok(())
299        } else {
300            let excess = self.counter + cost - self.max_counter;
301            let wait_secs = excess as f64 / self.decay_rate as f64;
302            Err(Duration::from_secs_f64(wait_secs))
303        }
304    }
305}
306
307
308// KrakenClient Trait Implementation
309
310
311impl<C: KrakenClient> KrakenClient for RateLimitedClient<C> {
312    // ========== Public Endpoints ==========
313
314    async fn get_server_time(&self) -> Result<ServerTime, KrakenError> {
315        self.wait_public().await?;
316        self.inner.get_server_time().await
317    }
318
319    async fn get_system_status(&self) -> Result<SystemStatus, KrakenError> {
320        self.wait_public().await?;
321        self.inner.get_system_status().await
322    }
323
324    async fn get_assets(
325        &self,
326        request: Option<&AssetInfoRequest>,
327    ) -> Result<HashMap<String, AssetInfo>, KrakenError> {
328        self.wait_public().await?;
329        self.inner.get_assets(request).await
330    }
331
332    async fn get_asset_pairs(
333        &self,
334        request: Option<&AssetPairsRequest>,
335    ) -> Result<HashMap<String, AssetPair>, KrakenError> {
336        self.wait_public().await?;
337        self.inner.get_asset_pairs(request).await
338    }
339
340    async fn get_ticker(&self, pairs: &str) -> Result<HashMap<String, TickerInfo>, KrakenError> {
341        self.wait_public().await?;
342        self.inner.get_ticker(pairs).await
343    }
344
345    async fn get_ohlc(&self, request: &OhlcRequest) -> Result<OhlcResponse, KrakenError> {
346        self.wait_public().await?;
347        self.inner.get_ohlc(request).await
348    }
349
350    async fn get_order_book(
351        &self,
352        request: &OrderBookRequest,
353    ) -> Result<HashMap<String, OrderBook>, KrakenError> {
354        // Order book has per-pair rate limiting
355        self.wait_orderbook(&request.pair).await?;
356        self.inner.get_order_book(request).await
357    }
358
359    async fn get_recent_trades(
360        &self,
361        request: &RecentTradesRequest,
362    ) -> Result<RecentTradesResponse, KrakenError> {
363        self.wait_public().await?;
364        self.inner.get_recent_trades(request).await
365    }
366
367    async fn get_recent_spreads(
368        &self,
369        request: &RecentSpreadsRequest,
370    ) -> Result<RecentSpreadsResponse, KrakenError> {
371        self.wait_public().await?;
372        self.inner.get_recent_spreads(request).await
373    }
374
375    // ========== Private Endpoints - Account ==========
376
377    async fn get_account_balance(&self) -> Result<HashMap<String, Decimal>, KrakenError> {
378        self.wait_private().await?;
379        self.inner.get_account_balance().await
380    }
381
382    async fn get_extended_balance(&self) -> Result<ExtendedBalances, KrakenError> {
383        self.wait_private().await?;
384        self.inner.get_extended_balance().await
385    }
386
387    async fn get_trade_balance(
388        &self,
389        request: Option<&TradeBalanceRequest>,
390    ) -> Result<TradeBalance, KrakenError> {
391        self.wait_private().await?;
392        self.inner.get_trade_balance(request).await
393    }
394
395    async fn get_open_orders(
396        &self,
397        request: Option<&OpenOrdersRequest>,
398    ) -> Result<OpenOrders, KrakenError> {
399        self.wait_private().await?;
400        self.inner.get_open_orders(request).await
401    }
402
403    async fn get_closed_orders(
404        &self,
405        request: Option<&ClosedOrdersRequest>,
406    ) -> Result<ClosedOrders, KrakenError> {
407        self.wait_private().await?;
408        self.inner.get_closed_orders(request).await
409    }
410
411    async fn query_orders(
412        &self,
413        request: &QueryOrdersRequest,
414    ) -> Result<HashMap<String, Order>, KrakenError> {
415        self.wait_private().await?;
416        self.inner.query_orders(request).await
417    }
418
419    async fn get_trades_history(
420        &self,
421        request: Option<&TradesHistoryRequest>,
422    ) -> Result<TradesHistory, KrakenError> {
423        self.wait_private().await?;
424        self.inner.get_trades_history(request).await
425    }
426
427    async fn get_open_positions(
428        &self,
429        request: Option<&OpenPositionsRequest>,
430    ) -> Result<HashMap<String, Position>, KrakenError> {
431        self.wait_private().await?;
432        self.inner.get_open_positions(request).await
433    }
434
435    async fn get_ledgers(
436        &self,
437        request: Option<&LedgersRequest>,
438    ) -> Result<LedgersInfo, KrakenError> {
439        self.wait_private().await?;
440        self.inner.get_ledgers(request).await
441    }
442
443    async fn get_trade_volume(
444        &self,
445        request: Option<&TradeVolumeRequest>,
446    ) -> Result<TradeVolume, KrakenError> {
447        self.wait_private().await?;
448        self.inner.get_trade_volume(request).await
449    }
450
451    // ========== Private Endpoints - Funding ==========
452
453    async fn get_deposit_methods(
454        &self,
455        request: &DepositMethodsRequest,
456    ) -> Result<Vec<DepositMethod>, KrakenError> {
457        self.wait_private().await?;
458        self.inner.get_deposit_methods(request).await
459    }
460
461    async fn get_deposit_addresses(
462        &self,
463        request: &DepositAddressesRequest,
464    ) -> Result<Vec<DepositAddress>, KrakenError> {
465        self.wait_private().await?;
466        self.inner.get_deposit_addresses(request).await
467    }
468
469    async fn get_deposit_status(
470        &self,
471        request: Option<&DepositStatusRequest>,
472    ) -> Result<DepositWithdrawStatusResponse, KrakenError> {
473        self.wait_private().await?;
474        self.inner.get_deposit_status(request).await
475    }
476
477    async fn get_withdraw_methods(
478        &self,
479        request: Option<&WithdrawMethodsRequest>,
480    ) -> Result<Vec<WithdrawMethod>, KrakenError> {
481        self.wait_private().await?;
482        self.inner.get_withdraw_methods(request).await
483    }
484
485    async fn get_withdraw_addresses(
486        &self,
487        request: Option<&WithdrawAddressesRequest>,
488    ) -> Result<Vec<WithdrawalAddress>, KrakenError> {
489        self.wait_private().await?;
490        self.inner.get_withdraw_addresses(request).await
491    }
492
493    async fn get_withdraw_info(
494        &self,
495        request: &WithdrawInfoRequest,
496    ) -> Result<WithdrawInfo, KrakenError> {
497        self.wait_private().await?;
498        self.inner.get_withdraw_info(request).await
499    }
500
501    async fn withdraw_funds(
502        &self,
503        request: &WithdrawRequest,
504    ) -> Result<ConfirmationRefId, KrakenError> {
505        self.wait_private().await?;
506        self.inner.withdraw_funds(request).await
507    }
508
509    async fn get_withdraw_status(
510        &self,
511        request: Option<&WithdrawStatusRequest>,
512    ) -> Result<DepositWithdrawStatusResponse, KrakenError> {
513        self.wait_private().await?;
514        self.inner.get_withdraw_status(request).await
515    }
516
517    async fn withdraw_cancel(&self, request: &WithdrawCancelRequest) -> Result<bool, KrakenError> {
518        self.wait_private().await?;
519        self.inner.withdraw_cancel(request).await
520    }
521
522    async fn wallet_transfer(
523        &self,
524        request: &WalletTransferRequest,
525    ) -> Result<ConfirmationRefId, KrakenError> {
526        self.wait_private().await?;
527        self.inner.wallet_transfer(request).await
528    }
529
530    // ========== Private Endpoints - Earn ==========
531
532    async fn earn_allocate(&self, request: &EarnAllocateRequest) -> Result<bool, KrakenError> {
533        self.wait_private().await?;
534        self.inner.earn_allocate(request).await
535    }
536
537    async fn earn_deallocate(&self, request: &EarnAllocateRequest) -> Result<bool, KrakenError> {
538        self.wait_private().await?;
539        self.inner.earn_deallocate(request).await
540    }
541
542    async fn get_earn_allocation_status(
543        &self,
544        request: &EarnAllocationStatusRequest,
545    ) -> Result<AllocationStatus, KrakenError> {
546        self.wait_private().await?;
547        self.inner.get_earn_allocation_status(request).await
548    }
549
550    async fn get_earn_deallocation_status(
551        &self,
552        request: &EarnAllocationStatusRequest,
553    ) -> Result<AllocationStatus, KrakenError> {
554        self.wait_private().await?;
555        self.inner.get_earn_deallocation_status(request).await
556    }
557
558    async fn list_earn_strategies(
559        &self,
560        request: Option<&EarnStrategiesRequest>,
561    ) -> Result<EarnStrategies, KrakenError> {
562        self.wait_private().await?;
563        self.inner.list_earn_strategies(request).await
564    }
565
566    async fn list_earn_allocations(
567        &self,
568        request: Option<&EarnAllocationsRequest>,
569    ) -> Result<EarnAllocations, KrakenError> {
570        self.wait_private().await?;
571        self.inner.list_earn_allocations(request).await
572    }
573
574    // ========== Private Endpoints - Trading ==========
575
576    async fn add_order(&self, request: &AddOrderRequest) -> Result<AddOrderResponse, KrakenError> {
577        // Trading operations use the trading rate limiter with order tracking
578        // For add_order, we generate a temporary ID (the real ID comes in the response)
579        let temp_id = format!("pending_{}", std::time::SystemTime::now()
580            .duration_since(std::time::UNIX_EPOCH)
581            .unwrap_or_default()
582            .as_nanos());
583
584        self.wait_trading_order(&temp_id, &request.pair).await?;
585        let result = self.inner.add_order(request).await?;
586
587        // Update the trading limiter with the real order ID
588        if let Some(order_id) = result.txid.as_ref().and_then(|ids| ids.first()) {
589            let mut limiter = self.trading_limiter.lock().await;
590            limiter.track_order(order_id.to_string(), OrderTrackingInfo::new(&request.pair));
591        }
592
593        Ok(result)
594    }
595
596    async fn cancel_order(
597        &self,
598        request: &CancelOrderRequest,
599    ) -> Result<CancelOrderResponse, KrakenError> {
600        // Apply cancellation penalty based on order age
601        self.wait_trading_cancel(&request.txid).await?;
602        self.inner.cancel_order(request).await
603    }
604
605    async fn cancel_all_orders(&self) -> Result<CancelOrderResponse, KrakenError> {
606        // Cancel all doesn't track individual orders
607        self.wait_private().await?;
608        self.inner.cancel_all_orders().await
609    }
610
611    // ========== Private Endpoints - WebSocket ==========
612
613    async fn get_websocket_token(&self) -> Result<WebSocketToken, KrakenError> {
614        self.wait_private().await?;
615        self.inner.get_websocket_token().await
616    }
617}
618
619#[cfg(test)]
620mod tests {
621    use super::*;
622
623    #[test]
624    fn test_private_rate_limiter_allows_initial_requests() {
625        let mut limiter = PrivateRateLimiter::new(20, 1.0);
626
627        // Should allow several requests before hitting limit
628        for _ in 0..15 {
629            assert!(limiter.try_acquire().is_ok());
630        }
631    }
632
633    #[test]
634    fn test_private_rate_limiter_blocks_when_full() {
635        let mut limiter = PrivateRateLimiter::new(20, 1.0);
636
637        // Fill up the limit
638        for _ in 0..20 {
639            limiter.try_acquire().ok();
640        }
641
642        // Next request should be blocked
643        assert!(limiter.try_acquire().is_err());
644    }
645
646    #[test]
647    fn test_private_rate_limiter_decay() {
648        let mut limiter = PrivateRateLimiter::new(20, 100.0); // High decay for testing
649
650        // Use some capacity
651        for _ in 0..10 {
652            limiter.try_acquire().ok();
653        }
654
655        // Wait for decay
656        std::thread::sleep(Duration::from_millis(150));
657
658        // Should have more capacity now
659        limiter.update();
660        assert!(limiter.counter < 1000); // Should have decayed significantly
661    }
662}