1#![allow(clippy::missing_panics_doc)]
3use std::{fmt, io, marker::PhantomData, net, thread, time};
4
5use ntex_io::{Io, IoConfig};
6use ntex_net::tcp_connect;
7use ntex_rt::System;
8use ntex_service::{ServiceFactory, cfg::SharedCfg};
9use socket2::{Domain, SockAddr, Socket, Type};
10use uuid::Uuid;
11
12use super::{Server, ServerBuilder};
13
14pub struct TestServerBuilder<F, R> {
16 id: Uuid,
17 factory: F,
18 config: SharedCfg,
19 client_config: SharedCfg,
20 _t: PhantomData<R>,
21}
22
23impl<F, R> fmt::Debug for TestServerBuilder<F, R> {
24 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
25 f.debug_struct("TestServerBuilder")
26 .field("id", &self.id)
27 .field("config", &self.config)
28 .field("client_config", &self.client_config)
29 .finish()
30 }
31}
32
33impl<F, R> TestServerBuilder<F, R>
34where
35 F: AsyncFn() -> R + Send + Clone + 'static,
36 R: ServiceFactory<Io, SharedCfg> + 'static,
37{
38 #[must_use]
39 pub fn new(factory: F) -> Self {
41 Self {
42 factory,
43 id: Uuid::now_v7(),
44 config: SharedCfg::new("TEST-SERVER").into(),
45 client_config: SharedCfg::new("TEST-CLIENT").into(),
46 _t: PhantomData,
47 }
48 }
49
50 #[must_use]
51 pub fn config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
53 self.config = cfg.into();
54 self
55 }
56
57 #[must_use]
58 pub fn client_config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
60 self.client_config = cfg.into();
61 self
62 }
63
64 pub fn start(self) -> TestServer {
66 log::debug!("Starting test server {:?}", self.id);
67 let config = self.config;
68 let factory = self.factory;
69 let cfg = System::current().config();
70 let name = System::current().name().to_string();
71
72 let (tx, rx) = oneshot::channel();
73 thread::spawn(move || {
75 let sys = System::with_config(&name, cfg);
76 let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
77 let local_addr = tcp.local_addr().unwrap();
78 let system = sys.system();
79
80 sys.run(move || {
81 let server = Server::builder()
82 .listen("test", tcp, async move |_| factory().await)?
83 .config("test", config)
84 .workers(1)
85 .disable_signals()
86 .enable_affinity()
87 .run();
88
89 ntex_rt::spawn(async move {
90 tx.send((system, local_addr, server))
91 .expect("Failed to send Server to TestServer");
92 });
93
94 Ok(())
95 })
96 });
97 let (system, addr, server) = rx.recv().unwrap();
98 thread::sleep(time::Duration::from_millis(25));
99
100 TestServer {
101 addr,
102 server,
103 system,
104 id: self.id,
105 cfg: self.client_config,
106 }
107 }
108}
109
110pub fn test_server<F, R>(factory: F) -> TestServer
140where
141 F: AsyncFn() -> R + Send + Clone + 'static,
142 R: ServiceFactory<Io, SharedCfg> + 'static,
143{
144 TestServerBuilder::new(factory).start()
145}
146
147pub fn build_test_server<F>(factory: F) -> TestServer
149where
150 F: AsyncFnOnce(ServerBuilder) -> ServerBuilder + Send + 'static,
151{
152 let cfg = System::current().config();
153 let name = System::current().name().to_string();
154
155 let id = Uuid::now_v7();
156 log::debug!("Starting {name:?} server {id:?}");
157
158 let (tx, rx) = oneshot::channel();
159
160 thread::spawn(move || {
162 let sys = System::with_config(&name, cfg);
163 let system = sys.system();
164
165 sys.block_on(async move {
166 let server = factory(super::build())
167 .await
168 .workers(1)
169 .disable_signals()
170 .run();
171 tx.send((system, server.clone()))
172 .expect("Failed to send Server to TestServer");
173 let _ = server.await;
174 });
175 });
176 let (system, server) = rx.recv().unwrap();
177 thread::sleep(time::Duration::from_millis(25));
178
179 TestServer {
180 id,
181 system,
182 server,
183 addr: "127.0.0.1:0".parse().unwrap(),
184 cfg: SharedCfg::new("TEST-CLIENT").add(IoConfig::new()).into(),
185 }
186}
187
188#[derive(Debug)]
189pub struct TestServer {
191 id: Uuid,
192 addr: net::SocketAddr,
193 system: System,
194 server: Server,
195 cfg: SharedCfg,
196}
197
198impl TestServer {
199 pub fn addr(&self) -> net::SocketAddr {
201 self.addr
202 }
203
204 #[must_use]
205 pub fn set_addr(mut self, addr: net::SocketAddr) -> Self {
206 self.addr = addr;
207 self
208 }
209
210 pub fn config(&self) -> SharedCfg {
212 self.cfg
213 }
214
215 pub async fn connect(&self) -> io::Result<Io> {
217 tcp_connect(self.addr, self.cfg).await
218 }
219
220 pub fn stop(&self) {
222 drop(self.server.stop(true));
223 }
224
225 pub fn unused_addr() -> net::SocketAddr {
227 let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
228 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
229 socket.set_reuse_address(true).unwrap();
230 socket.bind(&SockAddr::from(addr)).unwrap();
231 let tcp = net::TcpListener::from(socket);
232 tcp.local_addr().unwrap()
233 }
234
235 pub fn server(&self) -> Server {
237 self.server.clone()
238 }
239}
240
241impl Drop for TestServer {
242 fn drop(&mut self) {
243 log::debug!("Stopping test server {:?}", self.id);
244 drop(self.server.stop(false));
245 thread::sleep(time::Duration::from_millis(75));
246 self.system.stop();
247 thread::sleep(time::Duration::from_millis(25));
248 }
249}