use net::{
AdapterConfig, BackpressureMode, BatchConfig, ConsumeRequest, Event, EventBus, EventBusConfig,
Filter, Ordering, RedisAdapterConfig,
};
use serde_json::json;
use std::sync::atomic::Ordering as AtomicOrdering;
use std::time::Duration;
fn redis_url() -> String {
std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://localhost:6379".to_string())
}
fn unique_prefix() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("test_{}", ts)
}
async fn create_bus(prefix: &str) -> EventBus {
let config = EventBusConfig::builder()
.num_shards(2)
.ring_buffer_capacity(1024)
.backpressure_mode(BackpressureMode::DropOldest)
.batch(BatchConfig::low_latency())
.adapter(AdapterConfig::Redis(
RedisAdapterConfig::new(redis_url())
.with_prefix(prefix)
.with_max_stream_len(10000),
))
.build()
.expect("Failed to build config");
EventBus::new(config).await.expect("Failed to create bus")
}
#[tokio::test]
#[ignore] async fn test_redis_ingest_and_poll() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event = Event::from_str(&format!(r#"{{"index": {}, "data": "test"}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let response = bus
.poll(ConsumeRequest::new(100))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 10,
"Expected at least 10 events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_pagination() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..25 {
let event = Event::from_str(&format!(r#"{{"index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let mut all_events = Vec::new();
let mut cursor: Option<String> = None;
loop {
let request = match &cursor {
Some(c) => ConsumeRequest::new(10).from(c.clone()),
None => ConsumeRequest::new(10),
};
let response = bus.poll(request).await.expect("Failed to poll");
all_events.extend(response.events);
if !response.has_more {
break;
}
cursor = response.next_id;
}
assert!(
all_events.len() >= 25,
"Expected at least 25 events, got {}",
all_events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_ordering() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..20 {
let event = Event::from_str(&format!(r#"{{"seq": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let response = bus
.poll(ConsumeRequest::new(100).ordering(Ordering::InsertionTs))
.await
.expect("Failed to poll");
let mut prev_ts = 0u64;
for event in &response.events {
assert!(
event.insertion_ts >= prev_ts,
"Events not ordered: {} < {}",
event.insertion_ts,
prev_ts
);
prev_ts = event.insertion_ts;
}
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_filtering() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event_type = if i % 2 == 0 { "token" } else { "tool_call" };
let event =
Event::from_str(&format!(r#"{{"type": "{}", "index": {}}}"#, event_type, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let filter = Filter::eq("type", json!("token"));
let response = bus
.poll(ConsumeRequest::new(100).filter(filter))
.await
.expect("Failed to poll");
for event in &response.events {
let parsed = event.parse().expect("valid JSON");
assert_eq!(
parsed.get("type").and_then(|v| v.as_str()),
Some("token"),
"Expected token type, got {:?}",
event.raw
);
}
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_batch_ingest() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
let events: Vec<Event> = (0..100)
.map(|i| Event::from_str(&format!(r#"{{"batch_index": {}}}"#, i)).unwrap())
.collect();
let count = bus.ingest_batch(events);
assert_eq!(count, 100);
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(200)).await;
let response = bus
.poll(ConsumeRequest::new(200))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 100,
"Expected at least 100 events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_stats() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..50 {
let event = Event::from_str(&format!(r#"{{"stat_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
let stats = bus.stats();
assert!(
stats.events_ingested.load(AtomicOrdering::Relaxed) >= 50,
"Expected at least 50 ingested events"
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_reconnection() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
assert!(bus.is_healthy().await, "Bus should be healthy initially");
let event = Event::from_str(r#"{"test": "reconnect"}"#).unwrap();
bus.ingest(event).expect("Failed to ingest");
bus.flush().await.expect("Failed to flush");
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_redis_persistence_across_instances() {
let prefix = unique_prefix();
{
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event = Event::from_str(&format!(r#"{{"persist_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
bus.shutdown().await.expect("Failed to shutdown");
}
{
let bus = create_bus(&prefix).await;
let response = bus
.poll(ConsumeRequest::new(100))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 10,
"Expected at least 10 persisted events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
}