#[cfg(test)]
mod tests {
use fred::prelude::*;
use waypoint::config::RedisConfig;
use waypoint::redis::client::Redis;
fn test_config() -> RedisConfig {
RedisConfig {
url: "redis://localhost:6379".to_string(),
pool_size: 5,
batch_size: 10,
enable_dead_letter: false,
consumer_rebalance_interval_seconds: 300,
metrics_collection_interval_seconds: 60,
connection_timeout_ms: 5000,
idle_timeout_secs: 300,
max_connection_lifetime_secs: 300,
}
}
#[tokio::test]
#[ignore] async fn test_xreadgroup_with_binary_data() {
let config = test_config();
let redis = Redis::new(&config).await.expect("Failed to connect to Redis");
let stream_key = "test:binary_stream";
let group_name = "test_group";
let consumer_name = "test_consumer";
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
let binary_data = vec![
0x08, 0x01, 0x12, 0x14, 0x68, 0x75, 0x62, 0x3a, 0x73, 0x6e, 0x61, 0x70, 0x2e, 0x75,
0x6e, 0x6f, 0x2e, 0x66, 0x75, 0x6e, 0x1a, 0x10, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0xff, 0x62, 0x60, 0x60, 0x60,
];
let message_id =
redis.xadd(stream_key, &binary_data).await.expect("Failed to add message to stream");
println!("Added message with ID: {}", message_id);
let _: Result<String, _> =
redis.pool.xgroup_create(stream_key, group_name, "$", true).await;
let result = redis.xreadgroup(group_name, consumer_name, stream_key, 10).await;
match result {
Ok(messages) => {
println!("Successfully read {} messages", messages.len());
assert!(!messages.is_empty(), "Should have read at least one message");
let (read_id, read_data) = &messages[0];
println!("Read message ID: {}, data length: {}", read_id, read_data.len());
assert_eq!(read_data, &binary_data, "Binary data should match what was stored");
let _ = redis.xack(stream_key, group_name, read_id).await;
},
Err(e) => {
panic!("XREADGROUP failed with binary data: {}", e);
},
}
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
}
#[tokio::test]
#[ignore] async fn test_xreadgroup_with_mixed_data_types() {
let config = test_config();
let redis = Redis::new(&config).await.expect("Failed to connect to Redis");
let stream_key = "test:mixed_stream";
let group_name = "test_group";
let consumer_name = "test_consumer";
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
let string_data = "hello world".as_bytes().to_vec();
let binary_data = vec![0x00, 0x01, 0x02, 0x03, 0xFF, 0xFE, 0xFD];
let json_like_data = r#"{"type":"cast","data":"test"}"#.as_bytes().to_vec();
let _ = redis.xadd(stream_key, &string_data).await.expect("Failed to add string message");
let _ = redis.xadd(stream_key, &binary_data).await.expect("Failed to add binary message");
let _ = redis.xadd(stream_key, &json_like_data).await.expect("Failed to add JSON message");
let _: Result<String, _> =
redis.pool.xgroup_create(stream_key, group_name, "$", true).await;
let result = redis.xreadgroup(group_name, consumer_name, stream_key, 10).await;
match result {
Ok(messages) => {
println!("Successfully read {} messages with mixed data types", messages.len());
assert_eq!(messages.len(), 3, "Should have read 3 messages");
assert_eq!(messages[0].1, string_data);
assert_eq!(messages[1].1, binary_data);
assert_eq!(messages[2].1, json_like_data);
for (id, _) in &messages {
let _ = redis.xack(stream_key, group_name, id).await;
}
},
Err(e) => {
panic!("XREADGROUP failed with mixed data types: {}", e);
},
}
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
}
#[tokio::test]
#[ignore] async fn test_xclaim_with_binary_data() {
let config = test_config();
let redis = Redis::new(&config).await.expect("Failed to connect to Redis");
let stream_key = "test:xclaim_stream";
let group_name = "test_group";
let consumer1 = "consumer1";
let consumer2 = "consumer2";
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
let binary_data = vec![
0xFF, 0xFE, 0xFD, 0xFC, 0xFB, 0xFA, 0xF9, 0xF8, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
0x06, 0x07,
];
let message_id = redis.xadd(stream_key, &binary_data).await.expect("Failed to add message");
println!("Added message with ID: {}", message_id);
let _: Result<String, _> =
redis.pool.xgroup_create(stream_key, group_name, "0", true).await;
let _: Result<fred::types::RedisValue, _> = redis
.pool
.xreadgroup(group_name, consumer1, Some(1), Some(0), false, stream_key, ">")
.await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
let result = redis
.xclaim(
stream_key,
group_name,
consumer2,
std::time::Duration::from_millis(50),
&[message_id.clone()],
)
.await;
match result {
Ok(claimed) => {
println!("Successfully claimed {} messages", claimed.len());
assert!(!claimed.is_empty(), "Should have claimed at least one message");
let (claimed_id, claimed_data) = &claimed[0];
println!("Claimed message ID: {}, data length: {}", claimed_id, claimed_data.len());
assert_eq!(claimed_data, &binary_data, "Binary data should match what was stored");
let _ = redis.xack(stream_key, group_name, claimed_id).await;
},
Err(e) => {
panic!("XCLAIM failed with binary data: {}", e);
},
}
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
}
#[tokio::test]
#[ignore] async fn test_xreadgroup_empty_stream() {
let config = test_config();
let redis = Redis::new(&config).await.expect("Failed to connect to Redis");
let stream_key = "test:empty_stream";
let group_name = "test_group";
let consumer_name = "test_consumer";
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
let temp_data = b"temp".to_vec();
let msg_id = redis.xadd(stream_key, &temp_data).await.expect("Failed to create stream");
let _: u64 =
redis.pool.xdel(stream_key, msg_id).await.expect("Failed to delete temp message");
let _: Result<String, _> =
redis.pool.xgroup_create(stream_key, group_name, "0", true).await;
let result = redis.xreadgroup(group_name, consumer_name, stream_key, 10).await;
match result {
Ok(messages) => {
println!("Successfully read from empty stream: {} messages", messages.len());
assert!(messages.is_empty(), "Empty stream should return empty vec");
},
Err(e) => {
panic!("XREADGROUP failed on empty stream: {}", e);
},
}
let _: Result<u64, _> = redis.pool.del(stream_key).await;
let _: Result<u64, _> = redis.pool.xgroup_destroy(stream_key, group_name).await;
}
}