fast_telemetry_export/sweeper.rs
1//! Periodic sweeper for stale dynamic metric series.
2//!
3//! Advances the global eviction cycle and evicts series that have been inactive
4//! for longer than the configured threshold. This bounds memory usage from
5//! dynamic labels regardless of which exporters are active.
6//!
7//! The actual eviction logic is provided by the caller via a closure,
8//! making this work with any metrics struct that has dynamic series.
9
10use std::time::Duration;
11
12use tokio_util::sync::CancellationToken;
13
14/// Default sweep interval.
15const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(10);
16
17/// Default eviction threshold: series inactive for this many sweep cycles are
18/// evicted. With the default 10s interval this equals ~5 minutes of inactivity.
19const DEFAULT_EVICTION_THRESHOLD: u32 = 30;
20
21/// Configuration for the stale-series sweeper.
22#[derive(Clone)]
23pub struct SweepConfig {
24 /// How often to run the sweep (default: 10s).
25 pub interval: Duration,
26 /// Number of consecutive idle cycles before a series is evicted (default: 30).
27 pub eviction_threshold: u32,
28}
29
30impl Default for SweepConfig {
31 fn default() -> Self {
32 Self {
33 interval: DEFAULT_SWEEP_INTERVAL,
34 eviction_threshold: DEFAULT_EVICTION_THRESHOLD,
35 }
36 }
37}
38
39impl SweepConfig {
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn with_interval(mut self, interval: Duration) -> Self {
45 self.interval = interval;
46 self
47 }
48
49 pub fn with_eviction_threshold(mut self, threshold: u32) -> Self {
50 self.eviction_threshold = threshold;
51 self
52 }
53}
54
55/// Run the stale-series sweep loop.
56///
57/// `sweep_fn` is called each cycle with the eviction threshold. It should
58/// advance the eviction cycle and evict stale series, returning the number
59/// of series evicted.
60///
61/// Runs until `cancel` is triggered.
62///
63/// # Example
64///
65/// ```ignore
66/// use std::sync::Arc;
67///
68/// use fast_telemetry::advance_cycle;
69/// use fast_telemetry_export::sweeper::{SweepConfig, run};
70/// use tokio_util::sync::CancellationToken;
71///
72/// let metrics = Arc::new(MyMetrics::new());
73/// let cancel = CancellationToken::new();
74///
75/// let m = metrics.clone();
76/// tokio::spawn(run(SweepConfig::default(), cancel, move |threshold| {
77/// advance_cycle();
78/// m.requests_by_endpoint.evict_stale(threshold)
79/// + m.latency_by_endpoint.evict_stale(threshold)
80/// }));
81/// ```
82pub async fn run<F>(config: SweepConfig, cancel: CancellationToken, mut sweep_fn: F)
83where
84 F: FnMut(u32) -> usize,
85{
86 use tokio::time::MissedTickBehavior;
87
88 log::info!(
89 "Starting stale-series sweeper, interval={}s, eviction_threshold={}",
90 config.interval.as_secs(),
91 config.eviction_threshold
92 );
93
94 let mut interval = tokio::time::interval(config.interval);
95 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
96 interval.tick().await;
97
98 loop {
99 tokio::select! {
100 _ = interval.tick() => {}
101 _ = cancel.cancelled() => {
102 log::info!("Stale-series sweeper shutting down");
103 return;
104 }
105 }
106
107 let evicted = sweep_fn(config.eviction_threshold);
108
109 if evicted > 0 {
110 log::debug!("Evicted {evicted} stale metric series");
111 }
112 }
113}