fast_telemetry_export/
sweeper.rs1use std::time::Duration;
11
12use tokio_util::sync::CancellationToken;
13
14const DEFAULT_SWEEP_INTERVAL: Duration = Duration::from_secs(10);
16
17const DEFAULT_EVICTION_THRESHOLD: u32 = 30;
20
21#[derive(Clone)]
23pub struct SweepConfig {
24 pub interval: Duration,
26 pub eviction_threshold: u32,
28}
29
30impl Default for SweepConfig {
31 fn default() -> Self {
32 Self {
33 interval: DEFAULT_SWEEP_INTERVAL,
34 eviction_threshold: DEFAULT_EVICTION_THRESHOLD,
35 }
36 }
37}
38
39impl SweepConfig {
40 pub fn new() -> Self {
41 Self::default()
42 }
43
44 pub fn with_interval(mut self, interval: Duration) -> Self {
45 self.interval = interval;
46 self
47 }
48
49 pub fn with_eviction_threshold(mut self, threshold: u32) -> Self {
50 self.eviction_threshold = threshold;
51 self
52 }
53}
54
55pub async fn run<F>(config: SweepConfig, cancel: CancellationToken, mut sweep_fn: F)
83where
84 F: FnMut(u32) -> usize,
85{
86 use tokio::time::MissedTickBehavior;
87
88 log::info!(
89 "Starting stale-series sweeper, interval={}s, eviction_threshold={}",
90 config.interval.as_secs(),
91 config.eviction_threshold
92 );
93
94 let mut interval = tokio::time::interval(config.interval);
95 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
96 interval.tick().await;
97
98 loop {
99 tokio::select! {
100 _ = interval.tick() => {}
101 _ = cancel.cancelled() => {
102 log::info!("Stale-series sweeper shutting down");
103 return;
104 }
105 }
106
107 let evicted = sweep_fn(config.eviction_threshold);
108
109 if evicted > 0 {
110 log::debug!("Evicted {evicted} stale metric series");
111 }
112 }
113}
114
115#[cfg(feature = "monoio")]
121pub async fn run_monoio<F>(config: SweepConfig, cancel: CancellationToken, mut sweep_fn: F)
122where
123 F: FnMut(u32) -> usize,
124{
125 use monoio::time::MissedTickBehavior;
126
127 log::info!(
128 "Starting monoio stale-series sweeper, interval={}s, eviction_threshold={}",
129 config.interval.as_secs(),
130 config.eviction_threshold
131 );
132
133 let mut interval = monoio::time::interval(config.interval);
134 interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
135 interval.tick().await;
136
137 loop {
138 monoio::select! {
139 _ = interval.tick() => {}
140 _ = cancel.cancelled() => {
141 log::info!("monoio stale-series sweeper shutting down");
142 return;
143 }
144 }
145
146 let evicted = sweep_fn(config.eviction_threshold);
147
148 if evicted > 0 {
149 log::debug!("Evicted {evicted} stale metric series");
150 }
151 }
152}