use async_trait::async_trait;
use pmcp::error::Result;
use pmcp::shared::{ConnectionPool, ConnectionPoolConfig, LoadBalanceStrategy, TransportMessage};
use std::time::Duration;
use tracing::{info, Level};
#[derive(Debug, Clone)]
struct MockTransport {
id: u32,
}
impl MockTransport {
fn new(id: u32) -> Self {
Self { id }
}
}
#[async_trait]
impl pmcp::shared::Transport for MockTransport {
async fn send(&mut self, _message: TransportMessage) -> Result<()> {
info!("MockTransport {} sending message", self.id);
Ok(())
}
async fn receive(&mut self) -> Result<TransportMessage> {
tokio::time::sleep(Duration::from_millis(100)).await;
Ok(TransportMessage::Notification(
pmcp::types::Notification::Progress(pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String(format!("mock-{}", self.id)),
50.0,
Some(format!("Mock message from transport {}", self.id)),
)),
))
}
async fn close(&mut self) -> Result<()> {
info!("MockTransport {} closed", self.id);
Ok(())
}
fn is_connected(&self) -> bool {
true
}
fn transport_type(&self) -> &'static str {
"mock"
}
}
#[tokio::main]
async fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!("🚀 Starting Connection Pool and Load Balancing Example");
let config = ConnectionPoolConfig {
max_connections: 5,
min_connections: 2,
strategy: LoadBalanceStrategy::RoundRobin,
health_check_interval: Duration::from_secs(10),
operation_timeout: Duration::from_secs(5),
max_idle_time: Duration::from_secs(60),
auto_scaling: true,
max_retries: 3,
retry_delay: Duration::from_secs(1),
};
info!("✅ Configuration:");
info!(" • Max connections: {}", config.max_connections);
info!(" • Min connections: {}", config.min_connections);
info!(" • Strategy: {:?}", config.strategy);
info!(
" • Health check interval: {:?}",
config.health_check_interval
);
info!(" • Auto scaling: {}", config.auto_scaling);
let mut pool: ConnectionPool<MockTransport> = ConnectionPool::new(config);
info!("🔧 Starting connection pool...");
pool.start(|| {
static COUNTER: std::sync::atomic::AtomicU32 = std::sync::atomic::AtomicU32::new(0);
let id = COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
info!("Creating new MockTransport connection #{}", id + 1);
Ok(MockTransport::new(id + 1))
})
.await?;
info!("✓ Connection pool started");
let stats = pool.get_stats().await;
info!("📊 Initial pool stats:");
info!(" • Total connections: {}", stats.total_connections);
info!(" • Healthy connections: {}", stats.healthy_connections);
info!(" • Strategy: {:?}", stats.strategy);
info!("🎯 Testing load balancing strategies...");
let strategies = vec![
LoadBalanceStrategy::RoundRobin,
LoadBalanceStrategy::LeastConnections,
LoadBalanceStrategy::WeightedRoundRobin,
LoadBalanceStrategy::Random,
];
for strategy in strategies {
info!("Testing strategy: {:?}", strategy);
let test_config = ConnectionPoolConfig {
strategy,
max_connections: 3,
min_connections: 2,
..Default::default()
};
let mut test_pool: ConnectionPool<MockTransport> = ConnectionPool::new(test_config);
test_pool
.start(|| {
static TEST_COUNTER: std::sync::atomic::AtomicU32 =
std::sync::atomic::AtomicU32::new(100);
let id = TEST_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
Ok(MockTransport::new(id))
})
.await?;
for i in 0..5 {
let message = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String(format!("test-{}", i)),
i as f64 * 20.0,
Some(format!("Load balancing test {}", i)),
),
));
if let Ok(connection_id) = test_pool.get_connection().await {
info!(
" Strategy {:?} selected connection: {}",
strategy, connection_id
);
let _ = test_pool.send_to_connection(connection_id, message).await;
}
}
test_pool.shutdown().await?;
}
info!("🏥 Demonstrating health monitoring...");
let health_stats = pool.get_stats().await;
info!("Health status distribution:");
info!(" • Healthy: {}", health_stats.healthy_connections);
info!(" • Degraded: {}", health_stats.degraded_connections);
info!(" • Unhealthy: {}", health_stats.unhealthy_connections);
info!("📈 Simulating load across connections...");
for i in 0..10 {
let message = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String(format!("load-test-{}", i)),
i as f64 * 10.0,
Some(format!("Load test message {}", i)),
),
));
if let Err(e) = pool.send_message(message).await {
info!("Failed to send message {}: {}", i, e);
} else {
info!("✓ Message {} sent through pool", i);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let final_stats = pool.get_stats().await;
info!("📊 Final pool statistics:");
info!(" • Total connections: {}", final_stats.total_connections);
info!(
" • Healthy connections: {}",
final_stats.healthy_connections
);
info!(
" • Total requests processed: {}",
final_stats.total_requests
);
info!(
" • Currently active requests: {}",
final_stats.active_requests
);
info!("🔄 Connection pool benefits:");
info!(" • Automatic load distribution across connections");
info!(" • Health monitoring and automatic failover");
info!(" • Configurable load balancing strategies");
info!(" • Connection lifecycle management");
info!(" • Request/response correlation");
info!(" • Performance monitoring and statistics");
pool.shutdown().await?;
info!("👋 Connection pool shut down gracefully");
Ok(())
}