Skip to main content

aster/tracing/
rate_limiter.rs

1use std::time::{Duration, Instant};
2use tokio::sync::mpsc;
3use tokio::time::sleep;
4use tracing::{info, warn};
5
6pub struct RateLimitedTelemetrySender {
7    sender: mpsc::UnboundedSender<TelemetryEvent>,
8}
9
10#[derive(Debug, Clone)]
11pub enum TelemetryEvent {
12    Span(SpanData),
13    Metric(MetricData),
14}
15
16#[derive(Debug, Clone)]
17pub struct SpanData {
18    pub name: String,
19    pub attributes: Vec<(String, String)>,
20    pub duration: Option<Duration>,
21}
22
23#[derive(Debug, Clone)]
24pub struct MetricData {
25    pub name: String,
26    pub value: f64,
27    pub labels: Vec<(String, String)>,
28}
29
30impl RateLimitedTelemetrySender {
31    pub fn new(rate_limit_ms: u64) -> Self {
32        let (sender, mut receiver) = mpsc::unbounded_channel::<TelemetryEvent>();
33
34        tokio::spawn(async move {
35            let mut last_send = Instant::now();
36            let rate_limit_duration = Duration::from_millis(rate_limit_ms);
37
38            info!(
39                "Starting rate-limited telemetry sender with {}ms delay",
40                rate_limit_ms
41            );
42
43            while let Some(event) = receiver.recv().await {
44                let elapsed = last_send.elapsed();
45                if elapsed < rate_limit_duration {
46                    let sleep_duration = rate_limit_duration - elapsed;
47                    sleep(sleep_duration).await;
48                }
49
50                match event {
51                    TelemetryEvent::Span(span_data) => {
52                        Self::process_span(span_data).await;
53                    }
54                    TelemetryEvent::Metric(metric_data) => {
55                        Self::process_metric(metric_data).await;
56                    }
57                }
58
59                last_send = Instant::now();
60            }
61
62            warn!("Rate-limited telemetry sender shutting down");
63        });
64
65        Self { sender }
66    }
67
68    pub fn send_span(
69        &self,
70        span_data: SpanData,
71    ) -> Result<(), mpsc::error::SendError<TelemetryEvent>> {
72        self.sender.send(TelemetryEvent::Span(span_data))
73    }
74
75    pub fn send_metric(
76        &self,
77        metric_data: MetricData,
78    ) -> Result<(), mpsc::error::SendError<TelemetryEvent>> {
79        self.sender.send(TelemetryEvent::Metric(metric_data))
80    }
81
82    async fn process_span(span_data: SpanData) {
83        let span = tracing::info_span!("telemetry_span", name = %span_data.name);
84        let _enter = span.enter();
85
86        for (key, value) in span_data.attributes {
87            tracing::Span::current().record(key.as_str(), value.as_str());
88        }
89
90        if let Some(duration) = span_data.duration {
91            info!(duration_ms = duration.as_millis(), "span_duration");
92        }
93    }
94
95    async fn process_metric(metric_data: MetricData) {
96        info!(
97            metric_name = %metric_data.name,
98            metric_value = metric_data.value,
99            labels = ?metric_data.labels,
100            "telemetry_metric"
101        );
102    }
103}
104
105impl Default for RateLimitedTelemetrySender {
106    fn default() -> Self {
107        Self::new(400)
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use tokio::time::{timeout, Duration as TokioDuration};
115
116    #[tokio::test]
117    async fn test_rate_limited_sender() {
118        let sender = RateLimitedTelemetrySender::new(100); // 100ms rate limit for testing
119
120        let span_data = SpanData {
121            name: "test_span".to_string(),
122            attributes: vec![("key".to_string(), "value".to_string())],
123            duration: Some(Duration::from_millis(50)),
124        };
125
126        let metric_data = MetricData {
127            name: "test_metric".to_string(),
128            value: 42.0,
129            labels: vec![("label".to_string(), "value".to_string())],
130        };
131
132        // Send events
133        assert!(sender.send_span(span_data).is_ok());
134        assert!(sender.send_metric(metric_data).is_ok());
135
136        // Give time for processing
137        timeout(TokioDuration::from_millis(500), async {
138            tokio::time::sleep(TokioDuration::from_millis(300)).await;
139        })
140        .await
141        .unwrap();
142    }
143}