1use hex;
2use sha2::{Digest, Sha256};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5const DEFAULT_STREAM_KEY: &str = "hyperinfer:telemetry";
6
7#[derive(Clone)]
8pub struct Telemetry {
9 manager: Option<redis::aio::ConnectionManager>,
10 stream_key: String,
11}
12
13impl Telemetry {
14 fn key_id(key: &str) -> String {
16 let mut hasher = Sha256::new();
17 hasher.update(key.as_bytes());
18 let hash = hasher.finalize();
19 let hex_hash = hex::encode(hash);
20 if hex_hash.len() >= 8 {
22 format!("...{}", &hex_hash[hex_hash.len() - 8..])
23 } else {
24 hex_hash
25 }
26 }
27 pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
28 let manager = match redis::Client::open(redis_url) {
29 Ok(client) => match redis::aio::ConnectionManager::new(client).await {
30 Ok(m) => Some(m),
31 Err(e) => {
32 tracing::warn!("Failed to create Redis connection manager: {}", e);
33 None
34 }
35 },
36 Err(e) => {
37 tracing::warn!("Invalid Redis URL for telemetry: {}", e);
38 None
39 }
40 };
41
42 Ok(Self {
43 manager,
44 stream_key: DEFAULT_STREAM_KEY.to_string(),
45 })
46 }
47
48 pub fn with_stream_key(mut self, stream_key: &str) -> Self {
49 if !stream_key.trim().is_empty() {
50 self.stream_key = stream_key.to_string();
51 }
52 self
53 }
54
55 pub async fn record(
56 &self,
57 key: &str,
58 model: &str,
59 response_time_ms: u64,
60 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
61 let input_tokens = 0u32;
62 let output_tokens = 0u32;
63
64 self.record_with_tokens(key, model, input_tokens, output_tokens, response_time_ms)
65 .await
66 }
67
68 pub async fn record_with_tokens(
69 &self,
70 key: &str,
71 model: &str,
72 input_tokens: u32,
73 output_tokens: u32,
74 response_time_ms: u64,
75 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76 let timestamp = SystemTime::now()
77 .duration_since(UNIX_EPOCH)
78 .unwrap_or_default()
79 .as_millis() as u64;
80
81 if let Some(ref manager) = self.manager {
82 let stream_key = self.stream_key.clone();
83 let key_clone = key.to_string();
84 let model_clone = model.to_string();
85 let mut manager = manager.clone();
86
87 tokio::spawn(async move {
88 let result: Result<(), redis::RedisError> = redis::cmd("XADD")
89 .arg(&stream_key)
90 .arg("*")
91 .arg("key")
92 .arg(&key_clone)
93 .arg("model")
94 .arg(&model_clone)
95 .arg("input_tokens")
96 .arg(input_tokens.to_string())
97 .arg("output_tokens")
98 .arg(output_tokens.to_string())
99 .arg("response_time_ms")
100 .arg(response_time_ms.to_string())
101 .arg("timestamp")
102 .arg(timestamp.to_string())
103 .query_async(&mut manager)
104 .await;
105
106 if let Err(e) = result {
107 tracing::error!("Failed to push telemetry to Redis stream: {:?}", e);
108 }
109 });
110 } else {
111 tracing::debug!(
112 "Telemetry skipped (Redis unavailable): key_id={}, model={}, input_tokens={}, output_tokens={}, response_time_ms={}",
113 Self::key_id(key), model, input_tokens, output_tokens, response_time_ms
114 );
115 }
116
117 Ok(())
118 }
119}
120
121#[cfg(test)]
122mod tests {
123 use super::*;
124
125 #[tokio::test]
126 async fn test_telemetry_new() {
127 let result = Telemetry::new("redis://localhost:6379").await;
128 assert!(result.is_ok());
129 let telemetry = result.unwrap();
130 assert_eq!(telemetry.stream_key, "hyperinfer:telemetry");
131 }
132
133 #[tokio::test]
134 async fn test_telemetry_new_different_url() {
135 let result = Telemetry::new("redis://custom-host:1234/0").await;
136 assert!(result.is_ok());
137 let telemetry = result.unwrap();
138 assert_eq!(telemetry.stream_key, "hyperinfer:telemetry");
139 }
140
141 #[tokio::test]
142 async fn test_telemetry_with_stream_key() {
143 let telemetry = Telemetry::new("redis://localhost:6379")
144 .await
145 .unwrap()
146 .with_stream_key("custom:stream");
147 assert_eq!(telemetry.stream_key, "custom:stream");
148 }
149
150 #[tokio::test]
151 async fn test_telemetry_record() {
152 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
153 let result = telemetry.record("test-key", "gpt-4", 250).await;
154 assert!(result.is_ok());
155 }
156
157 #[tokio::test]
158 async fn test_telemetry_record_with_tokens() {
159 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
160 let result = telemetry
161 .record_with_tokens("test-key", "gpt-4", 100, 50, 250)
162 .await;
163 assert!(result.is_ok());
164 }
165
166 #[tokio::test]
167 async fn test_telemetry_record_multiple_calls() {
168 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
169
170 assert!(telemetry.record("key1", "gpt-4", 100).await.is_ok());
171 assert!(telemetry.record("key2", "claude-3", 200).await.is_ok());
172 assert!(telemetry.record("key1", "gpt-4", 150).await.is_ok());
173 }
174
175 #[tokio::test]
176 async fn test_telemetry_record_zero_response_time() {
177 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
178 let result = telemetry.record("test-key", "gpt-4", 0).await;
179 assert!(result.is_ok());
180 }
181
182 #[tokio::test]
183 async fn test_telemetry_record_large_response_time() {
184 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
185 let result = telemetry.record("test-key", "gpt-4", 999999).await;
186 assert!(result.is_ok());
187 }
188
189 #[tokio::test]
190 async fn test_telemetry_record_invalid_redis() {
191 let telemetry = Telemetry::new("invalid-url").await.unwrap();
192 let result = telemetry.record("test-key", "gpt-4", 250).await;
193 assert!(result.is_ok());
194 }
195
196 #[tokio::test]
197 async fn test_telemetry_with_empty_stream_key() {
198 let telemetry = Telemetry::new("redis://localhost:6379")
199 .await
200 .unwrap()
201 .with_stream_key("");
202 assert_eq!(telemetry.stream_key, DEFAULT_STREAM_KEY);
203 }
204
205 #[tokio::test]
206 async fn test_telemetry_with_whitespace_stream_key() {
207 let telemetry = Telemetry::new("redis://localhost:6379")
208 .await
209 .unwrap()
210 .with_stream_key(" ");
211 assert_eq!(telemetry.stream_key, DEFAULT_STREAM_KEY);
212 }
213
214 #[tokio::test]
215 async fn test_telemetry_record_empty_key() {
216 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
217 let result = telemetry.record("", "gpt-4", 250).await;
218 assert!(result.is_ok());
219 }
220
221 #[tokio::test]
222 async fn test_telemetry_record_empty_model() {
223 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
224 let result = telemetry.record("test-key", "", 250).await;
225 assert!(result.is_ok());
226 }
227
228 #[tokio::test]
229 async fn test_telemetry_record_max_values() {
230 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
231 let result = telemetry
232 .record_with_tokens("test-key", "gpt-4", u32::MAX, u32::MAX, u64::MAX)
233 .await;
234 assert!(result.is_ok());
235 }
236
237 #[tokio::test]
238 async fn test_telemetry_record_special_characters_in_key() {
239 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
240 let result = telemetry.record("test-key-!@#$%^&*()", "gpt-4", 250).await;
241 assert!(result.is_ok());
242 }
243
244 #[tokio::test]
245 async fn test_telemetry_record_special_characters_in_model() {
246 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
247 let result = telemetry
248 .record("test-key", "gpt-4-turbo-preview-2024", 250)
249 .await;
250 assert!(result.is_ok());
251 }
252
253 #[tokio::test]
254 async fn test_telemetry_record_unicode() {
255 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
256 let result = telemetry.record("test-key-🔑", "gpt-4", 250).await;
257 assert!(result.is_ok());
258 }
259
260 #[tokio::test]
261 async fn test_telemetry_record_very_long_strings() {
262 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
263 let long_key = "a".repeat(10000);
264 let long_model = "b".repeat(10000);
265 let result = telemetry.record(&long_key, &long_model, 250).await;
266 assert!(result.is_ok());
267 }
268
269 #[tokio::test]
270 async fn test_telemetry_concurrent_records() {
271 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
272 let telemetry = std::sync::Arc::new(telemetry);
273
274 let mut handles = vec![];
275 for i in 0..10 {
276 let telemetry_clone = telemetry.clone();
277 let handle = tokio::spawn(async move {
278 telemetry_clone
279 .record(&format!("key-{}", i), "gpt-4", 100 + i * 10)
280 .await
281 });
282 handles.push(handle);
283 }
284
285 for handle in handles {
286 let result = handle.await.unwrap();
287 assert!(result.is_ok());
288 }
289 }
290
291 #[tokio::test]
292 async fn test_telemetry_record_sequential() {
293 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
294
295 for i in 0..5 {
296 let result = telemetry
297 .record(&format!("key-{}", i), "gpt-4", 100 + i * 10)
298 .await;
299 assert!(result.is_ok());
300 }
301 }
302
303 #[tokio::test]
304 async fn test_telemetry_record_different_token_counts() {
305 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
306
307 let test_cases = vec![
308 (0, 0),
309 (1, 1),
310 (100, 50),
311 (u32::MAX, u32::MAX),
312 (500, 0),
313 (0, 500),
314 ];
315
316 for (input, output) in test_cases {
317 let result = telemetry
318 .record_with_tokens("test-key", "gpt-4", input, output, 250)
319 .await;
320 assert!(result.is_ok());
321 }
322 }
323
324 #[tokio::test]
325 async fn test_telemetry_with_very_long_stream_key() {
326 let long_key = "a".repeat(1000);
327 let telemetry = Telemetry::new("redis://localhost:6379")
328 .await
329 .unwrap()
330 .with_stream_key(&long_key);
331 assert_eq!(telemetry.stream_key, long_key);
332 }
333
334 #[tokio::test]
335 async fn test_telemetry_record_rapid_succession() {
336 let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
337
338 for _ in 0..100 {
339 let result = telemetry.record("test-key", "gpt-4", 250).await;
340 assert!(result.is_ok());
341 }
342 }
343}