Skip to main content

hyperinfer_client/
telemetry.rs

1use hex;
2use sha2::{Digest, Sha256};
3use std::time::{SystemTime, UNIX_EPOCH};
4
5const DEFAULT_STREAM_KEY: &str = "hyperinfer:telemetry";
6
7#[derive(Clone)]
8pub struct Telemetry {
9    manager: Option<redis::aio::ConnectionManager>,
10    stream_key: String,
11}
12
13impl Telemetry {
14    /// Returns a truncated hash suffix of the key for safe logging
15    fn key_id(key: &str) -> String {
16        let mut hasher = Sha256::new();
17        hasher.update(key.as_bytes());
18        let hash = hasher.finalize();
19        let hex_hash = hex::encode(hash);
20        // Return last 8 characters of hash
21        if hex_hash.len() >= 8 {
22            format!("...{}", &hex_hash[hex_hash.len() - 8..])
23        } else {
24            hex_hash
25        }
26    }
27    pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
28        let manager = match redis::Client::open(redis_url) {
29            Ok(client) => match redis::aio::ConnectionManager::new(client).await {
30                Ok(m) => Some(m),
31                Err(e) => {
32                    tracing::warn!("Failed to create Redis connection manager: {}", e);
33                    None
34                }
35            },
36            Err(e) => {
37                tracing::warn!("Invalid Redis URL for telemetry: {}", e);
38                None
39            }
40        };
41
42        Ok(Self {
43            manager,
44            stream_key: DEFAULT_STREAM_KEY.to_string(),
45        })
46    }
47
48    pub fn with_stream_key(mut self, stream_key: &str) -> Self {
49        if !stream_key.trim().is_empty() {
50            self.stream_key = stream_key.to_string();
51        }
52        self
53    }
54
55    pub async fn record(
56        &self,
57        key: &str,
58        model: &str,
59        response_time_ms: u64,
60    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
61        let input_tokens = 0u32;
62        let output_tokens = 0u32;
63
64        self.record_with_tokens(key, model, input_tokens, output_tokens, response_time_ms)
65            .await
66    }
67
68    pub async fn record_with_tokens(
69        &self,
70        key: &str,
71        model: &str,
72        input_tokens: u32,
73        output_tokens: u32,
74        response_time_ms: u64,
75    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
76        let timestamp = SystemTime::now()
77            .duration_since(UNIX_EPOCH)
78            .unwrap_or_default()
79            .as_millis() as u64;
80
81        if let Some(ref manager) = self.manager {
82            let stream_key = self.stream_key.clone();
83            let key_clone = key.to_string();
84            let model_clone = model.to_string();
85            let mut manager = manager.clone();
86
87            tokio::spawn(async move {
88                let result: Result<(), redis::RedisError> = redis::cmd("XADD")
89                    .arg(&stream_key)
90                    .arg("*")
91                    .arg("key")
92                    .arg(&key_clone)
93                    .arg("model")
94                    .arg(&model_clone)
95                    .arg("input_tokens")
96                    .arg(input_tokens.to_string())
97                    .arg("output_tokens")
98                    .arg(output_tokens.to_string())
99                    .arg("response_time_ms")
100                    .arg(response_time_ms.to_string())
101                    .arg("timestamp")
102                    .arg(timestamp.to_string())
103                    .query_async(&mut manager)
104                    .await;
105
106                if let Err(e) = result {
107                    tracing::error!("Failed to push telemetry to Redis stream: {:?}", e);
108                }
109            });
110        } else {
111            tracing::debug!(
112                "Telemetry skipped (Redis unavailable): key_id={}, model={}, input_tokens={}, output_tokens={}, response_time_ms={}",
113                Self::key_id(key), model, input_tokens, output_tokens, response_time_ms
114            );
115        }
116
117        Ok(())
118    }
119}
120
121#[cfg(test)]
122mod tests {
123    use super::*;
124
125    #[tokio::test]
126    async fn test_telemetry_new() {
127        let result = Telemetry::new("redis://localhost:6379").await;
128        assert!(result.is_ok());
129        let telemetry = result.unwrap();
130        assert_eq!(telemetry.stream_key, "hyperinfer:telemetry");
131    }
132
133    #[tokio::test]
134    async fn test_telemetry_new_different_url() {
135        let result = Telemetry::new("redis://custom-host:1234/0").await;
136        assert!(result.is_ok());
137        let telemetry = result.unwrap();
138        assert_eq!(telemetry.stream_key, "hyperinfer:telemetry");
139    }
140
141    #[tokio::test]
142    async fn test_telemetry_with_stream_key() {
143        let telemetry = Telemetry::new("redis://localhost:6379")
144            .await
145            .unwrap()
146            .with_stream_key("custom:stream");
147        assert_eq!(telemetry.stream_key, "custom:stream");
148    }
149
150    #[tokio::test]
151    async fn test_telemetry_record() {
152        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
153        let result = telemetry.record("test-key", "gpt-4", 250).await;
154        assert!(result.is_ok());
155    }
156
157    #[tokio::test]
158    async fn test_telemetry_record_with_tokens() {
159        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
160        let result = telemetry
161            .record_with_tokens("test-key", "gpt-4", 100, 50, 250)
162            .await;
163        assert!(result.is_ok());
164    }
165
166    #[tokio::test]
167    async fn test_telemetry_record_multiple_calls() {
168        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
169
170        assert!(telemetry.record("key1", "gpt-4", 100).await.is_ok());
171        assert!(telemetry.record("key2", "claude-3", 200).await.is_ok());
172        assert!(telemetry.record("key1", "gpt-4", 150).await.is_ok());
173    }
174
175    #[tokio::test]
176    async fn test_telemetry_record_zero_response_time() {
177        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
178        let result = telemetry.record("test-key", "gpt-4", 0).await;
179        assert!(result.is_ok());
180    }
181
182    #[tokio::test]
183    async fn test_telemetry_record_large_response_time() {
184        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
185        let result = telemetry.record("test-key", "gpt-4", 999999).await;
186        assert!(result.is_ok());
187    }
188
189    #[tokio::test]
190    async fn test_telemetry_record_invalid_redis() {
191        let telemetry = Telemetry::new("invalid-url").await.unwrap();
192        let result = telemetry.record("test-key", "gpt-4", 250).await;
193        assert!(result.is_ok());
194    }
195
196    #[tokio::test]
197    async fn test_telemetry_with_empty_stream_key() {
198        let telemetry = Telemetry::new("redis://localhost:6379")
199            .await
200            .unwrap()
201            .with_stream_key("");
202        assert_eq!(telemetry.stream_key, DEFAULT_STREAM_KEY);
203    }
204
205    #[tokio::test]
206    async fn test_telemetry_with_whitespace_stream_key() {
207        let telemetry = Telemetry::new("redis://localhost:6379")
208            .await
209            .unwrap()
210            .with_stream_key("   ");
211        assert_eq!(telemetry.stream_key, DEFAULT_STREAM_KEY);
212    }
213
214    #[tokio::test]
215    async fn test_telemetry_record_empty_key() {
216        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
217        let result = telemetry.record("", "gpt-4", 250).await;
218        assert!(result.is_ok());
219    }
220
221    #[tokio::test]
222    async fn test_telemetry_record_empty_model() {
223        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
224        let result = telemetry.record("test-key", "", 250).await;
225        assert!(result.is_ok());
226    }
227
228    #[tokio::test]
229    async fn test_telemetry_record_max_values() {
230        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
231        let result = telemetry
232            .record_with_tokens("test-key", "gpt-4", u32::MAX, u32::MAX, u64::MAX)
233            .await;
234        assert!(result.is_ok());
235    }
236
237    #[tokio::test]
238    async fn test_telemetry_record_special_characters_in_key() {
239        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
240        let result = telemetry.record("test-key-!@#$%^&*()", "gpt-4", 250).await;
241        assert!(result.is_ok());
242    }
243
244    #[tokio::test]
245    async fn test_telemetry_record_special_characters_in_model() {
246        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
247        let result = telemetry
248            .record("test-key", "gpt-4-turbo-preview-2024", 250)
249            .await;
250        assert!(result.is_ok());
251    }
252
253    #[tokio::test]
254    async fn test_telemetry_record_unicode() {
255        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
256        let result = telemetry.record("test-key-🔑", "gpt-4", 250).await;
257        assert!(result.is_ok());
258    }
259
260    #[tokio::test]
261    async fn test_telemetry_record_very_long_strings() {
262        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
263        let long_key = "a".repeat(10000);
264        let long_model = "b".repeat(10000);
265        let result = telemetry.record(&long_key, &long_model, 250).await;
266        assert!(result.is_ok());
267    }
268
269    #[tokio::test]
270    async fn test_telemetry_concurrent_records() {
271        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
272        let telemetry = std::sync::Arc::new(telemetry);
273
274        let mut handles = vec![];
275        for i in 0..10 {
276            let telemetry_clone = telemetry.clone();
277            let handle = tokio::spawn(async move {
278                telemetry_clone
279                    .record(&format!("key-{}", i), "gpt-4", 100 + i * 10)
280                    .await
281            });
282            handles.push(handle);
283        }
284
285        for handle in handles {
286            let result = handle.await.unwrap();
287            assert!(result.is_ok());
288        }
289    }
290
291    #[tokio::test]
292    async fn test_telemetry_record_sequential() {
293        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
294
295        for i in 0..5 {
296            let result = telemetry
297                .record(&format!("key-{}", i), "gpt-4", 100 + i * 10)
298                .await;
299            assert!(result.is_ok());
300        }
301    }
302
303    #[tokio::test]
304    async fn test_telemetry_record_different_token_counts() {
305        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
306
307        let test_cases = vec![
308            (0, 0),
309            (1, 1),
310            (100, 50),
311            (u32::MAX, u32::MAX),
312            (500, 0),
313            (0, 500),
314        ];
315
316        for (input, output) in test_cases {
317            let result = telemetry
318                .record_with_tokens("test-key", "gpt-4", input, output, 250)
319                .await;
320            assert!(result.is_ok());
321        }
322    }
323
324    #[tokio::test]
325    async fn test_telemetry_with_very_long_stream_key() {
326        let long_key = "a".repeat(1000);
327        let telemetry = Telemetry::new("redis://localhost:6379")
328            .await
329            .unwrap()
330            .with_stream_key(&long_key);
331        assert_eq!(telemetry.stream_key, long_key);
332    }
333
334    #[tokio::test]
335    async fn test_telemetry_record_rapid_succession() {
336        let telemetry = Telemetry::new("redis://localhost:6379").await.unwrap();
337
338        for _ in 0..100 {
339            let result = telemetry.record("test-key", "gpt-4", 250).await;
340            assert!(result.is_ok());
341        }
342    }
343}