1use redis::aio::MultiplexedConnection;
7use redis::Client;
8use std::sync::Arc;
9use tokio_util::sync::CancellationToken;
10use tracing::{error, info, warn};
11
12use crate::types::UsageRecord;
13
14const DEFAULT_TELEMETRY_STREAM: &str = "hyperinfer:telemetry";
15const DEFAULT_CONSUMER_GROUP: &str = "telemetry-consumer";
16const XAUTOCLAIM_IDLE_MS: &str = "600000";
17const XREADGROUP_BLOCK_MS: u32 = 5000;
18const XREADGROUP_COUNT: u32 = 10;
19const XAUTOCLAIM_COUNT: u32 = 100;
20const MAX_BACKOFF_SECS: u64 = 60;
21
22type StreamEntry = (String, Vec<(String, String)>);
23
24pub struct TelemetryConsumer {
25 client: Arc<Client>,
26 stream_key: String,
27 consumer_group: String,
28 consumer_name: String,
29}
30
31impl TelemetryConsumer {
32 pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
33 let client = Client::open(redis_url)?;
34 let consumer_name = format!("consumer-{}", uuid::Uuid::new_v4());
35
36 Ok(Self {
37 client: Arc::new(client),
38 stream_key: DEFAULT_TELEMETRY_STREAM.to_string(),
39 consumer_group: DEFAULT_CONSUMER_GROUP.to_string(),
40 consumer_name,
41 })
42 }
43
44 pub fn with_stream_key(mut self, stream_key: &str) -> Self {
45 self.stream_key = stream_key.to_string();
46 self
47 }
48
49 pub fn with_consumer_group(mut self, group: &str) -> Self {
50 self.consumer_group = group.to_string();
51 self
52 }
53
54 async fn ensure_consumer_group(
55 conn: &mut MultiplexedConnection,
56 stream_key: &str,
57 consumer_group: &str,
58 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
59 info!(
60 "Creating consumer group {} for stream {}",
61 consumer_group, stream_key
62 );
63 let result: Result<(), redis::RedisError> = redis::cmd("XGROUP")
64 .arg("CREATE")
65 .arg(stream_key)
66 .arg(consumer_group)
67 .arg("0")
68 .arg("MKSTREAM")
69 .query_async(conn)
70 .await;
71
72 match result {
73 Ok(_) => {
74 info!("Consumer group created successfully");
75 Ok(())
76 }
77 Err(e) => {
78 if e.to_string().contains("BUSYGROUP") {
79 info!("Consumer group already exists");
80 Ok(())
81 } else {
82 Err(e.into())
83 }
84 }
85 }
86 }
87
88 async fn ack_messages(
89 conn: &mut MultiplexedConnection,
90 stream_key: &str,
91 consumer_group: &str,
92 msg_ids: &[&str],
93 ) -> Result<(), redis::RedisError> {
94 Self::ack_messages_with_retry(conn, stream_key, consumer_group, msg_ids, 3, 50).await
95 }
96
97 async fn ack_messages_with_retry(
98 conn: &mut MultiplexedConnection,
99 stream_key: &str,
100 consumer_group: &str,
101 msg_ids: &[&str],
102 max_retries: u32,
103 base_delay_ms: u64,
104 ) -> Result<(), redis::RedisError> {
105 if msg_ids.is_empty() {
106 return Ok(());
107 }
108
109 let mut last_error = None;
110 for attempt in 0..max_retries {
111 match Self::do_ack_messages(conn, stream_key, consumer_group, msg_ids).await {
112 Ok(_) => return Ok(()),
113 Err(e) => {
114 last_error = Some(e.clone());
115 if attempt < max_retries - 1 {
116 let delay_ms = base_delay_ms * (2_u64.pow(attempt));
117 warn!(
118 "XACK failed (attempt {}/{}), retrying in {}ms: {}",
119 attempt + 1,
120 max_retries,
121 delay_ms,
122 e
123 );
124 tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
125 }
126 }
127 }
128 }
129 Err(last_error.unwrap())
130 }
131
132 async fn do_ack_messages(
133 conn: &mut MultiplexedConnection,
134 stream_key: &str,
135 consumer_group: &str,
136 msg_ids: &[&str],
137 ) -> Result<(), redis::RedisError> {
138 let mut cmd = redis::cmd("XACK");
139 cmd.arg(stream_key).arg(consumer_group);
140 for id in msg_ids {
141 cmd.arg(id);
142 }
143 let count: usize = cmd.query_async(conn).await?;
144 if count < msg_ids.len() {
145 let remaining = msg_ids.len() - count;
146 warn!(
147 "XACK only acknowledged {}/{} messages; {} may need recovery on reconnect",
148 count,
149 msg_ids.len(),
150 remaining
151 );
152 }
153 Ok(())
154 }
155
156 async fn process_entry<F, Fut>(msg_id: &str, fields: &[(String, String)], handler: &F) -> bool
157 where
158 F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
159 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
160 + Send,
161 {
162 if let Some(record) = Self::parse_entry(Some(msg_id), fields) {
163 match handler(record).await {
164 Ok(_) => true,
165 Err(e) => {
166 warn!("Failed to process message {}: {:?}", msg_id, e);
167 false
168 }
169 }
170 } else {
171 warn!("Failed to parse message {}", msg_id);
172 true
173 }
174 }
175
176 fn extract_string(value: &redis::Value) -> Option<String> {
177 match value {
178 redis::Value::BulkString(bytes) => Some(String::from_utf8_lossy(bytes).to_string()),
179 redis::Value::SimpleString(s) => Some(s.clone()),
180 _ => None,
181 }
182 }
183
184 fn extract_stream_entries(value: &redis::Value) -> Result<Vec<StreamEntry>, redis::RedisError> {
185 match value {
186 redis::Value::Array(entries) => {
187 let mut result = Vec::new();
188 for (i, entry) in entries.iter().enumerate() {
189 match entry {
190 redis::Value::Array(entry_data) => {
191 result.push(Self::extract_stream_entry(entry_data)?);
192 }
193 other => {
194 return Err(redis::RedisError::from((
195 redis::ErrorKind::UnexpectedReturnType,
196 "XAUTOCLAIM entry is not an array",
197 format!("entry {}: {:?}", i, other),
198 )));
199 }
200 }
201 }
202 Ok(result)
203 }
204 other => Err(redis::RedisError::from((
205 redis::ErrorKind::UnexpectedReturnType,
206 "XAUTOCLAIM claimed_messages is not an array",
207 format!("{:?}", other),
208 ))),
209 }
210 }
211
212 fn extract_stream_entry(
213 entry_data: &[redis::Value],
214 ) -> Result<(String, Vec<(String, String)>), redis::RedisError> {
215 if entry_data.len() < 2 {
216 return Err(redis::RedisError::from((
217 redis::ErrorKind::UnexpectedReturnType,
218 "XAUTOCLAIM entry has insufficient elements",
219 format!("expected >= 2, got {}", entry_data.len()),
220 )));
221 }
222 let msg_id = Self::extract_string(&entry_data[0]).ok_or_else(|| {
223 redis::RedisError::from((
224 redis::ErrorKind::UnexpectedReturnType,
225 "XAUTOCLAIM entry ID is not a valid string",
226 String::new(),
227 ))
228 })?;
229 let fields = Self::extract_fields(&entry_data[1]);
230 Ok((msg_id, fields))
231 }
232
233 fn extract_fields(value: &redis::Value) -> Vec<(String, String)> {
234 match value {
235 redis::Value::Array(field_pairs) => {
236 let mut pairs = Vec::new();
237 for chunk in field_pairs.chunks(2) {
238 if chunk.len() == 2 {
239 match (
240 Self::extract_string(&chunk[0]),
241 Self::extract_string(&chunk[1]),
242 ) {
243 (Some(key), Some(value)) => pairs.push((key, value)),
244 _ => {
245 warn!("Skipping malformed field pair: {:?}", chunk);
246 }
247 }
248 }
249 }
250 pairs
251 }
252 _ => Vec::new(),
253 }
254 }
255
256 async fn recover_pending_messages<F, Fut>(
257 conn: &mut MultiplexedConnection,
258 stream_key: &str,
259 consumer_group: &str,
260 consumer_name: &str,
261 handler: &F,
262 ) -> Result<(), redis::RedisError>
263 where
264 F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
265 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
266 + Send,
267 {
268 let mut start_id = "0-0".to_string();
269 loop {
270 info!(
271 "XAUTOCLAIM: group={}, consumer={}, start={}",
272 consumer_group, consumer_name, start_id
273 );
274 let result: Result<redis::Value, redis::RedisError> = redis::cmd("XAUTOCLAIM")
275 .arg(stream_key)
276 .arg(consumer_group)
277 .arg(consumer_name)
278 .arg(XAUTOCLAIM_IDLE_MS)
279 .arg(&start_id)
280 .arg("COUNT")
281 .arg(XAUTOCLAIM_COUNT)
282 .query_async(conn)
283 .await;
284
285 let (next_start, claimed) = match result {
286 Ok(redis::Value::Array(arr)) => {
287 if arr.len() != 3 {
290 return Err(redis::RedisError::from((
291 redis::ErrorKind::UnexpectedReturnType,
292 "XAUTOCLAIM returned unexpected array length",
293 format!("expected 3 elements, got {}", arr.len()),
294 )));
295 }
296 let next_start = Self::extract_string(&arr[0]).ok_or_else(|| {
297 redis::RedisError::from((
298 redis::ErrorKind::UnexpectedReturnType,
299 "XAUTOCLAIM cursor is not a valid string",
300 String::new(),
301 ))
302 })?;
303 let claimed = Self::extract_stream_entries(&arr[1])?;
304 (next_start, claimed)
307 }
308 Ok(other) => {
309 return Err(redis::RedisError::from((
310 redis::ErrorKind::UnexpectedReturnType,
311 "XAUTOCLAIM returned unexpected type",
312 format!("{:?}", other),
313 )));
314 }
315 Err(e) => {
316 warn!("XAUTOCLAIM failed: {}", e);
317 return Err(e);
318 }
319 };
320
321 info!(
322 "XAUTOCLAIM returned {} entries, next_start={}",
323 claimed.len(),
324 next_start
325 );
326
327 let mut ack_ids = Vec::with_capacity(claimed.len());
328 for (msg_id, fields) in &claimed {
329 if Self::process_entry(msg_id, fields, handler).await {
330 ack_ids.push(msg_id.as_str());
331 }
332 }
333 if !ack_ids.is_empty() {
334 Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
335 }
336
337 if next_start == "0-0" {
338 return Ok(());
339 }
340 start_id = next_start;
341 }
342 }
343
344 async fn read_and_process_batch<F, Fut>(
345 conn: &mut MultiplexedConnection,
346 stream_key: &str,
347 consumer_group: &str,
348 consumer_name: &str,
349 handler: &F,
350 ) -> Result<(), redis::RedisError>
351 where
352 F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
353 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
354 + Send,
355 {
356 info!(
357 "XREADGROUP: group={}, consumer={}, stream={}",
358 consumer_group, consumer_name, stream_key
359 );
360 #[allow(clippy::type_complexity)]
361 let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREADGROUP")
362 .arg("GROUP")
363 .arg(consumer_group)
364 .arg(consumer_name)
365 .arg("COUNT")
366 .arg(XREADGROUP_COUNT)
367 .arg("BLOCK")
368 .arg(XREADGROUP_BLOCK_MS)
369 .arg("STREAMS")
370 .arg(stream_key)
371 .arg(">")
372 .query_async(conn)
373 .await?;
374 info!("XREADGROUP returned {} streams", results.len());
375
376 for (_stream, entries) in results {
377 let mut ack_ids = Vec::with_capacity(entries.len());
378 for (entry_id, fields) in &entries {
379 if Self::process_entry(entry_id, fields, handler).await {
380 ack_ids.push(entry_id.as_str());
381 }
382 }
383 if !ack_ids.is_empty() {
384 Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
385 }
386 }
387
388 Ok(())
389 }
390
391 pub async fn start_consuming<F, Fut>(
392 &self,
393 handler: F,
394 cancellation_token: CancellationToken,
395 ) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error + Send + Sync>>
396 where
397 F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
398 Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
399 + Send,
400 {
401 let client = Arc::clone(&self.client);
402 let stream_key = self.stream_key.clone();
403 let consumer_group = self.consumer_group.clone();
404 let consumer_name = self.consumer_name.clone();
405
406 let handle = tokio::spawn(async move {
407 let mut backoff = 1u64;
408
409 loop {
410 if cancellation_token.is_cancelled() {
411 info!("Telemetry consumer shutting down");
412 return;
413 }
414
415 let conn_result = client.get_multiplexed_async_connection().await;
416 if let Err(e) = &conn_result {
417 error!(
418 "Failed to connect to Redis: {}. Reconnecting in {}s",
419 e, backoff
420 );
421 tokio::select! {
422 _ = cancellation_token.cancelled() => {
423 info!("Telemetry consumer shutting down");
424 return;
425 }
426 _ = tokio::time::sleep(tokio::time::Duration::from_secs(backoff)) => {
427 backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
428 }
429 }
430 continue;
431 }
432
433 let mut conn = conn_result.unwrap();
434 if let Err(e) =
435 Self::ensure_consumer_group(&mut conn, &stream_key, &consumer_group).await
436 {
437 warn!("Failed to ensure consumer group: {}", e);
438 }
439
440 info!(
441 "Starting telemetry consumption from stream: {} (group: {})",
442 stream_key, consumer_group
443 );
444
445 let recover_result = Self::recover_pending_messages(
446 &mut conn,
447 &stream_key,
448 &consumer_group,
449 &consumer_name,
450 &handler,
451 )
452 .await
453 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
454
455 let mut do_reconnect = false;
456 if let Err(e) = &recover_result {
457 warn!("Failed to recover pending messages: {}", e);
458 do_reconnect = true;
459 }
460
461 if do_reconnect {
462 error!("Recovery failed, reconnecting to retry on next cycle");
463 backoff = 1;
464 continue;
465 }
466
467 loop {
468 if cancellation_token.is_cancelled() {
469 info!("Telemetry consumer shutting down");
470 return;
471 }
472
473 tokio::select! {
474 result = Self::read_and_process_batch(
475 &mut conn,
476 &stream_key,
477 &consumer_group,
478 &consumer_name,
479 &handler,
480 ) => {
481 match result {
482 Ok(_) => {
483 backoff = 1;
484 }
485 Err(e) => {
486 error!(
487 "Telemetry consumer error: {}. Reconnecting in {}s",
488 e, backoff
489 );
490 backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
491 break;
492 }
493 }
494 }
495 _ = cancellation_token.cancelled() => {
496 info!("Telemetry consumer shutting down");
497 return;
498 }
499 }
500 }
501 }
502 });
503
504 Ok(handle)
505 }
506
507 fn parse_entry(msg_id: Option<&str>, fields: &[(String, String)]) -> Option<UsageRecord> {
508 let mut key = None;
509 let mut model = None;
510 let mut input_tokens = None;
511 let mut output_tokens = None;
512 let mut response_time_ms = None;
513 let mut timestamp = None;
514
515 for (k, v) in fields {
516 match k.as_str() {
517 "key" => key = Some(v),
518 "model" => model = Some(v),
519 "input_tokens" => input_tokens = Some(v),
520 "output_tokens" => output_tokens = Some(v),
521 "response_time_ms" => response_time_ms = Some(v),
522 "timestamp" => timestamp = Some(v),
523 _ => {}
524 }
525 }
526
527 let key_val = key?;
528 let model_val = model?;
529
530 if key_val.trim().is_empty() || model_val.trim().is_empty() {
531 return None;
532 }
533
534 let key = key_val.clone();
535 let model = model_val.clone();
536
537 Some(UsageRecord {
538 key,
539 model,
540 input_tokens: input_tokens?.parse().ok()?,
541 output_tokens: output_tokens?.parse().ok()?,
542 response_time_ms: response_time_ms?.parse().ok()?,
543 timestamp: timestamp?.parse().ok()?,
544 msg_id: msg_id.map(String::from),
545 })
546 }
547
548 pub async fn read_single_batch(
555 &self,
556 ) -> Result<Vec<UsageRecord>, Box<dyn std::error::Error + Send + Sync>> {
557 let mut conn = self.client.get_multiplexed_async_connection().await?;
558
559 #[allow(clippy::type_complexity)]
560 let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREAD")
561 .arg("COUNT")
562 .arg(100)
563 .arg("STREAMS")
564 .arg(&self.stream_key)
565 .arg("0")
566 .query_async(&mut conn)
567 .await?;
568
569 let mut records = Vec::new();
570 for (_stream, entries) in results {
571 for (_entry_id, fields) in entries {
572 if let Some(record) = Self::parse_entry(None, &fields) {
573 records.push(record);
574 }
575 }
576 }
577
578 Ok(records)
579 }
580}
581
582#[cfg(test)]
583mod tests {
584 use super::*;
585
586 #[test]
587 fn test_parse_entry_valid() {
588 let fields = vec![
589 ("key".to_string(), "test-key".to_string()),
590 ("model".to_string(), "gpt-4".to_string()),
591 ("input_tokens".to_string(), "100".to_string()),
592 ("output_tokens".to_string(), "50".to_string()),
593 ("response_time_ms".to_string(), "250".to_string()),
594 ("timestamp".to_string(), "1700000000000".to_string()),
595 ];
596
597 let record = TelemetryConsumer::parse_entry(None, &fields);
598 assert!(record.is_some());
599 let record = record.unwrap();
600 assert_eq!(record.key, "test-key");
601 assert_eq!(record.model, "gpt-4");
602 assert_eq!(record.input_tokens, 100);
603 assert_eq!(record.output_tokens, 50);
604 assert_eq!(record.response_time_ms, 250);
605 assert_eq!(record.timestamp, 1700000000000);
606 }
607
608 #[test]
609 fn test_parse_entry_with_msg_id() {
610 let fields = vec![
611 ("key".to_string(), "test-key".to_string()),
612 ("model".to_string(), "gpt-4".to_string()),
613 ("input_tokens".to_string(), "100".to_string()),
614 ("output_tokens".to_string(), "50".to_string()),
615 ("response_time_ms".to_string(), "250".to_string()),
616 ("timestamp".to_string(), "1700000000000".to_string()),
617 ];
618
619 let record = TelemetryConsumer::parse_entry(Some("1234567890-0"), &fields);
620 assert!(record.is_some());
621 let record = record.unwrap();
622 assert_eq!(record.key, "test-key");
623 assert_eq!(record.model, "gpt-4");
624 assert_eq!(record.msg_id, Some("1234567890-0".to_string()));
625 }
626
627 #[test]
628 fn test_parse_entry_missing_field() {
629 let fields = vec![
630 ("key".to_string(), "test-key".to_string()),
631 ("model".to_string(), "gpt-4".to_string()),
632 ];
633
634 let record = TelemetryConsumer::parse_entry(None, &fields);
635 assert!(record.is_none());
636 }
637
638 #[test]
639 fn test_parse_entry_invalid_number() {
640 let fields = vec![
641 ("key".to_string(), "test-key".to_string()),
642 ("model".to_string(), "gpt-4".to_string()),
643 ("input_tokens".to_string(), "not-a-number".to_string()),
644 ("output_tokens".to_string(), "50".to_string()),
645 ("response_time_ms".to_string(), "250".to_string()),
646 ("timestamp".to_string(), "1700000000000".to_string()),
647 ];
648
649 let record = TelemetryConsumer::parse_entry(None, &fields);
650 assert!(record.is_none());
651 }
652
653 #[tokio::test]
654 async fn test_telemetry_consumer_new() {
655 let result = TelemetryConsumer::new("redis://localhost:6379").await;
656 assert!(result.is_ok());
657 let consumer = result.unwrap();
658 assert_eq!(consumer.stream_key, "hyperinfer:telemetry");
659 }
660
661 #[tokio::test]
662 async fn test_telemetry_consumer_with_options() {
663 let consumer = TelemetryConsumer::new("redis://localhost:6379")
664 .await
665 .unwrap()
666 .with_stream_key("custom:stream")
667 .with_consumer_group("custom-group");
668
669 assert_eq!(consumer.stream_key, "custom:stream");
670 assert_eq!(consumer.consumer_group, "custom-group");
671 }
672
673 #[test]
674 fn test_parse_entry_extra_fields() {
675 let fields = vec![
676 ("key".to_string(), "test-key".to_string()),
677 ("model".to_string(), "gpt-4".to_string()),
678 ("input_tokens".to_string(), "100".to_string()),
679 ("output_tokens".to_string(), "50".to_string()),
680 ("response_time_ms".to_string(), "250".to_string()),
681 ("timestamp".to_string(), "1700000000000".to_string()),
682 ("extra_field".to_string(), "ignored".to_string()),
683 ];
684
685 let record = TelemetryConsumer::parse_entry(None, &fields);
686 assert!(record.is_some());
687 let record = record.unwrap();
688 assert_eq!(record.key, "test-key");
689 }
690
691 #[test]
692 fn test_parse_entry_empty() {
693 let fields = vec![];
694 let record = TelemetryConsumer::parse_entry(None, &fields);
695 assert!(record.is_none());
696 }
697
698 #[test]
699 fn test_parse_entry_partial_fields() {
700 let fields = vec![
701 ("key".to_string(), "test-key".to_string()),
702 ("model".to_string(), "gpt-4".to_string()),
703 ("input_tokens".to_string(), "100".to_string()),
704 ];
705
706 let record = TelemetryConsumer::parse_entry(None, &fields);
707 assert!(record.is_none());
708 }
709
710 #[test]
711 fn test_parse_entry_negative_numbers() {
712 let fields = vec![
713 ("key".to_string(), "test-key".to_string()),
714 ("model".to_string(), "gpt-4".to_string()),
715 ("input_tokens".to_string(), "-100".to_string()),
716 ("output_tokens".to_string(), "50".to_string()),
717 ("response_time_ms".to_string(), "250".to_string()),
718 ("timestamp".to_string(), "1700000000000".to_string()),
719 ];
720
721 let record = TelemetryConsumer::parse_entry(None, &fields);
722 assert!(record.is_none());
723 }
724
725 #[test]
726 fn test_parse_entry_overflow_u32() {
727 let fields = vec![
728 ("key".to_string(), "test-key".to_string()),
729 ("model".to_string(), "gpt-4".to_string()),
730 ("input_tokens".to_string(), "4294967296".to_string()),
731 ("output_tokens".to_string(), "50".to_string()),
732 ("response_time_ms".to_string(), "250".to_string()),
733 ("timestamp".to_string(), "1700000000000".to_string()),
734 ];
735
736 let record = TelemetryConsumer::parse_entry(None, &fields);
737 assert!(record.is_none());
738 }
739
740 #[test]
741 fn test_parse_entry_overflow_u64() {
742 let fields = vec![
743 ("key".to_string(), "test-key".to_string()),
744 ("model".to_string(), "gpt-4".to_string()),
745 ("input_tokens".to_string(), "100".to_string()),
746 ("output_tokens".to_string(), "50".to_string()),
747 (
748 "response_time_ms".to_string(),
749 "18446744073709551616".to_string(),
750 ),
751 ("timestamp".to_string(), "1700000000000".to_string()),
752 ];
753
754 let record = TelemetryConsumer::parse_entry(None, &fields);
755 assert!(record.is_none());
756 }
757
758 #[test]
759 fn test_parse_entry_max_values() {
760 let fields = vec![
761 ("key".to_string(), "test-key".to_string()),
762 ("model".to_string(), "gpt-4".to_string()),
763 ("input_tokens".to_string(), u32::MAX.to_string()),
764 ("output_tokens".to_string(), u32::MAX.to_string()),
765 ("response_time_ms".to_string(), u64::MAX.to_string()),
766 ("timestamp".to_string(), u64::MAX.to_string()),
767 ];
768
769 let record = TelemetryConsumer::parse_entry(None, &fields);
770 assert!(record.is_some());
771 let record = record.unwrap();
772 assert_eq!(record.input_tokens, u32::MAX);
773 assert_eq!(record.output_tokens, u32::MAX);
774 assert_eq!(record.response_time_ms, u64::MAX);
775 assert_eq!(record.timestamp, u64::MAX);
776 }
777
778 #[test]
779 fn test_parse_entry_zero_values() {
780 let fields = vec![
781 ("key".to_string(), "test-key".to_string()),
782 ("model".to_string(), "gpt-4".to_string()),
783 ("input_tokens".to_string(), "0".to_string()),
784 ("output_tokens".to_string(), "0".to_string()),
785 ("response_time_ms".to_string(), "0".to_string()),
786 ("timestamp".to_string(), "0".to_string()),
787 ];
788
789 let record = TelemetryConsumer::parse_entry(None, &fields);
790 assert!(record.is_some());
791 let record = record.unwrap();
792 assert_eq!(record.input_tokens, 0);
793 assert_eq!(record.output_tokens, 0);
794 assert_eq!(record.response_time_ms, 0);
795 assert_eq!(record.timestamp, 0);
796 }
797
798 #[test]
799 fn test_parse_entry_empty_strings() {
800 let fields = vec![
801 ("key".to_string(), "".to_string()),
802 ("model".to_string(), "".to_string()),
803 ("input_tokens".to_string(), "100".to_string()),
804 ("output_tokens".to_string(), "50".to_string()),
805 ("response_time_ms".to_string(), "250".to_string()),
806 ("timestamp".to_string(), "1700000000000".to_string()),
807 ];
808
809 let record = TelemetryConsumer::parse_entry(None, &fields);
810 assert!(record.is_none());
811 }
812
813 #[test]
814 fn test_parse_entry_whitespace_strings() {
815 let fields = vec![
816 ("key".to_string(), " ".to_string()),
817 ("model".to_string(), " ".to_string()),
818 ("input_tokens".to_string(), "100".to_string()),
819 ("output_tokens".to_string(), "50".to_string()),
820 ("response_time_ms".to_string(), "250".to_string()),
821 ("timestamp".to_string(), "1700000000000".to_string()),
822 ];
823
824 let record = TelemetryConsumer::parse_entry(None, &fields);
825 assert!(record.is_none());
826 }
827
828 #[test]
829 fn test_parse_entry_special_characters() {
830 let fields = vec![
831 ("key".to_string(), "test-key-!@#$%".to_string()),
832 ("model".to_string(), "gpt-4-turbo-preview".to_string()),
833 ("input_tokens".to_string(), "100".to_string()),
834 ("output_tokens".to_string(), "50".to_string()),
835 ("response_time_ms".to_string(), "250".to_string()),
836 ("timestamp".to_string(), "1700000000000".to_string()),
837 ];
838
839 let record = TelemetryConsumer::parse_entry(None, &fields);
840 assert!(record.is_some());
841 let record = record.unwrap();
842 assert_eq!(record.key, "test-key-!@#$%");
843 assert_eq!(record.model, "gpt-4-turbo-preview");
844 }
845
846 #[test]
847 fn test_parse_entry_unicode() {
848 let fields = vec![
849 ("key".to_string(), "test-key-🔑".to_string()),
850 ("model".to_string(), "gpt-4".to_string()),
851 ("input_tokens".to_string(), "100".to_string()),
852 ("output_tokens".to_string(), "50".to_string()),
853 ("response_time_ms".to_string(), "250".to_string()),
854 ("timestamp".to_string(), "1700000000000".to_string()),
855 ];
856
857 let record = TelemetryConsumer::parse_entry(None, &fields);
858 assert!(record.is_some());
859 let record = record.unwrap();
860 assert_eq!(record.key, "test-key-🔑");
861 }
862
863 #[test]
864 fn test_parse_entry_very_long_strings() {
865 let long_key = "a".repeat(10000);
866 let long_model = "b".repeat(10000);
867 let fields = vec![
868 ("key".to_string(), long_key.clone()),
869 ("model".to_string(), long_model.clone()),
870 ("input_tokens".to_string(), "100".to_string()),
871 ("output_tokens".to_string(), "50".to_string()),
872 ("response_time_ms".to_string(), "250".to_string()),
873 ("timestamp".to_string(), "1700000000000".to_string()),
874 ];
875
876 let record = TelemetryConsumer::parse_entry(None, &fields);
877 assert!(record.is_some());
878 let record = record.unwrap();
879 assert_eq!(record.key, long_key);
880 assert_eq!(record.model, long_model);
881 }
882}