use anyhow::Result;
use chrono::Utc;
use oxirs_stream::*;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tokio::time::timeout;
use uuid::Uuid;
pub struct PerformanceTestConfig {
pub event_count: usize,
pub concurrent_producers: usize,
pub concurrent_consumers: usize,
pub test_duration: Duration,
pub target_throughput: f64, pub target_latency_p99: Duration,
pub target_success_rate: f64,
}
impl Default for PerformanceTestConfig {
fn default() -> Self {
Self {
event_count: 10_000, concurrent_producers: 4, concurrent_consumers: 4, test_duration: Duration::from_secs(5), target_throughput: 10_000.0, target_latency_p99: Duration::from_millis(10),
target_success_rate: 0.99, }
}
}
impl PerformanceTestConfig {
#[allow(dead_code)]
fn full_performance() -> Self {
Self {
event_count: 100_000,
concurrent_producers: 10,
concurrent_consumers: 10,
test_duration: Duration::from_secs(60),
target_throughput: 100_000.0,
target_latency_p99: Duration::from_millis(10),
target_success_rate: 0.9999,
}
}
#[allow(dead_code)]
fn from_env() -> Self {
if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
Self::full_performance()
} else {
Self::default()
}
}
}
#[derive(Debug, Clone)]
pub struct PerformanceMetrics {
pub total_events_sent: usize,
pub total_events_received: usize,
pub successful_sends: usize,
pub failed_sends: usize,
pub latencies: Vec<Duration>,
pub throughput_samples: Vec<f64>,
pub test_duration: Duration,
}
impl Default for PerformanceMetrics {
fn default() -> Self {
Self::new()
}
}
impl PerformanceMetrics {
pub fn new() -> Self {
Self {
total_events_sent: 0,
total_events_received: 0,
successful_sends: 0,
failed_sends: 0,
latencies: Vec::new(),
throughput_samples: Vec::new(),
test_duration: Duration::from_secs(0),
}
}
pub fn success_rate(&self) -> f64 {
if self.total_events_sent == 0 {
1.0
} else {
self.successful_sends as f64 / self.total_events_sent as f64
}
}
pub fn average_throughput(&self) -> f64 {
if self.test_duration.as_secs_f64() == 0.0 {
0.0
} else {
self.total_events_sent as f64 / self.test_duration.as_secs_f64()
}
}
pub fn latency_percentile(&self, percentile: f64) -> Option<Duration> {
if self.latencies.is_empty() {
return None;
}
let mut sorted_latencies = self.latencies.clone();
sorted_latencies.sort();
let index = ((sorted_latencies.len() - 1) as f64 * percentile / 100.0) as usize;
Some(sorted_latencies[index])
}
pub fn latency_p50(&self) -> Option<Duration> {
self.latency_percentile(50.0)
}
pub fn latency_p95(&self) -> Option<Duration> {
self.latency_percentile(95.0)
}
pub fn latency_p99(&self) -> Option<Duration> {
self.latency_percentile(99.0)
}
}
fn create_performance_test_events(count: usize, batch_id: usize) -> Vec<StreamEvent> {
(0..count)
.map(|i| {
let event_id = format!("perf_{batch_id}_{i}");
StreamEvent::TripleAdded {
subject: format!("http://perf.test/subject_{i}"),
predicate: "http://perf.test/predicate".to_string(),
object: format!("\"Performance test data {i}\""),
graph: Some(format!("http://perf.test/graph_{batch_id}")),
metadata: EventMetadata {
event_id: event_id.clone(),
timestamp: Utc::now(),
source: "performance_test".to_string(),
user: Some("perf_user".to_string()),
context: Some("performance_context".to_string()),
caused_by: None,
version: "1.0".to_string(),
properties: {
let mut props = HashMap::new();
props.insert("batch_id".to_string(), batch_id.to_string());
props.insert("event_index".to_string(), i.to_string());
props.insert("test_type".to_string(), "performance".to_string());
props
},
checksum: Some(format!("checksum_{event_id}")),
},
}
})
.collect()
}
fn create_performance_stream_config(backend: StreamBackendType) -> StreamConfig {
let test_id = Uuid::new_v4();
StreamConfig {
backend,
topic: format!("perf-test-{test_id}"),
batch_size: 100, flush_interval_ms: 10, max_connections: 20,
connection_timeout: Duration::from_secs(5),
enable_compression: true,
compression_type: CompressionType::Lz4, retry_config: RetryConfig {
max_retries: 3,
initial_backoff: Duration::from_millis(10),
max_backoff: Duration::from_millis(500),
backoff_multiplier: 1.5,
jitter: false, },
circuit_breaker: CircuitBreakerConfig {
enabled: true,
failure_threshold: 10,
timeout: Duration::from_secs(30),
success_threshold: 5,
half_open_max_calls: 3,
},
security: SecurityConfig {
enable_tls: false, verify_certificates: true,
client_cert_path: None,
client_key_path: None,
ca_cert_path: None,
sasl_config: None,
},
performance: StreamPerformanceConfig {
enable_batching: true,
enable_pipelining: true,
buffer_size: 8192,
prefetch_count: 1000,
enable_zero_copy: true,
enable_simd: true,
parallel_processing: true,
worker_threads: Some(8),
},
monitoring: MonitoringConfig {
enable_metrics: true,
enable_tracing: false, metrics_interval: Duration::from_secs(1),
health_check_interval: Duration::from_secs(5),
enable_profiling: false,
prometheus_endpoint: None,
jaeger_endpoint: None,
log_level: "info".to_string(),
},
}
}
#[cfg(test)]
mod throughput_tests {
use super::*;
#[tokio::test]
async fn test_memory_backend_throughput() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let test_config = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
PerformanceTestConfig {
event_count: 50_000,
concurrent_producers: 5,
concurrent_consumers: 5,
test_duration: Duration::from_secs(30),
target_throughput: 10_000.0,
..Default::default()
}
} else {
PerformanceTestConfig {
event_count: 50, concurrent_producers: 1,
concurrent_consumers: 1,
test_duration: Duration::from_millis(500), target_throughput: 100.0, ..Default::default()
}
};
let metrics = run_throughput_test(config, test_config).await?;
println!("Memory Backend Throughput Test Results:");
println!(" Total events sent: {}", metrics.total_events_sent);
println!(" Total events received: {}", metrics.total_events_received);
println!(" Success rate: {:.4}%", metrics.success_rate() * 100.0);
println!(
" Average throughput: {:.0} events/sec",
metrics.average_throughput()
);
if let Some(p99) = metrics.latency_p99() {
println!(" P99 latency: {p99:?}");
}
let min_success_rate = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
0.99
} else {
0.95 };
let min_throughput = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
5_000.0
} else if cfg!(debug_assertions) {
10.0 } else {
350.0 };
assert!(
metrics.success_rate() >= min_success_rate,
"Success rate should be >= {}%",
min_success_rate * 100.0
);
assert!(
metrics.average_throughput() >= min_throughput,
"Throughput should be >= {min_throughput} events/sec"
);
Ok(())
}
#[cfg(feature = "kafka")]
#[tokio::test]
#[ignore] async fn test_kafka_backend_high_throughput() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Kafka {
brokers: vec!["localhost:9092".to_string()],
security_protocol: None,
sasl_config: None,
});
let test_config = PerformanceTestConfig {
event_count: 100_000,
concurrent_producers: 10,
concurrent_consumers: 10,
test_duration: Duration::from_secs(60),
target_throughput: 50_000.0,
..Default::default()
};
let metrics = run_throughput_test(config, test_config).await?;
println!("Kafka Backend Throughput Test Results:");
println!(" Total events sent: {}", metrics.total_events_sent);
println!(
" Average throughput: {:.0} events/sec",
metrics.average_throughput()
);
println!(" Success rate: {:.4}%", metrics.success_rate() * 100.0);
assert!(
metrics.average_throughput() >= 25_000.0,
"Kafka throughput should be >= 25K events/sec"
);
assert!(
metrics.success_rate() >= 0.995,
"Kafka success rate should be >= 99.5%"
);
Ok(())
}
pub async fn run_throughput_test(
config: StreamConfig,
test_config: PerformanceTestConfig,
) -> Result<PerformanceMetrics> {
let metrics = Arc::new(Mutex::new(PerformanceMetrics::new()));
let start_time = Instant::now();
let mut producer_handles = Vec::new();
for producer_id in 0..test_config.concurrent_producers {
let config = config.clone();
let metrics = metrics.clone();
let events_per_producer = test_config.event_count / test_config.concurrent_producers;
let handle = tokio::spawn(async move {
let mut stream = Stream::new(config).await?;
let events = create_performance_test_events(events_per_producer, producer_id);
let producer_start = Instant::now();
for event in events {
let send_start = Instant::now();
let result = stream.publish(event).await;
let send_latency = send_start.elapsed();
let mut m = metrics.lock().expect("metrics mutex poisoned");
m.total_events_sent += 1;
m.latencies.push(send_latency);
if result.is_ok() {
m.successful_sends += 1;
} else {
m.failed_sends += 1;
}
}
let producer_duration = producer_start.elapsed();
let producer_throughput =
events_per_producer as f64 / producer_duration.as_secs_f64();
{
let mut m = metrics.lock().expect("metrics mutex poisoned");
m.throughput_samples.push(producer_throughput);
}
stream.close().await?;
Ok::<(), anyhow::Error>(())
});
producer_handles.push(handle);
}
let mut consumer_handles = Vec::new();
for _consumer_id in 0..test_config.concurrent_consumers {
let config = config.clone();
let metrics = metrics.clone();
let test_duration = test_config.test_duration;
let handle = tokio::spawn(async move {
let mut stream = Stream::new(config).await?;
let consumer_start = Instant::now();
while consumer_start.elapsed() < test_duration {
if let Ok(Ok(Some(_))) =
timeout(Duration::from_millis(100), stream.consume()).await
{
let mut m = metrics.lock().expect("metrics mutex poisoned");
m.total_events_received += 1;
}
}
stream.close().await?;
Ok::<(), anyhow::Error>(())
});
consumer_handles.push(handle);
}
for handle in producer_handles {
handle.await??;
}
let catchup_duration = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
Duration::from_secs(5) } else {
Duration::from_millis(100) };
tokio::time::sleep(catchup_duration).await;
for handle in consumer_handles {
handle.abort();
}
let total_duration = start_time.elapsed();
{
let mut m = metrics.lock().expect("metrics mutex poisoned");
m.test_duration = total_duration;
}
let final_metrics = {
let m = metrics.lock().expect("metrics mutex poisoned");
m.clone()
};
Ok(final_metrics)
}
}
#[cfg(test)]
mod latency_tests {
use super::*;
#[tokio::test]
async fn test_end_to_end_latency() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
let mut latencies = Vec::new();
let test_count = 1000;
for i in 0..test_count {
let start = Instant::now();
let event = StreamEvent::TripleAdded {
subject: format!("http://latency.test/subject_{i}"),
predicate: "http://latency.test/predicate".to_string(),
object: format!("\"Latency test data {i}\""),
graph: None,
metadata: EventMetadata {
event_id: format!("latency_test_{i}"),
timestamp: Utc::now(),
source: "latency_test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: {
let mut props = HashMap::new();
props.insert("test_index".to_string(), i.to_string());
props.insert(
"start_time_nanos".to_string(),
start.elapsed().as_nanos().to_string(),
);
props
},
checksum: None,
},
};
producer.publish(event).await?;
if let Ok(Ok(Some(_))) = timeout(Duration::from_secs(5), consumer.consume()).await {
let latency = start.elapsed();
latencies.push(latency);
}
if i % 100 == 0 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
}
producer.close().await?;
consumer.close().await?;
latencies.sort();
let count = latencies.len();
if count > 0 {
let p50 = latencies[count / 2];
let p95 = latencies[(count * 95) / 100];
let p99 = latencies[(count * 99) / 100];
let max = latencies[count - 1];
println!("End-to-End Latency Results (n={count}):");
println!(" P50: {p50:?}");
println!(" P95: {p95:?}");
println!(" P99: {p99:?}");
println!(" Max: {max:?}");
assert!(
p99 < Duration::from_millis(100),
"P99 latency should be under 100ms for memory backend"
);
assert!(
p95 < Duration::from_millis(50),
"P95 latency should be under 50ms for memory backend"
);
}
Ok(())
}
#[cfg(feature = "kafka")]
#[tokio::test]
#[ignore] async fn test_kafka_latency_under_load() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Kafka {
brokers: vec!["localhost:9092".to_string()],
security_protocol: None,
sasl_config: None,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
let background_config = create_performance_stream_config(StreamBackendType::Kafka {
brokers: vec!["localhost:9092".to_string()],
security_protocol: None,
sasl_config: None,
});
let load_generator = tokio::spawn(async move {
let mut load_stream = Stream::new(background_config).await?;
for i in 0..10_000 {
let event = create_performance_test_events(1, i)[0].clone();
let _ = load_stream.publish(event).await;
if i % 100 == 0 {
tokio::time::sleep(Duration::from_millis(1)).await;
}
}
load_stream.close().await?;
Ok::<(), anyhow::Error>(())
});
let mut latencies = Vec::new();
let latency_test_count = 500;
tokio::time::sleep(Duration::from_millis(100)).await;
for i in 0..latency_test_count {
let start = Instant::now();
let event = StreamEvent::TripleAdded {
subject: format!("http://kafka.latency.test/subject_{i}"),
predicate: "http://kafka.latency.test/predicate".to_string(),
object: format!("\"Kafka latency test data {i}\""),
graph: None,
metadata: EventMetadata {
event_id: format!("kafka_latency_test_{i}"),
timestamp: Utc::now(),
source: "kafka_latency_test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
producer.publish(event).await?;
if let Ok(Ok(Some(_))) = timeout(Duration::from_secs(10), consumer.consume()).await {
let latency = start.elapsed();
latencies.push(latency);
}
tokio::time::sleep(Duration::from_millis(20)).await;
}
load_generator.abort();
producer.close().await?;
consumer.close().await?;
latencies.sort();
let count = latencies.len();
if count > 0 {
let p50 = latencies[count / 2];
let p95 = latencies[(count * 95) / 100];
let p99 = latencies[(count * 99) / 100];
println!("Kafka Latency Under Load Results (n={count}):");
println!(" P50: {p50:?}");
println!(" P95: {p95:?}");
println!(" P99: {p99:?}");
assert!(
p99 < Duration::from_millis(1000),
"P99 latency should be under 1s even under load"
);
assert!(
p95 < Duration::from_millis(500),
"P95 latency should be under 500ms under load"
);
}
Ok(())
}
}
#[cfg(test)]
mod scalability_tests {
use super::*;
use crate::throughput_tests::run_throughput_test;
#[tokio::test]
async fn test_concurrent_producers_scaling() -> Result<()> {
let base_config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let producer_counts = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
vec![1, 2, 5, 10, 20] } else {
vec![1, 2, 4] };
let mut results = Vec::new();
for &producer_count in &producer_counts {
let test_config = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
PerformanceTestConfig {
event_count: 10_000,
concurrent_producers: producer_count,
concurrent_consumers: 1,
test_duration: Duration::from_secs(30),
..Default::default()
}
} else {
PerformanceTestConfig {
event_count: 10, concurrent_producers: producer_count,
concurrent_consumers: 1,
test_duration: Duration::from_millis(500), ..Default::default()
}
};
let metrics = run_throughput_test(base_config.clone(), test_config).await?;
println!(
"Producers: {}, Throughput: {:.0} events/sec, Success: {:.2}%",
producer_count,
metrics.average_throughput(),
metrics.success_rate() * 100.0
);
results.push((producer_count, metrics.average_throughput()));
}
let single_producer_throughput = results[0].1;
let max_producer_throughput = results.last().unwrap().1;
if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
assert!(
max_producer_throughput > single_producer_throughput,
"Throughput should increase with more producers"
);
}
if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
let min_improvement_factor = 1.2;
assert!(
max_producer_throughput >= single_producer_throughput * min_improvement_factor,
"Should see at least modest throughput improvement with many producers (memory backend has lock contention). Single: {single_producer_throughput}, Max: {max_producer_throughput}"
);
}
Ok(())
}
#[tokio::test]
async fn test_message_size_scaling() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let message_sizes = vec![100, 1_000, 10_000, 50_000]; let mut results = Vec::new();
for &size in &message_sizes {
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config.clone()).await?;
let large_data = "x".repeat(size);
let test_count = 100;
let start = Instant::now();
for i in 0..test_count {
let event = StreamEvent::TripleAdded {
subject: format!("http://size.test/subject_{i}"),
predicate: "http://size.test/predicate".to_string(),
object: format!("\"{large_data}\""),
graph: None,
metadata: EventMetadata {
event_id: format!("size_test_{size}_{i}"),
timestamp: Utc::now(),
source: "size_test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
producer.publish(event).await?;
}
let mut received = 0;
for _ in 0..test_count {
if let Ok(Ok(Some(_))) =
timeout(Duration::from_millis(100), consumer.consume()).await
{
received += 1;
}
}
let duration = start.elapsed();
let throughput = received as f64 / duration.as_secs_f64();
let bytes_per_second = (received * size) as f64 / duration.as_secs_f64();
println!(
"Message size: {size} bytes, Throughput: {throughput:.0} msg/sec, {bytes_per_second:.0} bytes/sec"
);
results.push((size, throughput, bytes_per_second));
producer.close().await?;
consumer.close().await?;
}
for (size, throughput, _) in &results {
assert!(
*throughput > 0.0,
"Should handle messages of size {size} bytes"
);
}
Ok(())
}
#[cfg(feature = "kafka")]
#[tokio::test]
#[ignore] async fn test_partition_scaling() -> Result<()> {
let partition_counts = vec![1, 4, 8, 16];
let mut results = Vec::new();
for &partitions in &partition_counts {
let _topic_name = format!("partition-scale-test-{partitions}");
let config = create_performance_stream_config(StreamBackendType::Kafka {
brokers: vec!["localhost:9092".to_string()],
security_protocol: None,
sasl_config: None,
});
let test_config = PerformanceTestConfig {
event_count: 50_000,
concurrent_producers: partitions,
concurrent_consumers: partitions,
test_duration: Duration::from_secs(30),
..Default::default()
};
let metrics = run_throughput_test(config, test_config).await?;
println!(
"Partitions: {}, Throughput: {:.0} events/sec",
partitions,
metrics.average_throughput()
);
results.push((partitions, metrics.average_throughput()));
}
let single_partition_throughput = results[0].1;
let max_partition_throughput = results.last().unwrap().1;
assert!(
max_partition_throughput > single_partition_throughput,
"Throughput should increase with more partitions"
);
Ok(())
}
}
#[cfg(test)]
mod reliability_tests {
use super::*;
#[tokio::test]
async fn test_message_delivery_reliability() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
let test_count = 10_000;
let mut sent_events = HashMap::new();
for i in 0..test_count {
let event_id = format!("reliability_test_{i}");
let event = StreamEvent::TripleAdded {
subject: format!("http://reliability.test/subject_{i}"),
predicate: "http://reliability.test/predicate".to_string(),
object: format!("\"Reliability test data {i}\""),
graph: None,
metadata: EventMetadata {
event_id: event_id.clone(),
timestamp: Utc::now(),
source: "reliability_test".to_string(),
user: None,
context: None,
caused_by: None,
version: "1.0".to_string(),
properties: HashMap::new(),
checksum: None,
},
};
if producer.publish(event).await.is_ok() {
sent_events.insert(event_id, i);
}
}
let mut received_events = HashMap::new();
let mut duplicates = 0;
for _ in 0..test_count {
match timeout(Duration::from_millis(10), consumer.consume()).await {
Ok(Ok(Some(event))) => {
if let StreamEvent::TripleAdded { metadata, .. } = event {
if let std::collections::hash_map::Entry::Vacant(e) =
received_events.entry(metadata.event_id)
{
e.insert(true);
} else {
duplicates += 1;
}
}
}
_ => {
break;
}
}
}
producer.close().await?;
consumer.close().await?;
let sent_count = sent_events.len();
let received_count = received_events.len();
let delivery_rate = received_count as f64 / sent_count as f64;
let duplicate_rate = duplicates as f64 / received_count as f64;
println!("Reliability Test Results:");
println!(" Events sent: {sent_count}");
println!(" Events received: {received_count}");
println!(" Delivery rate: {:.4}%", delivery_rate * 100.0);
println!(" Duplicate rate: {:.4}%", duplicate_rate * 100.0);
assert!(delivery_rate >= 0.95, "Delivery rate should be >= 95%");
assert!(duplicate_rate <= 0.01, "Duplicate rate should be <= 1%");
let mut missing_events = Vec::new();
for event_id in sent_events.keys() {
if !received_events.contains_key(event_id) {
missing_events.push(event_id);
}
}
if !missing_events.is_empty() {
println!("Missing events: {}", missing_events.len());
for event_id in missing_events.iter().take(10) {
println!(" Missing: {event_id}");
}
}
Ok(())
}
#[tokio::test]
async fn test_failure_recovery() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
for i in 0..100 {
let event = create_performance_test_events(1, i)[0].clone();
producer.publish(event).await?;
}
producer.close().await?;
consumer.close().await?;
tokio::time::sleep(Duration::from_millis(100)).await;
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
for i in 100..200 {
let event = create_performance_test_events(1, i)[0].clone();
producer.publish(event).await?;
}
let mut received_count = 0;
for _ in 0..50 {
if let Ok(Ok(Some(_))) = timeout(Duration::from_millis(10), consumer.consume()).await {
received_count += 1;
}
}
println!("Received {received_count} events after recovery");
assert!(received_count > 0, "Should receive events after recovery");
producer.close().await?;
consumer.close().await?;
Ok(())
}
#[tokio::test]
async fn test_backpressure_handling() -> Result<()> {
let mut config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
config.batch_size = 1; config.flush_interval_ms = 1000;
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
let start = Instant::now();
let mut successful_sends = 0;
let mut failed_sends = 0;
for i in 0..1000 {
let event = create_performance_test_events(1, i)[0].clone();
match timeout(Duration::from_millis(100), producer.publish(event)).await {
Ok(Ok(_)) => successful_sends += 1,
Ok(Err(_)) => failed_sends += 1,
Err(_) => failed_sends += 1, }
if i % 100 == 0 {
println!(
"Sent {} events, {successful_sends} successful, {failed_sends} failed",
i + 1
);
}
}
let send_duration = start.elapsed();
let mut received_count = 0;
for _ in 0..successful_sends {
if let Ok(Ok(Some(_))) = timeout(Duration::from_millis(100), consumer.consume()).await {
received_count += 1;
}
}
println!("Backpressure Test Results:");
println!(" Send duration: {send_duration:?}");
println!(" Successful sends: {successful_sends}");
println!(" Failed sends: {failed_sends}");
println!(" Received count: {received_count}");
assert!(
successful_sends > 0,
"Should send some events despite backpressure"
);
assert!(received_count > 0, "Should receive some events");
producer.close().await?;
consumer.close().await?;
Ok(())
}
}
#[cfg(test)]
mod resource_usage_tests {
use super::*;
#[tokio::test]
async fn test_memory_usage_under_load() -> Result<()> {
let config = create_performance_stream_config(StreamBackendType::Memory {
max_size: Some(100000),
persistence: false,
});
let mut producer = Stream::new(config.clone()).await?;
let mut consumer = Stream::new(config).await?;
let monitor_duration = if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
60 } else {
10 };
let memory_monitor = tokio::spawn(async move {
let mut samples = Vec::new();
for _ in 0..monitor_duration {
let memory_usage = get_process_memory_usage();
samples.push(memory_usage);
tokio::time::sleep(Duration::from_secs(1)).await;
}
samples
});
let (events_per_batch, batch_count) =
if std::env::var("OXIRS_FULL_PERF_TEST").unwrap_or_default() == "1" {
(1000, 100) } else {
(100, 20) };
for batch in 0..batch_count {
let events = create_performance_test_events(events_per_batch, batch);
for event in events {
producer.publish(event).await?;
}
for _ in 0..events_per_batch / 2 {
if let Ok(Ok(Some(_))) = timeout(Duration::from_millis(1), consumer.consume()).await
{
}
}
if batch % (batch_count / 5).max(1) == 0 {
println!("Processed batch {batch}/{batch_count}");
}
}
let memory_samples = memory_monitor.await?;
producer.close().await?;
consumer.close().await?;
if !memory_samples.is_empty() {
let min_memory = memory_samples.iter().min().unwrap();
let max_memory = memory_samples.iter().max().unwrap();
let avg_memory = memory_samples.iter().sum::<u64>() / memory_samples.len() as u64;
println!("Memory Usage Analysis:");
println!(" Min: {} MB", min_memory / 1024 / 1024);
println!(" Max: {} MB", max_memory / 1024 / 1024);
println!(" Avg: {} MB", avg_memory / 1024 / 1024);
let memory_growth = max_memory - min_memory;
assert!(
memory_growth < 500 * 1024 * 1024,
"Memory growth should be under 500MB"
);
}
Ok(())
}
fn get_process_memory_usage() -> u64 {
1024 * 1024 * 100 }
}