1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
use crate::models::SlackTeamId;
use crate::ratectl::*;
use std::collections::{BinaryHeap, HashMap};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct SlackRateThrottler {
pub config: SlackApiRateControlConfig,
global_max_rate_limit_counter: Option<ThrottlingCounter>,
global_all_team_special_limits: HashMap<SlackApiRateControlSpecialLimitKey, ThrottlingCounter>,
rate_limit_per_team: HashMap<SlackTeamId, SlackTeamLimits>,
}
impl SlackRateThrottler {
pub fn new(rate_control_config: SlackApiRateControlConfig) -> Self {
let global_max_rate_limit_counter = rate_control_config
.global_max_rate_limit
.clone()
.map(|rl| rl.to_throttling_counter());
Self {
config: rate_control_config,
global_max_rate_limit_counter,
global_all_team_special_limits: HashMap::new(),
rate_limit_per_team: HashMap::new(),
}
}
pub fn calc_throttle_delay(
&mut self,
method_rate_ctl: &SlackApiMethodRateControlConfig,
team_id: Option<SlackTeamId>,
min_delayed: Option<Duration>,
) -> Option<Duration> {
let mut delays_heap: BinaryHeap<Duration> = BinaryHeap::new();
let now = Instant::now();
min_delayed
.iter()
.filter(|d| !d.is_zero())
.for_each(|d| delays_heap.push(*d));
self.global_max_rate_limit_counter
.as_ref()
.map(|c| c.update(now))
.into_iter()
.for_each(|updated_counter| {
if !updated_counter.delay().is_zero() {
delays_heap.push(*updated_counter.delay())
}
self.global_max_rate_limit_counter = Some(updated_counter);
});
match team_id {
Some(team_id) => {
let team_limits = self
.rate_limit_per_team
.entry(team_id)
.or_insert_with(|| SlackTeamLimits::new(&self.config));
team_limits.updated = now;
team_limits
.team_limit_counter
.as_ref()
.map(|c| c.update(now))
.into_iter()
.for_each(|updated_counter| {
if !updated_counter.delay().is_zero() {
delays_heap.push(*updated_counter.delay())
}
team_limits.team_limit_counter = Some(updated_counter);
});
if let Some(ref special_rate_limit) = method_rate_ctl.special_rate_limit {
let special_team_limit = team_limits
.special_limits
.entry(special_rate_limit.key.clone())
.or_insert_with(|| special_rate_limit.limit.to_throttling_counter());
*special_team_limit = special_team_limit.update(now);
if !special_team_limit.delay().is_zero() {
delays_heap.push(*special_team_limit.delay())
}
}
if let Some(ref tier) = method_rate_ctl.tier {
if let Some(tier_limit) = self.config.tiers_limits.get(tier) {
let tier_team_limit = team_limits
.tier_limits
.entry(tier.clone())
.or_insert_with(|| tier_limit.to_throttling_counter());
*tier_team_limit = tier_team_limit.update(now);
if !tier_team_limit.delay().is_zero() {
delays_heap.push(*tier_team_limit.delay())
}
}
}
self.rate_limit_per_team
.retain(|_, v| v.updated.duration_since(now).as_secs() < 3600);
}
None => {
if let Some(ref special_method_limits) = method_rate_ctl.special_rate_limit {
let special_team_limit = self
.global_all_team_special_limits
.entry(special_method_limits.key.clone())
.or_insert_with(|| special_method_limits.limit.to_throttling_counter());
*special_team_limit = special_team_limit.update(now);
if !special_team_limit.delay().is_zero() {
delays_heap.push(*special_team_limit.delay())
}
}
}
}
delays_heap.pop()
}
}