Skip to main content

fast_telemetry_export/
sweeper.rs

1//! Periodic sweeper for stale dynamic metric series.
2//!
3//! Advances the global eviction cycle and evicts series that have been inactive
4//! for longer than the configured threshold. This bounds memory usage from
5//! dynamic labels regardless of which exporters are active.
6//!
7//! The actual eviction logic is provided by the caller via a closure,
8//! making this work with any metrics struct that has dynamic series.
9
10use std::time::Duration;
11
12use tokio_util::sync::CancellationToken;
13
14/// Default sweep interval.
15const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(10);
16
17/// Default eviction threshold: series inactive for this many sweep cycles are
18/// evicted. With the default 10s interval this equals ~5 minutes of inactivity.
19const DEFAULT_EVICTION_THRESHOLD: u32 = 30;
20
21/// Configuration for the stale-series sweeper.
22#[derive(Clone)]
23pub struct SweepConfig {
24    /// How often to run the sweep (default: 10s).
25    pub interval: Duration,
26    /// Number of consecutive idle cycles before a series is evicted (default: 30).
27    pub eviction_threshold: u32,
28}
29
30impl Default for SweepConfig {
31    fn default() -> Self {
32        Self {
33            interval: DEFAULT_SWEEP_INTERVAL,
34            eviction_threshold: DEFAULT_EVICTION_THRESHOLD,
35        }
36    }
37}
38
39impl SweepConfig {
40    pub fn new() -> Self {
41        Self::default()
42    }
43
44    pub fn with_interval(mut self, interval: Duration) -> Self {
45        self.interval = interval;
46        self
47    }
48
49    pub fn with_eviction_threshold(mut self, threshold: u32) -> Self {
50        self.eviction_threshold = threshold;
51        self
52    }
53}
54
55/// Run the stale-series sweep loop.
56///
57/// `sweep_fn` is called each cycle with the eviction threshold. It should
58/// advance the eviction cycle and evict stale series, returning the number
59/// of series evicted.
60///
61/// Runs until `cancel` is triggered.
62///
63/// # Example
64///
65/// ```ignore
66/// use std::sync::Arc;
67///
68/// use fast_telemetry::advance_cycle;
69/// use fast_telemetry_export::sweeper::{SweepConfig, run};
70/// use tokio_util::sync::CancellationToken;
71///
72/// let metrics = Arc::new(MyMetrics::new());
73/// let cancel = CancellationToken::new();
74///
75/// let m = metrics.clone();
76/// tokio::spawn(run(SweepConfig::default(), cancel, move |threshold| {
77///     advance_cycle();
78///     m.requests_by_endpoint.evict_stale(threshold)
79///         + m.latency_by_endpoint.evict_stale(threshold)
80/// }));
81/// ```
82pub async fn run<F>(config: SweepConfig, cancel: CancellationToken, mut sweep_fn: F)
83where
84    F: FnMut(u32) -> usize,
85{
86    use tokio::time::MissedTickBehavior;
87
88    log::info!(
89        "Starting stale-series sweeper, interval={}s, eviction_threshold={}",
90        config.interval.as_secs(),
91        config.eviction_threshold
92    );
93
94    let mut interval = tokio::time::interval(config.interval);
95    interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
96    interval.tick().await;
97
98    loop {
99        tokio::select! {
100            _ = interval.tick() => {}
101            _ = cancel.cancelled() => {
102                log::info!("Stale-series sweeper shutting down");
103                return;
104            }
105        }
106
107        let evicted = sweep_fn(config.eviction_threshold);
108
109        if evicted > 0 {
110            log::debug!("Evicted {evicted} stale metric series");
111        }
112    }
113}