Skip to main content

hyperinfer_core/
telemetry_consumer.rs

1//! Telemetry consumer for reading usage data from Redis Streams
2//!
3//! This consumer reads telemetry data pushed by hyperinfer-client from Redis Streams
4//! and can forward it to a database for persistence.
5
6use redis::aio::MultiplexedConnection;
7use redis::Client;
8use std::sync::Arc;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, warn};
11
12use crate::types::UsageRecord;
13
14const DEFAULT_TELEMETRY_STREAM: &str = "hyperinfer:telemetry";
15const DEFAULT_CONSUMER_GROUP: &str = "telemetry-consumer";
16const XAUTOCLAIM_IDLE_MS: &str = "600000";
17const XREADGROUP_BLOCK_MS: u32 = 5000;
18const XREADGROUP_COUNT: u32 = 10;
19const XAUTOCLAIM_COUNT: u32 = 100;
20const MAX_BACKOFF_SECS: u64 = 60;
21
22type StreamEntry = (String, Vec<(String, String)>);
23
24pub struct TelemetryConsumer {
25    client: Arc<Client>,
26    stream_key: String,
27    consumer_group: String,
28    consumer_name: String,
29}
30
31impl TelemetryConsumer {
32    pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
33        let client = Client::open(redis_url)?;
34        let consumer_name = format!("consumer-{}", uuid::Uuid::new_v4());
35
36        Ok(Self {
37            client: Arc::new(client),
38            stream_key: DEFAULT_TELEMETRY_STREAM.to_string(),
39            consumer_group: DEFAULT_CONSUMER_GROUP.to_string(),
40            consumer_name,
41        })
42    }
43
44    pub fn with_stream_key(mut self, stream_key: &str) -> Self {
45        self.stream_key = stream_key.to_string();
46        self
47    }
48
49    pub fn with_consumer_group(mut self, group: &str) -> Self {
50        self.consumer_group = group.to_string();
51        self
52    }
53
54    async fn ensure_consumer_group(
55        conn: &mut MultiplexedConnection,
56        stream_key: &str,
57        consumer_group: &str,
58    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
59        info!(
60            "Creating consumer group {} for stream {}",
61            consumer_group, stream_key
62        );
63        let result: Result<(), redis::RedisError> = redis::cmd("XGROUP")
64            .arg("CREATE")
65            .arg(stream_key)
66            .arg(consumer_group)
67            .arg("0")
68            .arg("MKSTREAM")
69            .query_async(conn)
70            .await;
71
72        match result {
73            Ok(_) => {
74                info!("Consumer group created successfully");
75                Ok(())
76            }
77            Err(e) => {
78                if e.to_string().contains("BUSYGROUP") {
79                    info!("Consumer group already exists");
80                    Ok(())
81                } else {
82                    Err(e.into())
83                }
84            }
85        }
86    }
87
88    async fn ack_messages(
89        conn: &mut MultiplexedConnection,
90        stream_key: &str,
91        consumer_group: &str,
92        msg_ids: &[&str],
93    ) -> Result<(), redis::RedisError> {
94        Self::ack_messages_with_retry(conn, stream_key, consumer_group, msg_ids, 3, 50).await
95    }
96
97    async fn ack_messages_with_retry(
98        conn: &mut MultiplexedConnection,
99        stream_key: &str,
100        consumer_group: &str,
101        msg_ids: &[&str],
102        max_retries: u32,
103        base_delay_ms: u64,
104    ) -> Result<(), redis::RedisError> {
105        if msg_ids.is_empty() {
106            return Ok(());
107        }
108
109        let mut last_error = None;
110        for attempt in 0..max_retries {
111            match Self::do_ack_messages(conn, stream_key, consumer_group, msg_ids).await {
112                Ok(_) => return Ok(()),
113                Err(e) => {
114                    last_error = Some(e.clone());
115                    if attempt < max_retries - 1 {
116                        let delay_ms = base_delay_ms * (2_u64.pow(attempt));
117                        warn!(
118                            "XACK failed (attempt {}/{}), retrying in {}ms: {}",
119                            attempt + 1,
120                            max_retries,
121                            delay_ms,
122                            e
123                        );
124                        tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
125                    }
126                }
127            }
128        }
129        Err(last_error.unwrap())
130    }
131
132    async fn do_ack_messages(
133        conn: &mut MultiplexedConnection,
134        stream_key: &str,
135        consumer_group: &str,
136        msg_ids: &[&str],
137    ) -> Result<(), redis::RedisError> {
138        let mut cmd = redis::cmd("XACK");
139        cmd.arg(stream_key).arg(consumer_group);
140        for id in msg_ids {
141            cmd.arg(id);
142        }
143        let count: usize = cmd.query_async(conn).await?;
144        if count < msg_ids.len() {
145            let remaining = msg_ids.len() - count;
146            warn!(
147                "XACK only acknowledged {}/{} messages; {} may need recovery on reconnect",
148                count,
149                msg_ids.len(),
150                remaining
151            );
152        }
153        Ok(())
154    }
155
156    async fn process_entry<F, Fut>(msg_id: &str, fields: &[(String, String)], handler: &F) -> bool
157    where
158        F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
159        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
160            + Send,
161    {
162        if let Some(record) = Self::parse_entry(Some(msg_id), fields) {
163            match handler(record).await {
164                Ok(_) => true,
165                Err(e) => {
166                    warn!("Failed to process message {}: {:?}", msg_id, e);
167                    false
168                }
169            }
170        } else {
171            warn!("Failed to parse message {}", msg_id);
172            true
173        }
174    }
175
176    fn extract_string(value: &redis::Value) -> Option<String> {
177        match value {
178            redis::Value::BulkString(bytes) => Some(String::from_utf8_lossy(bytes).to_string()),
179            redis::Value::SimpleString(s) => Some(s.clone()),
180            _ => None,
181        }
182    }
183
184    fn extract_stream_entries(value: &redis::Value) -> Result<Vec<StreamEntry>, redis::RedisError> {
185        match value {
186            redis::Value::Array(entries) => {
187                let mut result = Vec::new();
188                for (i, entry) in entries.iter().enumerate() {
189                    match entry {
190                        redis::Value::Array(entry_data) => {
191                            result.push(Self::extract_stream_entry(entry_data)?);
192                        }
193                        other => {
194                            return Err(redis::RedisError::from((
195                                redis::ErrorKind::UnexpectedReturnType,
196                                "XAUTOCLAIM entry is not an array",
197                                format!("entry {}: {:?}", i, other),
198                            )));
199                        }
200                    }
201                }
202                Ok(result)
203            }
204            other => Err(redis::RedisError::from((
205                redis::ErrorKind::UnexpectedReturnType,
206                "XAUTOCLAIM claimed_messages is not an array",
207                format!("{:?}", other),
208            ))),
209        }
210    }
211
212    fn extract_stream_entry(
213        entry_data: &[redis::Value],
214    ) -> Result<(String, Vec<(String, String)>), redis::RedisError> {
215        if entry_data.len() < 2 {
216            return Err(redis::RedisError::from((
217                redis::ErrorKind::UnexpectedReturnType,
218                "XAUTOCLAIM entry has insufficient elements",
219                format!("expected >= 2, got {}", entry_data.len()),
220            )));
221        }
222        let msg_id = Self::extract_string(&entry_data[0]).ok_or_else(|| {
223            redis::RedisError::from((
224                redis::ErrorKind::UnexpectedReturnType,
225                "XAUTOCLAIM entry ID is not a valid string",
226                String::new(),
227            ))
228        })?;
229        let fields = Self::extract_fields(&entry_data[1]);
230        Ok((msg_id, fields))
231    }
232
233    fn extract_fields(value: &redis::Value) -> Vec<(String, String)> {
234        match value {
235            redis::Value::Array(field_pairs) => {
236                let mut pairs = Vec::new();
237                for chunk in field_pairs.chunks(2) {
238                    if chunk.len() == 2 {
239                        match (
240                            Self::extract_string(&chunk[0]),
241                            Self::extract_string(&chunk[1]),
242                        ) {
243                            (Some(key), Some(value)) => pairs.push((key, value)),
244                            _ => {
245                                warn!("Skipping malformed field pair: {:?}", chunk);
246                            }
247                        }
248                    }
249                }
250                pairs
251            }
252            _ => Vec::new(),
253        }
254    }
255
256    async fn recover_pending_messages<F, Fut>(
257        conn: &mut MultiplexedConnection,
258        stream_key: &str,
259        consumer_group: &str,
260        consumer_name: &str,
261        handler: &F,
262    ) -> Result<(), redis::RedisError>
263    where
264        F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
265        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
266            + Send,
267    {
268        let mut start_id = "0-0".to_string();
269        loop {
270            info!(
271                "XAUTOCLAIM: group={}, consumer={}, start={}",
272                consumer_group, consumer_name, start_id
273            );
274            let result: Result<redis::Value, redis::RedisError> = redis::cmd("XAUTOCLAIM")
275                .arg(stream_key)
276                .arg(consumer_group)
277                .arg(consumer_name)
278                .arg(XAUTOCLAIM_IDLE_MS)
279                .arg(&start_id)
280                .arg("COUNT")
281                .arg(XAUTOCLAIM_COUNT)
282                .query_async(conn)
283                .await;
284
285            let (next_start, claimed) = match result {
286                Ok(redis::Value::Array(arr)) => {
287                    // XAUTOCLAIM returns [cursor, claimed_messages, deleted_ids]
288                    // Validate expected 3-element structure
289                    if arr.len() != 3 {
290                        return Err(redis::RedisError::from((
291                            redis::ErrorKind::UnexpectedReturnType,
292                            "XAUTOCLAIM returned unexpected array length",
293                            format!("expected 3 elements, got {}", arr.len()),
294                        )));
295                    }
296                    let next_start = Self::extract_string(&arr[0]).ok_or_else(|| {
297                        redis::RedisError::from((
298                            redis::ErrorKind::UnexpectedReturnType,
299                            "XAUTOCLAIM cursor is not a valid string",
300                            String::new(),
301                        ))
302                    })?;
303                    let claimed = Self::extract_stream_entries(&arr[1])?;
304                    // arr[2] contains deleted message IDs that were claimed but no longer exist
305                    // We don't need to process them - they're already removed from the stream
306                    (next_start, claimed)
307                }
308                Ok(other) => {
309                    return Err(redis::RedisError::from((
310                        redis::ErrorKind::UnexpectedReturnType,
311                        "XAUTOCLAIM returned unexpected type",
312                        format!("{:?}", other),
313                    )));
314                }
315                Err(e) => {
316                    warn!("XAUTOCLAIM failed: {}", e);
317                    return Err(e);
318                }
319            };
320
321            info!(
322                "XAUTOCLAIM returned {} entries, next_start={}",
323                claimed.len(),
324                next_start
325            );
326
327            let mut ack_ids = Vec::with_capacity(claimed.len());
328            for (msg_id, fields) in &claimed {
329                if Self::process_entry(msg_id, fields, handler).await {
330                    ack_ids.push(msg_id.as_str());
331                }
332            }
333            if !ack_ids.is_empty() {
334                Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
335            }
336
337            if next_start == "0-0" {
338                return Ok(());
339            }
340            start_id = next_start;
341        }
342    }
343
344    async fn read_and_process_batch<F, Fut>(
345        conn: &mut MultiplexedConnection,
346        stream_key: &str,
347        consumer_group: &str,
348        consumer_name: &str,
349        handler: &F,
350    ) -> Result<(), redis::RedisError>
351    where
352        F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
353        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
354            + Send,
355    {
356        info!(
357            "XREADGROUP: group={}, consumer={}, stream={}",
358            consumer_group, consumer_name, stream_key
359        );
360        #[allow(clippy::type_complexity)]
361        let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREADGROUP")
362            .arg("GROUP")
363            .arg(consumer_group)
364            .arg(consumer_name)
365            .arg("COUNT")
366            .arg(XREADGROUP_COUNT)
367            .arg("BLOCK")
368            .arg(XREADGROUP_BLOCK_MS)
369            .arg("STREAMS")
370            .arg(stream_key)
371            .arg(">")
372            .query_async(conn)
373            .await?;
374        info!("XREADGROUP returned {} streams", results.len());
375
376        for (_stream, entries) in results {
377            let mut ack_ids = Vec::with_capacity(entries.len());
378            for (entry_id, fields) in &entries {
379                if Self::process_entry(entry_id, fields, handler).await {
380                    ack_ids.push(entry_id.as_str());
381                }
382            }
383            if !ack_ids.is_empty() {
384                Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
385            }
386        }
387
388        Ok(())
389    }
390
391    pub async fn start_consuming<F, Fut>(
392        &self,
393        handler: F,
394        cancellation_token: CancellationToken,
395    ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error + Send + Sync>>
396    where
397        F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
398        Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
399            + Send,
400    {
401        let client = Arc::clone(&self.client);
402        let stream_key = self.stream_key.clone();
403        let consumer_group = self.consumer_group.clone();
404        let consumer_name = self.consumer_name.clone();
405
406        let handle = tokio::spawn(async move {
407            let mut backoff = 1u64;
408
409            loop {
410                if cancellation_token.is_cancelled() {
411                    info!("Telemetry consumer shutting down");
412                    return;
413                }
414
415                let conn_result = client.get_multiplexed_async_connection().await;
416                if let Err(e) = &conn_result {
417                    error!(
418                        "Failed to connect to Redis: {}. Reconnecting in {}s",
419                        e, backoff
420                    );
421                    tokio::select! {
422                        _ = cancellation_token.cancelled() => {
423                            info!("Telemetry consumer shutting down");
424                            return;
425                        }
426                        _ = tokio::time::sleep(tokio::time::Duration::from_secs(backoff)) => {
427                            backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
428                        }
429                    }
430                    continue;
431                }
432
433                let mut conn = conn_result.unwrap();
434                if let Err(e) =
435                    Self::ensure_consumer_group(&mut conn, &stream_key, &consumer_group).await
436                {
437                    warn!("Failed to ensure consumer group: {}", e);
438                }
439
440                info!(
441                    "Starting telemetry consumption from stream: {} (group: {})",
442                    stream_key, consumer_group
443                );
444
445                let recover_result = Self::recover_pending_messages(
446                    &mut conn,
447                    &stream_key,
448                    &consumer_group,
449                    &consumer_name,
450                    &handler,
451                )
452                .await
453                .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
454
455                let mut do_reconnect = false;
456                if let Err(e) = &recover_result {
457                    warn!("Failed to recover pending messages: {}", e);
458                    do_reconnect = true;
459                }
460
461                if do_reconnect {
462                    error!("Recovery failed, reconnecting to retry on next cycle");
463                    backoff = 1;
464                    continue;
465                }
466
467                loop {
468                    if cancellation_token.is_cancelled() {
469                        info!("Telemetry consumer shutting down");
470                        return;
471                    }
472
473                    tokio::select! {
474                        result = Self::read_and_process_batch(
475                            &mut conn,
476                            &stream_key,
477                            &consumer_group,
478                            &consumer_name,
479                            &handler,
480                        ) => {
481                            match result {
482                                Ok(_) => {
483                                    backoff = 1;
484                                }
485                                Err(e) => {
486                                    error!(
487                                        "Telemetry consumer error: {}. Reconnecting in {}s",
488                                        e, backoff
489                                    );
490                                    backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
491                                    break;
492                                }
493                            }
494                        }
495                        _ = cancellation_token.cancelled() => {
496                            info!("Telemetry consumer shutting down");
497                            return;
498                        }
499                    }
500                }
501            }
502        });
503
504        Ok(handle)
505    }
506
507    fn parse_entry(msg_id: Option<&str>, fields: &[(String, String)]) -> Option<UsageRecord> {
508        let mut key = None;
509        let mut model = None;
510        let mut input_tokens = None;
511        let mut output_tokens = None;
512        let mut response_time_ms = None;
513        let mut timestamp = None;
514
515        for (k, v) in fields {
516            match k.as_str() {
517                "key" => key = Some(v),
518                "model" => model = Some(v),
519                "input_tokens" => input_tokens = Some(v),
520                "output_tokens" => output_tokens = Some(v),
521                "response_time_ms" => response_time_ms = Some(v),
522                "timestamp" => timestamp = Some(v),
523                _ => {}
524            }
525        }
526
527        let key_val = key?;
528        let model_val = model?;
529
530        if key_val.trim().is_empty() || model_val.trim().is_empty() {
531            return None;
532        }
533
534        let key = key_val.clone();
535        let model = model_val.clone();
536
537        Some(UsageRecord {
538            key,
539            model,
540            input_tokens: input_tokens?.parse().ok()?,
541            output_tokens: output_tokens?.parse().ok()?,
542            response_time_ms: response_time_ms?.parse().ok()?,
543            timestamp: timestamp?.parse().ok()?,
544            msg_id: msg_id.map(String::from),
545        })
546    }
547
548    /// Read a single batch of messages from the stream.
549    ///
550    /// **Note**: This method always reads from the beginning of the stream (ID "0")
551    /// and is intended for one-time reads or testing purposes only. For production
552    /// use with repeated batch reads, use `start_consuming` which leverages
553    /// consumer groups for proper message tracking and acknowledgment.
554    pub async fn read_single_batch(
555        &self,
556    ) -> Result<Vec<UsageRecord>, Box<dyn std::error::Error + Send + Sync>> {
557        let mut conn = self.client.get_multiplexed_async_connection().await?;
558
559        #[allow(clippy::type_complexity)]
560        let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREAD")
561            .arg("COUNT")
562            .arg(100)
563            .arg("STREAMS")
564            .arg(&self.stream_key)
565            .arg("0")
566            .query_async(&mut conn)
567            .await?;
568
569        let mut records = Vec::new();
570        for (_stream, entries) in results {
571            for (_entry_id, fields) in entries {
572                if let Some(record) = Self::parse_entry(None, &fields) {
573                    records.push(record);
574                }
575            }
576        }
577
578        Ok(records)
579    }
580}
581
582#[cfg(test)]
583mod tests {
584    use super::*;
585
586    #[test]
587    fn test_parse_entry_valid() {
588        let fields = vec![
589            ("key".to_string(), "test-key".to_string()),
590            ("model".to_string(), "gpt-4".to_string()),
591            ("input_tokens".to_string(), "100".to_string()),
592            ("output_tokens".to_string(), "50".to_string()),
593            ("response_time_ms".to_string(), "250".to_string()),
594            ("timestamp".to_string(), "1700000000000".to_string()),
595        ];
596
597        let record = TelemetryConsumer::parse_entry(None, &fields);
598        assert!(record.is_some());
599        let record = record.unwrap();
600        assert_eq!(record.key, "test-key");
601        assert_eq!(record.model, "gpt-4");
602        assert_eq!(record.input_tokens, 100);
603        assert_eq!(record.output_tokens, 50);
604        assert_eq!(record.response_time_ms, 250);
605        assert_eq!(record.timestamp, 1700000000000);
606    }
607
608    #[test]
609    fn test_parse_entry_with_msg_id() {
610        let fields = vec![
611            ("key".to_string(), "test-key".to_string()),
612            ("model".to_string(), "gpt-4".to_string()),
613            ("input_tokens".to_string(), "100".to_string()),
614            ("output_tokens".to_string(), "50".to_string()),
615            ("response_time_ms".to_string(), "250".to_string()),
616            ("timestamp".to_string(), "1700000000000".to_string()),
617        ];
618
619        let record = TelemetryConsumer::parse_entry(Some("1234567890-0"), &fields);
620        assert!(record.is_some());
621        let record = record.unwrap();
622        assert_eq!(record.key, "test-key");
623        assert_eq!(record.model, "gpt-4");
624        assert_eq!(record.msg_id, Some("1234567890-0".to_string()));
625    }
626
627    #[test]
628    fn test_parse_entry_missing_field() {
629        let fields = vec![
630            ("key".to_string(), "test-key".to_string()),
631            ("model".to_string(), "gpt-4".to_string()),
632        ];
633
634        let record = TelemetryConsumer::parse_entry(None, &fields);
635        assert!(record.is_none());
636    }
637
638    #[test]
639    fn test_parse_entry_invalid_number() {
640        let fields = vec![
641            ("key".to_string(), "test-key".to_string()),
642            ("model".to_string(), "gpt-4".to_string()),
643            ("input_tokens".to_string(), "not-a-number".to_string()),
644            ("output_tokens".to_string(), "50".to_string()),
645            ("response_time_ms".to_string(), "250".to_string()),
646            ("timestamp".to_string(), "1700000000000".to_string()),
647        ];
648
649        let record = TelemetryConsumer::parse_entry(None, &fields);
650        assert!(record.is_none());
651    }
652
653    #[tokio::test]
654    async fn test_telemetry_consumer_new() {
655        let result = TelemetryConsumer::new("redis://localhost:6379").await;
656        assert!(result.is_ok());
657        let consumer = result.unwrap();
658        assert_eq!(consumer.stream_key, "hyperinfer:telemetry");
659    }
660
661    #[tokio::test]
662    async fn test_telemetry_consumer_with_options() {
663        let consumer = TelemetryConsumer::new("redis://localhost:6379")
664            .await
665            .unwrap()
666            .with_stream_key("custom:stream")
667            .with_consumer_group("custom-group");
668
669        assert_eq!(consumer.stream_key, "custom:stream");
670        assert_eq!(consumer.consumer_group, "custom-group");
671    }
672
673    #[test]
674    fn test_parse_entry_extra_fields() {
675        let fields = vec![
676            ("key".to_string(), "test-key".to_string()),
677            ("model".to_string(), "gpt-4".to_string()),
678            ("input_tokens".to_string(), "100".to_string()),
679            ("output_tokens".to_string(), "50".to_string()),
680            ("response_time_ms".to_string(), "250".to_string()),
681            ("timestamp".to_string(), "1700000000000".to_string()),
682            ("extra_field".to_string(), "ignored".to_string()),
683        ];
684
685        let record = TelemetryConsumer::parse_entry(None, &fields);
686        assert!(record.is_some());
687        let record = record.unwrap();
688        assert_eq!(record.key, "test-key");
689    }
690
691    #[test]
692    fn test_parse_entry_empty() {
693        let fields = vec![];
694        let record = TelemetryConsumer::parse_entry(None, &fields);
695        assert!(record.is_none());
696    }
697
698    #[test]
699    fn test_parse_entry_partial_fields() {
700        let fields = vec![
701            ("key".to_string(), "test-key".to_string()),
702            ("model".to_string(), "gpt-4".to_string()),
703            ("input_tokens".to_string(), "100".to_string()),
704        ];
705
706        let record = TelemetryConsumer::parse_entry(None, &fields);
707        assert!(record.is_none());
708    }
709
710    #[test]
711    fn test_parse_entry_negative_numbers() {
712        let fields = vec![
713            ("key".to_string(), "test-key".to_string()),
714            ("model".to_string(), "gpt-4".to_string()),
715            ("input_tokens".to_string(), "-100".to_string()),
716            ("output_tokens".to_string(), "50".to_string()),
717            ("response_time_ms".to_string(), "250".to_string()),
718            ("timestamp".to_string(), "1700000000000".to_string()),
719        ];
720
721        let record = TelemetryConsumer::parse_entry(None, &fields);
722        assert!(record.is_none());
723    }
724
725    #[test]
726    fn test_parse_entry_overflow_u32() {
727        let fields = vec![
728            ("key".to_string(), "test-key".to_string()),
729            ("model".to_string(), "gpt-4".to_string()),
730            ("input_tokens".to_string(), "4294967296".to_string()),
731            ("output_tokens".to_string(), "50".to_string()),
732            ("response_time_ms".to_string(), "250".to_string()),
733            ("timestamp".to_string(), "1700000000000".to_string()),
734        ];
735
736        let record = TelemetryConsumer::parse_entry(None, &fields);
737        assert!(record.is_none());
738    }
739
740    #[test]
741    fn test_parse_entry_overflow_u64() {
742        let fields = vec![
743            ("key".to_string(), "test-key".to_string()),
744            ("model".to_string(), "gpt-4".to_string()),
745            ("input_tokens".to_string(), "100".to_string()),
746            ("output_tokens".to_string(), "50".to_string()),
747            (
748                "response_time_ms".to_string(),
749                "18446744073709551616".to_string(),
750            ),
751            ("timestamp".to_string(), "1700000000000".to_string()),
752        ];
753
754        let record = TelemetryConsumer::parse_entry(None, &fields);
755        assert!(record.is_none());
756    }
757
758    #[test]
759    fn test_parse_entry_max_values() {
760        let fields = vec![
761            ("key".to_string(), "test-key".to_string()),
762            ("model".to_string(), "gpt-4".to_string()),
763            ("input_tokens".to_string(), u32::MAX.to_string()),
764            ("output_tokens".to_string(), u32::MAX.to_string()),
765            ("response_time_ms".to_string(), u64::MAX.to_string()),
766            ("timestamp".to_string(), u64::MAX.to_string()),
767        ];
768
769        let record = TelemetryConsumer::parse_entry(None, &fields);
770        assert!(record.is_some());
771        let record = record.unwrap();
772        assert_eq!(record.input_tokens, u32::MAX);
773        assert_eq!(record.output_tokens, u32::MAX);
774        assert_eq!(record.response_time_ms, u64::MAX);
775        assert_eq!(record.timestamp, u64::MAX);
776    }
777
778    #[test]
779    fn test_parse_entry_zero_values() {
780        let fields = vec![
781            ("key".to_string(), "test-key".to_string()),
782            ("model".to_string(), "gpt-4".to_string()),
783            ("input_tokens".to_string(), "0".to_string()),
784            ("output_tokens".to_string(), "0".to_string()),
785            ("response_time_ms".to_string(), "0".to_string()),
786            ("timestamp".to_string(), "0".to_string()),
787        ];
788
789        let record = TelemetryConsumer::parse_entry(None, &fields);
790        assert!(record.is_some());
791        let record = record.unwrap();
792        assert_eq!(record.input_tokens, 0);
793        assert_eq!(record.output_tokens, 0);
794        assert_eq!(record.response_time_ms, 0);
795        assert_eq!(record.timestamp, 0);
796    }
797
798    #[test]
799    fn test_parse_entry_empty_strings() {
800        let fields = vec![
801            ("key".to_string(), "".to_string()),
802            ("model".to_string(), "".to_string()),
803            ("input_tokens".to_string(), "100".to_string()),
804            ("output_tokens".to_string(), "50".to_string()),
805            ("response_time_ms".to_string(), "250".to_string()),
806            ("timestamp".to_string(), "1700000000000".to_string()),
807        ];
808
809        let record = TelemetryConsumer::parse_entry(None, &fields);
810        assert!(record.is_none());
811    }
812
813    #[test]
814    fn test_parse_entry_whitespace_strings() {
815        let fields = vec![
816            ("key".to_string(), "   ".to_string()),
817            ("model".to_string(), "   ".to_string()),
818            ("input_tokens".to_string(), "100".to_string()),
819            ("output_tokens".to_string(), "50".to_string()),
820            ("response_time_ms".to_string(), "250".to_string()),
821            ("timestamp".to_string(), "1700000000000".to_string()),
822        ];
823
824        let record = TelemetryConsumer::parse_entry(None, &fields);
825        assert!(record.is_none());
826    }
827
828    #[test]
829    fn test_parse_entry_special_characters() {
830        let fields = vec![
831            ("key".to_string(), "test-key-!@#$%".to_string()),
832            ("model".to_string(), "gpt-4-turbo-preview".to_string()),
833            ("input_tokens".to_string(), "100".to_string()),
834            ("output_tokens".to_string(), "50".to_string()),
835            ("response_time_ms".to_string(), "250".to_string()),
836            ("timestamp".to_string(), "1700000000000".to_string()),
837        ];
838
839        let record = TelemetryConsumer::parse_entry(None, &fields);
840        assert!(record.is_some());
841        let record = record.unwrap();
842        assert_eq!(record.key, "test-key-!@#$%");
843        assert_eq!(record.model, "gpt-4-turbo-preview");
844    }
845
846    #[test]
847    fn test_parse_entry_unicode() {
848        let fields = vec![
849            ("key".to_string(), "test-key-🔑".to_string()),
850            ("model".to_string(), "gpt-4".to_string()),
851            ("input_tokens".to_string(), "100".to_string()),
852            ("output_tokens".to_string(), "50".to_string()),
853            ("response_time_ms".to_string(), "250".to_string()),
854            ("timestamp".to_string(), "1700000000000".to_string()),
855        ];
856
857        let record = TelemetryConsumer::parse_entry(None, &fields);
858        assert!(record.is_some());
859        let record = record.unwrap();
860        assert_eq!(record.key, "test-key-🔑");
861    }
862
863    #[test]
864    fn test_parse_entry_very_long_strings() {
865        let long_key = "a".repeat(10000);
866        let long_model = "b".repeat(10000);
867        let fields = vec![
868            ("key".to_string(), long_key.clone()),
869            ("model".to_string(), long_model.clone()),
870            ("input_tokens".to_string(), "100".to_string()),
871            ("output_tokens".to_string(), "50".to_string()),
872            ("response_time_ms".to_string(), "250".to_string()),
873            ("timestamp".to_string(), "1700000000000".to_string()),
874        ];
875
876        let record = TelemetryConsumer::parse_entry(None, &fields);
877        assert!(record.is_some());
878        let record = record.unwrap();
879        assert_eq!(record.key, long_key);
880        assert_eq!(record.model, long_model);
881    }
882}