pub struct DefaultRouter { /* private fields */ }Expand description
默认路由器实现
使用 DashMap 存储消息ID到处理函数的映射,支持并发访问。
DashMap 是一个线程安全的哈希表,适合在多线程环境中使用。
Implementations§
Source§impl DefaultRouter
impl DefaultRouter
Sourcepub fn new() -> Self
pub fn new() -> Self
创建一个新的默认路由器实例
§返回值
返回一个新的 DefaultRouter 实例,其中包含一个空的路由表
§示例
use zerust::{DefaultRouter, Response, Request};
use std::sync::Arc;
// 创建路由器
let router = Arc::new(DefaultRouter::new());
// 添加路由处理
router.add_route(1, |req| {
println!("处理消息ID为1的请求");
Response::new(req.msg_id(), b"Hello, World!".to_vec())
});
// 添加另一个路由处理
router.add_route(2, |req| {
println!("处理消息ID为2的请求");
Response::new(req.msg_id(), b"Echo: ".iter().chain(req.data().iter()).cloned().collect())
});Examples found in repository?
examples/benchmark_server.rs (line 72)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68 // 创建关闭通道
69 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71 // 创建路由器并注册回显处理函数
72 let router = Arc::new(DefaultRouter::new());
73 let router_clone = router.clone();
74
75 // 计数器,用于统计处理的请求数
76 let request_counter = Arc::new(AtomicUsize::new(0));
77 let counter_clone = request_counter.clone();
78
79 // 注册高性能回显处理函数 - 不打印日志,直接返回
80 router_clone.add_route(1, move |req| {
81 counter_clone.fetch_add(1, Ordering::Relaxed);
82 Response::new(req.msg_id(), req.data().to_vec())
83 });
84
85 // 启动服务器
86 let server_addr = "127.0.0.1:8888";
87 let server = Server::new(server_addr, router);
88 println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90 // 启动统计任务
91 let stats_handle = tokio::spawn(async move {
92 let mut last_count = 0;
93 let mut last_time = Instant::now();
94
95 loop {
96 sleep(Duration::from_secs(1)).await;
97 let current_count = request_counter.load(Ordering::Relaxed);
98 let current_time = Instant::now();
99 let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101 let rps = (current_count - last_count) as f64 / elapsed;
102 println!(
103 "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104 rps, current_count
105 );
106
107 last_count = current_count;
108 last_time = current_time;
109 }
110 });
111
112 // 启动服务器并等待Ctrl+C信号
113 let server_handle = tokio::spawn(async move {
114 if let Err(e) = server.run(shutdown_rx).await {
115 eprintln!("[Server] 运行时错误: {}", e);
116 }
117 });
118
119 println!("[Server] 按 Ctrl+C 停止服务器...");
120 tokio::signal::ctrl_c().await?;
121 println!("[Server] 接收到停止信号,正在关闭...");
122
123 // 发送关闭信号
124 let _ = shutdown_tx.send(());
125
126 // 等待服务器和统计任务完成
127 let _ = server_handle.await;
128 stats_handle.abort();
129
130 println!("[Server] 服务器已关闭");
131 Ok(())
132}More examples
examples/echo_server_v1.rs (line 35)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 // ========================================
26 // 1. 创建关闭通道:用于外部控制服务器生命周期
27 // ========================================
28 // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29 // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32 // ========================================
33 // 2. 创建并配置路由器
34 // ========================================
35 let router = Arc::new(DefaultRouter::new());
36
37 // 注册 msg_id = 1 的回显处理函数
38 let router_clone = router.clone();
39 router_clone.add_route(1, |req| {
40 println!("Received echo request: {:?}", req.data());
41 Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42 });
43
44 // ========================================
45 // 3. 启动服务器(异步任务)
46 // ========================================
47 let server = Server::new("127.0.0.1:8000", router);
48 let server_handle = tokio::spawn(async move {
49 if let Err(e) = server.run(shutdown_rx).await {
50 eprintln!("[Zerust] Server runtime error: {}", e);
51 }
52 });
53
54 // ========================================
55 // 4. 等待服务器就绪(端口探测)
56 // ========================================
57 // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58 if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59 eprintln!("[Client] Failed to connect to server within 5 seconds.");
60 return Err("Server did not start in time".into());
61 }
62 println!("[Client] Server is ready. Proceeding with test...");
63
64 // ========================================
65 // 5. 运行客户端测试
66 // ========================================
67 match client().await {
68 Ok(()) => println!("✅ Client finished successfully."),
69 Err(e) => eprintln!("❌ Client error: {}", e),
70 }
71
72 // ========================================
73 // 6. 发送关闭信号
74 // ========================================
75 // 客户端完成,通知服务器关闭
76 let _ = shutdown_tx.send(());
77 println!("[Main] Shutdown signal sent.");
78
79 // ========================================
80 // 7. 等待服务器完全停止
81 // ========================================
82 // 确保 server.run() 任务完全结束,避免资源泄漏
83 let _ = server_handle.await;
84
85 println!("🎉 Program exited gracefully.");
86 Ok(())
87}Sourcepub fn add_route<F>(&self, msg_id: u32, handler: F)
pub fn add_route<F>(&self, msg_id: u32, handler: F)
添加路由规则
将指定的消息ID与处理函数关联起来,当收到对应消息ID的请求时, 会调用该处理函数生成响应。
§参数
msg_id- 消息IDhandler- 处理函数,接收请求对象的引用,返回响应对象
§类型参数
F- 处理函数的类型,必须实现Fn(&Request) -> Response + Send + Sync + 'static'static约束确保了闭包捕获的任何数据都拥有所有权或具有 ’static 生命周期, 使得 Handler 可以安全地在程序的整个生命周期内存在
Examples found in repository?
examples/benchmark_server.rs (lines 80-83)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68 // 创建关闭通道
69 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71 // 创建路由器并注册回显处理函数
72 let router = Arc::new(DefaultRouter::new());
73 let router_clone = router.clone();
74
75 // 计数器,用于统计处理的请求数
76 let request_counter = Arc::new(AtomicUsize::new(0));
77 let counter_clone = request_counter.clone();
78
79 // 注册高性能回显处理函数 - 不打印日志,直接返回
80 router_clone.add_route(1, move |req| {
81 counter_clone.fetch_add(1, Ordering::Relaxed);
82 Response::new(req.msg_id(), req.data().to_vec())
83 });
84
85 // 启动服务器
86 let server_addr = "127.0.0.1:8888";
87 let server = Server::new(server_addr, router);
88 println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90 // 启动统计任务
91 let stats_handle = tokio::spawn(async move {
92 let mut last_count = 0;
93 let mut last_time = Instant::now();
94
95 loop {
96 sleep(Duration::from_secs(1)).await;
97 let current_count = request_counter.load(Ordering::Relaxed);
98 let current_time = Instant::now();
99 let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101 let rps = (current_count - last_count) as f64 / elapsed;
102 println!(
103 "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104 rps, current_count
105 );
106
107 last_count = current_count;
108 last_time = current_time;
109 }
110 });
111
112 // 启动服务器并等待Ctrl+C信号
113 let server_handle = tokio::spawn(async move {
114 if let Err(e) = server.run(shutdown_rx).await {
115 eprintln!("[Server] 运行时错误: {}", e);
116 }
117 });
118
119 println!("[Server] 按 Ctrl+C 停止服务器...");
120 tokio::signal::ctrl_c().await?;
121 println!("[Server] 接收到停止信号,正在关闭...");
122
123 // 发送关闭信号
124 let _ = shutdown_tx.send(());
125
126 // 等待服务器和统计任务完成
127 let _ = server_handle.await;
128 stats_handle.abort();
129
130 println!("[Server] 服务器已关闭");
131 Ok(())
132}More examples
examples/echo_server_v1.rs (lines 39-42)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 // ========================================
26 // 1. 创建关闭通道:用于外部控制服务器生命周期
27 // ========================================
28 // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29 // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30 let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32 // ========================================
33 // 2. 创建并配置路由器
34 // ========================================
35 let router = Arc::new(DefaultRouter::new());
36
37 // 注册 msg_id = 1 的回显处理函数
38 let router_clone = router.clone();
39 router_clone.add_route(1, |req| {
40 println!("Received echo request: {:?}", req.data());
41 Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42 });
43
44 // ========================================
45 // 3. 启动服务器(异步任务)
46 // ========================================
47 let server = Server::new("127.0.0.1:8000", router);
48 let server_handle = tokio::spawn(async move {
49 if let Err(e) = server.run(shutdown_rx).await {
50 eprintln!("[Zerust] Server runtime error: {}", e);
51 }
52 });
53
54 // ========================================
55 // 4. 等待服务器就绪(端口探测)
56 // ========================================
57 // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58 if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59 eprintln!("[Client] Failed to connect to server within 5 seconds.");
60 return Err("Server did not start in time".into());
61 }
62 println!("[Client] Server is ready. Proceeding with test...");
63
64 // ========================================
65 // 5. 运行客户端测试
66 // ========================================
67 match client().await {
68 Ok(()) => println!("✅ Client finished successfully."),
69 Err(e) => eprintln!("❌ Client error: {}", e),
70 }
71
72 // ========================================
73 // 6. 发送关闭信号
74 // ========================================
75 // 客户端完成,通知服务器关闭
76 let _ = shutdown_tx.send(());
77 println!("[Main] Shutdown signal sent.");
78
79 // ========================================
80 // 7. 等待服务器完全停止
81 // ========================================
82 // 确保 server.run() 任务完全结束,避免资源泄漏
83 let _ = server_handle.await;
84
85 println!("🎉 Program exited gracefully.");
86 Ok(())
87}Trait Implementations§
Source§impl Default for DefaultRouter
为 DefaultRouter 实现 Default trait
impl Default for DefaultRouter
为 DefaultRouter 实现 Default trait
Auto Trait Implementations§
impl Freeze for DefaultRouter
impl !RefUnwindSafe for DefaultRouter
impl Send for DefaultRouter
impl Sync for DefaultRouter
impl Unpin for DefaultRouter
impl !UnwindSafe for DefaultRouter
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more