1use super::*;
2use crate::{
3 core::{
4 base,
5 base::{nop_read_stat, nop_write_stat, ResourceType, SentinelRule, StatNode},
6 config, stat,
7 stat::ResourceNode,
8 },
9 logging, utils, Error, Result,
10};
11use lazy_static::lazy_static;
12use std::collections::{HashMap, HashSet};
13use std::hash::Hash;
14use std::sync::{Arc, Mutex, RwLock, Weak};
15
16pub type ControllerGenfn =
18 dyn Send + Sync + Fn(Arc<Rule>, Option<Arc<StandaloneStat>>) -> Result<Arc<Controller>>;
19
20#[derive(Hash, PartialEq, Eq)]
21pub struct ControllerGenKey {
22 calculate_strategy: CalculateStrategy,
23 control_strategy: ControlStrategy,
24}
25
26impl ControllerGenKey {
27 pub fn new(calculate_strategy: CalculateStrategy, control_strategy: ControlStrategy) -> Self {
28 ControllerGenKey {
29 calculate_strategy,
30 control_strategy,
31 }
32 }
33}
34
35pub type ControllerMap = HashMap<String, Vec<Arc<Controller>>>;
37pub type RuleMap = HashMap<String, HashSet<Arc<Rule>>>;
38
39lazy_static! {
40 static ref GEN_FUN_MAP: RwLock<HashMap<ControllerGenKey, Box<ControllerGenfn>>> = {
41 let mut gen_fun_map: HashMap<ControllerGenKey, Box<ControllerGenfn>> = HashMap::new();
42
43 insert_flow_generator!(
44 gen_fun_map,
45 CalculateStrategy::Direct,
46 ControlStrategy::Reject,
47 DirectCalculator,
48 RejectChecker
49 );
50 insert_flow_generator!(
51 gen_fun_map,
52 CalculateStrategy::Direct,
53 ControlStrategy::Throttling,
54 DirectCalculator,
55 ThrottlingChecker
56 );
57 insert_flow_generator!(
58 gen_fun_map,
59 CalculateStrategy::WarmUp,
60 ControlStrategy::Reject,
61 WarmUpCalculator,
62 RejectChecker
63 );
64 insert_flow_generator!(
65 gen_fun_map,
66 CalculateStrategy::WarmUp,
67 ControlStrategy::Throttling,
68 WarmUpCalculator,
69 ThrottlingChecker
70 );
71 insert_flow_generator!(
72 gen_fun_map,
73 CalculateStrategy::MemoryAdaptive,
74 ControlStrategy::Reject,
75 MemoryAdaptiveCalculator,
76 RejectChecker
77 );
78 insert_flow_generator!(
79 gen_fun_map,
80 CalculateStrategy::MemoryAdaptive,
81 ControlStrategy::Throttling,
82 MemoryAdaptiveCalculator,
83 ThrottlingChecker
84 );
85
86 RwLock::new(gen_fun_map)
87 };
88 static ref CONTROLLER_MAP: Mutex<ControllerMap> = Mutex::new(HashMap::new());
89 static ref NOP_STAT: Arc<StandaloneStat> = Arc::new(StandaloneStat::new(
90 false,
91 nop_read_stat(),
92 Some(nop_write_stat())
93 ));
94 static ref RULE_MAP: Mutex<RuleMap> = Mutex::new(HashMap::new());
95}
96
97fn log_rule_update(map: &RuleMap) {
98 if map.len() == 0 {
99 logging::info!("[FlowRuleManager] Flow rules were cleared")
100 } else {
101 logging::info!(
102 "[FlowRuleManager] Flow rules were loaded: {:?}",
103 map.values()
104 )
105 }
106}
107
108pub fn append_rule(rule: Arc<Rule>) -> bool {
110 if RULE_MAP
111 .lock()
112 .unwrap()
113 .get(&rule.resource)
114 .unwrap_or(&HashSet::new())
115 .contains(&rule)
116 {
117 return false;
118 }
119 match rule.is_valid() {
120 Ok(_) => {
121 RULE_MAP
122 .lock()
123 .unwrap()
124 .entry(rule.resource.clone())
125 .or_insert(HashSet::new())
126 .insert(Arc::clone(&rule));
127 }
128 Err(err) => logging::warn!(
129 "[Flow load_rules] Ignoring invalid flow rule {:?}, reason: {:?}",
130 rule,
131 err
132 ),
133 }
134 let mut placeholder = Vec::new();
135 let new_tcs_of_res = build_resource_traffic_shaping_controller(
136 &rule.resource,
137 RULE_MAP.lock().unwrap().get(&rule.resource).unwrap(),
138 CONTROLLER_MAP
139 .lock()
140 .unwrap()
141 .get_mut(&rule.resource)
142 .unwrap_or(&mut placeholder),
143 );
144 if new_tcs_of_res.len() > 0 {
145 CONTROLLER_MAP
146 .lock()
147 .unwrap()
148 .entry(rule.resource.clone())
149 .or_insert(Vec::new())
150 .push(Arc::clone(&new_tcs_of_res[0]));
151 }
152 true
153}
154
155pub fn load_rules(rules: Vec<Arc<Rule>>) -> bool {
160 let mut rule_map: RuleMap = HashMap::new();
161 for rule in rules {
166 let entry = rule_map
167 .entry(rule.resource.clone())
168 .or_insert(HashSet::new());
169 entry.insert(rule);
170 }
171
172 let mut global_rule_map = RULE_MAP.lock().unwrap();
173 if &*global_rule_map == &rule_map {
174 logging::info!(
175 "[Flow] Load rules is the same with current rules, so ignore load operation."
176 );
177 return false;
178 }
179 let mut valid_rules_map = HashMap::with_capacity(rule_map.len());
182 for (res, rules) in &rule_map {
183 let mut valid_rules = HashSet::new();
184 for rule in rules {
185 match rule.is_valid() {
186 Ok(_) => {
187 valid_rules.insert(Arc::clone(&rule));
188 }
189 Err(err) => logging::warn!(
190 "[Flow load_rules] Ignoring invalid flow rule {:?}, reason: {:?}",
191 rule,
192 err
193 ),
194 }
195 }
196 if valid_rules.len() > 0 {
197 valid_rules_map.insert(res.clone(), valid_rules);
198 }
199 }
200
201 let start = utils::curr_time_nanos();
202 let mut controller_map = CONTROLLER_MAP.lock().unwrap();
203 let mut valid_controller_map = HashMap::with_capacity(valid_rules_map.len());
204
205 for (res, rules) in valid_rules_map.iter() {
207 let mut placeholder = Vec::new();
208 let new_tcs_of_res = build_resource_traffic_shaping_controller(
209 res,
210 rules,
211 controller_map.get_mut(res).unwrap_or(&mut placeholder),
212 );
213 if new_tcs_of_res.len() > 0 {
214 valid_controller_map.insert(res.clone(), new_tcs_of_res);
215 }
216 }
217 *controller_map = valid_controller_map;
218 *global_rule_map = rule_map;
219 drop(global_rule_map);
220 drop(controller_map);
221 logging::debug!(
222 "[Flow load_rules] Time statistic(ns) for updating flow rule, time cost {}",
223 utils::curr_time_nanos() - start
224 );
225 log_rule_update(&valid_rules_map);
226 return true;
227}
228
229pub fn load_rules_of_resource(res: &String, rules: Vec<Arc<Rule>>) -> Result<bool> {
234 if res.len() == 0 {
235 return Err(Error::msg("empty resource"));
236 }
237 let rules: HashSet<_> = rules.into_iter().collect();
238 let mut global_rule_map = RULE_MAP.lock().unwrap();
239 let mut global_controller_map = CONTROLLER_MAP.lock().unwrap();
240 if rules.len() == 0 {
242 global_rule_map.remove(res);
243 global_controller_map.remove(res);
244 logging::info!("[Flow] clear resource level rules, resource {}", res);
245 return Ok(true);
246 }
247 if global_rule_map.get(res).unwrap_or(&HashSet::new()) == &rules {
249 logging::info!("[Flow] Load resource level rules is the same with current resource level rules, so ignore load operation.");
250 return Ok(false);
251 }
252
253 let mut valid_res_rules = HashSet::with_capacity(res.len());
254 for rule in &rules {
255 match rule.is_valid() {
256 Ok(_) => {
257 valid_res_rules.insert(Arc::clone(&rule));
258 }
259 Err(err) => logging::warn!(
260 "[Flow load_rules_of_resource] Ignoring invalid flow rule {:?}, reason: {:?}",
261 rule,
262 err
263 ),
264 }
265 }
266 let start = utils::curr_time_nanos();
268 let mut placeholder = Vec::new();
269 let mut old_res_tcs = global_controller_map
270 .get_mut(res)
271 .unwrap_or(&mut placeholder);
272
273 let valid_res_rules_string = format!("{:?}", &valid_res_rules);
274 let new_res_tcs =
275 build_resource_traffic_shaping_controller(res, &valid_res_rules, &mut old_res_tcs);
276
277 if new_res_tcs.len() == 0 {
278 global_controller_map.remove(res);
279 } else {
280 global_controller_map.insert(res.clone(), new_res_tcs);
281 }
282
283 global_rule_map.insert(res.clone(), rules);
284 logging::debug!(
285 "[Flow load_rules_of_resource] Time statistic(ns) for updating flow rule, timeCost: {}",
286 utils::curr_time_nanos() - start
287 );
288 logging::info!(
289 "[Flow] load resource level rules, resource: {}, valid_res_rules: {}",
290 res,
291 valid_res_rules_string
292 );
293
294 Ok(true)
295}
296
297pub fn get_rules() -> Vec<Arc<Rule>> {
301 let mut rules = Vec::new();
302 let controller_map = CONTROLLER_MAP.lock().unwrap();
303 for (_, controllers) in controller_map.iter() {
304 for c in controllers {
305 rules.push(Arc::clone(c.rule()));
306 }
307 }
308 rules
309}
310
311pub fn get_rules_of_resource(res: &String) -> Vec<Arc<Rule>> {
315 let controller_map = CONTROLLER_MAP.lock().unwrap();
316 let placeholder = Vec::new();
317 let controllers = controller_map.get(res).unwrap_or(&placeholder);
318 let mut rules = Vec::with_capacity(controllers.len());
319 for c in controllers {
320 rules.push(Arc::clone(c.rule()));
321 }
322 rules
323}
324
325pub fn clear_rules() {
329 RULE_MAP.lock().unwrap().clear();
330 CONTROLLER_MAP.lock().unwrap().clear();
331}
332
333pub fn clear_rules_of_resource(res: &String) {
337 RULE_MAP.lock().unwrap().remove(res);
338 CONTROLLER_MAP.lock().unwrap().remove(res);
339}
340
341pub fn get_traffic_controller_list_for(name: &String) -> Vec<Arc<Controller>> {
344 let controller_map = CONTROLLER_MAP.lock().unwrap();
345 let controllers = controller_map.get(name);
346 match controllers {
347 Some(controllers) => controllers.clone(),
348 None => Vec::new(),
349 }
350}
351
352fn generate_stat_for(rule: &Arc<Rule>) -> Result<Arc<StandaloneStat>> {
357 if !rule.need_statistic() {
358 return Ok(NOP_STAT.clone());
359 }
360
361 let interval_ms = rule.stat_interval_ms;
362
363 let res_node: Arc<ResourceNode> = {
364 if rule.relation_strategy == RelationStrategy::Associated {
365 stat::get_or_create_resource_node(&rule.ref_resource, &ResourceType::Common)
367 } else {
368 stat::get_or_create_resource_node(&rule.resource, &ResourceType::Common)
369 }
370 };
371
372 if interval_ms == 0 || interval_ms == config::metric_stat_interval_ms() {
373 let metric = res_node.default_metric();
375 let ret_stat = Arc::new(StandaloneStat::new(true, metric, None));
376 return Ok(ret_stat);
377 }
378
379 let mut sample_count: u32 = 1;
380 if interval_ms > config::global_stat_bucket_length_ms()
382 && interval_ms < config::global_stat_interval_ms_total()
383 && interval_ms % config::global_stat_bucket_length_ms() == 0
384 {
385 sample_count = interval_ms / config::global_stat_bucket_length_ms();
386 }
387
388 let validity = base::check_validity_for_reuse_statistic(
389 sample_count,
390 interval_ms,
391 config::global_stat_sample_count_total(),
392 config::global_stat_interval_ms_total(),
393 );
394 let _err = Error::msg(base::GLOBAL_STATISTIC_NON_REUSABLE_ERROR);
395 match validity {
396 Ok(_) => {
397 let metric = res_node.generate_read_stat(sample_count, interval_ms)?;
398 let ret_stat = Arc::new(StandaloneStat::new(true, metric, None));
399 Ok(ret_stat)
400 }
401 Err(_err) => {
402 logging::info!("[FlowRuleManager] Flow rule couldn't reuse global statistic and will generate independent statistic, rule: {:?}", rule);
403 let write_stat = Arc::new(stat::BucketLeapArray::new(sample_count, interval_ms)?);
404 let read_stat = Arc::new(stat::SlidingWindowMetric::new(
405 sample_count,
406 interval_ms,
407 write_stat.clone(),
408 )?);
409 let res_stat = Arc::new(StandaloneStat::new(false, read_stat, Some(write_stat)));
410 Ok(res_stat)
411 }
412 }
413}
414
415pub fn set_traffic_shaping_generator(
421 calculate_strategy: CalculateStrategy,
422 control_strategy: ControlStrategy,
423 generator: Box<ControllerGenfn>,
424) -> Result<()> {
425 match (calculate_strategy, control_strategy) {
426 (CalculateStrategy::Custom(_), _) | (_, ControlStrategy::Custom(_)) => {
427 GEN_FUN_MAP.write().unwrap().insert(
428 ControllerGenKey::new(calculate_strategy, control_strategy),
429 generator,
430 );
431 Ok(())
432 }
433 _ => Err(Error::msg(
434 "Default control behaviors are not allowed to be modified.",
435 )),
436 }
437}
438
439pub fn remove_traffic_shaping_generator(
442 calculate_strategy: CalculateStrategy,
443 control_strategy: ControlStrategy,
444) -> Result<()> {
445 match (calculate_strategy, control_strategy) {
446 (CalculateStrategy::Custom(_), _) | (_, ControlStrategy::Custom(_)) => {
447 GEN_FUN_MAP
448 .write()
449 .unwrap()
450 .remove(&ControllerGenKey::new(calculate_strategy, control_strategy));
451 Ok(())
452 }
453 _ => Err(Error::msg(
454 "Default control behaviors are not allowed to be removed.",
455 )),
456 }
457}
458
459fn calculate_reuse_index_for(r: &Arc<Rule>, old_res_tcs: &Vec<Arc<Controller>>) -> (usize, usize) {
460 let mut eq_idx = usize::MAX;
462 let mut reuse_stat_idx = usize::MAX;
464
465 for (idx, old_tc) in old_res_tcs.iter().enumerate() {
466 let old_rule = old_tc.rule();
467 if old_rule == r {
468 eq_idx = idx;
470 break;
471 }
472 if reuse_stat_idx == usize::MAX && old_rule.is_stat_reusable(r) {
474 reuse_stat_idx = idx;
475 }
476 }
477 (eq_idx, reuse_stat_idx)
478}
479
480pub fn build_resource_traffic_shaping_controller(
482 res: &String,
483 rules_of_res: &HashSet<Arc<Rule>>,
484 old_res_tcs: &mut Vec<Arc<Controller>>,
485) -> Vec<Arc<Controller>> {
486 let mut new_res_tcs = Vec::with_capacity(rules_of_res.len());
487 for rule in rules_of_res {
488 if res != &rule.resource {
489 logging::error!("unmatched resource name expect: {}, actual: {}. Unmatched resource name in flow::build_resource_traffic_shaping_controller(), rule: {:?}", res, rule.resource, rule);
490 continue;
491 }
492 let (eq_idx, reuse_stat_idx) = calculate_reuse_index_for(&rule, old_res_tcs);
493
494 if eq_idx != usize::MAX {
496 let eq_old_tc = Arc::clone(&old_res_tcs[eq_idx]);
498 new_res_tcs.push(eq_old_tc);
499 old_res_tcs.remove(eq_idx);
501 continue;
502 }
503
504 let gen_fun_map = GEN_FUN_MAP.read().unwrap();
505 let key = ControllerGenKey::new(
506 rule.calculate_strategy.clone(),
507 rule.control_strategy.clone(),
508 );
509 let generator = gen_fun_map.get(&key);
510
511 if generator.is_none() {
512 logging::error!("[FlowRuleManager build_resource_traffic_shaping_controller] Unsupported flow control strategy. Ignoring the rule due to unsupported control behavior in flow::build_resource_traffic_shaping_controller(), rule: {}", rule);
513 continue;
514 }
515 let generator = generator.unwrap();
516
517 let tc = {
518 if reuse_stat_idx != usize::MAX {
519 generator(
520 Arc::clone(&rule),
521 Some(Arc::clone(old_res_tcs[reuse_stat_idx].stat())),
522 )
523 } else {
524 generator(Arc::clone(&rule), None)
525 }
526 };
527
528 if tc.is_err() {
529 logging::error!("[FlowRuleManager build_resource_traffic_shaping_controller] Bad generated traffic controller. Ignoring the rule due to bad generated traffic controller in flow::build_resource_traffic_shaping_controller(), rule: {:?}, error: {:?}", rule, tc);
530 continue;
531 }
532 let tc = tc.unwrap();
533 if reuse_stat_idx != usize::MAX {
534 old_res_tcs.remove(reuse_stat_idx);
536 }
537 new_res_tcs.push(tc);
538 }
539 new_res_tcs
540}
541
542#[cfg(test)]
543mod test {
544 #![allow(clippy::vtable_address_comparisons)]
547
548 use super::*;
549
550 #[test]
551 #[should_panic(expected = "Default control behaviors are not allowed to be modified.")]
552 fn illegal_set() {
553 set_traffic_shaping_generator(
554 CalculateStrategy::Direct,
555 ControlStrategy::Reject,
556 Box::new(
557 |_: Arc<Rule>, _: Option<Arc<StandaloneStat>>| -> Result<Arc<Controller>> {
558 let rule = Arc::new(Rule::default());
559 let stat = generate_stat_for(&rule).unwrap();
560 let tsc = Arc::new(Controller::new(rule, stat));
561 Ok(tsc)
562 },
563 ),
564 )
565 .unwrap();
566 }
567
568 #[test]
569 #[should_panic(expected = "Default control behaviors are not allowed to be removed.")]
570 fn illegal_remove() {
571 remove_traffic_shaping_generator(CalculateStrategy::Direct, ControlStrategy::Reject)
572 .unwrap();
573 }
574
575 #[test]
576 #[ignore]
577 fn set_and_remove_generator() {
578 clear_rules();
579 const STRATEGY: u8 = 1;
580 set_traffic_shaping_generator(
581 CalculateStrategy::Custom(STRATEGY),
582 ControlStrategy::Custom(STRATEGY),
583 Box::new(
584 |_: Arc<Rule>, _: Option<Arc<StandaloneStat>>| -> Result<Arc<Controller>> {
585 let rule = Arc::new(Rule::default());
586 let stat = generate_stat_for(&rule).unwrap();
587 let tsc = Arc::new(Controller::new(rule, stat));
588 Ok(tsc)
589 },
590 ),
591 )
592 .unwrap();
593 let resource = String::from("test-customized-tc");
594 load_rules(vec![Arc::new(Rule {
595 threshold: 20.0,
596 resource: resource.clone(),
597 calculate_strategy: CalculateStrategy::Custom(STRATEGY),
598 control_strategy: ControlStrategy::Custom(STRATEGY),
599 ..Default::default()
600 })]);
601 let key = ControllerGenKey {
602 calculate_strategy: CalculateStrategy::Custom(STRATEGY),
603 control_strategy: ControlStrategy::Custom(STRATEGY),
604 };
605
606 let controller_map = CONTROLLER_MAP.lock().unwrap();
607
608 assert!(GEN_FUN_MAP.read().unwrap().contains_key(&key));
609 assert!(controller_map[&resource].len() > 0);
610 remove_traffic_shaping_generator(
611 CalculateStrategy::Custom(STRATEGY),
612 ControlStrategy::Custom(STRATEGY),
613 )
614 .unwrap();
615 assert!(!GEN_FUN_MAP.read().unwrap().contains_key(&key));
616 drop(controller_map);
617 clear_rules();
618 }
619
620 #[test]
621 #[ignore]
622 fn get_rules1() {
623 clear_rules();
624 let r1 = Arc::new(Rule {
625 resource: "abc1".into(),
626 calculate_strategy: CalculateStrategy::Direct,
627 control_strategy: ControlStrategy::Reject,
628 ..Default::default()
629 });
630 let r2 = Arc::new(Rule {
631 resource: "abc2".into(),
632 calculate_strategy: CalculateStrategy::Direct,
633 control_strategy: ControlStrategy::Throttling,
634 max_queueing_time_ms: 10,
635 stat_interval_ms: 1000,
636 ..Default::default()
637 });
638 load_rules(vec![Arc::clone(&r1), Arc::clone(&r2)]);
639 let rs = get_rules();
640
641 if rs[0].resource == String::from("abc1") {
642 assert_eq!(rs[0], r1);
644 assert_eq!(rs[1], r2);
645 } else {
646 assert_eq!(rs[0], r2);
647 assert_eq!(rs[1], r1);
648 }
649 clear_rules();
650 }
651
652 #[test]
653 #[ignore]
654 fn get_rules2() {
655 clear_rules();
656 let r1 = Arc::new(Rule {
657 resource: "abc1".into(),
658 calculate_strategy: CalculateStrategy::Direct,
659 control_strategy: ControlStrategy::Reject,
660 ..Default::default()
661 });
662 let r2 = Arc::new(Rule {
663 resource: "abc2".into(),
664 calculate_strategy: CalculateStrategy::Direct,
665 control_strategy: ControlStrategy::Throttling,
666 max_queueing_time_ms: 10,
667 stat_interval_ms: 1000,
668 ..Default::default()
669 });
670 load_rules(vec![r1.clone(), r2.clone()]);
671 let rs = get_rules();
672
673 if rs[0].resource == String::from("abc1") {
674 assert_eq!(rs[0], r1);
676 assert_eq!(rs[1], r2);
677 } else {
678 assert_eq!(rs[0], r2);
679 assert_eq!(rs[1], r1);
680 }
681
682 let controller_map = CONTROLLER_MAP.lock().unwrap();
683
684 assert_eq!(1, controller_map["abc2"].len());
685 assert_eq!(false, controller_map["abc2"][0].stat().reuse_global());
686
687 assert!(Arc::ptr_eq(
688 controller_map["abc2"][0].stat().read_only_metric(),
689 NOP_STAT.read_only_metric()
690 ));
691 assert!(Arc::ptr_eq(
692 controller_map["abc2"][0]
693 .stat()
694 .write_only_metric()
695 .unwrap(),
696 NOP_STAT.write_only_metric().unwrap()
697 ));
698 drop(controller_map);
699 clear_rules();
700 }
701
702 #[test]
703 fn generate_stat_for_default_metric_stat() {
704 let r1 = Arc::new(Rule {
705 resource: "abc".into(),
706 calculate_strategy: CalculateStrategy::Direct,
707 control_strategy: ControlStrategy::Reject,
708 threshold: 100.0,
709 relation_strategy: RelationStrategy::Current,
710 ..Default::default()
711 });
712 let bound_stat = generate_stat_for(&r1).unwrap();
713 assert!(bound_stat.reuse_global());
714
715 let res_node = stat::get_resource_node(&String::from("abc")).unwrap();
716 let stat = res_node.default_metric();
717 assert!(Arc::ptr_eq(bound_stat.read_only_metric(), &stat));
718 }
719
720 #[test]
721 fn generate_stat_for_reuse_global_stat() {
722 let r1 = Arc::new(Rule {
723 resource: "abc".into(),
724 calculate_strategy: CalculateStrategy::Direct,
725 control_strategy: ControlStrategy::Reject,
726 stat_interval_ms: 5000,
727 threshold: 100.0,
728 relation_strategy: RelationStrategy::Current,
729 ..Default::default()
730 });
731 let bound_stat = generate_stat_for(&r1).unwrap();
732 assert!(bound_stat.reuse_global());
733 assert!(bound_stat.write_only_metric().is_none());
734
735 let res_node = stat::get_resource_node(&String::from("abc")).unwrap();
736 let stat = res_node.default_metric();
737 assert!(!Arc::ptr_eq(bound_stat.read_only_metric(), &stat));
738 }
739
740 #[test]
741 fn generate_stat_for_standalone_stat() {
742 let r1 = Arc::new(Rule {
743 resource: "abc".into(),
744 calculate_strategy: CalculateStrategy::Direct,
745 control_strategy: ControlStrategy::Reject,
746 stat_interval_ms: 50000,
747 threshold: 100.0,
748 relation_strategy: RelationStrategy::Current,
749 ..Default::default()
750 });
751
752 let stat = generate_stat_for(&r1).unwrap();
753 assert!(!stat.reuse_global());
754 assert!(stat.write_only_metric().is_some());
755 }
756
757 #[test]
758 #[ignore]
759 fn build_controller1() {
760 clear_rules();
761 let r1 = Arc::new(Rule {
762 resource: "abc1".into(),
763 threshold: 100.0,
764 relation_strategy: RelationStrategy::Current,
765 calculate_strategy: CalculateStrategy::Direct,
766 control_strategy: ControlStrategy::Reject,
767 ..Default::default()
768 });
769
770 let r2 = Arc::new(Rule {
771 resource: "abc1".into(),
772 threshold: 200.0,
773 relation_strategy: RelationStrategy::Current,
774 calculate_strategy: CalculateStrategy::Direct,
775 control_strategy: ControlStrategy::Throttling,
776 max_queueing_time_ms: 10,
777 ..Default::default()
778 });
779
780 let mut controller_map = CONTROLLER_MAP.lock().unwrap();
781 assert_eq!(
782 0,
783 controller_map
784 .entry(String::from("abc1"))
785 .or_insert(Vec::new())
786 .len()
787 );
788
789 let mut placeholder = Vec::new();
790 let mut set = HashSet::new();
791 set.insert(Arc::clone(&r1));
792 set.insert(Arc::clone(&r2));
793 let tcs = build_resource_traffic_shaping_controller(
794 &String::from("abc1"),
795 &set,
796 controller_map.get_mut("abc1").unwrap_or(&mut placeholder),
797 );
798 assert_eq!(2, tcs.len());
799 assert!(&r1 == tcs[0].rule() || &r2 == tcs[0].rule());
800 assert!(&r1 == tcs[1].rule() || &r2 == tcs[1].rule());
801 drop(controller_map);
802 clear_rules();
803 }
804
805 #[test]
806 #[ignore]
807 fn build_controller2() {
808 clear_rules();
809 let r0 = Arc::new(Rule {
811 resource: "abc1".into(),
812 threshold: 100.0,
813 relation_strategy: RelationStrategy::Current,
814 calculate_strategy: CalculateStrategy::Direct,
815 control_strategy: ControlStrategy::Throttling,
816 stat_interval_ms: 1000,
817 ..Default::default()
818 });
819 let r1 = Arc::new(Rule {
821 resource: "abc1".into(),
822 threshold: 100.0,
823 relation_strategy: RelationStrategy::Current,
824 calculate_strategy: CalculateStrategy::Direct,
825 control_strategy: ControlStrategy::Reject,
826 stat_interval_ms: 1000,
827 ..Default::default()
828 });
829 let r2 = Arc::new(Rule {
831 resource: "abc1".into(),
832 threshold: 200.0,
833 relation_strategy: RelationStrategy::Current,
834 calculate_strategy: CalculateStrategy::Direct,
835 control_strategy: ControlStrategy::Reject,
836 max_queueing_time_ms: 10,
837 stat_interval_ms: 2000,
838 ..Default::default()
839 });
840 let r3 = Arc::new(Rule {
842 resource: "abc1".into(),
843 threshold: 300.0,
844 relation_strategy: RelationStrategy::Current,
845 calculate_strategy: CalculateStrategy::Direct,
846 control_strategy: ControlStrategy::Reject,
847 max_queueing_time_ms: 10,
848 stat_interval_ms: 5000,
849 ..Default::default()
850 });
851 let r4 = Arc::new(Rule {
853 resource: "abc1".into(),
854 threshold: 400.0,
855 relation_strategy: RelationStrategy::Current,
856 calculate_strategy: CalculateStrategy::Direct,
857 control_strategy: ControlStrategy::Reject,
858 max_queueing_time_ms: 10,
859 stat_interval_ms: 50000,
860 ..Default::default()
861 });
862
863 let s0 = generate_stat_for(&r0).unwrap();
864 let fake_tc0 = Arc::new(Controller::new(Arc::clone(&r0), s0));
865 let stat0 = fake_tc0.stat();
866 assert!(Arc::ptr_eq(&NOP_STAT, stat0));
867 assert_eq!(false, stat0.reuse_global());
868 assert!(stat0.write_only_metric().is_some());
869
870 let s1 = generate_stat_for(&r1).unwrap();
871 let fake_tc1 = Arc::new(Controller::new(Arc::clone(&r1), s1));
872 let stat1 = fake_tc1.stat();
873 assert!(!Arc::ptr_eq(&NOP_STAT, stat1));
874 assert_eq!(true, stat1.reuse_global());
875 assert!(stat1.write_only_metric().is_none());
876
877 let s2 = generate_stat_for(&r2).unwrap();
878 let fake_tc2 = Arc::new(Controller::new(Arc::clone(&r2), s2));
879 let stat2 = fake_tc2.stat();
880 assert!(!Arc::ptr_eq(&NOP_STAT, stat2));
881 assert_eq!(true, stat2.reuse_global());
882 assert!(stat2.write_only_metric().is_none());
883
884 let s3 = generate_stat_for(&r3).unwrap();
885 let fake_tc3 = Arc::new(Controller::new(Arc::clone(&r3), s3));
886 let stat3 = fake_tc3.stat();
887 assert!(!Arc::ptr_eq(&NOP_STAT, stat3));
888 assert_eq!(true, stat3.reuse_global());
889 assert!(stat3.write_only_metric().is_none());
890
891 let s4 = generate_stat_for(&r4).unwrap();
892 let fake_tc4 = Arc::new(Controller::new(Arc::clone(&r4), s4));
893 let stat4 = fake_tc4.stat();
894 assert!(!Arc::ptr_eq(&NOP_STAT, stat4));
895 assert_eq!(false, stat4.reuse_global());
896 assert!(stat4.write_only_metric().is_some());
897
898 let mut controller_map = CONTROLLER_MAP.lock().unwrap();
899
900 controller_map.insert(
901 "abc1".into(),
902 vec![
903 Arc::clone(&fake_tc0),
904 Arc::clone(&fake_tc1),
905 Arc::clone(&fake_tc2),
906 Arc::clone(&fake_tc3),
907 Arc::clone(&fake_tc4),
908 ],
909 );
910 assert_eq!(5, controller_map["abc1"].len());
911 let r12 = Arc::new(Rule {
913 resource: "abc1".into(),
914 threshold: 300.0,
915 relation_strategy: RelationStrategy::Current,
916 calculate_strategy: CalculateStrategy::Direct,
917 control_strategy: ControlStrategy::Reject,
918 stat_interval_ms: 1000,
919 ..Default::default()
920 });
921 let r22 = Arc::new(Rule {
923 resource: "abc1".into(),
924 threshold: 400.0,
925 relation_strategy: RelationStrategy::Current,
926 calculate_strategy: CalculateStrategy::Direct,
927 control_strategy: ControlStrategy::Reject,
928 max_queueing_time_ms: 10,
929 stat_interval_ms: 10000,
930 ..Default::default()
931 });
932 let r32 = Arc::new(Rule {
934 resource: "abc1".into(),
935 threshold: 300.0,
936 relation_strategy: RelationStrategy::Current,
937 calculate_strategy: CalculateStrategy::Direct,
938 control_strategy: ControlStrategy::Reject,
939 max_queueing_time_ms: 10,
940 stat_interval_ms: 5000,
941 ..Default::default()
942 });
943 let r42 = Arc::new(Rule {
945 resource: "abc1".into(),
946 threshold: 4000.0,
947 relation_strategy: RelationStrategy::Current,
948 calculate_strategy: CalculateStrategy::Direct,
949 control_strategy: ControlStrategy::Reject,
950 max_queueing_time_ms: 10,
951 stat_interval_ms: 50000,
952 ..Default::default()
953 });
954
955 let mut set = HashSet::new();
956 set.insert(Arc::clone(&r12));
957 set.insert(Arc::clone(&r22));
958 set.insert(Arc::clone(&r32));
959 set.insert(Arc::clone(&r42));
960 let tcs = build_resource_traffic_shaping_controller(
961 &String::from("abc1"),
962 &set,
963 controller_map.get_mut("abc1").unwrap(),
964 );
965
966 assert_eq!(4, tcs.len());
967
968 drop(controller_map);
972 clear_rules();
973 }
974
975 #[test]
976 #[ignore]
977 fn load_resource_by_rule() {
978 let r11 = Arc::new(Rule {
979 resource: "abc1".into(),
980 calculate_strategy: CalculateStrategy::Direct,
981 control_strategy: ControlStrategy::Reject,
982 threshold: 10.0,
983 ..Default::default()
984 });
985 let r12 = Arc::new(Rule {
986 resource: "abc1".into(),
987 calculate_strategy: CalculateStrategy::Direct,
988 control_strategy: ControlStrategy::Reject,
989 threshold: 20.0,
990 ..Default::default()
991 });
992 let r21 = Arc::new(Rule {
993 resource: "abc2".into(),
994 threshold: 10.0,
995 calculate_strategy: CalculateStrategy::Direct,
996 control_strategy: ControlStrategy::Reject,
997 ..Default::default()
998 });
999 let r22 = Arc::new(Rule {
1000 resource: "abc2".into(),
1001 threshold: 20.0,
1002 calculate_strategy: CalculateStrategy::Direct,
1003 control_strategy: ControlStrategy::Reject,
1004 ..Default::default()
1005 });
1006
1007 load_rules(vec![r11.clone(), r12.clone(), r21.clone(), r22.clone()]);
1008
1009 let result = load_rules_of_resource(&String::from(""), vec![r11.clone(), r12.clone()]);
1010 assert!(result.is_err());
1011
1012 let result = load_rules_of_resource(&String::from("abc1"), vec![r11, r12]);
1013 assert!(!result.unwrap());
1014
1015 let result = load_rules_of_resource(&String::from("abc1"), vec![]);
1016 assert!(result.unwrap());
1017
1018 let rule_map = RULE_MAP.lock().unwrap();
1019 let controller_map = CONTROLLER_MAP.lock().unwrap();
1020
1021 assert_eq!(0, controller_map.get("abc1").unwrap_or(&Vec::new()).len());
1022 assert_eq!(0, rule_map.get("abc1").unwrap_or(&HashSet::new()).len());
1023 assert_eq!(2, controller_map["abc2"].len());
1024 assert_eq!(2, rule_map["abc2"].len());
1025 }
1026
1027 #[test]
1028 #[ignore]
1029 fn clear_rules_by_resource() {
1030 let r11 = Arc::new(Rule {
1031 resource: "abc1".into(),
1032 calculate_strategy: CalculateStrategy::Direct,
1033 control_strategy: ControlStrategy::Reject,
1034 threshold: 10.0,
1035 ..Default::default()
1036 });
1037 let r12 = Arc::new(Rule {
1038 resource: "abc1".into(),
1039 calculate_strategy: CalculateStrategy::Direct,
1040 control_strategy: ControlStrategy::Reject,
1041 threshold: 20.0,
1042 ..Default::default()
1043 });
1044 let r21 = Arc::new(Rule {
1045 resource: "abc2".into(),
1046 threshold: 10.0,
1047 calculate_strategy: CalculateStrategy::Direct,
1048 control_strategy: ControlStrategy::Reject,
1049 ..Default::default()
1050 });
1051 let r22 = Arc::new(Rule {
1052 resource: "abc2".into(),
1053 threshold: 20.0,
1054 calculate_strategy: CalculateStrategy::Direct,
1055 control_strategy: ControlStrategy::Reject,
1056 ..Default::default()
1057 });
1058
1059 load_rules(vec![r11, r12, r21, r22]);
1060 clear_rules_of_resource(&String::from("abc1"));
1061
1062 let rule_map = RULE_MAP.lock().unwrap();
1063 let controller_map = CONTROLLER_MAP.lock().unwrap();
1064
1065 assert_eq!(0, controller_map.get("abc1").unwrap_or(&Vec::new()).len());
1066 assert_eq!(0, rule_map.get("abc1").unwrap_or(&HashSet::new()).len());
1067 assert_eq!(2, controller_map["abc2"].len());
1068 assert_eq!(2, rule_map["abc2"].len());
1069 drop(controller_map);
1070 drop(rule_map);
1071 clear_rules();
1072 }
1073}