aptos_logger_link/
sample.rs1use std::{
7 sync::atomic::{AtomicU64, Ordering},
8 time::{Duration, SystemTime},
9};
10
11#[derive(Debug)]
13pub enum SampleRate {
14 Duration(Duration),
17 Frequency(u64),
21 Always,
23}
24
25pub struct Sampling {
27 rate: SampleRate,
28 state: AtomicU64,
29}
30
31impl Sampling {
32 pub const fn new(rate: SampleRate) -> Self {
33 Self {
34 rate,
35 state: AtomicU64::new(0),
36 }
37 }
38
39 pub fn sample(&self) -> bool {
40 match &self.rate {
41 SampleRate::Duration(rate) => Self::sample_duration(rate, &self.state),
42 SampleRate::Frequency(rate) => Self::sample_frequency(*rate, &self.state),
43 SampleRate::Always => true,
44 }
45 }
46
47 fn sample_frequency(rate: u64, count: &AtomicU64) -> bool {
48 let previous_count = count
49 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
50 let new_count = if count == 0 {
51 rate.saturating_sub(1)
52 } else {
53 count.saturating_sub(1)
54 };
55 Some(new_count)
56 })
57 .expect("Closure should always returns 'Some'. This is a Bug.");
58
59 previous_count == 0
60 }
61
62 fn sample_duration(rate: &Duration, last_sample: &AtomicU64) -> bool {
63 let rate = rate.as_secs();
64 let now = SystemTime::now()
66 .duration_since(SystemTime::UNIX_EPOCH)
67 .expect("SystemTime before UNIX EPOCH!")
68 .as_secs();
69
70 last_sample
71 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |last_sample| {
72 if now.saturating_sub(last_sample) >= rate {
73 Some(now)
74 } else {
75 None
76 }
77 })
78 .is_ok()
79 }
80}
81
82#[macro_export]
85macro_rules! sample {
86 ($sample_rate:expr, $($args:expr)+ ,) => {
87 $crate::sample!($sample_rate, $($args)+);
88 };
89
90 ($sample_rate:expr, $($args:tt)+) => {{
91 static SAMPLING: $crate::sample::Sampling = $crate::sample::Sampling::new($sample_rate);
92 if SAMPLING.sample() {
93 $($args)+
94 }
95 }};
96}
97
98#[cfg(test)]
99mod tests {
100 use super::*;
101
102 #[test]
103 fn frequency() {
104 let sampling = Sampling::new(SampleRate::Frequency(10));
106 let mut v = Vec::new();
107 for i in 0..=25 {
108 if sampling.sample() {
109 v.push(i);
110 }
111 }
112
113 assert_eq!(v, vec![0, 10, 20]);
114 }
115
116 #[test]
117 fn always() {
118 let sampling = Sampling::new(SampleRate::Always);
120 let mut v = Vec::new();
121 for i in 0..5 {
122 if sampling.sample() {
123 v.push(i);
124 }
125 }
126
127 assert_eq!(v, vec![0, 1, 2, 3, 4]);
128 }
129
130 #[ignore]
131 #[test]
132 fn duration() {
133 let sampling = Sampling::new(SampleRate::Duration(Duration::from_secs(1)));
135 let mut v = Vec::new();
136 for i in 0..5 {
137 if sampling.sample() {
138 v.push(i);
139 }
140
141 std::thread::sleep(Duration::from_millis(500));
142 }
143
144 assert_eq!(v.len(), 2);
145 }
146
147 #[test]
148 fn macro_expansion() {
149 for i in 0..10 {
150 sample!(
151 SampleRate::Frequency(2),
152 println!("loooooooooooooooooooooooooong hello {}", i),
153 );
154
155 sample!(SampleRate::Frequency(2), {
156 println!("hello {}", i);
157 });
158
159 sample!(SampleRate::Frequency(2), println!("hello {}", i));
160
161 sample! {
162 SampleRate::Frequency(2),
163
164 for j in 10..20 {
165 println!("hello {}", j);
166 }
167 }
168 }
169 }
170
171 #[test]
172 fn threaded() {
173 fn work() -> usize {
174 let mut count = 0;
175
176 for _ in 0..1000 {
177 sample!(SampleRate::Frequency(5), count += 1);
178 }
179
180 count
181 }
182
183 let mut handles = Vec::new();
184 for _ in 0..10 {
185 handles.push(std::thread::spawn(work));
186 }
187
188 let mut count = 0;
189 for handle in handles {
190 count += handle.join().unwrap();
191 }
192
193 assert_eq!(count, 2000);
194 }
195}