aptos_logger_link/
sample.rs

1// Copyright (c) Aptos
2// SPDX-License-Identifier: Apache-2.0
3
4//! Periodic sampling for logs, metrics, and other use cases through a simple macro
5
6use std::{
7    sync::atomic::{AtomicU64, Ordering},
8    time::{Duration, SystemTime},
9};
10
11/// The rate at which a `sample!` macro will run it's given function
12#[derive(Debug)]
13pub enum SampleRate {
14    /// Only sample a single time during a window of time. This rate only has a resolution in
15    /// seconds.
16    Duration(Duration),
17    /// Sample based on the frequency of the event. The provided u64 is the inverse of the
18    /// frequency (1/x), for example Frequency(2) means that 1 out of every 2 events will be
19    /// sampled (1/2).
20    Frequency(u64),
21    /// Always Sample
22    Always,
23}
24
25/// An internal struct that can be checked if a sample is ready for the `sample!` macro
26pub struct Sampling {
27    rate: SampleRate,
28    state: AtomicU64,
29}
30
31impl Sampling {
32    pub const fn new(rate: SampleRate) -> Self {
33        Self {
34            rate,
35            state: AtomicU64::new(0),
36        }
37    }
38
39    pub fn sample(&self) -> bool {
40        match &self.rate {
41            SampleRate::Duration(rate) => Self::sample_duration(rate, &self.state),
42            SampleRate::Frequency(rate) => Self::sample_frequency(*rate, &self.state),
43            SampleRate::Always => true,
44        }
45    }
46
47    fn sample_frequency(rate: u64, count: &AtomicU64) -> bool {
48        let previous_count = count
49            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |count| {
50                let new_count = if count == 0 {
51                    rate.saturating_sub(1)
52                } else {
53                    count.saturating_sub(1)
54                };
55                Some(new_count)
56            })
57            .expect("Closure should always returns 'Some'. This is a Bug.");
58
59        previous_count == 0
60    }
61
62    fn sample_duration(rate: &Duration, last_sample: &AtomicU64) -> bool {
63        let rate = rate.as_secs();
64        // Seconds since Unix Epoch
65        let now = SystemTime::now()
66            .duration_since(SystemTime::UNIX_EPOCH)
67            .expect("SystemTime before UNIX EPOCH!")
68            .as_secs();
69
70        last_sample
71            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |last_sample| {
72                if now.saturating_sub(last_sample) >= rate {
73                    Some(now)
74                } else {
75                    None
76                }
77            })
78            .is_ok()
79    }
80}
81
82/// Samples a given function at a `SampleRate`, useful for periodically emitting logs or metrics on
83/// high throughput pieces of code.
84#[macro_export]
85macro_rules! sample {
86    ($sample_rate:expr, $($args:expr)+ ,) => {
87        $crate::sample!($sample_rate, $($args)+);
88    };
89
90    ($sample_rate:expr, $($args:tt)+) => {{
91        static SAMPLING: $crate::sample::Sampling = $crate::sample::Sampling::new($sample_rate);
92        if SAMPLING.sample() {
93            $($args)+
94        }
95    }};
96}
97
98#[cfg(test)]
99mod tests {
100    use super::*;
101
102    #[test]
103    fn frequency() {
104        // Frequency
105        let sampling = Sampling::new(SampleRate::Frequency(10));
106        let mut v = Vec::new();
107        for i in 0..=25 {
108            if sampling.sample() {
109                v.push(i);
110            }
111        }
112
113        assert_eq!(v, vec![0, 10, 20]);
114    }
115
116    #[test]
117    fn always() {
118        // Always
119        let sampling = Sampling::new(SampleRate::Always);
120        let mut v = Vec::new();
121        for i in 0..5 {
122            if sampling.sample() {
123                v.push(i);
124            }
125        }
126
127        assert_eq!(v, vec![0, 1, 2, 3, 4]);
128    }
129
130    #[ignore]
131    #[test]
132    fn duration() {
133        // Duration
134        let sampling = Sampling::new(SampleRate::Duration(Duration::from_secs(1)));
135        let mut v = Vec::new();
136        for i in 0..5 {
137            if sampling.sample() {
138                v.push(i);
139            }
140
141            std::thread::sleep(Duration::from_millis(500));
142        }
143
144        assert_eq!(v.len(), 2);
145    }
146
147    #[test]
148    fn macro_expansion() {
149        for i in 0..10 {
150            sample!(
151                SampleRate::Frequency(2),
152                println!("loooooooooooooooooooooooooong hello {}", i),
153            );
154
155            sample!(SampleRate::Frequency(2), {
156                println!("hello {}", i);
157            });
158
159            sample!(SampleRate::Frequency(2), println!("hello {}", i));
160
161            sample! {
162                SampleRate::Frequency(2),
163
164                for j in 10..20 {
165                    println!("hello {}", j);
166                }
167            }
168        }
169    }
170
171    #[test]
172    fn threaded() {
173        fn work() -> usize {
174            let mut count = 0;
175
176            for _ in 0..1000 {
177                sample!(SampleRate::Frequency(5), count += 1);
178            }
179
180            count
181        }
182
183        let mut handles = Vec::new();
184        for _ in 0..10 {
185            handles.push(std::thread::spawn(work));
186        }
187
188        let mut count = 0;
189        for handle in handles {
190            count += handle.join().unwrap();
191        }
192
193        assert_eq!(count, 2000);
194    }
195}