1#![allow(clippy::missing_panics_doc)]
3use std::{fmt, io, marker::PhantomData, net, thread, time};
4
5use ntex_io::{Io, IoConfig};
6use ntex_net::tcp_connect;
7use ntex_rt::System;
8use ntex_service::{ServiceFactory, cfg::SharedCfg};
9use socket2::{Domain, SockAddr, Socket, Type};
10use uuid::Uuid;
11
12use super::{Server, ServerBuilder};
13
14pub struct TestServerBuilder<F, R> {
16 id: Uuid,
17 factory: F,
18 config: SharedCfg,
19 client_config: SharedCfg,
20 _t: PhantomData<R>,
21}
22
23impl<F, R> fmt::Debug for TestServerBuilder<F, R> {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 f.debug_struct("TestServerBuilder")
26 .field("id", &self.id)
27 .field("config", &self.config)
28 .field("client_config", &self.client_config)
29 .finish()
30 }
31}
32
33impl<F, R> TestServerBuilder<F, R>
34where
35 F: AsyncFn() -> R + Send + Clone + 'static,
36 R: ServiceFactory<Io, SharedCfg> + 'static,
37{
38 #[must_use]
39 pub fn new(factory: F) -> Self {
41 Self {
42 factory,
43 id: Uuid::now_v7(),
44 config: SharedCfg::new("TEST-SERVER").into(),
45 client_config: SharedCfg::new("TEST-CLIENT").into(),
46 _t: PhantomData,
47 }
48 }
49
50 #[must_use]
51 pub fn config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
53 self.config = cfg.into();
54 self
55 }
56
57 #[must_use]
58 pub fn client_config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
60 self.client_config = cfg.into();
61 self
62 }
63
64 pub fn start(self) -> TestServer {
66 log::debug!("Starting test server {:?}", self.id);
67 let config = self.config;
68 let factory = self.factory;
69 let cfg = System::current().config();
70 let name = System::current().name().to_string();
71
72 let (tx, rx) = oneshot::channel();
73 thread::spawn(move || {
75 let sys = System::with_config(&name, cfg);
76 let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
77 let local_addr = tcp.local_addr().unwrap();
78
79 sys.run(move || {
80 let server = Server::builder()
81 .listen("test", tcp, async move |_| factory().await)?
82 .config("test", config)
83 .workers(1)
84 .disable_signals()
85 .enable_affinity()
86 .run();
87
88 ntex_rt::spawn(async move {
89 tx.send((System::current(), local_addr, server))
90 .expect("Failed to send Server to TestServer");
91 });
92
93 Ok(())
94 })
95 });
96 let (system, addr, server) = rx.recv().unwrap();
97 thread::sleep(time::Duration::from_millis(25));
98
99 TestServer {
100 addr,
101 server,
102 system,
103 id: self.id,
104 cfg: self.client_config,
105 }
106 }
107}
108
109pub fn test_server<F, R>(factory: F) -> TestServer
139where
140 F: AsyncFn() -> R + Send + Clone + 'static,
141 R: ServiceFactory<Io, SharedCfg> + 'static,
142{
143 TestServerBuilder::new(factory).start()
144}
145
146pub fn build_test_server<F>(factory: F) -> TestServer
148where
149 F: AsyncFnOnce(ServerBuilder) -> ServerBuilder + Send + 'static,
150{
151 let cfg = System::current().config();
152 let name = System::current().name().to_string();
153
154 let id = Uuid::now_v7();
155 log::debug!("Starting {name:?} server {id:?}");
156
157 let (tx, rx) = oneshot::channel();
158
159 thread::spawn(move || {
161 let sys = System::with_config(&name, cfg);
162
163 sys.block_on(async move {
164 let server = factory(super::build())
165 .await
166 .workers(1)
167 .disable_signals()
168 .run();
169 tx.send((System::current(), server.clone()))
170 .expect("Failed to send Server to TestServer");
171 let _ = server.await;
172 });
173 });
174 let (system, server) = rx.recv().unwrap();
175 thread::sleep(time::Duration::from_millis(25));
176
177 TestServer {
178 id,
179 system,
180 server,
181 addr: "127.0.0.1:0".parse().unwrap(),
182 cfg: SharedCfg::new("TEST-CLIENT").add(IoConfig::new()).into(),
183 }
184}
185
186#[derive(Debug)]
187pub struct TestServer {
189 id: Uuid,
190 addr: net::SocketAddr,
191 system: System,
192 server: Server,
193 cfg: SharedCfg,
194}
195
196impl TestServer {
197 pub fn addr(&self) -> net::SocketAddr {
199 self.addr
200 }
201
202 #[must_use]
203 pub fn set_addr(mut self, addr: net::SocketAddr) -> Self {
204 self.addr = addr;
205 self
206 }
207
208 pub fn config(&self) -> SharedCfg {
210 self.cfg.clone()
211 }
212
213 pub async fn connect(&self) -> io::Result<Io> {
215 tcp_connect(self.addr, self.cfg.clone()).await
216 }
217
218 pub fn stop(&self) {
220 drop(self.server.stop(true));
221 }
222
223 pub fn unused_addr() -> net::SocketAddr {
225 let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
226 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
227 socket.set_reuse_address(true).unwrap();
228 socket.bind(&SockAddr::from(addr)).unwrap();
229 let tcp = net::TcpListener::from(socket);
230 tcp.local_addr().unwrap()
231 }
232
233 pub fn server(&self) -> Server {
235 self.server.clone()
236 }
237}
238
239impl Drop for TestServer {
240 fn drop(&mut self) {
241 log::debug!("Stopping test server {:?}", self.id);
242 drop(self.server.stop(false));
243 thread::sleep(time::Duration::from_millis(75));
244 self.system.stop();
245 thread::sleep(time::Duration::from_millis(25));
246 }
247}