use pmcp::shared::{OptimizedSseConfig, OptimizedSseTransport, Transport, TransportMessage};
use std::time::Duration;
use tracing::{info, Level};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!("🚀 Starting Optimized SSE Transport Example");
let config = OptimizedSseConfig {
url: "http://localhost:8080/sse".to_string(),
connection_timeout: Duration::from_secs(30),
keepalive_interval: Duration::from_secs(15),
max_reconnects: 5,
reconnect_delay: Duration::from_secs(1),
buffer_size: 100,
flush_interval: Duration::from_millis(100),
enable_pooling: true,
max_connections: 10,
enable_compression: false,
};
info!("✅ Configuration:");
info!(" • URL: {}", config.url);
info!(" • Connection pooling: {}", config.enable_pooling);
info!(" • Max connections: {}", config.max_connections);
info!(" • Buffer size: {}", config.buffer_size);
info!(" • Flush interval: {:?}", config.flush_interval);
info!(" • Keepalive: {:?}", config.keepalive_interval);
info!(" • Compression: {}", config.enable_compression);
let mut transport = OptimizedSseTransport::new(config);
info!(
"📊 Transport created with type: {}",
transport.transport_type()
);
info!("📤 Sending test messages...");
let notification = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String("task-001".to_string()),
25.0,
Some("Processing started".to_string()),
),
));
if let Err(e) = transport.send(notification).await {
info!("Failed to send notification: {}", e);
} else {
info!("✓ Notification sent");
}
let request = TransportMessage::Request {
id: pmcp::types::RequestId::from(1i64),
request: pmcp::types::Request::Client(Box::new(pmcp::types::ClientRequest::Ping)),
};
if let Err(e) = transport.send(request).await {
info!("Failed to send request: {}", e);
} else {
info!("✓ Request sent");
}
info!("📦 Sending batch of messages...");
for i in 0..10 {
let progress_msg = TransportMessage::Notification(pmcp::types::Notification::Progress(
pmcp::types::ProgressNotification::new(
pmcp::types::ProgressToken::String(format!("batch-{}", i)),
i as f64 * 10.0,
Some(format!("Batch message {}", i)),
),
));
if let Err(e) = transport.send(progress_msg).await {
info!("Failed to send batch message {}: {}", i, e);
}
}
info!("✓ Batch messages queued (will be coalesced and flushed)");
info!(
"🔌 Connection status: {}",
if transport.is_connected() {
"Connected"
} else {
"Disconnected"
}
);
info!("📥 Attempting to receive messages...");
match tokio::time::timeout(Duration::from_secs(2), transport.receive()).await {
Ok(Ok(msg)) => {
info!("Received message: {:?}", msg);
},
Ok(Err(e)) => {
info!("Receive error: {}", e);
},
Err(_) => {
info!("No messages received (timeout)");
},
}
info!("🔄 Connection pooling benefits:");
info!(" • Reuses existing connections");
info!(" • Reduces latency for subsequent requests");
info!(" • Maintains TCP keepalive");
info!(" • Automatic reconnection on failure");
transport.close().await?;
info!("👋 Transport closed");
Ok(())
}