use smux_rust::{client, server};
use std::time::{Duration, Instant};
use tokio::net::{TcpListener, TcpStream};
#[tokio::test]
async fn test_num_streams() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn(async move {
let (conn, _) = listener.accept().await.unwrap();
let session = server(Box::new(conn), None).await.unwrap();
assert_eq!(session.num_streams().await, 0);
let _stream = session.accept_stream().await.unwrap();
assert_eq!(session.num_streams().await, 1);
tokio::time::sleep(Duration::from_millis(100)).await;
});
let conn = TcpStream::connect(addr).await?;
let session = client(Box::new(conn), None).await?;
let stream = session.open_stream().await?;
assert_eq!(session.num_streams().await, 1);
stream.close().await?;
tokio::time::sleep(Duration::from_millis(50)).await;
server_handle.await?;
Ok(())
}
#[tokio::test]
async fn test_stream_deadline() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn(async move {
let (conn, _) = listener.accept().await.unwrap();
let session = server(Box::new(conn), None).await.unwrap();
let mut stream = session.accept_stream().await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let _ = stream.close().await;
});
let conn = TcpStream::connect(addr).await?;
let session = client(Box::new(conn), None).await?;
let stream = session.open_stream().await?;
stream.set_read_deadline(Some(Instant::now() + Duration::from_millis(100))).await;
let mut buf = [0u8; 1024];
let result = stream.read(&mut buf).await;
assert!(result.is_err());
if let Err(e) = result {
assert_eq!(e.kind(), std::io::ErrorKind::TimedOut);
}
server_handle.await?;
Ok(())
}
#[tokio::test]
async fn test_die_notifier() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn(async move {
let (conn, _) = listener.accept().await.unwrap();
let session = server(Box::new(conn), None).await.unwrap();
let stream = session.accept_stream().await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let _ = stream.close().await;
});
let conn = TcpStream::connect(addr).await?;
let session = client(Box::new(conn), None).await?;
let stream = session.open_stream().await?;
let die_notifier = stream.get_die_notifier();
let notify_handle = tokio::spawn(async move {
die_notifier.notified().await;
true
});
tokio::time::sleep(Duration::from_millis(50)).await;
stream.close().await?;
let notified = tokio::time::timeout(Duration::from_secs(1), notify_handle).await??;
assert!(notified);
server_handle.await?;
Ok(())
}
#[tokio::test]
async fn test_async_read_write() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn(async move {
let (conn, _) = listener.accept().await.unwrap();
let session = server(Box::new(conn), None).await.unwrap();
let stream = session.accept_stream().await.unwrap();
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf).await.unwrap();
assert_eq!(&buf[..n], b"ping");
stream.write_all(b"pong").await.unwrap();
stream.close().await.unwrap();
});
let conn = TcpStream::connect(addr).await?;
let session = client(Box::new(conn), None).await?;
let stream = session.open_stream().await?;
stream.write_all(b"ping").await?;
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf).await?;
assert_eq!(&buf[..n], b"pong");
stream.close().await?;
server_handle.await?;
Ok(())
}
#[tokio::test]
async fn test_session_deadline() -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn(async move {
let (conn, _) = listener.accept().await.unwrap();
let session = server(Box::new(conn), None).await.unwrap();
session.set_deadline(Some(Instant::now() + Duration::from_secs(1))).await;
tokio::time::sleep(Duration::from_millis(100)).await;
});
let conn = TcpStream::connect(addr).await?;
let session = client(Box::new(conn), None).await?;
session.set_deadline(Some(Instant::now() + Duration::from_secs(1))).await;
let stream = session.open_stream().await?;
stream.write_all(b"test").await?;
stream.close().await?;
server_handle.await?;
Ok(())
}