Struct DefaultRouter

Source
pub struct DefaultRouter { /* private fields */ }
Expand description

默认路由器实现

使用 DashMap 存储消息ID到处理函数的映射,支持并发访问。 DashMap 是一个线程安全的哈希表,适合在多线程环境中使用。

Implementations§

Source§

impl DefaultRouter

Source

pub fn new() -> Self

创建一个新的默认路由器实例

§返回值

返回一个新的 DefaultRouter 实例,其中包含一个空的路由表

§示例
use zerust::{DefaultRouter, Response, Request};
use std::sync::Arc;

// 创建路由器
let router = Arc::new(DefaultRouter::new());

// 添加路由处理
router.add_route(1, |req| {
    println!("处理消息ID为1的请求");
    Response::new(req.msg_id(), b"Hello, World!".to_vec())
});

// 添加另一个路由处理
router.add_route(2, |req| {
    println!("处理消息ID为2的请求");
    Response::new(req.msg_id(), b"Echo: ".iter().chain(req.data().iter()).cloned().collect())
});
Examples found in repository?
examples/benchmark_server.rs (line 72)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68    // 创建关闭通道
69    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71    // 创建路由器并注册回显处理函数
72    let router = Arc::new(DefaultRouter::new());
73    let router_clone = router.clone();
74
75    // 计数器,用于统计处理的请求数
76    let request_counter = Arc::new(AtomicUsize::new(0));
77    let counter_clone = request_counter.clone();
78
79    // 注册高性能回显处理函数 - 不打印日志,直接返回
80    router_clone.add_route(1, move |req| {
81        counter_clone.fetch_add(1, Ordering::Relaxed);
82        Response::new(req.msg_id(), req.data().to_vec())
83    });
84
85    // 启动服务器
86    let server_addr = "127.0.0.1:8888";
87    let server = Server::new(server_addr, router);
88    println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90    // 启动统计任务
91    let stats_handle = tokio::spawn(async move {
92        let mut last_count = 0;
93        let mut last_time = Instant::now();
94
95        loop {
96            sleep(Duration::from_secs(1)).await;
97            let current_count = request_counter.load(Ordering::Relaxed);
98            let current_time = Instant::now();
99            let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101            let rps = (current_count - last_count) as f64 / elapsed;
102            println!(
103                "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104                rps, current_count
105            );
106
107            last_count = current_count;
108            last_time = current_time;
109        }
110    });
111
112    // 启动服务器并等待Ctrl+C信号
113    let server_handle = tokio::spawn(async move {
114        if let Err(e) = server.run(shutdown_rx).await {
115            eprintln!("[Server] 运行时错误: {}", e);
116        }
117    });
118
119    println!("[Server] 按 Ctrl+C 停止服务器...");
120    tokio::signal::ctrl_c().await?;
121    println!("[Server] 接收到停止信号,正在关闭...");
122
123    // 发送关闭信号
124    let _ = shutdown_tx.send(());
125
126    // 等待服务器和统计任务完成
127    let _ = server_handle.await;
128    stats_handle.abort();
129
130    println!("[Server] 服务器已关闭");
131    Ok(())
132}
More examples
Hide additional examples
examples/echo_server_v1.rs (line 35)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    // ========================================
26    // 1. 创建关闭通道:用于外部控制服务器生命周期
27    // ========================================
28    // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29    // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32    // ========================================
33    // 2. 创建并配置路由器
34    // ========================================
35    let router = Arc::new(DefaultRouter::new());
36
37    // 注册 msg_id = 1 的回显处理函数
38    let router_clone = router.clone();
39    router_clone.add_route(1, |req| {
40        println!("Received echo request: {:?}", req.data());
41        Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42    });
43
44    // ========================================
45    // 3. 启动服务器(异步任务)
46    // ========================================
47    let server = Server::new("127.0.0.1:8000", router);
48    let server_handle = tokio::spawn(async move {
49        if let Err(e) = server.run(shutdown_rx).await {
50            eprintln!("[Zerust] Server runtime error: {}", e);
51        }
52    });
53
54    // ========================================
55    // 4. 等待服务器就绪(端口探测)
56    // ========================================
57    // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58    if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59        eprintln!("[Client] Failed to connect to server within 5 seconds.");
60        return Err("Server did not start in time".into());
61    }
62    println!("[Client] Server is ready. Proceeding with test...");
63
64    // ========================================
65    // 5. 运行客户端测试
66    // ========================================
67    match client().await {
68        Ok(()) => println!("✅ Client finished successfully."),
69        Err(e) => eprintln!("❌ Client error: {}", e),
70    }
71
72    // ========================================
73    // 6. 发送关闭信号
74    // ========================================
75    // 客户端完成,通知服务器关闭
76    let _ = shutdown_tx.send(());
77    println!("[Main] Shutdown signal sent.");
78
79    // ========================================
80    // 7. 等待服务器完全停止
81    // ========================================
82    // 确保 server.run() 任务完全结束,避免资源泄漏
83    let _ = server_handle.await;
84
85    println!("🎉 Program exited gracefully.");
86    Ok(())
87}
Source

pub fn add_route<F>(&self, msg_id: u32, handler: F)
where F: Fn(&Request) -> Response + Send + Sync + 'static,

添加路由规则

将指定的消息ID与处理函数关联起来,当收到对应消息ID的请求时, 会调用该处理函数生成响应。

§参数
  • msg_id - 消息ID
  • handler - 处理函数,接收请求对象的引用,返回响应对象
§类型参数
  • F - 处理函数的类型,必须实现 Fn(&Request) -> Response + Send + Sync + 'static
    • 'static 约束确保了闭包捕获的任何数据都拥有所有权或具有 ’static 生命周期, 使得 Handler 可以安全地在程序的整个生命周期内存在
Examples found in repository?
examples/benchmark_server.rs (lines 80-83)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68    // 创建关闭通道
69    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71    // 创建路由器并注册回显处理函数
72    let router = Arc::new(DefaultRouter::new());
73    let router_clone = router.clone();
74
75    // 计数器,用于统计处理的请求数
76    let request_counter = Arc::new(AtomicUsize::new(0));
77    let counter_clone = request_counter.clone();
78
79    // 注册高性能回显处理函数 - 不打印日志,直接返回
80    router_clone.add_route(1, move |req| {
81        counter_clone.fetch_add(1, Ordering::Relaxed);
82        Response::new(req.msg_id(), req.data().to_vec())
83    });
84
85    // 启动服务器
86    let server_addr = "127.0.0.1:8888";
87    let server = Server::new(server_addr, router);
88    println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90    // 启动统计任务
91    let stats_handle = tokio::spawn(async move {
92        let mut last_count = 0;
93        let mut last_time = Instant::now();
94
95        loop {
96            sleep(Duration::from_secs(1)).await;
97            let current_count = request_counter.load(Ordering::Relaxed);
98            let current_time = Instant::now();
99            let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101            let rps = (current_count - last_count) as f64 / elapsed;
102            println!(
103                "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104                rps, current_count
105            );
106
107            last_count = current_count;
108            last_time = current_time;
109        }
110    });
111
112    // 启动服务器并等待Ctrl+C信号
113    let server_handle = tokio::spawn(async move {
114        if let Err(e) = server.run(shutdown_rx).await {
115            eprintln!("[Server] 运行时错误: {}", e);
116        }
117    });
118
119    println!("[Server] 按 Ctrl+C 停止服务器...");
120    tokio::signal::ctrl_c().await?;
121    println!("[Server] 接收到停止信号,正在关闭...");
122
123    // 发送关闭信号
124    let _ = shutdown_tx.send(());
125
126    // 等待服务器和统计任务完成
127    let _ = server_handle.await;
128    stats_handle.abort();
129
130    println!("[Server] 服务器已关闭");
131    Ok(())
132}
More examples
Hide additional examples
examples/echo_server_v1.rs (lines 39-42)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    // ========================================
26    // 1. 创建关闭通道:用于外部控制服务器生命周期
27    // ========================================
28    // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29    // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32    // ========================================
33    // 2. 创建并配置路由器
34    // ========================================
35    let router = Arc::new(DefaultRouter::new());
36
37    // 注册 msg_id = 1 的回显处理函数
38    let router_clone = router.clone();
39    router_clone.add_route(1, |req| {
40        println!("Received echo request: {:?}", req.data());
41        Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42    });
43
44    // ========================================
45    // 3. 启动服务器(异步任务)
46    // ========================================
47    let server = Server::new("127.0.0.1:8000", router);
48    let server_handle = tokio::spawn(async move {
49        if let Err(e) = server.run(shutdown_rx).await {
50            eprintln!("[Zerust] Server runtime error: {}", e);
51        }
52    });
53
54    // ========================================
55    // 4. 等待服务器就绪(端口探测)
56    // ========================================
57    // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58    if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59        eprintln!("[Client] Failed to connect to server within 5 seconds.");
60        return Err("Server did not start in time".into());
61    }
62    println!("[Client] Server is ready. Proceeding with test...");
63
64    // ========================================
65    // 5. 运行客户端测试
66    // ========================================
67    match client().await {
68        Ok(()) => println!("✅ Client finished successfully."),
69        Err(e) => eprintln!("❌ Client error: {}", e),
70    }
71
72    // ========================================
73    // 6. 发送关闭信号
74    // ========================================
75    // 客户端完成,通知服务器关闭
76    let _ = shutdown_tx.send(());
77    println!("[Main] Shutdown signal sent.");
78
79    // ========================================
80    // 7. 等待服务器完全停止
81    // ========================================
82    // 确保 server.run() 任务完全结束,避免资源泄漏
83    let _ = server_handle.await;
84
85    println!("🎉 Program exited gracefully.");
86    Ok(())
87}

Trait Implementations§

Source§

impl Default for DefaultRouter

DefaultRouter 实现 Default trait

Source§

fn default() -> Self

Returns the “default value” for a type. Read more
Source§

impl Router for DefaultRouter

DefaultRouter 实现 Router trait

Source§

fn handle(&self, req: &Request) -> Response

处理请求并生成响应

根据请求的消息ID查找对应的处理函数,如果找到则调用该函数处理请求, 否则返回一个表示路由未找到的响应。

§参数
  • req - 请求对象的引用
§返回值

返回对应的响应对象

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.