sentinel_core/core/flow/
rule_manager.rs

1use super::*;
2use crate::{
3    core::{
4        base,
5        base::{nop_read_stat, nop_write_stat, ResourceType, SentinelRule, StatNode},
6        config, stat,
7        stat::ResourceNode,
8    },
9    logging, utils, Error, Result,
10};
11use lazy_static::lazy_static;
12use std::collections::{HashMap, HashSet};
13use std::hash::Hash;
14use std::sync::{Arc, Mutex, RwLock, Weak};
15
16/// ControllerGenfn represents the Traffic Controller generator function of a specific control behavior.
17pub type ControllerGenfn =
18    dyn Send + Sync + Fn(Arc<Rule>, Option<Arc<StandaloneStat>>) -> Result<Arc<Controller>>;
19
20#[derive(Hash, PartialEq, Eq)]
21pub struct ControllerGenKey {
22    calculate_strategy: CalculateStrategy,
23    control_strategy: ControlStrategy,
24}
25
26impl ControllerGenKey {
27    pub fn new(calculate_strategy: CalculateStrategy, control_strategy: ControlStrategy) -> Self {
28        ControllerGenKey {
29            calculate_strategy,
30            control_strategy,
31        }
32    }
33}
34
35/// ControllerMap represents the map storage for Controller.
36pub type ControllerMap = HashMap<String, Vec<Arc<Controller>>>;
37pub type RuleMap = HashMap<String, HashSet<Arc<Rule>>>;
38
39lazy_static! {
40    static ref GEN_FUN_MAP: RwLock<HashMap<ControllerGenKey, Box<ControllerGenfn>>> = {
41        let mut gen_fun_map: HashMap<ControllerGenKey, Box<ControllerGenfn>> = HashMap::new();
42
43        insert_flow_generator!(
44            gen_fun_map,
45            CalculateStrategy::Direct,
46            ControlStrategy::Reject,
47            DirectCalculator,
48            RejectChecker
49        );
50        insert_flow_generator!(
51            gen_fun_map,
52            CalculateStrategy::Direct,
53            ControlStrategy::Throttling,
54            DirectCalculator,
55            ThrottlingChecker
56        );
57        insert_flow_generator!(
58            gen_fun_map,
59            CalculateStrategy::WarmUp,
60            ControlStrategy::Reject,
61            WarmUpCalculator,
62            RejectChecker
63        );
64        insert_flow_generator!(
65            gen_fun_map,
66            CalculateStrategy::WarmUp,
67            ControlStrategy::Throttling,
68            WarmUpCalculator,
69            ThrottlingChecker
70        );
71        insert_flow_generator!(
72            gen_fun_map,
73            CalculateStrategy::MemoryAdaptive,
74            ControlStrategy::Reject,
75            MemoryAdaptiveCalculator,
76            RejectChecker
77        );
78        insert_flow_generator!(
79            gen_fun_map,
80            CalculateStrategy::MemoryAdaptive,
81            ControlStrategy::Throttling,
82            MemoryAdaptiveCalculator,
83            ThrottlingChecker
84        );
85
86        RwLock::new(gen_fun_map)
87    };
88    static ref CONTROLLER_MAP: Mutex<ControllerMap> = Mutex::new(HashMap::new());
89    static ref NOP_STAT: Arc<StandaloneStat> = Arc::new(StandaloneStat::new(
90        false,
91        nop_read_stat(),
92        Some(nop_write_stat())
93    ));
94    static ref RULE_MAP: Mutex<RuleMap> = Mutex::new(HashMap::new());
95}
96
97fn log_rule_update(map: &RuleMap) {
98    if map.len() == 0 {
99        logging::info!("[FlowRuleManager] Flow rules were cleared")
100    } else {
101        logging::info!(
102            "[FlowRuleManager] Flow rules were loaded: {:?}",
103            map.values()
104        )
105    }
106}
107
108/// different from
109pub fn append_rule(rule: Arc<Rule>) -> bool {
110    if RULE_MAP
111        .lock()
112        .unwrap()
113        .get(&rule.resource)
114        .unwrap_or(&HashSet::new())
115        .contains(&rule)
116    {
117        return false;
118    }
119    match rule.is_valid() {
120        Ok(_) => {
121            RULE_MAP
122                .lock()
123                .unwrap()
124                .entry(rule.resource.clone())
125                .or_insert(HashSet::new())
126                .insert(Arc::clone(&rule));
127        }
128        Err(err) => logging::warn!(
129            "[Flow load_rules] Ignoring invalid flow rule {:?}, reason: {:?}",
130            rule,
131            err
132        ),
133    }
134    let mut placeholder = Vec::new();
135    let new_tcs_of_res = build_resource_traffic_shaping_controller(
136        &rule.resource,
137        RULE_MAP.lock().unwrap().get(&rule.resource).unwrap(),
138        CONTROLLER_MAP
139            .lock()
140            .unwrap()
141            .get_mut(&rule.resource)
142            .unwrap_or(&mut placeholder),
143    );
144    if new_tcs_of_res.len() > 0 {
145        CONTROLLER_MAP
146            .lock()
147            .unwrap()
148            .entry(rule.resource.clone())
149            .or_insert(Vec::new())
150            .push(Arc::clone(&new_tcs_of_res[0]));
151    }
152    true
153}
154
155/// `load_rules` loads the given flow rules to the rule manager, while all previous rules will be replaced.
156/// The returned `bool` indicates whether do real load operation, if the rules is the same with previous rules, return false
157// This func acquires locks on global `RULE_MAP` and `CONTROLLER_MAP`,
158// please release your locks on them before calling this func
159pub fn load_rules(rules: Vec<Arc<Rule>>) -> bool {
160    let mut rule_map: RuleMap = HashMap::new();
161    // todo: validate rules here,
162    // neglect invalid rules,
163    // instead of dealing with them in
164    // `on_rule_update`
165    for rule in rules {
166        let entry = rule_map
167            .entry(rule.resource.clone())
168            .or_insert(HashSet::new());
169        entry.insert(rule);
170    }
171
172    let mut global_rule_map = RULE_MAP.lock().unwrap();
173    if &*global_rule_map == &rule_map {
174        logging::info!(
175            "[Flow] Load rules is the same with current rules, so ignore load operation."
176        );
177        return false;
178    }
179    // when rule_map is different with global one, update the global one
180    // ignore invalid rules
181    let mut valid_rules_map = HashMap::with_capacity(rule_map.len());
182    for (res, rules) in &rule_map {
183        let mut valid_rules = HashSet::new();
184        for rule in rules {
185            match rule.is_valid() {
186                Ok(_) => {
187                    valid_rules.insert(Arc::clone(&rule));
188                }
189                Err(err) => logging::warn!(
190                    "[Flow load_rules] Ignoring invalid flow rule {:?}, reason: {:?}",
191                    rule,
192                    err
193                ),
194            }
195        }
196        if valid_rules.len() > 0 {
197            valid_rules_map.insert(res.clone(), valid_rules);
198        }
199    }
200
201    let start = utils::curr_time_nanos();
202    let mut controller_map = CONTROLLER_MAP.lock().unwrap();
203    let mut valid_controller_map = HashMap::with_capacity(valid_rules_map.len());
204
205    // build controller_map according to valid rules
206    for (res, rules) in valid_rules_map.iter() {
207        let mut placeholder = Vec::new();
208        let new_tcs_of_res = build_resource_traffic_shaping_controller(
209            res,
210            rules,
211            controller_map.get_mut(res).unwrap_or(&mut placeholder),
212        );
213        if new_tcs_of_res.len() > 0 {
214            valid_controller_map.insert(res.clone(), new_tcs_of_res);
215        }
216    }
217    *controller_map = valid_controller_map;
218    *global_rule_map = rule_map;
219    drop(global_rule_map);
220    drop(controller_map);
221    logging::debug!(
222        "[Flow load_rules] Time statistic(ns) for updating flow rule, time cost {}",
223        utils::curr_time_nanos() - start
224    );
225    log_rule_update(&valid_rules_map);
226    return true;
227}
228
229/// `load_rules_of_resource` loads the given resource's flow rules to the rule manager, while all previous resource's rules will be replaced.
230/// The first returned value indicates whether do real load operation, if the rules is the same with previous resource's rules, return false
231// This func acquires locks on global `RULE_MAP` and `CONTROLLER_MAP`,
232// please release your locks on them before calling this func
233pub fn load_rules_of_resource(res: &String, rules: Vec<Arc<Rule>>) -> Result<bool> {
234    if res.len() == 0 {
235        return Err(Error::msg("empty resource"));
236    }
237    let rules: HashSet<_> = rules.into_iter().collect();
238    let mut global_rule_map = RULE_MAP.lock().unwrap();
239    let mut global_controller_map = CONTROLLER_MAP.lock().unwrap();
240    // clear resource rules
241    if rules.len() == 0 {
242        global_rule_map.remove(res);
243        global_controller_map.remove(res);
244        logging::info!("[Flow] clear resource level rules, resource {}", res);
245        return Ok(true);
246    }
247    // load resource level rules
248    if global_rule_map.get(res).unwrap_or(&HashSet::new()) == &rules {
249        logging::info!("[Flow] Load resource level rules is the same with current resource level rules, so ignore load operation.");
250        return Ok(false);
251    }
252
253    let mut valid_res_rules = HashSet::with_capacity(res.len());
254    for rule in &rules {
255        match rule.is_valid() {
256            Ok(_) => {
257                valid_res_rules.insert(Arc::clone(&rule));
258            }
259            Err(err) => logging::warn!(
260                "[Flow load_rules_of_resource] Ignoring invalid flow rule {:?}, reason: {:?}",
261                rule,
262                err
263            ),
264        }
265    }
266    // the `res` related rules changes, have to update
267    let start = utils::curr_time_nanos();
268    let mut placeholder = Vec::new();
269    let mut old_res_tcs = global_controller_map
270        .get_mut(res)
271        .unwrap_or(&mut placeholder);
272
273    let valid_res_rules_string = format!("{:?}", &valid_res_rules);
274    let new_res_tcs =
275        build_resource_traffic_shaping_controller(res, &valid_res_rules, &mut old_res_tcs);
276
277    if new_res_tcs.len() == 0 {
278        global_controller_map.remove(res);
279    } else {
280        global_controller_map.insert(res.clone(), new_res_tcs);
281    }
282
283    global_rule_map.insert(res.clone(), rules);
284    logging::debug!(
285        "[Flow load_rules_of_resource] Time statistic(ns) for updating flow rule, timeCost: {}",
286        utils::curr_time_nanos() - start
287    );
288    logging::info!(
289        "[Flow] load resource level rules, resource: {}, valid_res_rules: {}",
290        res,
291        valid_res_rules_string
292    );
293
294    Ok(true)
295}
296
297/// `get_rules` returns all the rules in `CONTROLLER_MAP`
298// This func acquires the locks on global `CONTROLLER_MAP`,
299// please release your lock on it before calling this func
300pub fn get_rules() -> Vec<Arc<Rule>> {
301    let mut rules = Vec::new();
302    let controller_map = CONTROLLER_MAP.lock().unwrap();
303    for (_, controllers) in controller_map.iter() {
304        for c in controllers {
305            rules.push(Arc::clone(c.rule()));
306        }
307    }
308    rules
309}
310
311/// `get_rules_of_resource` returns specific resource's rules
312// This func acquires the lock on global `CONTROLLER_MAP`,
313// please release your locks on them before calling this func
314pub fn get_rules_of_resource(res: &String) -> Vec<Arc<Rule>> {
315    let controller_map = CONTROLLER_MAP.lock().unwrap();
316    let placeholder = Vec::new();
317    let controllers = controller_map.get(res).unwrap_or(&placeholder);
318    let mut rules = Vec::with_capacity(controllers.len());
319    for c in controllers {
320        rules.push(Arc::clone(c.rule()));
321    }
322    rules
323}
324
325/// clear_rules clears all the rules in flow module.
326// This func acquires locks on global `RULE_MAP` and `CONTROLLER_MAP`,
327// please release your locks on them before calling this func
328pub fn clear_rules() {
329    RULE_MAP.lock().unwrap().clear();
330    CONTROLLER_MAP.lock().unwrap().clear();
331}
332
333/// `clear_rules_of_resource` clears resource level rules in flow module.
334// This func acquires locks on global `RULE_MAP` and `CONTROLLER_MAP`,
335// please release your locks on them before calling this func
336pub fn clear_rules_of_resource(res: &String) {
337    RULE_MAP.lock().unwrap().remove(res);
338    CONTROLLER_MAP.lock().unwrap().remove(res);
339}
340
341// This func acquires the lock on global `CONTROLLER_MAP`,
342// please release your lock on it before calling this func
343pub fn get_traffic_controller_list_for(name: &String) -> Vec<Arc<Controller>> {
344    let controller_map = CONTROLLER_MAP.lock().unwrap();
345    let controllers = controller_map.get(name);
346    match controllers {
347        Some(controllers) => controllers.clone(),
348        None => Vec::new(),
349    }
350}
351
352/// `generate_stat_for` generates a `StandaloneStat` according to the rule,
353/// it may generate a cloned pointer to the global `NOP_STAT`,
354/// a new stat node with default global metrics
355/// or a new stat node with new metrics.
356fn generate_stat_for(rule: &Arc<Rule>) -> Result<Arc<StandaloneStat>> {
357    if !rule.need_statistic() {
358        return Ok(NOP_STAT.clone());
359    }
360
361    let interval_ms = rule.stat_interval_ms;
362
363    let res_node: Arc<ResourceNode> = {
364        if rule.relation_strategy == RelationStrategy::Associated {
365            // use associated statistic
366            stat::get_or_create_resource_node(&rule.ref_resource, &ResourceType::Common)
367        } else {
368            stat::get_or_create_resource_node(&rule.resource, &ResourceType::Common)
369        }
370    };
371
372    if interval_ms == 0 || interval_ms == config::metric_stat_interval_ms() {
373        // default case, use the resource's default statistic
374        let metric = res_node.default_metric();
375        let ret_stat = Arc::new(StandaloneStat::new(true, metric, None));
376        return Ok(ret_stat);
377    }
378
379    let mut sample_count: u32 = 1;
380    //calculate the sample count
381    if interval_ms > config::global_stat_bucket_length_ms()
382        && interval_ms < config::global_stat_interval_ms_total()
383        && interval_ms % config::global_stat_bucket_length_ms() == 0
384    {
385        sample_count = interval_ms / config::global_stat_bucket_length_ms();
386    }
387
388    let validity = base::check_validity_for_reuse_statistic(
389        sample_count,
390        interval_ms,
391        config::global_stat_sample_count_total(),
392        config::global_stat_interval_ms_total(),
393    );
394    let _err = Error::msg(base::GLOBAL_STATISTIC_NON_REUSABLE_ERROR);
395    match validity {
396        Ok(_) => {
397            let metric = res_node.generate_read_stat(sample_count, interval_ms)?;
398            let ret_stat = Arc::new(StandaloneStat::new(true, metric, None));
399            Ok(ret_stat)
400        }
401        Err(_err) => {
402            logging::info!("[FlowRuleManager] Flow rule couldn't reuse global statistic and will generate independent statistic, rule: {:?}", rule);
403            let write_stat = Arc::new(stat::BucketLeapArray::new(sample_count, interval_ms)?);
404            let read_stat = Arc::new(stat::SlidingWindowMetric::new(
405                sample_count,
406                interval_ms,
407                write_stat.clone(),
408            )?);
409            let res_stat = Arc::new(StandaloneStat::new(false, read_stat, Some(write_stat)));
410            Ok(res_stat)
411        }
412    }
413}
414
415/// `set_traffic_shaping_generator` sets the traffic controller generator for the given CalculateStrategy and ControlStrategy.
416/// Note that modifying the generator of default control strategy is not allowed.
417/// it is type safe
418// This func acquires the lock on global `GEN_FUN_MAP`,
419// please release your lock on it before calling this func
420pub fn set_traffic_shaping_generator(
421    calculate_strategy: CalculateStrategy,
422    control_strategy: ControlStrategy,
423    generator: Box<ControllerGenfn>,
424) -> Result<()> {
425    match (calculate_strategy, control_strategy) {
426        (CalculateStrategy::Custom(_), _) | (_, ControlStrategy::Custom(_)) => {
427            GEN_FUN_MAP.write().unwrap().insert(
428                ControllerGenKey::new(calculate_strategy, control_strategy),
429                generator,
430            );
431            Ok(())
432        }
433        _ => Err(Error::msg(
434            "Default control behaviors are not allowed to be modified.",
435        )),
436    }
437}
438
439// This func acquires the lock on global `GEN_FUN_MAP`,
440// please release your lock on it before calling this func
441pub fn remove_traffic_shaping_generator(
442    calculate_strategy: CalculateStrategy,
443    control_strategy: ControlStrategy,
444) -> Result<()> {
445    match (calculate_strategy, control_strategy) {
446        (CalculateStrategy::Custom(_), _) | (_, ControlStrategy::Custom(_)) => {
447            GEN_FUN_MAP
448                .write()
449                .unwrap()
450                .remove(&ControllerGenKey::new(calculate_strategy, control_strategy));
451            Ok(())
452        }
453        _ => Err(Error::msg(
454            "Default control behaviors are not allowed to be removed.",
455        )),
456    }
457}
458
459fn calculate_reuse_index_for(r: &Arc<Rule>, old_res_tcs: &Vec<Arc<Controller>>) -> (usize, usize) {
460    // the index of equivalent rule in old traffic shaping controller slice
461    let mut eq_idx = usize::MAX;
462    // the index of statistic reusable rule in old traffic shaping controller slice
463    let mut reuse_stat_idx = usize::MAX;
464
465    for (idx, old_tc) in old_res_tcs.iter().enumerate() {
466        let old_rule = old_tc.rule();
467        if old_rule == r {
468            // break if there is equivalent rule
469            eq_idx = idx;
470            break;
471        }
472        // search the index of first stat reusable rule
473        if reuse_stat_idx == usize::MAX && old_rule.is_stat_reusable(r) {
474            reuse_stat_idx = idx;
475        }
476    }
477    (eq_idx, reuse_stat_idx)
478}
479
480/// build_resource_traffic_shaping_controller builds Controller slice from rules. the resource of rules must be equals to res
481pub fn build_resource_traffic_shaping_controller(
482    res: &String,
483    rules_of_res: &HashSet<Arc<Rule>>,
484    old_res_tcs: &mut Vec<Arc<Controller>>,
485) -> Vec<Arc<Controller>> {
486    let mut new_res_tcs = Vec::with_capacity(rules_of_res.len());
487    for rule in rules_of_res {
488        if res != &rule.resource {
489            logging::error!("unmatched resource name expect: {}, actual: {}. Unmatched resource name in flow::build_resource_traffic_shaping_controller(), rule: {:?}", res, rule.resource, rule);
490            continue;
491        }
492        let (eq_idx, reuse_stat_idx) = calculate_reuse_index_for(&rule, old_res_tcs);
493
494        // First check equals scenario
495        if eq_idx != usize::MAX {
496            // reuse the old tc
497            let eq_old_tc = Arc::clone(&old_res_tcs[eq_idx]);
498            new_res_tcs.push(eq_old_tc);
499            // remove old tc from old_res_tcs
500            old_res_tcs.remove(eq_idx);
501            continue;
502        }
503
504        let gen_fun_map = GEN_FUN_MAP.read().unwrap();
505        let key = ControllerGenKey::new(
506            rule.calculate_strategy.clone(),
507            rule.control_strategy.clone(),
508        );
509        let generator = gen_fun_map.get(&key);
510
511        if generator.is_none() {
512            logging::error!("[FlowRuleManager build_resource_traffic_shaping_controller] Unsupported flow control strategy. Ignoring the rule due to unsupported control behavior in flow::build_resource_traffic_shaping_controller(), rule: {}",  rule);
513            continue;
514        }
515        let generator = generator.unwrap();
516
517        let tc = {
518            if reuse_stat_idx != usize::MAX {
519                generator(
520                    Arc::clone(&rule),
521                    Some(Arc::clone(old_res_tcs[reuse_stat_idx].stat())),
522                )
523            } else {
524                generator(Arc::clone(&rule), None)
525            }
526        };
527
528        if tc.is_err() {
529            logging::error!("[FlowRuleManager build_resource_traffic_shaping_controller] Bad generated traffic controller. Ignoring the rule due to bad generated traffic controller in flow::build_resource_traffic_shaping_controller(), rule: {:?}, error: {:?}", rule, tc);
530            continue;
531        }
532        let tc = tc.unwrap();
533        if reuse_stat_idx != usize::MAX {
534            // remove old tc from old_res_tcs
535            old_res_tcs.remove(reuse_stat_idx);
536        }
537        new_res_tcs.push(tc);
538    }
539    new_res_tcs
540}
541
542#[cfg(test)]
543mod test {
544    //! Some tests cannot run in parallel, since we cannot promise that
545    //! the global data structs are not modified before assertion.
546    #![allow(clippy::vtable_address_comparisons)]
547
548    use super::*;
549
550    #[test]
551    #[should_panic(expected = "Default control behaviors are not allowed to be modified.")]
552    fn illegal_set() {
553        set_traffic_shaping_generator(
554            CalculateStrategy::Direct,
555            ControlStrategy::Reject,
556            Box::new(
557                |_: Arc<Rule>, _: Option<Arc<StandaloneStat>>| -> Result<Arc<Controller>> {
558                    let rule = Arc::new(Rule::default());
559                    let stat = generate_stat_for(&rule).unwrap();
560                    let tsc = Arc::new(Controller::new(rule, stat));
561                    Ok(tsc)
562                },
563            ),
564        )
565        .unwrap();
566    }
567
568    #[test]
569    #[should_panic(expected = "Default control behaviors are not allowed to be removed.")]
570    fn illegal_remove() {
571        remove_traffic_shaping_generator(CalculateStrategy::Direct, ControlStrategy::Reject)
572            .unwrap();
573    }
574
575    #[test]
576    #[ignore]
577    fn set_and_remove_generator() {
578        clear_rules();
579        const STRATEGY: u8 = 1;
580        set_traffic_shaping_generator(
581            CalculateStrategy::Custom(STRATEGY),
582            ControlStrategy::Custom(STRATEGY),
583            Box::new(
584                |_: Arc<Rule>, _: Option<Arc<StandaloneStat>>| -> Result<Arc<Controller>> {
585                    let rule = Arc::new(Rule::default());
586                    let stat = generate_stat_for(&rule).unwrap();
587                    let tsc = Arc::new(Controller::new(rule, stat));
588                    Ok(tsc)
589                },
590            ),
591        )
592        .unwrap();
593        let resource = String::from("test-customized-tc");
594        load_rules(vec![Arc::new(Rule {
595            threshold: 20.0,
596            resource: resource.clone(),
597            calculate_strategy: CalculateStrategy::Custom(STRATEGY),
598            control_strategy: ControlStrategy::Custom(STRATEGY),
599            ..Default::default()
600        })]);
601        let key = ControllerGenKey {
602            calculate_strategy: CalculateStrategy::Custom(STRATEGY),
603            control_strategy: ControlStrategy::Custom(STRATEGY),
604        };
605
606        let controller_map = CONTROLLER_MAP.lock().unwrap();
607
608        assert!(GEN_FUN_MAP.read().unwrap().contains_key(&key));
609        assert!(controller_map[&resource].len() > 0);
610        remove_traffic_shaping_generator(
611            CalculateStrategy::Custom(STRATEGY),
612            ControlStrategy::Custom(STRATEGY),
613        )
614        .unwrap();
615        assert!(!GEN_FUN_MAP.read().unwrap().contains_key(&key));
616        drop(controller_map);
617        clear_rules();
618    }
619
620    #[test]
621    #[ignore]
622    fn get_rules1() {
623        clear_rules();
624        let r1 = Arc::new(Rule {
625            resource: "abc1".into(),
626            calculate_strategy: CalculateStrategy::Direct,
627            control_strategy: ControlStrategy::Reject,
628            ..Default::default()
629        });
630        let r2 = Arc::new(Rule {
631            resource: "abc2".into(),
632            calculate_strategy: CalculateStrategy::Direct,
633            control_strategy: ControlStrategy::Throttling,
634            max_queueing_time_ms: 10,
635            stat_interval_ms: 1000,
636            ..Default::default()
637        });
638        load_rules(vec![Arc::clone(&r1), Arc::clone(&r2)]);
639        let rs = get_rules();
640
641        if rs[0].resource == String::from("abc1") {
642            // Arc<T> equals when inner T equals, even if they are different pointers
643            assert_eq!(rs[0], r1);
644            assert_eq!(rs[1], r2);
645        } else {
646            assert_eq!(rs[0], r2);
647            assert_eq!(rs[1], r1);
648        }
649        clear_rules();
650    }
651
652    #[test]
653    #[ignore]
654    fn get_rules2() {
655        clear_rules();
656        let r1 = Arc::new(Rule {
657            resource: "abc1".into(),
658            calculate_strategy: CalculateStrategy::Direct,
659            control_strategy: ControlStrategy::Reject,
660            ..Default::default()
661        });
662        let r2 = Arc::new(Rule {
663            resource: "abc2".into(),
664            calculate_strategy: CalculateStrategy::Direct,
665            control_strategy: ControlStrategy::Throttling,
666            max_queueing_time_ms: 10,
667            stat_interval_ms: 1000,
668            ..Default::default()
669        });
670        load_rules(vec![r1.clone(), r2.clone()]);
671        let rs = get_rules();
672
673        if rs[0].resource == String::from("abc1") {
674            // Arc<T> equals when inner T equals, even if they are different pointers
675            assert_eq!(rs[0], r1);
676            assert_eq!(rs[1], r2);
677        } else {
678            assert_eq!(rs[0], r2);
679            assert_eq!(rs[1], r1);
680        }
681
682        let controller_map = CONTROLLER_MAP.lock().unwrap();
683
684        assert_eq!(1, controller_map["abc2"].len());
685        assert_eq!(false, controller_map["abc2"][0].stat().reuse_global());
686
687        assert!(Arc::ptr_eq(
688            controller_map["abc2"][0].stat().read_only_metric(),
689            NOP_STAT.read_only_metric()
690        ));
691        assert!(Arc::ptr_eq(
692            controller_map["abc2"][0]
693                .stat()
694                .write_only_metric()
695                .unwrap(),
696            NOP_STAT.write_only_metric().unwrap()
697        ));
698        drop(controller_map);
699        clear_rules();
700    }
701
702    #[test]
703    fn generate_stat_for_default_metric_stat() {
704        let r1 = Arc::new(Rule {
705            resource: "abc".into(),
706            calculate_strategy: CalculateStrategy::Direct,
707            control_strategy: ControlStrategy::Reject,
708            threshold: 100.0,
709            relation_strategy: RelationStrategy::Current,
710            ..Default::default()
711        });
712        let bound_stat = generate_stat_for(&r1).unwrap();
713        assert!(bound_stat.reuse_global());
714
715        let res_node = stat::get_resource_node(&String::from("abc")).unwrap();
716        let stat = res_node.default_metric();
717        assert!(Arc::ptr_eq(bound_stat.read_only_metric(), &stat));
718    }
719
720    #[test]
721    fn generate_stat_for_reuse_global_stat() {
722        let r1 = Arc::new(Rule {
723            resource: "abc".into(),
724            calculate_strategy: CalculateStrategy::Direct,
725            control_strategy: ControlStrategy::Reject,
726            stat_interval_ms: 5000,
727            threshold: 100.0,
728            relation_strategy: RelationStrategy::Current,
729            ..Default::default()
730        });
731        let bound_stat = generate_stat_for(&r1).unwrap();
732        assert!(bound_stat.reuse_global());
733        assert!(bound_stat.write_only_metric().is_none());
734
735        let res_node = stat::get_resource_node(&String::from("abc")).unwrap();
736        let stat = res_node.default_metric();
737        assert!(!Arc::ptr_eq(bound_stat.read_only_metric(), &stat));
738    }
739
740    #[test]
741    fn generate_stat_for_standalone_stat() {
742        let r1 = Arc::new(Rule {
743            resource: "abc".into(),
744            calculate_strategy: CalculateStrategy::Direct,
745            control_strategy: ControlStrategy::Reject,
746            stat_interval_ms: 50000,
747            threshold: 100.0,
748            relation_strategy: RelationStrategy::Current,
749            ..Default::default()
750        });
751
752        let stat = generate_stat_for(&r1).unwrap();
753        assert!(!stat.reuse_global());
754        assert!(stat.write_only_metric().is_some());
755    }
756
757    #[test]
758    #[ignore]
759    fn build_controller1() {
760        clear_rules();
761        let r1 = Arc::new(Rule {
762            resource: "abc1".into(),
763            threshold: 100.0,
764            relation_strategy: RelationStrategy::Current,
765            calculate_strategy: CalculateStrategy::Direct,
766            control_strategy: ControlStrategy::Reject,
767            ..Default::default()
768        });
769
770        let r2 = Arc::new(Rule {
771            resource: "abc1".into(),
772            threshold: 200.0,
773            relation_strategy: RelationStrategy::Current,
774            calculate_strategy: CalculateStrategy::Direct,
775            control_strategy: ControlStrategy::Throttling,
776            max_queueing_time_ms: 10,
777            ..Default::default()
778        });
779
780        let mut controller_map = CONTROLLER_MAP.lock().unwrap();
781        assert_eq!(
782            0,
783            controller_map
784                .entry(String::from("abc1"))
785                .or_insert(Vec::new())
786                .len()
787        );
788
789        let mut placeholder = Vec::new();
790        let mut set = HashSet::new();
791        set.insert(Arc::clone(&r1));
792        set.insert(Arc::clone(&r2));
793        let tcs = build_resource_traffic_shaping_controller(
794            &String::from("abc1"),
795            &set,
796            controller_map.get_mut("abc1").unwrap_or(&mut placeholder),
797        );
798        assert_eq!(2, tcs.len());
799        assert!(&r1 == tcs[0].rule() || &r2 == tcs[0].rule());
800        assert!(&r1 == tcs[1].rule() || &r2 == tcs[1].rule());
801        drop(controller_map);
802        clear_rules();
803    }
804
805    #[test]
806    #[ignore]
807    fn build_controller2() {
808        clear_rules();
809        // use nop statistics because of no need statistics
810        let r0 = Arc::new(Rule {
811            resource: "abc1".into(),
812            threshold: 100.0,
813            relation_strategy: RelationStrategy::Current,
814            calculate_strategy: CalculateStrategy::Direct,
815            control_strategy: ControlStrategy::Throttling,
816            stat_interval_ms: 1000,
817            ..Default::default()
818        });
819        // reuse resource node default leap array
820        let r1 = Arc::new(Rule {
821            resource: "abc1".into(),
822            threshold: 100.0,
823            relation_strategy: RelationStrategy::Current,
824            calculate_strategy: CalculateStrategy::Direct,
825            control_strategy: ControlStrategy::Reject,
826            stat_interval_ms: 1000,
827            ..Default::default()
828        });
829        // reuse resource node default leap array
830        let r2 = Arc::new(Rule {
831            resource: "abc1".into(),
832            threshold: 200.0,
833            relation_strategy: RelationStrategy::Current,
834            calculate_strategy: CalculateStrategy::Direct,
835            control_strategy: ControlStrategy::Reject,
836            max_queueing_time_ms: 10,
837            stat_interval_ms: 2000,
838            ..Default::default()
839        });
840        // reuse resource node default leap array
841        let r3 = Arc::new(Rule {
842            resource: "abc1".into(),
843            threshold: 300.0,
844            relation_strategy: RelationStrategy::Current,
845            calculate_strategy: CalculateStrategy::Direct,
846            control_strategy: ControlStrategy::Reject,
847            max_queueing_time_ms: 10,
848            stat_interval_ms: 5000,
849            ..Default::default()
850        });
851        // use independent leap array because of too big interval
852        let r4 = Arc::new(Rule {
853            resource: "abc1".into(),
854            threshold: 400.0,
855            relation_strategy: RelationStrategy::Current,
856            calculate_strategy: CalculateStrategy::Direct,
857            control_strategy: ControlStrategy::Reject,
858            max_queueing_time_ms: 10,
859            stat_interval_ms: 50000,
860            ..Default::default()
861        });
862
863        let s0 = generate_stat_for(&r0).unwrap();
864        let fake_tc0 = Arc::new(Controller::new(Arc::clone(&r0), s0));
865        let stat0 = fake_tc0.stat();
866        assert!(Arc::ptr_eq(&NOP_STAT, stat0));
867        assert_eq!(false, stat0.reuse_global());
868        assert!(stat0.write_only_metric().is_some());
869
870        let s1 = generate_stat_for(&r1).unwrap();
871        let fake_tc1 = Arc::new(Controller::new(Arc::clone(&r1), s1));
872        let stat1 = fake_tc1.stat();
873        assert!(!Arc::ptr_eq(&NOP_STAT, stat1));
874        assert_eq!(true, stat1.reuse_global());
875        assert!(stat1.write_only_metric().is_none());
876
877        let s2 = generate_stat_for(&r2).unwrap();
878        let fake_tc2 = Arc::new(Controller::new(Arc::clone(&r2), s2));
879        let stat2 = fake_tc2.stat();
880        assert!(!Arc::ptr_eq(&NOP_STAT, stat2));
881        assert_eq!(true, stat2.reuse_global());
882        assert!(stat2.write_only_metric().is_none());
883
884        let s3 = generate_stat_for(&r3).unwrap();
885        let fake_tc3 = Arc::new(Controller::new(Arc::clone(&r3), s3));
886        let stat3 = fake_tc3.stat();
887        assert!(!Arc::ptr_eq(&NOP_STAT, stat3));
888        assert_eq!(true, stat3.reuse_global());
889        assert!(stat3.write_only_metric().is_none());
890
891        let s4 = generate_stat_for(&r4).unwrap();
892        let fake_tc4 = Arc::new(Controller::new(Arc::clone(&r4), s4));
893        let stat4 = fake_tc4.stat();
894        assert!(!Arc::ptr_eq(&NOP_STAT, stat4));
895        assert_eq!(false, stat4.reuse_global());
896        assert!(stat4.write_only_metric().is_some());
897
898        let mut controller_map = CONTROLLER_MAP.lock().unwrap();
899
900        controller_map.insert(
901            "abc1".into(),
902            vec![
903                Arc::clone(&fake_tc0),
904                Arc::clone(&fake_tc1),
905                Arc::clone(&fake_tc2),
906                Arc::clone(&fake_tc3),
907                Arc::clone(&fake_tc4),
908            ],
909        );
910        assert_eq!(5, controller_map["abc1"].len());
911        // reuse stat with rule 1
912        let r12 = Arc::new(Rule {
913            resource: "abc1".into(),
914            threshold: 300.0,
915            relation_strategy: RelationStrategy::Current,
916            calculate_strategy: CalculateStrategy::Direct,
917            control_strategy: ControlStrategy::Reject,
918            stat_interval_ms: 1000,
919            ..Default::default()
920        });
921        // can't reuse stat with rule 2, generate from resource's global statistic
922        let r22 = Arc::new(Rule {
923            resource: "abc1".into(),
924            threshold: 400.0,
925            relation_strategy: RelationStrategy::Current,
926            calculate_strategy: CalculateStrategy::Direct,
927            control_strategy: ControlStrategy::Reject,
928            max_queueing_time_ms: 10,
929            stat_interval_ms: 10000,
930            ..Default::default()
931        });
932        // equals with rule 3
933        let r32 = Arc::new(Rule {
934            resource: "abc1".into(),
935            threshold: 300.0,
936            relation_strategy: RelationStrategy::Current,
937            calculate_strategy: CalculateStrategy::Direct,
938            control_strategy: ControlStrategy::Reject,
939            max_queueing_time_ms: 10,
940            stat_interval_ms: 5000,
941            ..Default::default()
942        });
943        // reuse independent stat with rule 4
944        let r42 = Arc::new(Rule {
945            resource: "abc1".into(),
946            threshold: 4000.0,
947            relation_strategy: RelationStrategy::Current,
948            calculate_strategy: CalculateStrategy::Direct,
949            control_strategy: ControlStrategy::Reject,
950            max_queueing_time_ms: 10,
951            stat_interval_ms: 50000,
952            ..Default::default()
953        });
954
955        let mut set = HashSet::new();
956        set.insert(Arc::clone(&r12));
957        set.insert(Arc::clone(&r22));
958        set.insert(Arc::clone(&r32));
959        set.insert(Arc::clone(&r42));
960        let tcs = build_resource_traffic_shaping_controller(
961            &String::from("abc1"),
962            &set,
963            controller_map.get_mut("abc1").unwrap(),
964        );
965
966        assert_eq!(4, tcs.len());
967
968        // cannot automatically drop,
969        // since the `controller_map` has not leave its scope
970        // explicitly drop it or put the codes above in an individual `{}`
971        drop(controller_map);
972        clear_rules();
973    }
974
975    #[test]
976    #[ignore]
977    fn load_resource_by_rule() {
978        let r11 = Arc::new(Rule {
979            resource: "abc1".into(),
980            calculate_strategy: CalculateStrategy::Direct,
981            control_strategy: ControlStrategy::Reject,
982            threshold: 10.0,
983            ..Default::default()
984        });
985        let r12 = Arc::new(Rule {
986            resource: "abc1".into(),
987            calculate_strategy: CalculateStrategy::Direct,
988            control_strategy: ControlStrategy::Reject,
989            threshold: 20.0,
990            ..Default::default()
991        });
992        let r21 = Arc::new(Rule {
993            resource: "abc2".into(),
994            threshold: 10.0,
995            calculate_strategy: CalculateStrategy::Direct,
996            control_strategy: ControlStrategy::Reject,
997            ..Default::default()
998        });
999        let r22 = Arc::new(Rule {
1000            resource: "abc2".into(),
1001            threshold: 20.0,
1002            calculate_strategy: CalculateStrategy::Direct,
1003            control_strategy: ControlStrategy::Reject,
1004            ..Default::default()
1005        });
1006
1007        load_rules(vec![r11.clone(), r12.clone(), r21.clone(), r22.clone()]);
1008
1009        let result = load_rules_of_resource(&String::from(""), vec![r11.clone(), r12.clone()]);
1010        assert!(result.is_err());
1011
1012        let result = load_rules_of_resource(&String::from("abc1"), vec![r11, r12]);
1013        assert!(!result.unwrap());
1014
1015        let result = load_rules_of_resource(&String::from("abc1"), vec![]);
1016        assert!(result.unwrap());
1017
1018        let rule_map = RULE_MAP.lock().unwrap();
1019        let controller_map = CONTROLLER_MAP.lock().unwrap();
1020
1021        assert_eq!(0, controller_map.get("abc1").unwrap_or(&Vec::new()).len());
1022        assert_eq!(0, rule_map.get("abc1").unwrap_or(&HashSet::new()).len());
1023        assert_eq!(2, controller_map["abc2"].len());
1024        assert_eq!(2, rule_map["abc2"].len());
1025    }
1026
1027    #[test]
1028    #[ignore]
1029    fn clear_rules_by_resource() {
1030        let r11 = Arc::new(Rule {
1031            resource: "abc1".into(),
1032            calculate_strategy: CalculateStrategy::Direct,
1033            control_strategy: ControlStrategy::Reject,
1034            threshold: 10.0,
1035            ..Default::default()
1036        });
1037        let r12 = Arc::new(Rule {
1038            resource: "abc1".into(),
1039            calculate_strategy: CalculateStrategy::Direct,
1040            control_strategy: ControlStrategy::Reject,
1041            threshold: 20.0,
1042            ..Default::default()
1043        });
1044        let r21 = Arc::new(Rule {
1045            resource: "abc2".into(),
1046            threshold: 10.0,
1047            calculate_strategy: CalculateStrategy::Direct,
1048            control_strategy: ControlStrategy::Reject,
1049            ..Default::default()
1050        });
1051        let r22 = Arc::new(Rule {
1052            resource: "abc2".into(),
1053            threshold: 20.0,
1054            calculate_strategy: CalculateStrategy::Direct,
1055            control_strategy: ControlStrategy::Reject,
1056            ..Default::default()
1057        });
1058
1059        load_rules(vec![r11, r12, r21, r22]);
1060        clear_rules_of_resource(&String::from("abc1"));
1061
1062        let rule_map = RULE_MAP.lock().unwrap();
1063        let controller_map = CONTROLLER_MAP.lock().unwrap();
1064
1065        assert_eq!(0, controller_map.get("abc1").unwrap_or(&Vec::new()).len());
1066        assert_eq!(0, rule_map.get("abc1").unwrap_or(&HashSet::new()).len());
1067        assert_eq!(2, controller_map["abc2"].len());
1068        assert_eq!(2, rule_map["abc2"].len());
1069        drop(controller_map);
1070        drop(rule_map);
1071        clear_rules();
1072    }
1073}