smux_rust 0.2.1

A simple multiplexing library for Rust, inspired by xtaci/smux
//! 高级功能测试

use smux_rust::{client, server};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};

#[tokio::test]
async fn test_num_streams() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let server_handle = tokio::spawn(async move {
        let (conn, _) = listener.accept().await.unwrap();
        let session = server(Box::new(conn), None).await.unwrap();
        
        // 初始应该是 0
        assert_eq!(session.num_streams().await, 0);
        
        // 接受一个流
        let _stream = session.accept_stream().await.unwrap();
        assert_eq!(session.num_streams().await, 1);
        
        tokio::time::sleep(Duration::from_millis(100)).await;
    });

    let conn = TcpStream::connect(addr).await?;
    let session = client(Box::new(conn), None).await?;
    
    // 打开一个流
    let stream = session.open_stream().await?;
    assert_eq!(session.num_streams().await, 1);
    
    stream.close().await?;
    tokio::time::sleep(Duration::from_millis(50)).await;
    
    server_handle.await?;
    Ok(())
}

#[tokio::test]
async fn test_stream_deadline() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let server_handle = tokio::spawn(async move {
        let (conn, _) = listener.accept().await.unwrap();
        let session = server(Box::new(conn), None).await.unwrap();
        let mut stream = session.accept_stream().await.unwrap();
        
        // 不发送数据,让客户端超时
        tokio::time::sleep(Duration::from_secs(2)).await;
        let _ = stream.close().await;
    });

    let conn = TcpStream::connect(addr).await?;
    let session = client(Box::new(conn), None).await?;
    let stream = session.open_stream().await?;
    
    // 设置很短的读取超时
    stream.set_read_deadline(Some(Instant::now() + Duration::from_millis(100))).await;
    
    let mut buf = [0u8; 1024];
    let result = stream.read(&mut buf).await;
    
    // 应该超时
    assert!(result.is_err());
    if let Err(e) = result {
        assert_eq!(e.kind(), std::io::ErrorKind::TimedOut);
    }
    
    server_handle.await?;
    Ok(())
}

#[tokio::test]
async fn test_die_notifier() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let server_handle = tokio::spawn(async move {
        let (conn, _) = listener.accept().await.unwrap();
        let session = server(Box::new(conn), None).await.unwrap();
        let stream = session.accept_stream().await.unwrap();
        
        tokio::time::sleep(Duration::from_millis(100)).await;
        let _ = stream.close().await;
    });

    let conn = TcpStream::connect(addr).await?;
    let session = client(Box::new(conn), None).await?;
    let stream = session.open_stream().await?;
    
    let die_notifier = stream.get_die_notifier();
    
    // 在另一个任务中等待关闭通知
    let notify_handle = tokio::spawn(async move {
        die_notifier.notified().await;
        true
    });
    
    // 关闭流
    tokio::time::sleep(Duration::from_millis(50)).await;
    stream.close().await?;
    
    // 验证通知被触发
    let notified = tokio::time::timeout(Duration::from_secs(1), notify_handle).await??;
    assert!(notified);
    
    server_handle.await?;
    Ok(())
}

#[tokio::test]
async fn test_async_read_write() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let server_handle = tokio::spawn(async move {
        let (conn, _) = listener.accept().await.unwrap();
        let session = server(Box::new(conn), None).await.unwrap();
        let stream = session.accept_stream().await.unwrap();
        
        // 读取数据
        let mut buf = [0u8; 1024];
        let n = stream.read(&mut buf).await.unwrap();
        assert_eq!(&buf[..n], b"ping");
        
        // 写入响应
        stream.write_all(b"pong").await.unwrap();
        stream.close().await.unwrap();
    });

    let conn = TcpStream::connect(addr).await?;
    let session = client(Box::new(conn), None).await?;
    let stream = session.open_stream().await?;
    
    // 写入数据
    stream.write_all(b"ping").await?;
    
    // 读取响应
    let mut buf = [0u8; 1024];
    let n = stream.read(&mut buf).await?;
    assert_eq!(&buf[..n], b"pong");
    
    stream.close().await?;
    server_handle.await?;
    Ok(())
}

#[tokio::test]
async fn test_session_deadline() -> Result<(), Box<dyn std::error::Error>> {
    let listener = TcpListener::bind("127.0.0.1:0").await?;
    let addr = listener.local_addr()?;

    let server_handle = tokio::spawn(async move {
        let (conn, _) = listener.accept().await.unwrap();
        let session = server(Box::new(conn), None).await.unwrap();
        
        // 设置会话截止时间
        session.set_deadline(Some(Instant::now() + Duration::from_secs(1))).await;
        
        tokio::time::sleep(Duration::from_millis(100)).await;
    });

    let conn = TcpStream::connect(addr).await?;
    let session = client(Box::new(conn), None).await?;
    
    // 设置会话截止时间
    session.set_deadline(Some(Instant::now() + Duration::from_secs(1))).await;
    
    let stream = session.open_stream().await?;
    stream.write_all(b"test").await?;
    stream.close().await?;
    
    server_handle.await?;
    Ok(())
}