1use std::{fmt, io, marker::PhantomData, net, thread, time};
3
4use ntex_io::{Io, IoConfig};
5use ntex_net::tcp_connect;
6use ntex_rt::System;
7use ntex_service::{ServiceFactory, cfg::SharedCfg};
8use socket2::{Domain, SockAddr, Socket, Type};
9use uuid::Uuid;
10
11use super::{Server, ServerBuilder};
12
13pub struct TestServerBuilder<F, R> {
15 id: Uuid,
16 factory: F,
17 config: SharedCfg,
18 client_config: SharedCfg,
19 _t: PhantomData<R>,
20}
21
22impl<F, R> fmt::Debug for TestServerBuilder<F, R> {
23 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
24 f.debug_struct("TestServerBuilder")
25 .field("id", &self.id)
26 .field("config", &self.config)
27 .field("client_config", &self.client_config)
28 .finish()
29 }
30}
31
32impl<F, R> TestServerBuilder<F, R>
33where
34 F: AsyncFn() -> R + Send + Clone + 'static,
35 R: ServiceFactory<Io, SharedCfg> + 'static,
36{
37 pub fn new(factory: F) -> Self {
38 Self {
39 factory,
40 id: Uuid::now_v7(),
41 config: SharedCfg::new("TEST-SERVER").into(),
42 client_config: SharedCfg::new("TEST-CLIENT").into(),
43 _t: PhantomData,
44 }
45 }
46
47 pub fn config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
49 self.config = cfg.into();
50 self
51 }
52
53 pub fn client_config<T: Into<SharedCfg>>(mut self, cfg: T) -> Self {
55 self.client_config = cfg.into();
56 self
57 }
58
59 pub fn start(self) -> TestServer {
61 log::debug!("Starting test server {:?}", self.id);
62 let config = self.config;
63 let factory = self.factory;
64 let cfg = System::current().config();
65 let name = System::current().name().to_string();
66
67 let (tx, rx) = oneshot::channel();
68 thread::spawn(move || {
70 let sys = System::with_config(&name, cfg);
71 let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap();
72 let local_addr = tcp.local_addr().unwrap();
73 let system = sys.system();
74
75 sys.run(move || {
76 let server = Server::builder()
77 .listen("test", tcp, async move |_| factory().await)?
78 .config("test", config)
79 .workers(1)
80 .disable_signals()
81 .enable_affinity()
82 .run();
83
84 ntex_rt::spawn(async move {
85 tx.send((system, local_addr, server))
86 .expect("Failed to send Server to TestServer");
87 });
88
89 Ok(())
90 })
91 });
92 let (system, addr, server) = rx.recv().unwrap();
93 thread::sleep(time::Duration::from_millis(25));
94
95 TestServer {
96 addr,
97 server,
98 system,
99 id: self.id,
100 cfg: self.client_config,
101 }
102 }
103}
104
105pub fn test_server<F, R>(factory: F) -> TestServer
135where
136 F: AsyncFn() -> R + Send + Clone + 'static,
137 R: ServiceFactory<Io, SharedCfg> + 'static,
138{
139 TestServerBuilder::new(factory).start()
140}
141
142pub fn build_test_server<F>(factory: F) -> TestServer
144where
145 F: AsyncFnOnce(ServerBuilder) -> ServerBuilder + Send + 'static,
146{
147 let cfg = System::current().config();
148 let name = System::current().name().to_string();
149
150 let id = Uuid::now_v7();
151 log::debug!("Starting {:?} server {:?}", name, id);
152
153 let (tx, rx) = oneshot::channel();
154
155 thread::spawn(move || {
157 let sys = System::with_config(&name, cfg);
158 let system = sys.system();
159
160 sys.block_on(async move {
161 let server = factory(super::build())
162 .await
163 .workers(1)
164 .disable_signals()
165 .run();
166 tx.send((system, server.clone()))
167 .expect("Failed to send Server to TestServer");
168 let _ = server.await;
169 });
170 });
171 let (system, server) = rx.recv().unwrap();
172 thread::sleep(time::Duration::from_millis(25));
173
174 TestServer {
175 id,
176 system,
177 server,
178 addr: "127.0.0.1:0".parse().unwrap(),
179 cfg: SharedCfg::new("TEST-CLIENT").add(IoConfig::new()).into(),
180 }
181}
182
183#[derive(Debug)]
184pub struct TestServer {
186 id: Uuid,
187 addr: net::SocketAddr,
188 system: System,
189 server: Server,
190 cfg: SharedCfg,
191}
192
193impl TestServer {
194 pub fn addr(&self) -> net::SocketAddr {
196 self.addr
197 }
198
199 pub fn set_addr(mut self, addr: net::SocketAddr) -> Self {
200 self.addr = addr;
201 self
202 }
203
204 pub fn config(&self) -> SharedCfg {
206 self.cfg
207 }
208
209 pub async fn connect(&self) -> io::Result<Io> {
211 tcp_connect(self.addr, self.cfg).await
212 }
213
214 pub fn stop(&self) {
216 let _ = self.server.stop(true);
217 }
218
219 pub fn unused_addr() -> net::SocketAddr {
221 let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap();
222 let socket = Socket::new(Domain::IPV4, Type::STREAM, None).unwrap();
223 socket.set_reuse_address(true).unwrap();
224 socket.bind(&SockAddr::from(addr)).unwrap();
225 let tcp = net::TcpListener::from(socket);
226 tcp.local_addr().unwrap()
227 }
228
229 pub fn server(&self) -> Server {
231 self.server.clone()
232 }
233}
234
235impl Drop for TestServer {
236 fn drop(&mut self) {
237 log::debug!("Stopping test server {:?}", self.id);
238 let _ = self.server.stop(false);
239 thread::sleep(time::Duration::from_millis(75));
240 self.system.stop();
241 thread::sleep(time::Duration::from_millis(25));
242 }
243}