1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use crate::TimeStorage;
pub enum TokenAcquisition {
Acquired(f64),
// the zero time at which the bucket is empty
ZeroedAt(f64),
}
/// Primitive storage for a token bucket using a policy provided timestamp.
/// The timestamp represents the "origin time" at which the bucket would have
/// zero tokens.
///
/// Heavily inspired by folly's TokenBucket algorithm.
#[derive(Debug)]
pub struct TokenBucketStorage<S> {
inner: S,
}
impl<S: TimeStorage> TokenBucketStorage<S> {
/// Create a new storage object with the provided zero time.
pub fn new(zero_time: f64) -> Self {
Self {
inner: S::new(zero_time),
}
}
/// Reset the bucket to a given origin time.
///
/// The origin time is the time at which the token
pub fn reset(&self, zero_time: f64) {
self.inner.store(zero_time);
}
fn load_zero_time(&self) -> f64 {
self.inner.load()
}
/// Current token balance given a rate, burst size and current time.
pub fn balance(&self, rate: f64, burst_size: f64, now: f64) -> f64 {
debug_assert!(rate > 0.0);
debug_assert!(burst_size > 0.0);
let zt = self.load_zero_time();
((now - zt) * rate).min(burst_size)
}
/// The callback is given the currently available tokens and returns the
/// number it would like to consume. The function returns the amount
/// actually consumed.
pub fn consume<F>(&self, rate: f64, burst_size: f64, now: f64, f: F) -> f64
where
F: Fn(f64) -> f64,
{
debug_assert!(rate > 0.0);
debug_assert!(burst_size > 0.0);
let mut zero_time_old = self.load_zero_time();
loop {
let available = ((now - zero_time_old) * rate).min(burst_size);
let consumed = f(available);
if consumed == 0.0 {
return 0.0;
}
let tokens_new = available - consumed;
let zero_time_new = now - tokens_new / rate;
match self
.inner
.compare_exchange_weak(zero_time_old, zero_time_new)
{
Ok(()) => return consumed,
Err(actual_zero_time) => {
zero_time_old = actual_zero_time;
}
}
}
}
/// The callback is given the currently available tokens and returns the
/// number it would like to consume. The function returns the amount
/// actually consumed.
pub fn consume2<F>(&self, rate: f64, burst_size: f64, now: f64, f: F) -> TokenAcquisition
where
F: Fn(f64) -> f64,
{
debug_assert!(rate > 0.0);
debug_assert!(burst_size > 0.0);
let mut zero_time_old = self.load_zero_time();
loop {
let available = ((now - zero_time_old) * rate).min(burst_size);
let consumed = f(available);
if consumed == 0.0 {
return TokenAcquisition::ZeroedAt(zero_time_old);
}
let tokens_new = available - consumed;
let zero_time_new = now - tokens_new / rate;
match self
.inner
.compare_exchange_weak(zero_time_old, zero_time_new)
{
Ok(()) => return TokenAcquisition::Acquired(consumed),
Err(actual_zero_time) => {
zero_time_old = actual_zero_time;
}
}
}
}
/// Return tokens back to the bucket (tokens can be negative to borrow
/// from the future). The resulting zero time is returned in fractional seconds.
pub fn return_tokens(&self, tokens: f64, rate: f64) -> f64 {
debug_assert!(rate > 0.0);
let mut zero_time_old = self.load_zero_time();
loop {
let zero_time_new = zero_time_old - tokens / rate;
match self
.inner
.compare_exchange_weak(zero_time_old, zero_time_new)
{
Ok(()) => return zero_time_new,
Err(actual_zero_time) => {
zero_time_old = actual_zero_time;
}
}
}
}
/// Time when the bucket will have `target` tokens available.
pub fn time_when_bucket(&self, rate: f64, target: f64) -> f64 {
self.load_zero_time() + target / rate
}
}