pub struct Server { /* private fields */ }Expand description
表示一个TCP服务器
Server 是框架的主要入口点,负责监听TCP连接并处理客户端请求。
它使用 Router 来分发请求,使用 Connection 来管理客户端连接。
Implementations§
Source§impl Server
impl Server
Sourcepub fn new(addr: &str, router: Arc<dyn Router + Send + Sync>) -> Self
pub fn new(addr: &str, router: Arc<dyn Router + Send + Sync>) -> Self
Examples found in repository?
examples/benchmark_server.rs (line 87)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68 // 创建关闭通道
69 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71 // 创建路由器并注册回显处理函数
72 let router = Arc::new(DefaultRouter::new());
73 let router_clone = router.clone();
74
75 // 计数器,用于统计处理的请求数
76 let request_counter = Arc::new(AtomicUsize::new(0));
77 let counter_clone = request_counter.clone();
78
79 // 注册高性能回显处理函数 - 不打印日志,直接返回
80 router_clone.add_route(1, move |req| {
81 counter_clone.fetch_add(1, Ordering::Relaxed);
82 Response::new(req.msg_id(), req.data().to_vec())
83 });
84
85 // 启动服务器
86 let server_addr = "127.0.0.1:8888";
87 let server = Server::new(server_addr, router);
88 println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90 // 启动统计任务
91 let stats_handle = tokio::spawn(async move {
92 let mut last_count = 0;
93 let mut last_time = Instant::now();
94
95 loop {
96 sleep(Duration::from_secs(1)).await;
97 let current_count = request_counter.load(Ordering::Relaxed);
98 let current_time = Instant::now();
99 let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101 let rps = (current_count - last_count) as f64 / elapsed;
102 println!(
103 "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104 rps, current_count
105 );
106
107 last_count = current_count;
108 last_time = current_time;
109 }
110 });
111
112 // 启动服务器并等待Ctrl+C信号
113 let server_handle = tokio::spawn(async move {
114 if let Err(e) = server.run(shutdown_rx).await {
115 eprintln!("[Server] 运行时错误: {}", e);
116 }
117 });
118
119 println!("[Server] 按 Ctrl+C 停止服务器...");
120 tokio::signal::ctrl_c().await?;
121 println!("[Server] 接收到停止信号,正在关闭...");
122
123 // 发送关闭信号
124 let _ = shutdown_tx.send(());
125
126 // 等待服务器和统计任务完成
127 let _ = server_handle.await;
128 stats_handle.abort();
129
130 println!("[Server] 服务器已关闭");
131 Ok(())
132}More examples
examples/echo_server_v1.rs (line 47)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 // ========================================
26 // 1. 创建关闭通道:用于外部控制服务器生命周期
27 // ========================================
28 // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29 // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32 // ========================================
33 // 2. 创建并配置路由器
34 // ========================================
35 let router = Arc::new(DefaultRouter::new());
36
37 // 注册 msg_id = 1 的回显处理函数
38 let router_clone = router.clone();
39 router_clone.add_route(1, |req| {
40 println!("Received echo request: {:?}", req.data());
41 Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42 });
43
44 // ========================================
45 // 3. 启动服务器(异步任务)
46 // ========================================
47 let server = Server::new("127.0.0.1:8000", router);
48 let server_handle = tokio::spawn(async move {
49 if let Err(e) = server.run(shutdown_rx).await {
50 eprintln!("[Zerust] Server runtime error: {}", e);
51 }
52 });
53
54 // ========================================
55 // 4. 等待服务器就绪(端口探测)
56 // ========================================
57 // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58 if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59 eprintln!("[Client] Failed to connect to server within 5 seconds.");
60 return Err("Server did not start in time".into());
61 }
62 println!("[Client] Server is ready. Proceeding with test...");
63
64 // ========================================
65 // 5. 运行客户端测试
66 // ========================================
67 match client().await {
68 Ok(()) => println!("✅ Client finished successfully."),
69 Err(e) => eprintln!("❌ Client error: {}", e),
70 }
71
72 // ========================================
73 // 6. 发送关闭信号
74 // ========================================
75 // 客户端完成,通知服务器关闭
76 let _ = shutdown_tx.send(());
77 println!("[Main] Shutdown signal sent.");
78
79 // ========================================
80 // 7. 等待服务器完全停止
81 // ========================================
82 // 确保 server.run() 任务完全结束,避免资源泄漏
83 let _ = server_handle.await;
84
85 println!("🎉 Program exited gracefully.");
86 Ok(())
87}Sourcepub async fn run(&self, shutdown: Receiver<()>) -> Result<(), ZerustError>
pub async fn run(&self, shutdown: Receiver<()>) -> Result<(), ZerustError>
启动服务器并监听指定地址的TCP连接
该函数会绑定到配置的地址并开始监听TCP连接,对于每个传入的连接, 都会创建一个异步任务来处理请求。如果在监听过程中发生IO错误, 函数会立即返回错误。
§参数
shutdown: 接收关闭信号的通道。当发送端被 drop 或发送消息时,服务器将关闭。
§返回值
Ok(())- 服务器正常启动并运行Err(ZerustError)- 服务器启动或运行过程中发生错误
Examples found in repository?
examples/benchmark_server.rs (line 114)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68 // 创建关闭通道
69 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71 // 创建路由器并注册回显处理函数
72 let router = Arc::new(DefaultRouter::new());
73 let router_clone = router.clone();
74
75 // 计数器,用于统计处理的请求数
76 let request_counter = Arc::new(AtomicUsize::new(0));
77 let counter_clone = request_counter.clone();
78
79 // 注册高性能回显处理函数 - 不打印日志,直接返回
80 router_clone.add_route(1, move |req| {
81 counter_clone.fetch_add(1, Ordering::Relaxed);
82 Response::new(req.msg_id(), req.data().to_vec())
83 });
84
85 // 启动服务器
86 let server_addr = "127.0.0.1:8888";
87 let server = Server::new(server_addr, router);
88 println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90 // 启动统计任务
91 let stats_handle = tokio::spawn(async move {
92 let mut last_count = 0;
93 let mut last_time = Instant::now();
94
95 loop {
96 sleep(Duration::from_secs(1)).await;
97 let current_count = request_counter.load(Ordering::Relaxed);
98 let current_time = Instant::now();
99 let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101 let rps = (current_count - last_count) as f64 / elapsed;
102 println!(
103 "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104 rps, current_count
105 );
106
107 last_count = current_count;
108 last_time = current_time;
109 }
110 });
111
112 // 启动服务器并等待Ctrl+C信号
113 let server_handle = tokio::spawn(async move {
114 if let Err(e) = server.run(shutdown_rx).await {
115 eprintln!("[Server] 运行时错误: {}", e);
116 }
117 });
118
119 println!("[Server] 按 Ctrl+C 停止服务器...");
120 tokio::signal::ctrl_c().await?;
121 println!("[Server] 接收到停止信号,正在关闭...");
122
123 // 发送关闭信号
124 let _ = shutdown_tx.send(());
125
126 // 等待服务器和统计任务完成
127 let _ = server_handle.await;
128 stats_handle.abort();
129
130 println!("[Server] 服务器已关闭");
131 Ok(())
132}More examples
examples/echo_server_v1.rs (line 49)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 // ========================================
26 // 1. 创建关闭通道:用于外部控制服务器生命周期
27 // ========================================
28 // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29 // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32 // ========================================
33 // 2. 创建并配置路由器
34 // ========================================
35 let router = Arc::new(DefaultRouter::new());
36
37 // 注册 msg_id = 1 的回显处理函数
38 let router_clone = router.clone();
39 router_clone.add_route(1, |req| {
40 println!("Received echo request: {:?}", req.data());
41 Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42 });
43
44 // ========================================
45 // 3. 启动服务器(异步任务)
46 // ========================================
47 let server = Server::new("127.0.0.1:8000", router);
48 let server_handle = tokio::spawn(async move {
49 if let Err(e) = server.run(shutdown_rx).await {
50 eprintln!("[Zerust] Server runtime error: {}", e);
51 }
52 });
53
54 // ========================================
55 // 4. 等待服务器就绪(端口探测)
56 // ========================================
57 // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58 if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59 eprintln!("[Client] Failed to connect to server within 5 seconds.");
60 return Err("Server did not start in time".into());
61 }
62 println!("[Client] Server is ready. Proceeding with test...");
63
64 // ========================================
65 // 5. 运行客户端测试
66 // ========================================
67 match client().await {
68 Ok(()) => println!("✅ Client finished successfully."),
69 Err(e) => eprintln!("❌ Client error: {}", e),
70 }
71
72 // ========================================
73 // 6. 发送关闭信号
74 // ========================================
75 // 客户端完成,通知服务器关闭
76 let _ = shutdown_tx.send(());
77 println!("[Main] Shutdown signal sent.");
78
79 // ========================================
80 // 7. 等待服务器完全停止
81 // ========================================
82 // 确保 server.run() 任务完全结束,避免资源泄漏
83 let _ = server_handle.await;
84
85 println!("🎉 Program exited gracefully.");
86 Ok(())
87}Auto Trait Implementations§
impl Freeze for Server
impl !RefUnwindSafe for Server
impl Send for Server
impl Sync for Server
impl Unpin for Server
impl !UnwindSafe for Server
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more