use redis::aio::MultiplexedConnection;
use redis::Client;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
use tracing::{error, info, warn};
use crate::types::UsageRecord;
const DEFAULT_TELEMETRY_STREAM: &str = "hyperinfer:telemetry";
const DEFAULT_CONSUMER_GROUP: &str = "telemetry-consumer";
const XAUTOCLAIM_IDLE_MS: &str = "600000";
const XREADGROUP_BLOCK_MS: u32 = 5000;
const XREADGROUP_COUNT: u32 = 10;
const XAUTOCLAIM_COUNT: u32 = 100;
const MAX_BACKOFF_SECS: u64 = 60;
type StreamEntry = (String, Vec<(String, String)>);
pub struct TelemetryConsumer {
client: Arc<Client>,
stream_key: String,
consumer_group: String,
consumer_name: String,
}
impl TelemetryConsumer {
pub async fn new(redis_url: &str) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let client = Client::open(redis_url)?;
let consumer_name = format!("consumer-{}", uuid::Uuid::new_v4());
Ok(Self {
client: Arc::new(client),
stream_key: DEFAULT_TELEMETRY_STREAM.to_string(),
consumer_group: DEFAULT_CONSUMER_GROUP.to_string(),
consumer_name,
})
}
pub fn with_stream_key(mut self, stream_key: &str) -> Self {
self.stream_key = stream_key.to_string();
self
}
pub fn with_consumer_group(mut self, group: &str) -> Self {
self.consumer_group = group.to_string();
self
}
async fn ensure_consumer_group(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Creating consumer group {} for stream {}",
consumer_group, stream_key
);
let result: Result<(), redis::RedisError> = redis::cmd("XGROUP")
.arg("CREATE")
.arg(stream_key)
.arg(consumer_group)
.arg("0")
.arg("MKSTREAM")
.query_async(conn)
.await;
match result {
Ok(_) => {
info!("Consumer group created successfully");
Ok(())
}
Err(e) => {
if e.to_string().contains("BUSYGROUP") {
info!("Consumer group already exists");
Ok(())
} else {
Err(e.into())
}
}
}
}
async fn ack_messages(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
msg_ids: &[&str],
) -> Result<(), redis::RedisError> {
Self::ack_messages_with_retry(conn, stream_key, consumer_group, msg_ids, 3, 50).await
}
async fn ack_messages_with_retry(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
msg_ids: &[&str],
max_retries: u32,
base_delay_ms: u64,
) -> Result<(), redis::RedisError> {
if msg_ids.is_empty() {
return Ok(());
}
let mut last_error = None;
for attempt in 0..max_retries {
match Self::do_ack_messages(conn, stream_key, consumer_group, msg_ids).await {
Ok(_) => return Ok(()),
Err(e) => {
last_error = Some(e.clone());
if attempt < max_retries - 1 {
let delay_ms = base_delay_ms * (2_u64.pow(attempt));
warn!(
"XACK failed (attempt {}/{}), retrying in {}ms: {}",
attempt + 1,
max_retries,
delay_ms,
e
);
tokio::time::sleep(tokio::time::Duration::from_millis(delay_ms)).await;
}
}
}
}
Err(last_error.unwrap())
}
async fn do_ack_messages(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
msg_ids: &[&str],
) -> Result<(), redis::RedisError> {
let mut cmd = redis::cmd("XACK");
cmd.arg(stream_key).arg(consumer_group);
for id in msg_ids {
cmd.arg(id);
}
let count: usize = cmd.query_async(conn).await?;
if count < msg_ids.len() {
let remaining = msg_ids.len() - count;
warn!(
"XACK only acknowledged {}/{} messages; {} may need recovery on reconnect",
count,
msg_ids.len(),
remaining
);
}
Ok(())
}
async fn process_entry<F, Fut>(msg_id: &str, fields: &[(String, String)], handler: &F) -> bool
where
F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send,
{
if let Some(record) = Self::parse_entry(Some(msg_id), fields) {
match handler(record).await {
Ok(_) => true,
Err(e) => {
warn!("Failed to process message {}: {:?}", msg_id, e);
false
}
}
} else {
warn!("Failed to parse message {}", msg_id);
true
}
}
fn extract_string(value: &redis::Value) -> Option<String> {
match value {
redis::Value::BulkString(bytes) => Some(String::from_utf8_lossy(bytes).to_string()),
redis::Value::SimpleString(s) => Some(s.clone()),
_ => None,
}
}
fn extract_stream_entries(value: &redis::Value) -> Result<Vec<StreamEntry>, redis::RedisError> {
match value {
redis::Value::Array(entries) => {
let mut result = Vec::new();
for (i, entry) in entries.iter().enumerate() {
match entry {
redis::Value::Array(entry_data) => {
result.push(Self::extract_stream_entry(entry_data)?);
}
other => {
return Err(redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM entry is not an array",
format!("entry {}: {:?}", i, other),
)));
}
}
}
Ok(result)
}
other => Err(redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM claimed_messages is not an array",
format!("{:?}", other),
))),
}
}
fn extract_stream_entry(
entry_data: &[redis::Value],
) -> Result<(String, Vec<(String, String)>), redis::RedisError> {
if entry_data.len() < 2 {
return Err(redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM entry has insufficient elements",
format!("expected >= 2, got {}", entry_data.len()),
)));
}
let msg_id = Self::extract_string(&entry_data[0]).ok_or_else(|| {
redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM entry ID is not a valid string",
String::new(),
))
})?;
let fields = Self::extract_fields(&entry_data[1]);
Ok((msg_id, fields))
}
fn extract_fields(value: &redis::Value) -> Vec<(String, String)> {
match value {
redis::Value::Array(field_pairs) => {
let mut pairs = Vec::new();
for chunk in field_pairs.chunks(2) {
if chunk.len() == 2 {
match (
Self::extract_string(&chunk[0]),
Self::extract_string(&chunk[1]),
) {
(Some(key), Some(value)) => pairs.push((key, value)),
_ => {
warn!("Skipping malformed field pair: {:?}", chunk);
}
}
}
}
pairs
}
_ => Vec::new(),
}
}
async fn recover_pending_messages<F, Fut>(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
consumer_name: &str,
handler: &F,
) -> Result<(), redis::RedisError>
where
F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send,
{
let mut start_id = "0-0".to_string();
loop {
info!(
"XAUTOCLAIM: group={}, consumer={}, start={}",
consumer_group, consumer_name, start_id
);
let result: Result<redis::Value, redis::RedisError> = redis::cmd("XAUTOCLAIM")
.arg(stream_key)
.arg(consumer_group)
.arg(consumer_name)
.arg(XAUTOCLAIM_IDLE_MS)
.arg(&start_id)
.arg("COUNT")
.arg(XAUTOCLAIM_COUNT)
.query_async(conn)
.await;
let (next_start, claimed) = match result {
Ok(redis::Value::Array(arr)) => {
if arr.len() != 3 {
return Err(redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM returned unexpected array length",
format!("expected 3 elements, got {}", arr.len()),
)));
}
let next_start = Self::extract_string(&arr[0]).ok_or_else(|| {
redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM cursor is not a valid string",
String::new(),
))
})?;
let claimed = Self::extract_stream_entries(&arr[1])?;
(next_start, claimed)
}
Ok(other) => {
return Err(redis::RedisError::from((
redis::ErrorKind::UnexpectedReturnType,
"XAUTOCLAIM returned unexpected type",
format!("{:?}", other),
)));
}
Err(e) => {
warn!("XAUTOCLAIM failed: {}", e);
return Err(e);
}
};
info!(
"XAUTOCLAIM returned {} entries, next_start={}",
claimed.len(),
next_start
);
let mut ack_ids = Vec::with_capacity(claimed.len());
for (msg_id, fields) in &claimed {
if Self::process_entry(msg_id, fields, handler).await {
ack_ids.push(msg_id.as_str());
}
}
if !ack_ids.is_empty() {
Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
}
if next_start == "0-0" {
return Ok(());
}
start_id = next_start;
}
}
async fn read_and_process_batch<F, Fut>(
conn: &mut MultiplexedConnection,
stream_key: &str,
consumer_group: &str,
consumer_name: &str,
handler: &F,
) -> Result<(), redis::RedisError>
where
F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send,
{
info!(
"XREADGROUP: group={}, consumer={}, stream={}",
consumer_group, consumer_name, stream_key
);
#[allow(clippy::type_complexity)]
let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(consumer_group)
.arg(consumer_name)
.arg("COUNT")
.arg(XREADGROUP_COUNT)
.arg("BLOCK")
.arg(XREADGROUP_BLOCK_MS)
.arg("STREAMS")
.arg(stream_key)
.arg(">")
.query_async(conn)
.await?;
info!("XREADGROUP returned {} streams", results.len());
for (_stream, entries) in results {
let mut ack_ids = Vec::with_capacity(entries.len());
for (entry_id, fields) in &entries {
if Self::process_entry(entry_id, fields, handler).await {
ack_ids.push(entry_id.as_str());
}
}
if !ack_ids.is_empty() {
Self::ack_messages(conn, stream_key, consumer_group, &ack_ids).await?;
}
}
Ok(())
}
pub async fn start_consuming<F, Fut>(
&self,
handler: F,
cancellation_token: CancellationToken,
) -> Result<tokio::task::JoinHandle<()>, Box<dyn std::error::Error + Send + Sync>>
where
F: Fn(UsageRecord) -> Fut + Send + Sync + 'static,
Fut: std::future::Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
+ Send,
{
let client = Arc::clone(&self.client);
let stream_key = self.stream_key.clone();
let consumer_group = self.consumer_group.clone();
let consumer_name = self.consumer_name.clone();
let handle = tokio::spawn(async move {
let mut backoff = 1u64;
loop {
if cancellation_token.is_cancelled() {
info!("Telemetry consumer shutting down");
return;
}
let conn_result = client.get_multiplexed_async_connection().await;
if let Err(e) = &conn_result {
error!(
"Failed to connect to Redis: {}. Reconnecting in {}s",
e, backoff
);
tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Telemetry consumer shutting down");
return;
}
_ = tokio::time::sleep(tokio::time::Duration::from_secs(backoff)) => {
backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
}
}
continue;
}
let mut conn = conn_result.unwrap();
if let Err(e) =
Self::ensure_consumer_group(&mut conn, &stream_key, &consumer_group).await
{
warn!("Failed to ensure consumer group: {}", e);
}
info!(
"Starting telemetry consumption from stream: {} (group: {})",
stream_key, consumer_group
);
let recover_result = Self::recover_pending_messages(
&mut conn,
&stream_key,
&consumer_group,
&consumer_name,
&handler,
)
.await
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>);
let mut do_reconnect = false;
if let Err(e) = &recover_result {
warn!("Failed to recover pending messages: {}", e);
do_reconnect = true;
}
if do_reconnect {
error!("Recovery failed, reconnecting to retry on next cycle");
backoff = 1;
continue;
}
loop {
if cancellation_token.is_cancelled() {
info!("Telemetry consumer shutting down");
return;
}
tokio::select! {
result = Self::read_and_process_batch(
&mut conn,
&stream_key,
&consumer_group,
&consumer_name,
&handler,
) => {
match result {
Ok(_) => {
backoff = 1;
}
Err(e) => {
error!(
"Telemetry consumer error: {}. Reconnecting in {}s",
e, backoff
);
backoff = (backoff * 2).min(MAX_BACKOFF_SECS);
break;
}
}
}
_ = cancellation_token.cancelled() => {
info!("Telemetry consumer shutting down");
return;
}
}
}
}
});
Ok(handle)
}
fn parse_entry(msg_id: Option<&str>, fields: &[(String, String)]) -> Option<UsageRecord> {
let mut key = None;
let mut model = None;
let mut input_tokens = None;
let mut output_tokens = None;
let mut response_time_ms = None;
let mut timestamp = None;
for (k, v) in fields {
match k.as_str() {
"key" => key = Some(v),
"model" => model = Some(v),
"input_tokens" => input_tokens = Some(v),
"output_tokens" => output_tokens = Some(v),
"response_time_ms" => response_time_ms = Some(v),
"timestamp" => timestamp = Some(v),
_ => {}
}
}
let key_val = key?;
let model_val = model?;
if key_val.trim().is_empty() || model_val.trim().is_empty() {
return None;
}
let key = key_val.clone();
let model = model_val.clone();
Some(UsageRecord {
key,
model,
input_tokens: input_tokens?.parse().ok()?,
output_tokens: output_tokens?.parse().ok()?,
response_time_ms: response_time_ms?.parse().ok()?,
timestamp: timestamp?.parse().ok()?,
msg_id: msg_id.map(String::from),
})
}
pub async fn read_single_batch(
&self,
) -> Result<Vec<UsageRecord>, Box<dyn std::error::Error + Send + Sync>> {
let mut conn = self.client.get_multiplexed_async_connection().await?;
#[allow(clippy::type_complexity)]
let results: Vec<(String, Vec<(String, Vec<(String, String)>)>)> = redis::cmd("XREAD")
.arg("COUNT")
.arg(100)
.arg("STREAMS")
.arg(&self.stream_key)
.arg("0")
.query_async(&mut conn)
.await?;
let mut records = Vec::new();
for (_stream, entries) in results {
for (_entry_id, fields) in entries {
if let Some(record) = Self::parse_entry(None, &fields) {
records.push(record);
}
}
}
Ok(records)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_entry_valid() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, "test-key");
assert_eq!(record.model, "gpt-4");
assert_eq!(record.input_tokens, 100);
assert_eq!(record.output_tokens, 50);
assert_eq!(record.response_time_ms, 250);
assert_eq!(record.timestamp, 1700000000000);
}
#[test]
fn test_parse_entry_with_msg_id() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(Some("1234567890-0"), &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, "test-key");
assert_eq!(record.model, "gpt-4");
assert_eq!(record.msg_id, Some("1234567890-0".to_string()));
}
#[test]
fn test_parse_entry_missing_field() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_invalid_number() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "not-a-number".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[tokio::test]
async fn test_telemetry_consumer_new() {
let result = TelemetryConsumer::new("redis://localhost:6379").await;
assert!(result.is_ok());
let consumer = result.unwrap();
assert_eq!(consumer.stream_key, "hyperinfer:telemetry");
}
#[tokio::test]
async fn test_telemetry_consumer_with_options() {
let consumer = TelemetryConsumer::new("redis://localhost:6379")
.await
.unwrap()
.with_stream_key("custom:stream")
.with_consumer_group("custom-group");
assert_eq!(consumer.stream_key, "custom:stream");
assert_eq!(consumer.consumer_group, "custom-group");
}
#[test]
fn test_parse_entry_extra_fields() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
("extra_field".to_string(), "ignored".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, "test-key");
}
#[test]
fn test_parse_entry_empty() {
let fields = vec![];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_partial_fields() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_negative_numbers() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "-100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_overflow_u32() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "4294967296".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_overflow_u64() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
(
"response_time_ms".to_string(),
"18446744073709551616".to_string(),
),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_max_values() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), u32::MAX.to_string()),
("output_tokens".to_string(), u32::MAX.to_string()),
("response_time_ms".to_string(), u64::MAX.to_string()),
("timestamp".to_string(), u64::MAX.to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.input_tokens, u32::MAX);
assert_eq!(record.output_tokens, u32::MAX);
assert_eq!(record.response_time_ms, u64::MAX);
assert_eq!(record.timestamp, u64::MAX);
}
#[test]
fn test_parse_entry_zero_values() {
let fields = vec![
("key".to_string(), "test-key".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "0".to_string()),
("output_tokens".to_string(), "0".to_string()),
("response_time_ms".to_string(), "0".to_string()),
("timestamp".to_string(), "0".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.input_tokens, 0);
assert_eq!(record.output_tokens, 0);
assert_eq!(record.response_time_ms, 0);
assert_eq!(record.timestamp, 0);
}
#[test]
fn test_parse_entry_empty_strings() {
let fields = vec![
("key".to_string(), "".to_string()),
("model".to_string(), "".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_whitespace_strings() {
let fields = vec![
("key".to_string(), " ".to_string()),
("model".to_string(), " ".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_none());
}
#[test]
fn test_parse_entry_special_characters() {
let fields = vec![
("key".to_string(), "test-key-!@#$%".to_string()),
("model".to_string(), "gpt-4-turbo-preview".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, "test-key-!@#$%");
assert_eq!(record.model, "gpt-4-turbo-preview");
}
#[test]
fn test_parse_entry_unicode() {
let fields = vec![
("key".to_string(), "test-key-🔑".to_string()),
("model".to_string(), "gpt-4".to_string()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, "test-key-🔑");
}
#[test]
fn test_parse_entry_very_long_strings() {
let long_key = "a".repeat(10000);
let long_model = "b".repeat(10000);
let fields = vec![
("key".to_string(), long_key.clone()),
("model".to_string(), long_model.clone()),
("input_tokens".to_string(), "100".to_string()),
("output_tokens".to_string(), "50".to_string()),
("response_time_ms".to_string(), "250".to_string()),
("timestamp".to_string(), "1700000000000".to_string()),
];
let record = TelemetryConsumer::parse_entry(None, &fields);
assert!(record.is_some());
let record = record.unwrap();
assert_eq!(record.key, long_key);
assert_eq!(record.model, long_model);
}
}