# smux_rust
一个用 Rust 实现的多路复用库,参考了 [xtaci/smux](https://github.com/xtaci/smux) 的 Go 实现。
## 功能特性
- **多路复用**:在单个底层连接上创建多个流
- **流量控制**:Token bucket 和滑动窗口控制
- **内存优化**:对象池减少内存分配开销
- **流量整形**:轮询调度算法保证流之间的公平性
- **协议支持**:支持协议版本 1 和版本 2
## 使用方法
### 基本使用
```rust
use smux_rust::{client, server, Config};
use std::io::{Read, Write};
use std::net::TcpStream;
// 客户端示例
fn client_example() -> Result<(), Box<dyn std::error::Error>> {
// 建立底层连接(例如 TCP)
let tcp_conn = TcpStream::connect("127.0.0.1:8080")?;
// 创建客户端会话
let session = client(Box::new(tcp_conn), None)?;
// 打开一个新流
let mut stream = session.open_stream()?;
// 写入数据
stream.write_all(b"Hello, Server!")?;
// 读取响应
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf)?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
// 关闭流
stream.close()?;
// 关闭会话
session.close()?;
Ok(())
}
// 服务器端示例
fn server_example() -> Result<(), Box<dyn std::error::Error>> {
use std::net::{TcpListener, TcpStream};
let listener = TcpListener::bind("127.0.0.1:8080")?;
for stream in listener.incoming() {
let tcp_conn = stream?;
// 创建服务器会话
let session = server(Box::new(tcp_conn), None)?;
// 接受流
let mut stream = session.accept_stream()?;
// 读取数据
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf)?;
println!("Received: {}", String::from_utf8_lossy(&buf[..n]));
// 写入响应
stream.write_all(b"Hello, Client!")?;
// 关闭流
stream.close()?;
}
Ok(())
}
```
### 自定义配置
```rust
use smux_rust::Config;
use std::time::Duration;
let config = Config {
version: 2, // 使用协议版本 2
keep_alive_disabled: false,
keep_alive_interval: Duration::from_secs(10),
keep_alive_timeout: Duration::from_secs(30),
max_frame_size: 32768,
max_receive_buffer: 4_194_304, // 4MB
max_stream_buffer: 65536, // 64KB
};
// 验证配置
config.verify()?;
// 使用自定义配置创建会话
let session = client(Box::new(tcp_conn), Some(config))?;
```
### 多流使用
```rust
// 在同一个会话中打开多个流
let stream1 = session.open_stream().await?;
let stream2 = session.open_stream().await?;
let stream3 = session.open_stream().await?;
// 每个流都可以独立读写
stream1.write_all(b"Stream 1 data").await?;
stream2.write_all(b"Stream 2 data").await?;
stream3.write_all(b"Stream 3 data").await?;
```
### 高级功能
```rust
use std::time::{Duration, Instant};
// 1. 获取活跃流数量
let num = session.num_streams().await;
println!("活跃流数量: {}", num);
// 2. 设置会话截止时间
session.set_deadline(Some(Instant::now() + Duration::from_secs(30))).await;
// 3. 设置流的读写超时
stream.set_read_deadline(Some(Instant::now() + Duration::from_secs(5))).await;
stream.set_write_deadline(Some(Instant::now() + Duration::from_secs(5))).await;
// 4. 获取流关闭通知
let die_notifier = stream.get_die_notifier();
tokio::spawn(async move {
die_notifier.notified().await;
println!("流已关闭");
});
// 5. 检查流是否关闭
if stream.is_closed().await {
println!("流已关闭");
}
// 6. 等待多个流中任意一个就绪(PollWait)
let streams = vec![&stream1, &stream2, &stream3];
let idx = session.poll_wait(&streams).await?;
println!("流 {} 有数据可读", idx);
// 7. 获取流地址
if let Some(addr) = stream.local_addr() {
println!("本地地址: {}", addr);
}
// 8. 高效复制流数据到文件或其他 AsyncWrite
let mut file = tokio::fs::File::create("output.dat").await?;
let bytes_copied = stream.copy_to(&mut file).await?;
println!("复制了 {} 字节", bytes_copied);
```
## 协议规范
帧格式:
```
命令类型:
- `cmdSYN(0)` - 流打开
- `cmdFIN(1)` - 流关闭
- `cmdPSH(2)` - 数据推送
- `cmdNOP(3)` - 无操作
- `cmdUPD(4)` - 窗口更新(仅版本 2)
流ID分配:
- 客户端使用奇数,从 1 开始
- 服务器端使用偶数,从 0 开始
## 依赖
- `bytes` - 字节缓冲区处理
- `parking_lot` - 高性能互斥锁
## 许可证
MIT License