aster/tracing/
rate_limiter.rs1use std::time::{Duration, Instant};
2use tokio::sync::mpsc;
3use tokio::time::sleep;
4use tracing::{info, warn};
5
6pub struct RateLimitedTelemetrySender {
7 sender: mpsc::UnboundedSender<TelemetryEvent>,
8}
9
10#[derive(Debug, Clone)]
11pub enum TelemetryEvent {
12 Span(SpanData),
13 Metric(MetricData),
14}
15
16#[derive(Debug, Clone)]
17pub struct SpanData {
18 pub name: String,
19 pub attributes: Vec<(String, String)>,
20 pub duration: Option<Duration>,
21}
22
23#[derive(Debug, Clone)]
24pub struct MetricData {
25 pub name: String,
26 pub value: f64,
27 pub labels: Vec<(String, String)>,
28}
29
30impl RateLimitedTelemetrySender {
31 pub fn new(rate_limit_ms: u64) -> Self {
32 let (sender, mut receiver) = mpsc::unbounded_channel::<TelemetryEvent>();
33
34 tokio::spawn(async move {
35 let mut last_send = Instant::now();
36 let rate_limit_duration = Duration::from_millis(rate_limit_ms);
37
38 info!(
39 "Starting rate-limited telemetry sender with {}ms delay",
40 rate_limit_ms
41 );
42
43 while let Some(event) = receiver.recv().await {
44 let elapsed = last_send.elapsed();
45 if elapsed < rate_limit_duration {
46 let sleep_duration = rate_limit_duration - elapsed;
47 sleep(sleep_duration).await;
48 }
49
50 match event {
51 TelemetryEvent::Span(span_data) => {
52 Self::process_span(span_data).await;
53 }
54 TelemetryEvent::Metric(metric_data) => {
55 Self::process_metric(metric_data).await;
56 }
57 }
58
59 last_send = Instant::now();
60 }
61
62 warn!("Rate-limited telemetry sender shutting down");
63 });
64
65 Self { sender }
66 }
67
68 pub fn send_span(
69 &self,
70 span_data: SpanData,
71 ) -> Result<(), mpsc::error::SendError<TelemetryEvent>> {
72 self.sender.send(TelemetryEvent::Span(span_data))
73 }
74
75 pub fn send_metric(
76 &self,
77 metric_data: MetricData,
78 ) -> Result<(), mpsc::error::SendError<TelemetryEvent>> {
79 self.sender.send(TelemetryEvent::Metric(metric_data))
80 }
81
82 async fn process_span(span_data: SpanData) {
83 let span = tracing::info_span!("telemetry_span", name = %span_data.name);
84 let _enter = span.enter();
85
86 for (key, value) in span_data.attributes {
87 tracing::Span::current().record(key.as_str(), value.as_str());
88 }
89
90 if let Some(duration) = span_data.duration {
91 info!(duration_ms = duration.as_millis(), "span_duration");
92 }
93 }
94
95 async fn process_metric(metric_data: MetricData) {
96 info!(
97 metric_name = %metric_data.name,
98 metric_value = metric_data.value,
99 labels = ?metric_data.labels,
100 "telemetry_metric"
101 );
102 }
103}
104
105impl Default for RateLimitedTelemetrySender {
106 fn default() -> Self {
107 Self::new(400)
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use tokio::time::{timeout, Duration as TokioDuration};
115
116 #[tokio::test]
117 async fn test_rate_limited_sender() {
118 let sender = RateLimitedTelemetrySender::new(100); let span_data = SpanData {
121 name: "test_span".to_string(),
122 attributes: vec![("key".to_string(), "value".to_string())],
123 duration: Some(Duration::from_millis(50)),
124 };
125
126 let metric_data = MetricData {
127 name: "test_metric".to_string(),
128 value: 42.0,
129 labels: vec![("label".to_string(), "value".to_string())],
130 };
131
132 assert!(sender.send_span(span_data).is_ok());
134 assert!(sender.send_metric(metric_data).is_ok());
135
136 timeout(TokioDuration::from_millis(500), async {
138 tokio::time::sleep(TokioDuration::from_millis(300)).await;
139 })
140 .await
141 .unwrap();
142 }
143}