use async_nats::jetstream;
use net::{
AdapterConfig, BackpressureMode, BatchConfig, ConsumeRequest, Event, EventBus, EventBusConfig,
Filter, JetStreamAdapterConfig, Ordering,
};
use serde_json::json;
use std::time::Duration;
fn jetstream_url() -> String {
std::env::var("JETSTREAM_URL").unwrap_or_else(|_| "nats://localhost:4222".to_string())
}
fn unique_prefix() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
format!("test_{}", ts)
}
async fn create_bus(prefix: &str) -> EventBus {
let config = EventBusConfig::builder()
.num_shards(2)
.ring_buffer_capacity(1024)
.backpressure_mode(BackpressureMode::DropOldest)
.batch(BatchConfig::low_latency())
.adapter(AdapterConfig::JetStream(
JetStreamAdapterConfig::new(jetstream_url())
.with_prefix(prefix)
.with_max_messages(10000),
))
.build()
.expect("Failed to build config");
EventBus::new(config).await.expect("Failed to create bus")
}
#[tokio::test]
#[ignore] async fn test_jetstream_ingest_and_poll() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event = Event::from_str(&format!(r#"{{"index": {}, "data": "test"}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let response = bus
.poll(ConsumeRequest::new(100))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 10,
"Expected at least 10 events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_pagination() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..25 {
let event = Event::from_str(&format!(r#"{{"index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(300)).await;
let mut all_events = Vec::new();
let mut cursor: Option<String> = None;
loop {
let request = match &cursor {
Some(c) => ConsumeRequest::new(10).from(c.clone()),
None => ConsumeRequest::new(10),
};
let response = bus.poll(request).await.expect("Failed to poll");
all_events.extend(response.events);
if !response.has_more {
break;
}
cursor = response.next_id;
}
assert!(
all_events.len() >= 25,
"Expected at least 25 events, got {}",
all_events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_ordering() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..20 {
let event = Event::from_str(&format!(r#"{{"seq": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let response = bus
.poll(ConsumeRequest::new(100).ordering(Ordering::InsertionTs))
.await
.expect("Failed to poll");
let mut prev_ts = 0u64;
for event in &response.events {
assert!(
event.insertion_ts >= prev_ts,
"Events not ordered: {} < {}",
event.insertion_ts,
prev_ts
);
prev_ts = event.insertion_ts;
}
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_filtering() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event_type = if i % 2 == 0 { "token" } else { "tool_call" };
let event =
Event::from_str(&format!(r#"{{"type": "{}", "index": {}}}"#, event_type, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let filter = Filter::eq("type", json!("token"));
let response = bus
.poll(ConsumeRequest::new(100).filter(filter))
.await
.expect("Failed to poll");
for event in &response.events {
let parsed = event.parse().expect("valid JSON");
assert_eq!(
parsed.get("type").and_then(|v| v.as_str()),
Some("token"),
"Expected token type, got {:?}",
event.raw
);
}
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_batch_ingest() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
let events: Vec<Event> = (0..100)
.map(|i| Event::from_str(&format!(r#"{{"batch_index": {}}}"#, i)).unwrap())
.collect();
let count = bus.ingest_batch(events);
assert_eq!(count, 100);
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(200)).await;
let response = bus
.poll(ConsumeRequest::new(200))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 100,
"Expected at least 100 events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_stats() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
for i in 0..50 {
let event = Event::from_str(&format!(r#"{{"stat_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
let stats = bus.stats();
assert!(
stats
.events_ingested
.load(std::sync::atomic::Ordering::Relaxed)
>= 50,
"Expected at least 50 ingested events"
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_health() {
let prefix = unique_prefix();
let bus = create_bus(&prefix).await;
assert!(bus.is_healthy().await, "Bus should be healthy");
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_persistence_across_instances() {
let prefix = unique_prefix();
{
let bus = create_bus(&prefix).await;
for i in 0..10 {
let event = Event::from_str(&format!(r#"{{"persist_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
bus.shutdown().await.expect("Failed to shutdown");
}
{
let bus = create_bus(&prefix).await;
let response = bus
.poll(ConsumeRequest::new(100))
.await
.expect("Failed to poll");
assert!(
response.events.len() >= 10,
"Expected at least 10 persisted events, got {}",
response.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}
}
#[tokio::test]
#[ignore] async fn test_jetstream_stream_limits() {
let prefix = unique_prefix();
let config = EventBusConfig::builder()
.num_shards(1)
.ring_buffer_capacity(1024)
.adapter(AdapterConfig::JetStream(
JetStreamAdapterConfig::new(jetstream_url())
.with_prefix(&prefix)
.with_max_messages(50), ))
.build()
.expect("Failed to build config");
let bus = EventBus::new(config).await.expect("Failed to create bus");
for i in 0..100 {
let event = Event::from_str(&format!(r#"{{"limit_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(500)).await;
let response = bus
.poll(ConsumeRequest::new(200))
.await
.expect("Failed to poll");
assert!(
response.events.len() <= 100, "Expected around 50 events due to limits, got {}",
response.events.len()
);
assert!(
response.events.len() <= 100,
"Should not have more events than ingested"
);
bus.shutdown().await.expect("Failed to shutdown");
}
#[tokio::test]
#[ignore] async fn test_jetstream_sequence_gaps() {
let prefix = unique_prefix();
let stream_name = format!("{}_shard_0", prefix);
let client = async_nats::connect(&jetstream_url())
.await
.expect("Failed to connect to NATS");
let js = jetstream::new(client);
let bus = create_bus(&prefix).await;
for i in 0..20 {
let event = Event::from_str(&format!(r#"{{"gap_index": {}}}"#, i)).unwrap();
bus.ingest(event).expect("Failed to ingest");
}
bus.flush().await.expect("Failed to flush");
tokio::time::sleep(Duration::from_millis(100)).await;
let stream = js
.get_stream(&stream_name)
.await
.expect("Failed to get stream");
for seq in [3u64, 5, 7, 9, 11] {
let _ = stream.delete_message(seq).await;
}
tokio::time::sleep(Duration::from_millis(50)).await;
let response = bus
.poll(ConsumeRequest::new(10))
.await
.expect("Failed to poll");
assert_eq!(
response.events.len(),
10,
"Expected exactly 10 events despite sequence gaps, got {}",
response.events.len()
);
assert!(
response.has_more,
"Expected has_more=true since we have more events"
);
let response2 = bus
.poll(ConsumeRequest::new(10).from(response.next_id.unwrap()))
.await
.expect("Failed to poll second page");
assert!(
response2.events.len() >= 5,
"Expected at least 5 more events, got {}",
response2.events.len()
);
bus.shutdown().await.expect("Failed to shutdown");
}