sentinel_core/core/hotspot/
rule.rs

1use crate::{
2    base::{ParamKey, SentinelRule},
3    Error,
4};
5use serde::{Deserialize, Serialize};
6use serde_json;
7use std::collections::HashMap;
8use std::fmt::{self, Debug};
9use std::hash::{Hash, Hasher};
10cfg_k8s! {
11    use schemars::JsonSchema;
12    use kube::CustomResource;
13}
14
15/// ControlStrategy indicates the traffic shaping strategy.
16#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
17#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
18pub enum ControlStrategy {
19    Reject,
20    Throttling,
21    #[serde(skip)]
22    Custom(u8),
23}
24
25impl Default for ControlStrategy {
26    fn default() -> Self {
27        ControlStrategy::Reject
28    }
29}
30
31// MetricType represents the target metric type.
32#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
33#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
34pub enum MetricType {
35    /// Concurrency represents concurrency count.
36    Concurrency,
37    /// QPS represents request count per second.
38    QPS,
39}
40
41impl Default for MetricType {
42    fn default() -> Self {
43        MetricType::Concurrency
44    }
45}
46
47/// Rule represents the hotspot(frequent) parameter flow control rule
48#[cfg_attr(
49    feature = "ds_k8s",
50    derive(CustomResource, JsonSchema),
51    kube(
52        group = "rust.datasource.sentinel.io",
53        version = "v1alpha1",
54        kind = "HotspotResource",
55        namespaced
56    )
57)]
58#[derive(Debug, Clone, Serialize, Deserialize)]
59#[serde(default)]
60pub struct Rule {
61    /// `id` is the unique id
62    pub id: String,
63    /// `resource` is the resource name
64    pub resource: String,
65    /// `metric_type` indicates the metric type for checking logic.
66    /// For Concurrency metric, hotspot module will check the each hot parameter's concurrency,
67    ///		if concurrency exceeds the threshold, reject the traffic directly.
68    /// For QPS metric, hotspot module will check the each hot parameter's QPS,
69    ///		the `control_strategy` decides the behavior of traffic shaping controller
70    pub metric_type: MetricType,
71    /// `control_strategy` indicates the traffic shaping behaviour.
72    /// `control_strategy` only takes effect when `metric_type` is QPS
73    pub control_strategy: ControlStrategy,
74    /// ``param_index`` is the index in context arguments slice.
75    /// `param_index` means the <`param_index`>-th parameter
76    pub param_index: isize,
77    /// `param_key` is the key in EntryContext.Input.Attachments map.
78    /// `param_key` can be used as a supplement to `param_index` to facilitate rules to quickly obtain parameter from a large number of parameters
79    /// `param_key` is mutually exclusive with `param_index`, `param_key` has the higher priority than `param_index`
80    pub param_key: String,
81    /// threshold is the threshold to trigger rejection
82    pub threshold: u64,
83    /// max_queueing_time_ms only takes effect when `control_strategy` is `Throttling` and `metric_type` is `QPS`
84    pub max_queueing_time_ms: u64,
85    /// `burst_count` is the silent count
86    /// `burst_count` only takes effect when `control_strategy` is `Reject` and `metric_type` is `QPS`
87    pub burst_count: u64,
88    /// `duration_in_sec` is the time interval in statistic
89    /// `duration_in_sec` only takes effect when `metric_type` is QPS
90    pub duration_in_sec: u64,
91    /// `params_max_capacity` is the max capacity of cache statistic
92    pub params_max_capacity: usize,
93    /// `specific_items` indicates the special threshold for specific value
94    pub specific_items: HashMap<ParamKey, u64>,
95}
96
97impl Default for Rule {
98    fn default() -> Self {
99        Rule {
100            #[cfg(target_arch = "wasm32")]
101            id: String::new(),
102            #[cfg(not(target_arch = "wasm32"))]
103            id: uuid::Uuid::new_v4().to_string(),
104            resource: String::default(),
105            metric_type: MetricType::default(),
106            control_strategy: ControlStrategy::default(),
107            param_index: 0,
108            param_key: String::default(),
109            threshold: 0,
110            max_queueing_time_ms: 0,
111            burst_count: 0,
112            duration_in_sec: 0,
113            params_max_capacity: 0,
114            specific_items: HashMap::default(),
115        }
116    }
117}
118
119impl Rule {
120    pub fn is_stat_reusable(&self, other: &Self) -> bool {
121        self.resource == other.resource
122            && self.control_strategy == other.control_strategy
123            && self.params_max_capacity == other.params_max_capacity
124            && self.duration_in_sec == other.duration_in_sec
125            && self.metric_type == other.metric_type
126    }
127}
128
129impl Eq for Rule {}
130
131impl Hash for Rule {
132    fn hash<H: Hasher>(&self, state: &mut H) {
133        self.id.hash(state);
134        self.resource.hash(state);
135    }
136}
137
138impl SentinelRule for Rule {
139    fn resource_name(&self) -> String {
140        self.resource.clone()
141    }
142
143    fn is_valid(&self) -> crate::Result<()> {
144        if self.resource.len() == 0 {
145            return Err(Error::msg("empty resource name"));
146        }
147        if self.metric_type == MetricType::QPS && self.duration_in_sec == 0 {
148            return Err(Error::msg("invalid duration"));
149        }
150        if self.param_index > 0 && self.param_key.len() != 0 {
151            return Err(Error::msg(
152                "param index and param key are mutually exclusive",
153            ));
154        }
155        Ok(())
156    }
157}
158
159impl PartialEq for Rule {
160    fn eq(&self, other: &Self) -> bool {
161        self.resource == other.resource
162            && self.metric_type == other.metric_type
163            && self.control_strategy == other.control_strategy
164            && self.params_max_capacity == other.params_max_capacity
165            && self.param_index == other.param_index
166            && self.param_key == other.param_key
167            && self.threshold == other.threshold
168            && self.duration_in_sec == other.duration_in_sec
169            && self.specific_items == other.specific_items
170            && ((self.control_strategy == ControlStrategy::Reject
171                && self.burst_count == other.burst_count)
172                || (self.control_strategy == ControlStrategy::Throttling
173                    && self.max_queueing_time_ms == other.max_queueing_time_ms))
174    }
175}
176
177impl fmt::Display for Rule {
178    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
179        let fmtted = serde_json::to_string_pretty(self).unwrap();
180        write!(f, "{}", fmtted)
181    }
182}
183
184#[cfg(test)]
185mod test {
186    use super::*;
187
188    #[test]
189    #[should_panic(expected = "empty resource name")]
190    fn invalid_name() {
191        let rule = Rule::default();
192        rule.is_valid().unwrap();
193    }
194
195    #[test]
196    #[should_panic(expected = "invalid duration")]
197    fn invalid_duration() {
198        let rule = Rule {
199            resource: "name".into(),
200            metric_type: MetricType::QPS,
201            ..Default::default()
202        };
203        rule.is_valid().unwrap();
204    }
205
206    #[test]
207    #[should_panic(expected = "param index and param key are mutually exclusive")]
208    fn invalid_param() {
209        let rule = Rule {
210            resource: "abc".into(),
211            metric_type: MetricType::QPS,
212            control_strategy: ControlStrategy::Reject,
213            duration_in_sec: 1,
214            param_index: 10,
215            param_key: "test2".into(),
216            ..Default::default()
217        };
218        rule.is_valid().unwrap();
219    }
220
221    #[test]
222    fn test_eq() {
223        let mut specific_items: HashMap<ParamKey, u64> = HashMap::new();
224        specific_items.insert("sss".into(), 1);
225        specific_items.insert("1123".into(), 3);
226        let rule1 = Rule {
227            id: "abc".into(),
228            resource: "abc".into(),
229            metric_type: MetricType::Concurrency,
230            control_strategy: ControlStrategy::Reject,
231            param_index: 0,
232            param_key: "key".into(),
233            threshold: 110,
234            max_queueing_time_ms: 5,
235            burst_count: 10,
236            duration_in_sec: 1,
237            params_max_capacity: 10000,
238            specific_items: specific_items.clone(),
239            ..Default::default()
240        };
241        let rule2 = Rule {
242            id: "abc".into(),
243            resource: "abc".into(),
244            metric_type: MetricType::Concurrency,
245            control_strategy: ControlStrategy::Reject,
246            param_index: 0,
247            param_key: "key".into(),
248            threshold: 110,
249            max_queueing_time_ms: 5,
250            burst_count: 10,
251            duration_in_sec: 1,
252            params_max_capacity: 10000,
253            specific_items,
254            ..Default::default()
255        };
256        assert_eq!(rule1, rule2);
257    }
258}