use smux_rust::{client, server, Config};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("=== smux_rust 高级功能示例 ===\n");
let server_handle = tokio::spawn(async {
if let Err(e) = server_example().await {
eprintln!("服务器错误: {}", e);
}
});
sleep(Duration::from_millis(100)).await;
client_example().await?;
tokio::time::timeout(Duration::from_secs(5), server_handle).await??;
Ok(())
}
async fn server_example() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:9090").await?;
println!("[服务器] 监听在 127.0.0.1:9090");
let (tcp_conn, addr) = listener.accept().await?;
println!("[服务器] 接受来自 {} 的连接", addr);
let config = Config {
version: 2, keep_alive_disabled: false,
keep_alive_interval: Duration::from_secs(5),
keep_alive_timeout: Duration::from_secs(15),
max_frame_size: 32768,
max_receive_buffer: 4_194_304,
max_stream_buffer: 65536,
};
let session = server(Box::new(tcp_conn), Some(config)).await?;
println!("[服务器] 会话已创建");
let session_clone = session.clone();
let monitor_handle = tokio::spawn(async move {
loop {
let num = session_clone.num_streams().await;
if num > 0 {
println!("[服务器] 当前活跃流数量: {}", num);
}
sleep(Duration::from_millis(500)).await;
if session_clone.is_closed() {
break;
}
}
});
let mut handles = Vec::new();
for i in 0..3 {
let session_clone = session.clone();
let handle = tokio::spawn(async move {
match session_clone.accept_stream().await {
Ok(stream) => {
println!("[服务器] 接受流 #{} (ID: {})", i + 1, stream.id());
let die_notifier = stream.get_die_notifier();
let stream_id = stream.id();
tokio::spawn(async move {
die_notifier.notified().await;
println!("[服务器] 流 {} 已关闭(通过通知)", stream_id);
});
stream
.set_read_deadline(Some(Instant::now() + Duration::from_secs(5)))
.await;
let mut buf = vec![0u8; 1024];
match stream.read(&mut buf).await {
Ok(n) => {
println!(
"[服务器] 流 {} 收到数据: {}",
stream.id(),
String::from_utf8_lossy(&buf[..n])
);
let response = format!("Response from server to stream {}", stream.id());
if let Err(e) = stream.write_all(response.as_bytes()).await {
eprintln!("[服务器] 流 {} 写入错误: {}", stream.id(), e);
} else {
println!("[服务器] 流 {} 发送响应", stream.id());
}
}
Err(e) => {
eprintln!("[服务器] 流 {} 读取错误: {}", stream.id(), e);
}
}
if let Err(e) = stream.close().await {
eprintln!("[服务器] 流 {} 关闭错误: {}", stream.id(), e);
}
}
Err(e) => {
if !session_clone.is_closed() {
eprintln!("[服务器] 接受流 #{} 错误: {}", i + 1, e);
}
}
}
});
handles.push(handle);
}
for handle in handles {
let _ = handle.await;
}
monitor_handle.await?;
println!("[服务器] 关闭会话");
session.close().await?;
Ok(())
}
async fn client_example() -> Result<(), Box<dyn std::error::Error>> {
println!("[客户端] 连接到 127.0.0.1:9090");
let tcp_conn = TcpStream::connect("127.0.0.1:9090").await?;
let config = Config {
version: 2, keep_alive_disabled: false,
keep_alive_interval: Duration::from_secs(5),
keep_alive_timeout: Duration::from_secs(15),
max_frame_size: 32768,
max_receive_buffer: 4_194_304,
max_stream_buffer: 65536,
};
let config_version = config.version;
let session = client(Box::new(tcp_conn), Some(config)).await?;
println!("[客户端] 会话已创建(协议版本 {})", config_version);
session
.set_deadline(Some(Instant::now() + Duration::from_secs(30)))
.await;
for i in 0..3 {
let stream = session.open_stream().await?;
println!("[客户端] 打开流 #{} (ID: {})", i + 1, stream.id());
stream
.set_write_deadline(Some(Instant::now() + Duration::from_secs(3)))
.await;
if !stream.is_closed().await {
let message = format!("Message {} from client", i + 1);
stream.write_all(message.as_bytes()).await?;
println!("[客户端] 流 {} 发送: {}", stream.id(), message);
}
let mut buf = vec![0u8; 1024];
match stream.read(&mut buf).await {
Ok(n) => {
println!(
"[客户端] 流 {} 收到响应: {}",
stream.id(),
String::from_utf8_lossy(&buf[..n])
);
}
Err(e) => {
eprintln!("[客户端] 流 {} 读取错误: {}", stream.id(), e);
}
}
stream.close().await?;
println!("[客户端] 流 {} 已关闭", stream.id());
sleep(Duration::from_millis(100)).await;
}
let num_streams = session.num_streams().await;
println!("[客户端] 当前活跃流数量: {}", num_streams);
println!("[客户端] 关闭会话");
session.close().await?;
Ok(())
}