smux_rust 0.2.1

A simple multiplexing library for Rust, inspired by xtaci/smux
//! 高级功能示例
//!
//! 这个示例展示了 smux_rust 的高级功能:
//! - num_streams() 获取流数量
//! - set_deadline() 设置超时
//! - get_die_notifier() 流关闭通知
//! - 异步 I/O 操作

use smux_rust::{client, server, Config};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::sleep;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    println!("=== smux_rust 高级功能示例 ===\n");

    // 启动服务器
    let server_handle = tokio::spawn(async {
        if let Err(e) = server_example().await {
            eprintln!("服务器错误: {}", e);
        }
    });

    // 等待服务器启动
    sleep(Duration::from_millis(100)).await;

    // 运行客户端
    client_example().await?;

    // 等待服务器完成
    tokio::time::timeout(Duration::from_secs(5), server_handle).await??;

    Ok(())
}

async fn server_example() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:9090").await?;
    println!("[服务器] 监听在 127.0.0.1:9090");

    let (tcp_conn, addr) = listener.accept().await?;
    println!("[服务器] 接受来自 {} 的连接", addr);

    // 创建服务器会话(使用与客户端匹配的配置)
    let config = Config {
        version: 2, // 与客户端保持一致
        keep_alive_disabled: false,
        keep_alive_interval: Duration::from_secs(5),
        keep_alive_timeout: Duration::from_secs(15),
        max_frame_size: 32768,
        max_receive_buffer: 4_194_304,
        max_stream_buffer: 65536,
    };
    let session = server(Box::new(tcp_conn), Some(config)).await?;
    println!("[服务器] 会话已创建");

    // 功能1:监控流数量
    let session_clone = session.clone();
    let monitor_handle = tokio::spawn(async move {
        loop {
            let num = session_clone.num_streams().await;
            if num > 0 {
                println!("[服务器] 当前活跃流数量: {}", num);
            }
            sleep(Duration::from_millis(500)).await;
            if session_clone.is_closed() {
                break;
            }
        }
    });

    // 接受并处理多个流(并发处理)
    let mut handles = Vec::new();
    for i in 0..3 {
        let session_clone = session.clone();
        let handle = tokio::spawn(async move {
            match session_clone.accept_stream().await {
                Ok(stream) => {
                    println!("[服务器] 接受流 #{} (ID: {})", i + 1, stream.id());

                    // 功能2:使用流关闭通知
                    let die_notifier = stream.get_die_notifier();
                    let stream_id = stream.id();
                    tokio::spawn(async move {
                        die_notifier.notified().await;
                        println!("[服务器] 流 {} 已关闭(通过通知)", stream_id);
                    });

                    // 功能3:设置读取超时
                    stream
                        .set_read_deadline(Some(Instant::now() + Duration::from_secs(5)))
                        .await;

                    // 功能4:读取和响应
                    let mut buf = vec![0u8; 1024];
                    match stream.read(&mut buf).await {
                        Ok(n) => {
                            println!(
                                "[服务器] 流 {} 收到数据: {}",
                                stream.id(),
                                String::from_utf8_lossy(&buf[..n])
                            );

                            // 功能5:发送响应
                            let response = format!("Response from server to stream {}", stream.id());
                            if let Err(e) = stream.write_all(response.as_bytes()).await {
                                eprintln!("[服务器] 流 {} 写入错误: {}", stream.id(), e);
                            } else {
                                println!("[服务器] 流 {} 发送响应", stream.id());
                            }
                        }
                        Err(e) => {
                            eprintln!("[服务器] 流 {} 读取错误: {}", stream.id(), e);
                        }
                    }

                    if let Err(e) = stream.close().await {
                        eprintln!("[服务器] 流 {} 关闭错误: {}", stream.id(), e);
                    }
                }
                Err(e) => {
                    if !session_clone.is_closed() {
                        eprintln!("[服务器] 接受流 #{} 错误: {}", i + 1, e);
                    }
                }
            }
        });
        handles.push(handle);
    }
    
    // 等待所有流处理完成
    for handle in handles {
        let _ = handle.await;
    }

    // 等待监控任务
    monitor_handle.await?;

    println!("[服务器] 关闭会话");
    session.close().await?;

    Ok(())
}

async fn client_example() -> Result<(), Box<dyn std::error::Error>> {
    println!("[客户端] 连接到 127.0.0.1:9090");

    let tcp_conn = TcpStream::connect("127.0.0.1:9090").await?;

    // 创建自定义配置
    let config = Config {
        version: 2, // 使用协议版本 2
        keep_alive_disabled: false,
        keep_alive_interval: Duration::from_secs(5),
        keep_alive_timeout: Duration::from_secs(15),
        max_frame_size: 32768,
        max_receive_buffer: 4_194_304,
        max_stream_buffer: 65536,
    };

    let config_version = config.version;
    let session = client(Box::new(tcp_conn), Some(config)).await?;
    println!("[客户端] 会话已创建(协议版本 {}", config_version);

    // 功能1:设置会话截止时间
    session
        .set_deadline(Some(Instant::now() + Duration::from_secs(30)))
        .await;

    // 创建多个流
    for i in 0..3 {
        let stream = session.open_stream().await?;
        println!("[客户端] 打开流 #{} (ID: {})", i + 1, stream.id());

        // 功能2:设置写入超时
        stream
            .set_write_deadline(Some(Instant::now() + Duration::from_secs(3)))
            .await;

        // 功能3:检查流是否关闭
        if !stream.is_closed().await {
            let message = format!("Message {} from client", i + 1);
            stream.write_all(message.as_bytes()).await?;
            println!("[客户端] 流 {} 发送: {}", stream.id(), message);
        }

        // 功能4:使用 AsyncReadExt trait
        let mut buf = vec![0u8; 1024];
        match stream.read(&mut buf).await {
            Ok(n) => {
                println!(
                    "[客户端] 流 {} 收到响应: {}",
                    stream.id(),
                    String::from_utf8_lossy(&buf[..n])
                );
            }
            Err(e) => {
                eprintln!("[客户端] 流 {} 读取错误: {}", stream.id(), e);
            }
        }

        stream.close().await?;
        println!("[客户端] 流 {} 已关闭", stream.id());

        sleep(Duration::from_millis(100)).await;
    }

    // 功能5:获取活跃流数量
    let num_streams = session.num_streams().await;
    println!("[客户端] 当前活跃流数量: {}", num_streams);

    println!("[客户端] 关闭会话");
    session.close().await?;

    Ok(())
}