echo_server_v1/
echo_server_v1.rs

1//! # Zerust Echo Server 测试程序
2//!
3//! 本示例用于验证 Zerust 框架的核心功能:
4//! - 异步 TCP 服务器启动与连接处理
5//! - 路由分发机制(msg_id -> handler)
6//! - 客户端请求发送与响应解析
7//! - 服务器的**优雅启动与主动关闭**
8//! - 集成测试的完整生命周期控制
9//!
10//! ✅ 运行方式:
11//! ```bash
12//! cargo run --example echo_server_v1
13//! ```
14
15use std::sync::Arc;
16use std::time::Duration;
17use tokio::io::{AsyncReadExt, AsyncWriteExt};
18use tokio::net::TcpStream;
19use tokio::sync::oneshot;
20use zerust::datapack::DataPack;
21use zerust::{DefaultRouter, Response, Server};
22
23#[tokio::main]
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    // ========================================
26    // 1. 创建关闭通道:用于外部控制服务器生命周期
27    // ========================================
28    // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29    // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32    // ========================================
33    // 2. 创建并配置路由器
34    // ========================================
35    let router = Arc::new(DefaultRouter::new());
36
37    // 注册 msg_id = 1 的回显处理函数
38    let router_clone = router.clone();
39    router_clone.add_route(1, |req| {
40        println!("Received echo request: {:?}", req.data());
41        Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42    });
43
44    // ========================================
45    // 3. 启动服务器(异步任务)
46    // ========================================
47    let server = Server::new("127.0.0.1:8000", router);
48    let server_handle = tokio::spawn(async move {
49        if let Err(e) = server.run(shutdown_rx).await {
50            eprintln!("[Zerust] Server runtime error: {}", e);
51        }
52    });
53
54    // ========================================
55    // 4. 等待服务器就绪(端口探测)
56    // ========================================
57    // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58    if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59        eprintln!("[Client] Failed to connect to server within 5 seconds.");
60        return Err("Server did not start in time".into());
61    }
62    println!("[Client] Server is ready. Proceeding with test...");
63
64    // ========================================
65    // 5. 运行客户端测试
66    // ========================================
67    match client().await {
68        Ok(()) => println!("✅ Client finished successfully."),
69        Err(e) => eprintln!("❌ Client error: {}", e),
70    }
71
72    // ========================================
73    // 6. 发送关闭信号
74    // ========================================
75    // 客户端完成,通知服务器关闭
76    let _ = shutdown_tx.send(());
77    println!("[Main] Shutdown signal sent.");
78
79    // ========================================
80    // 7. 等待服务器完全停止
81    // ========================================
82    // 确保 server.run() 任务完全结束,避免资源泄漏
83    let _ = server_handle.await;
84
85    println!("🎉 Program exited gracefully.");
86    Ok(())
87}
88
89/// 客户端:连接服务器,发送测试请求,接收并验证响应
90async fn client() -> Result<(), Box<dyn std::error::Error>> {
91    let mut stream = TcpStream::connect("127.0.0.1:8000").await?;
92    println!("Connected to server");
93
94    // 构造请求:msg_id=1, data="test"
95    let bytes = DataPack::pack(1, b"test");
96    stream.write_all(&bytes).await?;
97    println!("Sent request: msg_id=1, data=test");
98
99    // 读取响应头(8字节)
100    let mut header = [0u8; 8];
101    stream.read_exact(&mut header).await?;
102    let (msg_id, data_len) = DataPack::unpack_header(&header)?;
103    println!(
104        "Received response header: msg_id={}, data_len={}",
105        msg_id, data_len
106    );
107
108    // 读取响应数据
109    let mut data = vec![0u8; data_len as usize];
110    stream.read_exact(&mut data).await?;
111    println!(
112        "Received response: msg_id={}, data={:?}",
113        msg_id,
114        String::from_utf8_lossy(&data)
115    );
116
117    Ok(())
118}
119
120/// 等待服务器在指定端口上启动
121///
122/// # 参数
123/// - `port`: 要探测的端口
124/// - `timeout`: 最大等待时间
125///
126/// # 返回
127/// - `Ok(())`: 在超时前成功连接
128/// - `Err(_)`: 超时或持续连接失败
129async fn wait_for_server(port: u16, timeout: Duration) -> Result<(), Box<dyn std::error::Error>> {
130    let deadline = tokio::time::Instant::now() + timeout;
131    loop {
132        // 尝试连接,带剩余时间限制
133        let connect_fut = TcpStream::connect(("127.0.0.1", port));
134        if tokio::time::timeout(deadline - tokio::time::Instant::now(), connect_fut)
135            .await
136            .is_ok()
137        {
138            return Ok(()); // 连接成功,退出
139        }
140        // 短暂休眠后重试
141        tokio::time::sleep(Duration::from_millis(10)).await;
142    }
143}