1use crate::{base::SentinelRule, logging, system_metric, Error};
2use serde::{Deserialize, Serialize};
3use serde_json;
4use std::fmt;
5use std::hash::{Hash, Hasher};
6cfg_k8s! {
7 use schemars::JsonSchema;
8 use kube::CustomResource;
9}
10
11pub type Id = String;
12
13#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
15#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize)]
16pub enum RelationStrategy {
17 Current,
19 Associated,
21}
22
23impl Default for RelationStrategy {
24 fn default() -> RelationStrategy {
25 RelationStrategy::Current
26 }
27}
28
29#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
30#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
31pub enum CalculateStrategy {
32 Direct,
33 WarmUp,
34 MemoryAdaptive,
35 #[serde(skip)]
36 Custom(u8),
37}
38
39impl Default for CalculateStrategy {
40 fn default() -> CalculateStrategy {
41 CalculateStrategy::Direct
42 }
43}
44
45#[cfg_attr(feature = "ds_k8s", derive(JsonSchema))]
46#[derive(Debug, Copy, Clone, PartialEq, Serialize, Deserialize, Hash, Eq)]
47pub enum ControlStrategy {
48 Reject,
49 Throttling,
52 #[serde(skip)]
53 Custom(u8),
54}
55
56impl Default for ControlStrategy {
57 fn default() -> ControlStrategy {
58 ControlStrategy::Reject
59 }
60}
61
62#[cfg_attr(
63 feature = "ds_k8s",
64 derive(CustomResource, JsonSchema),
65 kube(
66 group = "rust.datasource.sentinel.io",
67 version = "v1alpha1",
68 kind = "FlowResource",
69 namespaced
70 )
71)]
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(default)]
74pub struct Rule {
76 pub id: Id,
78 pub resource: String,
80 pub ref_resource: String,
81 pub calculate_strategy: CalculateStrategy,
82 pub control_strategy: ControlStrategy,
83 pub relation_strategy: RelationStrategy,
84 pub threshold: f64,
87 pub warm_up_period_sec: u32,
88 pub warm_up_cold_factor: u32,
89 pub max_queueing_time_ms: u32,
93 pub stat_interval_ms: u32,
98
99 pub low_mem_usage_threshold: u64,
105 pub high_mem_usage_threshold: u64,
106 pub mem_low_water_mark: u64,
107 pub mem_high_water_mark: u64,
108}
109
110impl Hash for Rule {
111 fn hash<H: Hasher>(&self, state: &mut H) {
112 self.id.hash(state);
113 self.resource.hash(state);
114 self.ref_resource.hash(state);
115 }
116}
117
118impl Default for Rule {
119 fn default() -> Self {
120 Rule {
121 #[cfg(target_arch = "wasm32")]
122 id: String::new(),
123 #[cfg(not(target_arch = "wasm32"))]
124 id: uuid::Uuid::new_v4().to_string(),
125 resource: String::default(),
126 ref_resource: String::default(),
127 calculate_strategy: CalculateStrategy::default(),
128 control_strategy: ControlStrategy::default(),
129 relation_strategy: RelationStrategy::default(),
130 threshold: 0.0,
131 warm_up_period_sec: 0,
132 warm_up_cold_factor: 0,
133 max_queueing_time_ms: 0,
134 stat_interval_ms: 0,
135 low_mem_usage_threshold: 0,
136 high_mem_usage_threshold: 0,
137 mem_low_water_mark: 0,
138 mem_high_water_mark: 0,
139 }
140 }
141}
142
143impl Rule {
144 pub fn is_stat_reusable(&self, other: &Self) -> bool {
145 self.resource == other.resource
146 && self.relation_strategy == other.relation_strategy
147 && self.ref_resource == other.ref_resource
148 && self.stat_interval_ms == other.stat_interval_ms
149 && self.need_statistic()
150 && other.need_statistic()
151 }
152
153 pub fn need_statistic(&self) -> bool {
154 return self.calculate_strategy == CalculateStrategy::WarmUp
155 || self.control_strategy == ControlStrategy::Reject;
156 }
157}
158
159impl SentinelRule for Rule {
160 fn resource_name(&self) -> String {
161 self.resource.clone()
162 }
163
164 fn is_valid(&self) -> crate::Result<()> {
165 if self.resource.len() == 0 {
166 return Err(Error::msg("empty resource name"));
167 }
168 if self.threshold < 0.0 {
169 return Err(Error::msg("negative threshold"));
170 }
171 if self.relation_strategy == RelationStrategy::Associated && self.ref_resource.len() == 0 {
172 return Err(Error::msg("ref_resource must be non empty when relation_strategy is RelationStrategy::Associated"));
173 }
174 if self.calculate_strategy == CalculateStrategy::WarmUp {
175 if self.warm_up_period_sec == 0 {
176 return Err(Error::msg("warm_up_period_sec must be great than 0"));
177 }
178 if self.warm_up_cold_factor == 1 {
179 return Err(Error::msg("warm_up_cold_factor must be great than 1"));
180 }
181 }
182 if self.stat_interval_ms > 10 * 60 * 1000 {
183 logging::info!(
184 "stat_interval_ms is great than 10 minutes, less than 10 minutes is recommended."
185 )
186 }
187 if self.calculate_strategy == CalculateStrategy::MemoryAdaptive {
188 if self.mem_low_water_mark == 0
189 || self.mem_high_water_mark == 0
190 || self.high_mem_usage_threshold == 0
191 || self.low_mem_usage_threshold == 0
192 {
193 return Err(Error::msg(
194 "memory water mark or usage threshold setting to 0",
195 ));
196 }
197 if self.high_mem_usage_threshold >= self.low_mem_usage_threshold {
198 return Err(Error::msg(
199 "self.high_mem_usage_threshold >= self.low_mem_usage_threshold",
200 ));
201 }
202 if self.mem_high_water_mark > system_metric::get_total_memory_size() {
203 return Err(Error::msg("self.mem_high_water_mark should not be greater than current system's total memory size"));
204 }
205 if self.mem_low_water_mark >= self.mem_high_water_mark {
206 return Err(Error::msg(
208 "self.mem_low_water_mark >= self.mem_high_water_mark",
209 ));
210 }
211 }
212 Ok(())
213 }
214}
215
216impl PartialEq for Rule {
217 fn eq(&self, other: &Self) -> bool {
218 self.resource == other.resource
220 && self.ref_resource == other.ref_resource
221 && self.calculate_strategy == other.calculate_strategy
222 && self.control_strategy == other.control_strategy
223 && self.relation_strategy == other.relation_strategy
224 && self.threshold == other.threshold
225 && self.warm_up_period_sec == other.warm_up_period_sec
226 && self.warm_up_cold_factor == other.warm_up_cold_factor
227 && self.max_queueing_time_ms == other.max_queueing_time_ms
228 && self.stat_interval_ms == other.stat_interval_ms
229 && self.low_mem_usage_threshold == other.low_mem_usage_threshold
230 && self.high_mem_usage_threshold == other.high_mem_usage_threshold
231 && self.mem_low_water_mark == other.mem_low_water_mark
232 && self.mem_high_water_mark == other.mem_high_water_mark
233 }
234}
235
236impl Eq for Rule {}
237
238impl fmt::Display for Rule {
239 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240 let fmtted = serde_json::to_string_pretty(self).unwrap();
241 write!(f, "{}", fmtted)
242 }
243}
244
245#[cfg(test)]
246mod test {
247 use super::*;
248
249 #[test]
250 fn need_statistic() {
251 let r1 = Rule {
253 resource: "abc1".into(),
254 threshold: 100.0,
255 relation_strategy: RelationStrategy::Current,
256 calculate_strategy: CalculateStrategy::Direct,
257 control_strategy: ControlStrategy::Reject,
258 stat_interval_ms: 1000,
259 ..Default::default()
260 };
261 let r2 = Rule {
263 resource: "abc1".into(),
264 threshold: 200.0,
265 relation_strategy: RelationStrategy::Current,
266 calculate_strategy: CalculateStrategy::Direct,
267 control_strategy: ControlStrategy::Throttling,
268 max_queueing_time_ms: 10,
269 stat_interval_ms: 2000,
270 ..Default::default()
271 };
272 let r3 = Rule {
274 resource: "abc1".into(),
275 threshold: 300.0,
276 relation_strategy: RelationStrategy::Current,
277 calculate_strategy: CalculateStrategy::WarmUp,
278 control_strategy: ControlStrategy::Reject,
279 max_queueing_time_ms: 10,
280 stat_interval_ms: 5000,
281 ..Default::default()
282 };
283 let r4 = Rule {
285 resource: "abc1".into(),
286 threshold: 400.0,
287 relation_strategy: RelationStrategy::Current,
288 calculate_strategy: CalculateStrategy::WarmUp,
289 control_strategy: ControlStrategy::Throttling,
290 max_queueing_time_ms: 10,
291 stat_interval_ms: 50000,
292 ..Default::default()
293 };
294
295 assert!(r1.need_statistic());
296 assert!(!r2.need_statistic());
297 assert!(r3.need_statistic());
298 assert!(r4.need_statistic());
299 }
300
301 #[test]
302 fn is_stat_reusable() {
303 let r11 = Rule {
305 resource: "abc1".into(),
306 threshold: 100.0,
307 relation_strategy: RelationStrategy::Current,
308 calculate_strategy: CalculateStrategy::Direct,
309 control_strategy: ControlStrategy::Reject,
310 stat_interval_ms: 1000,
311 ..Default::default()
312 };
313 let r12 = Rule {
314 resource: "abc2".into(),
315 threshold: 100.0,
316 relation_strategy: RelationStrategy::Current,
317 calculate_strategy: CalculateStrategy::Direct,
318 control_strategy: ControlStrategy::Reject,
319 stat_interval_ms: 1000,
320 ..Default::default()
321 };
322 assert!(!r11.is_stat_reusable(&r12));
323
324 let r21 = Rule {
326 resource: "abc1".into(),
327 threshold: 100.0,
328 relation_strategy: RelationStrategy::Current,
329 calculate_strategy: CalculateStrategy::Direct,
330 control_strategy: ControlStrategy::Reject,
331 stat_interval_ms: 1000,
332 ..Default::default()
333 };
334 let r22 = Rule {
335 resource: "abc1".into(),
336 threshold: 100.0,
337 relation_strategy: RelationStrategy::Associated,
338 ref_resource: "abc3".into(),
339 calculate_strategy: CalculateStrategy::Direct,
340 control_strategy: ControlStrategy::Reject,
341 stat_interval_ms: 1000,
342 ..Default::default()
343 };
344 assert!(!r21.is_stat_reusable(&r22));
345
346 let r31 = Rule {
348 resource: "abc1".into(),
349 threshold: 100.0,
350 relation_strategy: RelationStrategy::Associated,
351 ref_resource: "abc3".into(),
352 calculate_strategy: CalculateStrategy::Direct,
353 control_strategy: ControlStrategy::Reject,
354 stat_interval_ms: 1000,
355 ..Default::default()
356 };
357 let r32 = Rule {
358 resource: "abc1".into(),
359 threshold: 100.0,
360 relation_strategy: RelationStrategy::Associated,
361 ref_resource: "abc4".into(),
362 calculate_strategy: CalculateStrategy::Direct,
363 control_strategy: ControlStrategy::Reject,
364 stat_interval_ms: 1000,
365 ..Default::default()
366 };
367 assert!(!r31.is_stat_reusable(&r32));
368
369 let r41 = Rule {
371 resource: "abc1".into(),
372 threshold: 100.0,
373 relation_strategy: RelationStrategy::Current,
374 calculate_strategy: CalculateStrategy::Direct,
375 control_strategy: ControlStrategy::Reject,
376 stat_interval_ms: 1000,
377 ..Default::default()
378 };
379 let r42 = Rule {
380 resource: "abc1".into(),
381 threshold: 100.0,
382 relation_strategy: RelationStrategy::Current,
383 calculate_strategy: CalculateStrategy::Direct,
384 control_strategy: ControlStrategy::Reject,
385 stat_interval_ms: 2000,
386 ..Default::default()
387 };
388 assert!(!r41.is_stat_reusable(&r42));
389
390 let r51 = Rule {
392 resource: "abc1".into(),
393 threshold: 100.0,
394 relation_strategy: RelationStrategy::Current,
395 calculate_strategy: CalculateStrategy::Direct,
396 control_strategy: ControlStrategy::Reject,
397 stat_interval_ms: 1000,
398 ..Default::default()
399 };
400 let r52 = Rule {
401 resource: "abc1".into(),
402 threshold: 100.0,
403 relation_strategy: RelationStrategy::Current,
404 calculate_strategy: CalculateStrategy::Direct,
405 control_strategy: ControlStrategy::Throttling,
406 stat_interval_ms: 1000,
407 ..Default::default()
408 };
409 assert!(!r51.is_stat_reusable(&r52));
410
411 let r61 = Rule {
413 resource: "abc1".into(),
414 threshold: 100.0,
415 relation_strategy: RelationStrategy::Current,
416 calculate_strategy: CalculateStrategy::Direct,
417 control_strategy: ControlStrategy::Reject,
418 stat_interval_ms: 1000,
419 ..Default::default()
420 };
421 let r62 = Rule {
422 resource: "abc1".into(),
423 threshold: 200.0,
424 relation_strategy: RelationStrategy::Current,
425 calculate_strategy: CalculateStrategy::Direct,
426 control_strategy: ControlStrategy::Reject,
427 stat_interval_ms: 1000,
428 ..Default::default()
429 };
430 assert!(r61.is_stat_reusable(&r62));
431 }
432
433 #[test]
434 fn is_valid_flow_rule1() {
435 let bad_rule1 = Rule {
436 threshold: 1.0,
437 resource: "".into(),
438 ..Default::default()
439 };
440 let bad_rule2 = Rule {
441 threshold: -1.9,
442 resource: "test".into(),
443 ..Default::default()
444 };
445 let bad_rule3 = Rule {
446 threshold: 5.0,
447 resource: "test".into(),
448 calculate_strategy: CalculateStrategy::WarmUp,
449 control_strategy: ControlStrategy::Reject,
450 ..Default::default()
451 };
452 let bad_rule4 = Rule {
453 threshold: 5.0,
454 resource: "test".into(),
455 calculate_strategy: CalculateStrategy::WarmUp,
456 control_strategy: ControlStrategy::Reject,
457 stat_interval_ms: 6000000,
458 ..Default::default()
459 };
460
461 let good_rule1 = Rule {
462 threshold: 10.0,
463 resource: "test".into(),
464 calculate_strategy: CalculateStrategy::WarmUp,
465 control_strategy: ControlStrategy::Throttling,
466 warm_up_period_sec: 10,
467 max_queueing_time_ms: 10,
468 stat_interval_ms: 1000,
469 ..Default::default()
470 };
471 let good_rule2 = Rule {
472 threshold: 10.0,
473 resource: "test".into(),
474 calculate_strategy: CalculateStrategy::WarmUp,
475 control_strategy: ControlStrategy::Throttling,
476 warm_up_period_sec: 10,
477 max_queueing_time_ms: 0,
478 stat_interval_ms: 1000,
479 ..Default::default()
480 };
481
482 assert!(bad_rule1.is_valid().is_err());
483 assert!(bad_rule2.is_valid().is_err());
484 assert!(bad_rule3.is_valid().is_err());
485 assert!(bad_rule4.is_valid().is_err());
486
487 assert!(good_rule1.is_valid().is_ok());
488 assert!(good_rule2.is_valid().is_ok());
489 }
490
491 #[test]
492 fn is_valid_flow_rule2() {
493 let mut rule = Rule {
494 resource: "hello0".into(),
495 calculate_strategy: CalculateStrategy::MemoryAdaptive,
496 control_strategy: ControlStrategy::Reject,
497 stat_interval_ms: 10,
498 low_mem_usage_threshold: 2,
499 high_mem_usage_threshold: 1,
500 mem_low_water_mark: 1,
501 mem_high_water_mark: 2,
502 ..Default::default()
503 };
504 assert!(rule.is_valid().is_ok());
505
506 rule.low_mem_usage_threshold = 9;
507 rule.high_mem_usage_threshold = 9;
508 assert!(rule.is_valid().is_err());
509 rule.low_mem_usage_threshold = 10;
510 assert!(rule.is_valid().is_ok());
511
512 rule.mem_low_water_mark = 0;
513 assert!(rule.is_valid().is_err());
514
515 rule.mem_low_water_mark = 100 * 1024;
516 rule.mem_high_water_mark = 300 * 1024;
517 assert!(rule.is_valid().is_ok());
518
519 rule.mem_high_water_mark = 0;
520 assert!(rule.is_valid().is_err());
521
522 rule.mem_high_water_mark = 300 * 1024;
523 assert!(rule.is_valid().is_ok());
524
525 rule.mem_low_water_mark = 100 * 1024;
526 rule.mem_high_water_mark = 30 * 1024;
527 assert!(rule.is_valid().is_err());
528
529 rule.mem_high_water_mark = 300 * 1024;
530 assert!(rule.is_valid().is_ok());
531 }
532}