Struct Server

Source
pub struct Server { /* private fields */ }
Expand description

表示一个TCP服务器

Server 是框架的主要入口点,负责监听TCP连接并处理客户端请求。 它使用 Router 来分发请求,使用 Connection 来管理客户端连接。

Implementations§

Source§

impl Server

Source

pub fn new(addr: &str, router: Arc<dyn Router + Send + Sync>) -> Self

创建一个新的服务器实例

§参数
  • addr - 服务器监听的地址,格式为 “IP:端口”
  • router - 路由器实例,用于分发请求到对应的处理函数
§返回值

返回一个新的 Server 实例

Examples found in repository?
examples/benchmark_server.rs (line 87)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68    // 创建关闭通道
69    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71    // 创建路由器并注册回显处理函数
72    let router = Arc::new(DefaultRouter::new());
73    let router_clone = router.clone();
74
75    // 计数器,用于统计处理的请求数
76    let request_counter = Arc::new(AtomicUsize::new(0));
77    let counter_clone = request_counter.clone();
78
79    // 注册高性能回显处理函数 - 不打印日志,直接返回
80    router_clone.add_route(1, move |req| {
81        counter_clone.fetch_add(1, Ordering::Relaxed);
82        Response::new(req.msg_id(), req.data().to_vec())
83    });
84
85    // 启动服务器
86    let server_addr = "127.0.0.1:8888";
87    let server = Server::new(server_addr, router);
88    println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90    // 启动统计任务
91    let stats_handle = tokio::spawn(async move {
92        let mut last_count = 0;
93        let mut last_time = Instant::now();
94
95        loop {
96            sleep(Duration::from_secs(1)).await;
97            let current_count = request_counter.load(Ordering::Relaxed);
98            let current_time = Instant::now();
99            let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101            let rps = (current_count - last_count) as f64 / elapsed;
102            println!(
103                "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104                rps, current_count
105            );
106
107            last_count = current_count;
108            last_time = current_time;
109        }
110    });
111
112    // 启动服务器并等待Ctrl+C信号
113    let server_handle = tokio::spawn(async move {
114        if let Err(e) = server.run(shutdown_rx).await {
115            eprintln!("[Server] 运行时错误: {}", e);
116        }
117    });
118
119    println!("[Server] 按 Ctrl+C 停止服务器...");
120    tokio::signal::ctrl_c().await?;
121    println!("[Server] 接收到停止信号,正在关闭...");
122
123    // 发送关闭信号
124    let _ = shutdown_tx.send(());
125
126    // 等待服务器和统计任务完成
127    let _ = server_handle.await;
128    stats_handle.abort();
129
130    println!("[Server] 服务器已关闭");
131    Ok(())
132}
More examples
Hide additional examples
examples/echo_server_v1.rs (line 47)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    // ========================================
26    // 1. 创建关闭通道:用于外部控制服务器生命周期
27    // ========================================
28    // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29    // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32    // ========================================
33    // 2. 创建并配置路由器
34    // ========================================
35    let router = Arc::new(DefaultRouter::new());
36
37    // 注册 msg_id = 1 的回显处理函数
38    let router_clone = router.clone();
39    router_clone.add_route(1, |req| {
40        println!("Received echo request: {:?}", req.data());
41        Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42    });
43
44    // ========================================
45    // 3. 启动服务器(异步任务)
46    // ========================================
47    let server = Server::new("127.0.0.1:8000", router);
48    let server_handle = tokio::spawn(async move {
49        if let Err(e) = server.run(shutdown_rx).await {
50            eprintln!("[Zerust] Server runtime error: {}", e);
51        }
52    });
53
54    // ========================================
55    // 4. 等待服务器就绪(端口探测)
56    // ========================================
57    // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58    if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59        eprintln!("[Client] Failed to connect to server within 5 seconds.");
60        return Err("Server did not start in time".into());
61    }
62    println!("[Client] Server is ready. Proceeding with test...");
63
64    // ========================================
65    // 5. 运行客户端测试
66    // ========================================
67    match client().await {
68        Ok(()) => println!("✅ Client finished successfully."),
69        Err(e) => eprintln!("❌ Client error: {}", e),
70    }
71
72    // ========================================
73    // 6. 发送关闭信号
74    // ========================================
75    // 客户端完成,通知服务器关闭
76    let _ = shutdown_tx.send(());
77    println!("[Main] Shutdown signal sent.");
78
79    // ========================================
80    // 7. 等待服务器完全停止
81    // ========================================
82    // 确保 server.run() 任务完全结束,避免资源泄漏
83    let _ = server_handle.await;
84
85    println!("🎉 Program exited gracefully.");
86    Ok(())
87}
Source

pub async fn run(&self, shutdown: Receiver<()>) -> Result<(), ZerustError>

启动服务器并监听指定地址的TCP连接

该函数会绑定到配置的地址并开始监听TCP连接,对于每个传入的连接, 都会创建一个异步任务来处理请求。如果在监听过程中发生IO错误, 函数会立即返回错误。

§参数
  • shutdown: 接收关闭信号的通道。当发送端被 drop 或发送消息时,服务器将关闭。
§返回值
  • Ok(()) - 服务器正常启动并运行
  • Err(ZerustError) - 服务器启动或运行过程中发生错误
Examples found in repository?
examples/benchmark_server.rs (line 114)
67async fn run_server() -> Result<(), Box<dyn std::error::Error>> {
68    // 创建关闭通道
69    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
70
71    // 创建路由器并注册回显处理函数
72    let router = Arc::new(DefaultRouter::new());
73    let router_clone = router.clone();
74
75    // 计数器,用于统计处理的请求数
76    let request_counter = Arc::new(AtomicUsize::new(0));
77    let counter_clone = request_counter.clone();
78
79    // 注册高性能回显处理函数 - 不打印日志,直接返回
80    router_clone.add_route(1, move |req| {
81        counter_clone.fetch_add(1, Ordering::Relaxed);
82        Response::new(req.msg_id(), req.data().to_vec())
83    });
84
85    // 启动服务器
86    let server_addr = "127.0.0.1:8888";
87    let server = Server::new(server_addr, router);
88    println!("[Server] 基准测试服务器启动在 {}", server_addr);
89
90    // 启动统计任务
91    let stats_handle = tokio::spawn(async move {
92        let mut last_count = 0;
93        let mut last_time = Instant::now();
94
95        loop {
96            sleep(Duration::from_secs(1)).await;
97            let current_count = request_counter.load(Ordering::Relaxed);
98            let current_time = Instant::now();
99            let elapsed = current_time.duration_since(last_time).as_secs_f64();
100
101            let rps = (current_count - last_count) as f64 / elapsed;
102            println!(
103                "[Stats] 当前RPS: {:.2} req/s, 总请求数: {}",
104                rps, current_count
105            );
106
107            last_count = current_count;
108            last_time = current_time;
109        }
110    });
111
112    // 启动服务器并等待Ctrl+C信号
113    let server_handle = tokio::spawn(async move {
114        if let Err(e) = server.run(shutdown_rx).await {
115            eprintln!("[Server] 运行时错误: {}", e);
116        }
117    });
118
119    println!("[Server] 按 Ctrl+C 停止服务器...");
120    tokio::signal::ctrl_c().await?;
121    println!("[Server] 接收到停止信号,正在关闭...");
122
123    // 发送关闭信号
124    let _ = shutdown_tx.send(());
125
126    // 等待服务器和统计任务完成
127    let _ = server_handle.await;
128    stats_handle.abort();
129
130    println!("[Server] 服务器已关闭");
131    Ok(())
132}
More examples
Hide additional examples
examples/echo_server_v1.rs (line 49)
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25    // ========================================
26    // 1. 创建关闭通道:用于外部控制服务器生命周期
27    // ========================================
28    // 当 shutdown_tx 被 drop 或 send(()) 时,shutdown_rx 将完成
29    // server.run() 中通过 tokio::select! 监听该信号,实现优雅退出
30    let (shutdown_tx, shutdown_rx) = oneshot::channel::<()>();
31
32    // ========================================
33    // 2. 创建并配置路由器
34    // ========================================
35    let router = Arc::new(DefaultRouter::new());
36
37    // 注册 msg_id = 1 的回显处理函数
38    let router_clone = router.clone();
39    router_clone.add_route(1, |req| {
40        println!("Received echo request: {:?}", req.data());
41        Response::new(req.msg_id(), req.data().to_vec()) // 原样返回
42    });
43
44    // ========================================
45    // 3. 启动服务器(异步任务)
46    // ========================================
47    let server = Server::new("127.0.0.1:8000", router);
48    let server_handle = tokio::spawn(async move {
49        if let Err(e) = server.run(shutdown_rx).await {
50            eprintln!("[Zerust] Server runtime error: {}", e);
51        }
52    });
53
54    // ========================================
55    // 4. 等待服务器就绪(端口探测)
56    // ========================================
57    // 替代 sleep(),更可靠:最多等待 5 秒,每 10ms 尝试一次连接
58    if let Err(_) = wait_for_server(8000, Duration::from_secs(5)).await {
59        eprintln!("[Client] Failed to connect to server within 5 seconds.");
60        return Err("Server did not start in time".into());
61    }
62    println!("[Client] Server is ready. Proceeding with test...");
63
64    // ========================================
65    // 5. 运行客户端测试
66    // ========================================
67    match client().await {
68        Ok(()) => println!("✅ Client finished successfully."),
69        Err(e) => eprintln!("❌ Client error: {}", e),
70    }
71
72    // ========================================
73    // 6. 发送关闭信号
74    // ========================================
75    // 客户端完成,通知服务器关闭
76    let _ = shutdown_tx.send(());
77    println!("[Main] Shutdown signal sent.");
78
79    // ========================================
80    // 7. 等待服务器完全停止
81    // ========================================
82    // 确保 server.run() 任务完全结束,避免资源泄漏
83    let _ = server_handle.await;
84
85    println!("🎉 Program exited gracefully.");
86    Ok(())
87}

Auto Trait Implementations§

§

impl Freeze for Server

§

impl !RefUnwindSafe for Server

§

impl Send for Server

§

impl Sync for Server

§

impl Unpin for Server

§

impl !UnwindSafe for Server

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.