elfo_core/dumping/
control.rs

1use std::sync::Arc;
2
3use arc_swap::ArcSwap;
4use parking_lot::Mutex;
5use smallvec::SmallVec;
6
7use elfo_utils::{CachePadded, RateLimit, RateLimiter};
8
9use super::{
10    config::DumpingConfig,
11    sequence_no::{SequenceNo, SequenceNoGenerator},
12};
13
14#[stability::unstable]
15#[derive(Default)]
16pub struct DumpingControl {
17    config: Mutex<DumpingConfig>,
18    sequence_no_gen: CachePadded<SequenceNoGenerator>,
19    classes: ArcSwap<SmallVec<[PerClass; 1]>>, // TODO: use `SecondaryMap`?
20}
21
22#[derive(Clone)]
23struct PerClass {
24    class: &'static str,
25    disabled: bool,
26    limiter: Arc<CachePadded<RateLimiter>>,
27}
28
29impl PerClass {
30    fn new(class: &'static str) -> Self {
31        Self {
32            class,
33            disabled: true,
34            limiter: Default::default(),
35        }
36    }
37
38    fn with_config(&self, config: &DumpingConfig) -> Self {
39        let limiter = self.limiter.clone();
40        limiter.configure(RateLimit::Rps(config.max_rate));
41
42        Self {
43            class: self.class,
44            disabled: config.disabled,
45            limiter,
46        }
47    }
48
49    fn check(&self) -> CheckResult {
50        if self.disabled {
51            CheckResult::NotInterested
52        } else if self.limiter.acquire() {
53            CheckResult::Passed
54        } else {
55            CheckResult::Limited
56        }
57    }
58}
59
60impl DumpingControl {
61    pub(crate) fn configure(&self, config: &DumpingConfig) {
62        // All structural updates must be performed under the lock.
63        let mut config_lock = self.config.lock();
64        *config_lock = config.clone();
65
66        let new_classes = self
67            .classes
68            .load()
69            .iter()
70            .map(|class| class.with_config(config))
71            .collect();
72
73        self.classes.store(Arc::new(new_classes));
74    }
75
76    pub(crate) fn next_sequence_no(&self) -> SequenceNo {
77        self.sequence_no_gen.generate()
78    }
79
80    #[stability::unstable]
81    pub fn check(&self, class: &'static str) -> CheckResult {
82        if let Some(per_class) = find_class(&self.classes.load(), class) {
83            per_class.check()
84        } else {
85            self.add_class(class);
86            find_class(&self.classes.load(), class)
87                .expect("absent class")
88                .check()
89        }
90    }
91
92    #[cold]
93    #[inline(never)]
94    fn add_class(&self, class: &'static str) {
95        // All structural updates must be performed under the lock.
96        let config = self.config.lock();
97        let classes = self.classes.load();
98
99        // Check again under the lock.
100        if find_class(&classes, class).is_some() {
101            return;
102        }
103
104        let mut new_classes = (**classes).clone();
105        new_classes.push(PerClass::new(class).with_config(&config));
106        self.classes.store(Arc::new(new_classes));
107    }
108}
109
110#[stability::unstable]
111pub enum CheckResult {
112    Passed,
113    NotInterested,
114    Limited,
115}
116
117fn find_class<'a>(classes: &'a [PerClass], class: &'static str) -> Option<&'a PerClass> {
118    classes.iter().find(|c| c.class == class)
119}