sentinel_core/core/flow/
rule.rs

1use crate::{base::SentinelRule, logging, system_metric, Error};
2use serde::{Deserialize, Serialize};
3use serde_json;
4use std::fmt;
5use std::hash::{Hash, Hasher};
6cfg_k8s! {
7    use schemars::JsonSchema;
8    use kube::CustomResource;
9}
10
11pub type Id = String;
12
13/// RelationStrategy indicates the flow control strategy based on the relation of invocations.
14#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
15#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
16pub enum RelationStrategy {
17    /// Current means flow control by current resource directly.
18    Current,
19    /// Associated means flow control by the associated resource rather than current resource.
20    Associated,
21}
22
23impl Default for RelationStrategy {
24    fn default() -> RelationStrategy {
25        RelationStrategy::Current
26    }
27}
28
29#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
30#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
31pub enum CalculateStrategy {
32    Direct,
33    WarmUp,
34    MemoryAdaptive,
35    #[serde(skip)]
36    Custom(u8),
37}
38
39impl Default for CalculateStrategy {
40    fn default() -> CalculateStrategy {
41        CalculateStrategy::Direct
42    }
43}
44
45#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
46#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
47pub enum ControlStrategy {
48    Reject,
49    /// Throttling indicates that pending requests will be throttled,
50    /// wait in queue (until free capacity is available)
51    Throttling,
52    #[serde(skip)]
53    Custom(u8),
54}
55
56impl Default for ControlStrategy {
57    fn default() -> ControlStrategy {
58        ControlStrategy::Reject
59    }
60}
61
62#[cfg_attr(
63    feature = "ds_k8s",
64    derive(CustomResource, JsonSchema),
65    kube(
66        group = "rust.datasource.sentinel.io",
67        version = "v1alpha1",
68        kind = "FlowResource",
69        namespaced
70    )
71)]
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(default)]
74/// Rule describes the strategy of flow control, the flow control strategy is based on QPS statistic metric
75pub struct Rule {
76    /// `id` represents the unique ID of the rule (optional).
77    pub id: Id,
78    /// `resource` represents the resource name.
79    pub resource: String,
80    pub ref_resource: String,
81    pub calculate_strategy: CalculateStrategy,
82    pub control_strategy: ControlStrategy,
83    pub relation_strategy: RelationStrategy,
84    /// `threshold` means the threshold during stat_interval_ms
85    /// If `stat_interval_ms` is 1000(1 second), `threshold` means QPS
86    pub threshold: f64,
87    pub warm_up_period_sec: u32,
88    pub warm_up_cold_factor: u32,
89    /// `max_queueing_time_ms` only takes effect when `control_strategy` is Throttling.
90    /// When `max_queueing_time_ms` is 0, it means Throttling only controls interval of requests,
91    /// and requests exceeding the threshold will be rejected directly.
92    pub max_queueing_time_ms: u32,
93    /// stat_interval_ms indicates the statistic interval and it's the optional setting for flow Rule.
94    /// If user doesn't set stat_interval_ms, that means using default metric statistic of resource.
95    /// If the stat_interval_ms user specifies can not reuse the global statistic of resource,
96    /// sentinel will generate independent statistic structure for this self.
97    pub stat_interval_ms: u32,
98
99    /// adaptive flow control algorithm related parameters
100    /// limitation: low_mem_usage_threshold > high_mem_usage_threshold && mem_high_water_mark > mem_low_water_mark
101    /// - if the current memory usage is less than or equals to mem_low_water_mark, threshold == low_mem_usage_threshold
102    /// - if the current memory usage is more than or equals to mem_high_water_mark, threshold == high_mem_usage_threshold
103    /// - if the current memory usage is in (mem_low_water_mark, mem_high_water_mark), threshold is in (high_mem_usage_threshold, low_mem_usage_threshold)
104    pub low_mem_usage_threshold: u64,
105    pub high_mem_usage_threshold: u64,
106    pub mem_low_water_mark: u64,
107    pub mem_high_water_mark: u64,
108}
109
110impl Hash for Rule {
111    fn hash<H: Hasher>(&self, state: &mut H) {
112        self.id.hash(state);
113        self.resource.hash(state);
114        self.ref_resource.hash(state);
115    }
116}
117
118impl Default for Rule {
119    fn default() -> Self {
120        Rule {
121            #[cfg(target_arch = "wasm32")]
122            id: String::new(),
123            #[cfg(not(target_arch = "wasm32"))]
124            id: uuid::Uuid::new_v4().to_string(),
125            resource: String::default(),
126            ref_resource: String::default(),
127            calculate_strategy: CalculateStrategy::default(),
128            control_strategy: ControlStrategy::default(),
129            relation_strategy: RelationStrategy::default(),
130            threshold: 0.0,
131            warm_up_period_sec: 0,
132            warm_up_cold_factor: 0,
133            max_queueing_time_ms: 0,
134            stat_interval_ms: 0,
135            low_mem_usage_threshold: 0,
136            high_mem_usage_threshold: 0,
137            mem_low_water_mark: 0,
138            mem_high_water_mark: 0,
139        }
140    }
141}
142
143impl Rule {
144    pub fn is_stat_reusable(&self, other: &Self) -> bool {
145        self.resource == other.resource
146            && self.relation_strategy == other.relation_strategy
147            && self.ref_resource == other.ref_resource
148            && self.stat_interval_ms == other.stat_interval_ms
149            && self.need_statistic()
150            && other.need_statistic()
151    }
152
153    pub fn need_statistic(&self) -> bool {
154        return self.calculate_strategy == CalculateStrategy::WarmUp
155            || self.control_strategy == ControlStrategy::Reject;
156    }
157}
158
159impl SentinelRule for Rule {
160    fn resource_name(&self) -> String {
161        self.resource.clone()
162    }
163
164    fn is_valid(&self) -> crate::Result<()> {
165        if self.resource.len() == 0 {
166            return Err(Error::msg("empty resource name"));
167        }
168        if self.threshold < 0.0 {
169            return Err(Error::msg("negative threshold"));
170        }
171        if self.relation_strategy == RelationStrategy::Associated && self.ref_resource.len() == 0 {
172            return Err(Error::msg("ref_resource must be non empty when relation_strategy is RelationStrategy::Associated"));
173        }
174        if self.calculate_strategy == CalculateStrategy::WarmUp {
175            if self.warm_up_period_sec == 0 {
176                return Err(Error::msg("warm_up_period_sec must be great than 0"));
177            }
178            if self.warm_up_cold_factor == 1 {
179                return Err(Error::msg("warm_up_cold_factor must be great than 1"));
180            }
181        }
182        if self.stat_interval_ms > 10 * 60 * 1000 {
183            logging::info!(
184                "stat_interval_ms is great than 10 minutes, less than 10 minutes is recommended."
185            )
186        }
187        if self.calculate_strategy == CalculateStrategy::MemoryAdaptive {
188            if self.mem_low_water_mark == 0
189                || self.mem_high_water_mark == 0
190                || self.high_mem_usage_threshold == 0
191                || self.low_mem_usage_threshold == 0
192            {
193                return Err(Error::msg(
194                    "memory water mark or usage threshold setting to 0",
195                ));
196            }
197            if self.high_mem_usage_threshold >= self.low_mem_usage_threshold {
198                return Err(Error::msg(
199                    "self.high_mem_usage_threshold >= self.low_mem_usage_threshold",
200                ));
201            }
202            if self.mem_high_water_mark > system_metric::get_total_memory_size() {
203                return Err(Error::msg("self.mem_high_water_mark should not be greater than current system's total memory size"));
204            }
205            if self.mem_low_water_mark >= self.mem_high_water_mark {
206                // can not be equal to defeat from zero overflow
207                return Err(Error::msg(
208                    "self.mem_low_water_mark >= self.mem_high_water_mark",
209                ));
210            }
211        }
212        Ok(())
213    }
214}
215
216impl PartialEq for Rule {
217    fn eq(&self, other: &Self) -> bool {
218        // todo: discuss under different strategies
219        self.resource == other.resource
220            && self.ref_resource == other.ref_resource
221            && self.calculate_strategy == other.calculate_strategy
222            && self.control_strategy == other.control_strategy
223            && self.relation_strategy == other.relation_strategy
224            && self.threshold == other.threshold
225            && self.warm_up_period_sec == other.warm_up_period_sec
226            && self.warm_up_cold_factor == other.warm_up_cold_factor
227            && self.max_queueing_time_ms == other.max_queueing_time_ms
228            && self.stat_interval_ms == other.stat_interval_ms
229            && self.low_mem_usage_threshold == other.low_mem_usage_threshold
230            && self.high_mem_usage_threshold == other.high_mem_usage_threshold
231            && self.mem_low_water_mark == other.mem_low_water_mark
232            && self.mem_high_water_mark == other.mem_high_water_mark
233    }
234}
235
236impl Eq for Rule {}
237
238impl fmt::Display for Rule {
239    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240        let fmtted = serde_json::to_string_pretty(self).unwrap();
241        write!(f, "{}", fmtted)
242    }
243}
244
245#[cfg(test)]
246mod test {
247    use super::*;
248
249    #[test]
250    fn need_statistic() {
251        // need
252        let r1 = Rule {
253            resource: "abc1".into(),
254            threshold: 100.0,
255            relation_strategy: RelationStrategy::Current,
256            calculate_strategy: CalculateStrategy::Direct,
257            control_strategy: ControlStrategy::Reject,
258            stat_interval_ms: 1000,
259            ..Default::default()
260        };
261        // no need
262        let r2 = Rule {
263            resource: "abc1".into(),
264            threshold: 200.0,
265            relation_strategy: RelationStrategy::Current,
266            calculate_strategy: CalculateStrategy::Direct,
267            control_strategy: ControlStrategy::Throttling,
268            max_queueing_time_ms: 10,
269            stat_interval_ms: 2000,
270            ..Default::default()
271        };
272        // need
273        let r3 = Rule {
274            resource: "abc1".into(),
275            threshold: 300.0,
276            relation_strategy: RelationStrategy::Current,
277            calculate_strategy: CalculateStrategy::WarmUp,
278            control_strategy: ControlStrategy::Reject,
279            max_queueing_time_ms: 10,
280            stat_interval_ms: 5000,
281            ..Default::default()
282        };
283        // need
284        let r4 = Rule {
285            resource: "abc1".into(),
286            threshold: 400.0,
287            relation_strategy: RelationStrategy::Current,
288            calculate_strategy: CalculateStrategy::WarmUp,
289            control_strategy: ControlStrategy::Throttling,
290            max_queueing_time_ms: 10,
291            stat_interval_ms: 50000,
292            ..Default::default()
293        };
294
295        assert!(r1.need_statistic());
296        assert!(!r2.need_statistic());
297        assert!(r3.need_statistic());
298        assert!(r4.need_statistic());
299    }
300
301    #[test]
302    fn is_stat_reusable() {
303        // Not same resource
304        let r11 = Rule {
305            resource: "abc1".into(),
306            threshold: 100.0,
307            relation_strategy: RelationStrategy::Current,
308            calculate_strategy: CalculateStrategy::Direct,
309            control_strategy: ControlStrategy::Reject,
310            stat_interval_ms: 1000,
311            ..Default::default()
312        };
313        let r12 = Rule {
314            resource: "abc2".into(),
315            threshold: 100.0,
316            relation_strategy: RelationStrategy::Current,
317            calculate_strategy: CalculateStrategy::Direct,
318            control_strategy: ControlStrategy::Reject,
319            stat_interval_ms: 1000,
320            ..Default::default()
321        };
322        assert!(!r11.is_stat_reusable(&r12));
323
324        // Not same relation strategy
325        let r21 = Rule {
326            resource: "abc1".into(),
327            threshold: 100.0,
328            relation_strategy: RelationStrategy::Current,
329            calculate_strategy: CalculateStrategy::Direct,
330            control_strategy: ControlStrategy::Reject,
331            stat_interval_ms: 1000,
332            ..Default::default()
333        };
334        let r22 = Rule {
335            resource: "abc1".into(),
336            threshold: 100.0,
337            relation_strategy: RelationStrategy::Associated,
338            ref_resource: "abc3".into(),
339            calculate_strategy: CalculateStrategy::Direct,
340            control_strategy: ControlStrategy::Reject,
341            stat_interval_ms: 1000,
342            ..Default::default()
343        };
344        assert!(!r21.is_stat_reusable(&r22));
345
346        // Not same ref resource
347        let r31 = Rule {
348            resource: "abc1".into(),
349            threshold: 100.0,
350            relation_strategy: RelationStrategy::Associated,
351            ref_resource: "abc3".into(),
352            calculate_strategy: CalculateStrategy::Direct,
353            control_strategy: ControlStrategy::Reject,
354            stat_interval_ms: 1000,
355            ..Default::default()
356        };
357        let r32 = Rule {
358            resource: "abc1".into(),
359            threshold: 100.0,
360            relation_strategy: RelationStrategy::Associated,
361            ref_resource: "abc4".into(),
362            calculate_strategy: CalculateStrategy::Direct,
363            control_strategy: ControlStrategy::Reject,
364            stat_interval_ms: 1000,
365            ..Default::default()
366        };
367        assert!(!r31.is_stat_reusable(&r32));
368
369        // Not same stat interval
370        let r41 = Rule {
371            resource: "abc1".into(),
372            threshold: 100.0,
373            relation_strategy: RelationStrategy::Current,
374            calculate_strategy: CalculateStrategy::Direct,
375            control_strategy: ControlStrategy::Reject,
376            stat_interval_ms: 1000,
377            ..Default::default()
378        };
379        let r42 = Rule {
380            resource: "abc1".into(),
381            threshold: 100.0,
382            relation_strategy: RelationStrategy::Current,
383            calculate_strategy: CalculateStrategy::Direct,
384            control_strategy: ControlStrategy::Reject,
385            stat_interval_ms: 2000,
386            ..Default::default()
387        };
388        assert!(!r41.is_stat_reusable(&r42));
389
390        // Not both need stat
391        let r51 = Rule {
392            resource: "abc1".into(),
393            threshold: 100.0,
394            relation_strategy: RelationStrategy::Current,
395            calculate_strategy: CalculateStrategy::Direct,
396            control_strategy: ControlStrategy::Reject,
397            stat_interval_ms: 1000,
398            ..Default::default()
399        };
400        let r52 = Rule {
401            resource: "abc1".into(),
402            threshold: 100.0,
403            relation_strategy: RelationStrategy::Current,
404            calculate_strategy: CalculateStrategy::Direct,
405            control_strategy: ControlStrategy::Throttling,
406            stat_interval_ms: 1000,
407            ..Default::default()
408        };
409        assert!(!r51.is_stat_reusable(&r52));
410
411        // Not same threshold
412        let r61 = Rule {
413            resource: "abc1".into(),
414            threshold: 100.0,
415            relation_strategy: RelationStrategy::Current,
416            calculate_strategy: CalculateStrategy::Direct,
417            control_strategy: ControlStrategy::Reject,
418            stat_interval_ms: 1000,
419            ..Default::default()
420        };
421        let r62 = Rule {
422            resource: "abc1".into(),
423            threshold: 200.0,
424            relation_strategy: RelationStrategy::Current,
425            calculate_strategy: CalculateStrategy::Direct,
426            control_strategy: ControlStrategy::Reject,
427            stat_interval_ms: 1000,
428            ..Default::default()
429        };
430        assert!(r61.is_stat_reusable(&r62));
431    }
432
433    #[test]
434    fn is_valid_flow_rule1() {
435        let bad_rule1 = Rule {
436            threshold: 1.0,
437            resource: "".into(),
438            ..Default::default()
439        };
440        let bad_rule2 = Rule {
441            threshold: -1.9,
442            resource: "test".into(),
443            ..Default::default()
444        };
445        let bad_rule3 = Rule {
446            threshold: 5.0,
447            resource: "test".into(),
448            calculate_strategy: CalculateStrategy::WarmUp,
449            control_strategy: ControlStrategy::Reject,
450            ..Default::default()
451        };
452        let bad_rule4 = Rule {
453            threshold: 5.0,
454            resource: "test".into(),
455            calculate_strategy: CalculateStrategy::WarmUp,
456            control_strategy: ControlStrategy::Reject,
457            stat_interval_ms: 6000000,
458            ..Default::default()
459        };
460
461        let good_rule1 = Rule {
462            threshold: 10.0,
463            resource: "test".into(),
464            calculate_strategy: CalculateStrategy::WarmUp,
465            control_strategy: ControlStrategy::Throttling,
466            warm_up_period_sec: 10,
467            max_queueing_time_ms: 10,
468            stat_interval_ms: 1000,
469            ..Default::default()
470        };
471        let good_rule2 = Rule {
472            threshold: 10.0,
473            resource: "test".into(),
474            calculate_strategy: CalculateStrategy::WarmUp,
475            control_strategy: ControlStrategy::Throttling,
476            warm_up_period_sec: 10,
477            max_queueing_time_ms: 0,
478            stat_interval_ms: 1000,
479            ..Default::default()
480        };
481
482        assert!(bad_rule1.is_valid().is_err());
483        assert!(bad_rule2.is_valid().is_err());
484        assert!(bad_rule3.is_valid().is_err());
485        assert!(bad_rule4.is_valid().is_err());
486
487        assert!(good_rule1.is_valid().is_ok());
488        assert!(good_rule2.is_valid().is_ok());
489    }
490
491    #[test]
492    fn is_valid_flow_rule2() {
493        let mut rule = Rule {
494            resource: "hello0".into(),
495            calculate_strategy: CalculateStrategy::MemoryAdaptive,
496            control_strategy: ControlStrategy::Reject,
497            stat_interval_ms: 10,
498            low_mem_usage_threshold: 2,
499            high_mem_usage_threshold: 1,
500            mem_low_water_mark: 1,
501            mem_high_water_mark: 2,
502            ..Default::default()
503        };
504        assert!(rule.is_valid().is_ok());
505
506        rule.low_mem_usage_threshold = 9;
507        rule.high_mem_usage_threshold = 9;
508        assert!(rule.is_valid().is_err());
509        rule.low_mem_usage_threshold = 10;
510        assert!(rule.is_valid().is_ok());
511
512        rule.mem_low_water_mark = 0;
513        assert!(rule.is_valid().is_err());
514
515        rule.mem_low_water_mark = 100 * 1024;
516        rule.mem_high_water_mark = 300 * 1024;
517        assert!(rule.is_valid().is_ok());
518
519        rule.mem_high_water_mark = 0;
520        assert!(rule.is_valid().is_err());
521
522        rule.mem_high_water_mark = 300 * 1024;
523        assert!(rule.is_valid().is_ok());
524
525        rule.mem_low_water_mark = 100 * 1024;
526        rule.mem_high_water_mark = 30 * 1024;
527        assert!(rule.is_valid().is_err());
528
529        rule.mem_high_water_mark = 300 * 1024;
530        assert!(rule.is_valid().is_ok());
531    }
532}