hyper_simple_server/
server.rs

1
2
3use crate::prelude::*;
4
5
6
7
8#[ derive (Clone) ]
9#[ cfg (feature = "hss-server-core") ]
10#[ cfg (feature = "hyper--server") ]
11pub struct Server {
12	internals : ServerInternals,
13}
14
15#[ cfg (feature = "hss-server-core") ]
16#[ cfg (feature = "hyper--server") ]
17struct ServerInternals0 {
18	configuration : Configuration,
19}
20
21#[ cfg (feature = "hss-server-core") ]
22#[ cfg (feature = "hyper--server") ]
23type ServerInternals = Arc<RwLock<ServerInternals0>>;
24
25
26
27
28#[ cfg (feature = "hss-server-core") ]
29#[ cfg (feature = "hyper--server") ]
30impl Server {
31	
32	pub fn new (_configuration : Configuration) -> ServerResult<Self> {
33		let _self = ServerInternals0 {
34				configuration : _configuration,
35			};
36		let _self = Server {
37				internals : Arc::new (RwLock::new (_self)),
38			};
39		Ok (_self)
40	}
41}
42
43
44#[ cfg (feature = "hss-handler") ]
45#[ cfg (feature = "hss-server-core") ]
46#[ cfg (feature = "hyper--server") ]
47impl Server {
48	
49	pub fn run_and_wait (_configuration : Configuration) -> ServerResult {
50		let _handler = Self::handler_0 (&_configuration) ?;
51		Self::run_and_wait_with_handler (_configuration, _handler)
52	}
53	
54	pub async fn run (_configuration : Configuration) -> ServerResult {
55		let _handler = Self::handler_0 (&_configuration) ?;
56		Self::run_with_handler (_configuration, _handler) .await
57	}
58	
59	pub fn serve_and_wait (&self) -> ServerResult {
60		let _handler = self.handler () ?;
61		self.serve_and_wait_with_handler (_handler)
62	}
63	
64	pub async fn serve (&self) -> ServerResult {
65		let _handler = self.handler () ?;
66		self.serve_with_handler (_handler) .await
67	}
68	
69	fn handler (&self) -> ServerResult<HandlerDynArc> {
70		let _self = self.internals.read () .or_wrap (0x0f9770a1) ?;
71		Self::handler_0 (&_self.configuration)
72	}
73	
74	fn handler_0 (_configuration : &Configuration) -> ServerResult<HandlerDynArc> {
75		if let Some (_handler) = _configuration.handler.clone () {
76			Ok (_handler)
77		} else {
78			Err (error_with_message (0x55a5104c, "no handler specified"))
79		}
80	}
81}
82
83
84#[ cfg (feature = "hss-handler") ]
85#[ cfg (feature = "hss-server-core") ]
86#[ cfg (feature = "hyper--server") ]
87impl Server
88{
89	pub fn run_and_wait_with_handler <H, F> (_configuration : Configuration, _handler : H) -> ServerResult
90			where
91				H : Handler<Future = F> + Send + Sync + 'static + Clone,
92				F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
93	{
94		let _server = Server::new (_configuration) ?;
95		_server.serve_and_wait_with_handler (_handler)
96	}
97	
98	pub async fn run_with_handler <H, F> (_configuration : Configuration, _handler : H) -> ServerResult
99			where
100				H : Handler<Future = F> + Send + Sync + 'static + Clone,
101				F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
102	{
103		let _server = Server::new (_configuration) ?;
104		_server.serve_with_handler (_handler) .await
105	}
106	
107	pub fn serve_and_wait_with_handler <H, F> (&self, _handler : H) -> ServerResult
108			where
109				H : Handler<Future = F> + Send + Sync + 'static + Clone,
110				F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
111	{
112		#[ cfg (feature = "hss-server-profiling") ]
113		let _profiling = {
114			let _self = self.internals.read () .or_panic (0x0a78cbe3);
115			if let Some (_path) = &_self.configuration.profiling {
116				Some (ProfilingSession::new_and_start (_path) ?)
117			} else {
118				None
119			}
120		};
121		
122		let _runtime = self.serve_runtime () ?;
123		let _future = self.serve_with_handler (_handler);
124		let _outcome = _runtime.block_on (_future);
125		
126		#[ cfg (feature = "hss-server-profiling") ]
127		if let Some (_profiling) = _profiling {
128			_profiling.stop_and_drop () ?;
129		}
130		
131		_outcome
132	}
133	
134	pub async fn serve_with_handler <H, F> (&self, _handler : H) -> ServerResult
135			where
136				H : Handler<Future = F> + Send + Sync + 'static + Clone,
137				F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
138	{
139		let _service = move |_ : &Connection| {
140				let _service = _handler.clone () .wrap ();
141				let _service = ServiceWrapper (_service);
142				async {
143					ServerResult::Ok (_service)
144				}
145			};
146		
147		self.serve_with_make_service_fn (_service) .await
148	}
149}
150
151
152
153
154#[ cfg (feature = "hss-server-core") ]
155#[ cfg (feature = "hyper--server") ]
156impl Server {
157	
158	pub fn serve_builder (&self) -> ServerResult<hyper::Builder<Accepter, ServerExecutor>> {
159		
160		let _self = self.internals.read () .or_panic (0x62cbf380);
161		
162		eprintln! ("[ii] [83af6f05]  server listening on `{}`;", _self.configuration.endpoint.url ());
163		
164		let _accepter = Accepter::new (&_self.configuration.endpoint) ?;
165		let _protocol = self.serve_protocol () ?;
166		let _executor = ServerExecutor ();
167		
168		let _builder = hyper::Builder::new (_accepter, _protocol);
169		let _builder = _builder.executor (_executor);
170		
171		Ok (_builder)
172	}
173	
174	pub async fn serve_with_service_fn <S, SF, SB, SBD> (&self, _service : S) -> ServerResult
175			where
176				S : FnMut (Request<Body>) -> SF + Send + 'static + Clone,
177				SF : Future<Output = Result<Response<SB>, io::Error>> + Send + 'static,
178				SB : BodyTrait<Data = SBD, Error = io::Error> + Send + Sync + 'static,
179				SBD : Buf + Send + 'static,
180	{
181		let _make_service = move |_ : &Connection| {
182				let _service = _service.clone ();
183				let _service = hyper::service_fn (_service);
184				let _service = ServiceWrapper (_service);
185				async {
186					ServerResult::Ok (_service)
187				}
188			};
189		
190		self.serve_with_make_service_fn (_make_service).await
191	}
192	
193	pub async fn serve_with_make_service_fn <M, MF, ME, S, SF, SE, SB, SBD, SBE> (&self, _make_service : M) -> ServerResult
194			where
195				M : FnMut (&Connection) -> MF + Send + 'static,
196				MF : Future<Output = Result<S, ME>> + Send + 'static,
197				ME : Error + Send + Sync + 'static,
198				S : hyper::Service<Request<Body>, Response = Response<SB>, Future = SF, Error = SE> + Send + 'static,
199				SE : Error + Send + Sync + 'static,
200				SF : Future<Output = Result<Response<SB>, SE>> + Send + 'static,
201				SB : BodyTrait<Data = SBD, Error = SBE> + Send + Sync + 'static,
202				SBD : Buf + Send + 'static,
203				SBE : Error + Send + Sync + 'static,
204	{
205		
206		let _service = hyper::make_service_fn (_make_service);
207		let _builder = self.serve_builder () ?;
208		
209		let _future = _builder.serve (_service);
210		let _future = _future.with_graceful_shutdown (async { tokio::ctrl_c () .await .or_panic (0xa011830e); });
211		
212		#[ cfg (debug_assertions) ]
213		eprintln! ("[ii] [3aed0938]  server initialized;");
214		
215		let _outcome = _future.await;
216		
217		#[ cfg (debug_assertions) ]
218		eprintln! ("[ii] [3eff9778]  server terminated;");
219		
220		let _outcome = _outcome.or_wrap (0x73080376);
221		_outcome
222	}
223	
224	pub fn serve_protocol (&self) -> ServerResult<hyper::Http> {
225		
226		let _self = self.internals.read () .or_panic (0xdd5eec49);
227		let _protocol = &_self.configuration.endpoint.protocol;
228		
229		let mut _http = hyper::Http::new ();
230		
231		#[ cfg (feature = "hyper--server-http1") ]
232		if _protocol.supports_http1_only () {
233			_http.http1_only (true);
234		}
235		#[ cfg (feature = "hyper--server-http1") ]
236		if _protocol.supports_http1 () {
237			_http.http1_keep_alive (true);
238			_http.http1_half_close (true);
239			_http.max_buf_size (16 * 1024);
240		}
241		
242		#[ cfg (feature = "hyper--server-http2") ]
243		if _protocol.supports_http2_only () {
244			_http.http2_only (true);
245		}
246		#[ cfg (feature = "hyper--server-http2") ]
247		if _protocol.supports_http2 () {
248			_http.http2_max_concurrent_streams (128);
249			#[ cfg (feature = "hyper--runtime") ]
250			_http.http2_keep_alive_interval (Some (time::Duration::new (6, 0)));
251			#[ cfg (feature = "hyper--runtime") ]
252			_http.http2_keep_alive_timeout (time::Duration::new (30, 0));
253		}
254		
255		Ok (_http)
256	}
257	
258	pub fn serve_runtime (&self) -> ServerResult<Runtime> {
259		
260		let _self = self.internals.read () .or_panic (0xfc9b9ffb);
261		
262		#[ cfg (feature = "hss-jemalloc") ]
263		if true {
264			#[ cfg (debug_assertions) ]
265			eprintln! ("[ii] [cecdcf1b]  using `jemalloc` allocator;");
266			#[ cfg (feature = "hss-server-debug-jemalloc") ]
267			server_start_jemalloc_stats ();
268		}
269		
270		#[ cfg (feature = "hss-server-debug-strace") ]
271		if true {
272			server_start_strace ();
273		}
274		
275		let mut _runtime_0 = None;
276		
277		#[ cfg (feature = "hss-server-mt") ]
278		if let Some (_threads) = _self.configuration.threads {
279			if _threads > 0 {
280				#[ cfg (debug_assertions) ]
281				eprintln! ("[ii] [cf4d96e6]  using multi-threaded executor (with {} threads);", _threads);
282				let _runtime = runtime_multiple_threads (Some (_threads)) ?;
283				_runtime_0 = Some (_runtime);
284			}
285		}
286		
287		if _runtime_0.is_none () {
288			#[ cfg (debug_assertions) ]
289			eprintln! ("[ii] [25065ee8]  using current-thread executor (with 1 thread);");
290			let _runtime = runtime_current_thread () ?;
291			_runtime_0 = Some (_runtime);
292		};
293		
294		let _runtime = _runtime_0.infallible (0xfb2d7cfb);
295		
296		#[ cfg (feature = "hss-server-sanitize") ]
297		#[ cfg (debug_assertions) ]
298		eprintln! ("[ii] [3c1badd4]  using URI sanitizer;");
299		
300		Ok (_runtime)
301	}
302}
303
304
305
306
307#[ cfg (feature = "hss-server-core") ]
308#[ cfg (feature = "hyper--server") ]
309struct ServiceWrapper <S> (S)
310	where
311		S : hyper::Service<Request<Body>, Error = io::Error>,
312;
313
314#[ cfg (feature = "hss-server-core") ]
315#[ cfg (feature = "hyper--server") ]
316#[ allow (dead_code) ]
317enum ServiceWrapperFuture <S>
318	where
319		S : hyper::Service<Request<Body>, Error = io::Error>,
320{
321	Future (S::Future),
322	Error (io::Error),
323	Done,
324}
325
326
327#[ cfg (feature = "hss-server-core") ]
328#[ cfg (feature = "hyper--server") ]
329impl <S> hyper::Service<Request<Body>> for ServiceWrapper<S>
330	where
331		S : hyper::Service<Request<Body>, Error = io::Error>,
332{
333	type Future = ServiceWrapperFuture<S>;
334	type Response = S::Response;
335	type Error = io::Error;
336	
337	fn poll_ready (&mut self, _context : &mut Context<'_>) -> Poll<Result<(), io::Error>> {
338		self.0.poll_ready (_context)
339	}
340	
341	fn call (&mut self, mut _request : Request<Body>) -> Self::Future {
342		
343		#[ cfg (feature = "hss-server-sanitize") ]
344		match sanitize_uri (_request.uri ()) {
345			Err (_error) => {
346				if true {
347					eprintln! ("[ww] [aace2099]  URI sanitize failed for `{}`:  {}", _request.uri (), _error);
348				}
349				return ServiceWrapperFuture::Error (_error);
350			}
351			Ok (Some (_uri)) => {
352				if true {
353					eprintln! ("[ww] [d1e356bc]  URI sanitized to `{}` from `{}`;", _uri, _request.uri ());
354				}
355				* _request.uri_mut () = _uri;
356			}
357			Ok (None) => (),
358		}
359		
360		let _future = self.0.call (_request);
361		ServiceWrapperFuture::Future (_future)
362	}
363}
364
365#[ cfg (feature = "hss-server-core") ]
366#[ cfg (feature = "hyper--server") ]
367impl <S> Future for ServiceWrapperFuture<S>
368	where
369		S : hyper::Service<Request<Body>, Error = io::Error>,
370{
371	type Output = <S::Future as Future>::Output;
372	
373	fn poll (self : Pin<&mut Self>, _context : &mut Context<'_>) -> Poll<Self::Output> {
374		#[ allow (unsafe_code) ]
375		let _self_0 = unsafe { self.get_unchecked_mut () };
376		match _self_0 {
377			ServiceWrapperFuture::Future (_future) => {
378				#[ allow (unsafe_code) ]
379				let _delegate = unsafe { Pin::new_unchecked (_future) };
380				match _delegate.poll (_context) {
381					_outcome @ Poll::Pending =>
382						_outcome,
383					_outcome @ Poll::Ready (Ok (_)) => {
384						let _ = mem::replace (_self_0, ServiceWrapperFuture::Done);
385						_outcome
386					}
387					Poll::Ready (Err (_error)) => {
388						if true {
389							eprintln! ("[ee] [540dc2bc]  handler failed:  {}", _error);
390						}
391						Poll::Ready (Err (_error))
392					}
393				}
394			}
395			ServiceWrapperFuture::Error (_error) => {
396				let _self_1 = mem::replace (_self_0, ServiceWrapperFuture::Done);
397				if let ServiceWrapperFuture::Error (_error) = _self_1 {
398					Poll::Ready (Err (_error))
399				} else {
400					panic_with_code (0xd83566d8);
401				}
402			}
403			ServiceWrapperFuture::Done =>
404				Poll::Ready (Err (error_with_code (0x0722e578))),
405		}
406	}
407}
408
409
410
411
412#[ derive (Clone) ]
413#[ cfg (feature = "hss-server-core") ]
414#[ cfg (feature = "hyper--server") ]
415pub struct ServerExecutor ();
416
417#[ cfg (feature = "hss-server-core") ]
418#[ cfg (feature = "hyper--server") ]
419impl <F> hyper::Executor<F> for ServerExecutor
420		where
421			F : Future<Output = ()> + Send + 'static,
422{
423	fn execute (&self, _future : F) {
424		tokio::spawn (_future);
425	}
426}
427
428
429
430
431#[ cfg (feature = "hss-server-core") ]
432#[ cfg (feature = "hss-server-debug-strace") ]
433fn server_start_strace () -> () {
434	
435	#[ cfg (debug_assertions) ]
436	eprintln! ("[ii] [19f96abc]  starting `strace` tracing...");
437	
438	process::Command::new ("strace")
439			.args (&["-f", "-p", & process::id () .to_string ()])
440			.spawn ()
441			.or_panic (0xff87ffef);
442}
443
444
445#[ cfg (feature = "hss-server-core") ]
446#[ cfg (feature = "hss-jemalloc") ]
447#[ cfg (feature = "hss-server-debug-jemalloc") ]
448fn server_start_jemalloc_stats () -> () {
449	
450	#[ cfg (debug_assertions) ]
451	eprintln! ("[ii] [19f5dcf1]  starting `jemalloc` tracing...");
452	
453	extern "C" fn _write (_ : * mut os::raw::c_void, _message : * const os::raw::c_char) {
454		#[ allow (unsafe_code) ]
455		let _message = unsafe { ffi::CStr::from_ptr (_message) };
456		let _message = _message.to_str () .or_panic (0x2d88d281);
457		for _message in _message.split_terminator ("\n") {
458			if (_message == "___ Begin jemalloc statistics ___") || (_message == "--- End jemalloc statistics ---") {
459				continue;
460			}
461			if _message == "Background threads: 0, num_runs: 0, run_interval: 0 ns" {
462				continue;
463			}
464			eprintln! ("[dd] [35256205]  jemalloc statistics:  {}", _message);
465		}
466	}
467	thread::spawn (|| {
468		let _options = &b"gmdablxe\0"[..];
469			loop {
470				#[ allow (unsafe_code) ]
471				unsafe { ::jemalloc_sys::malloc_stats_print (Some (_write), ptr::null_mut (), _options.as_ptr () as * const os::raw::c_char) };
472				thread::sleep (time::Duration::from_secs (1));
473		}
474	});
475}
476
477
478
479
480#[ cfg (feature = "tokio--rt-multi-thread") ]
481pub fn runtime_multiple_threads (_threads : Option<usize>) -> ServerResult<Runtime> {
482	let _threads = _threads.unwrap_or (1);
483	let mut _builder = tokio::RuntimeBuilder::new_multi_thread ();
484	_builder.worker_threads (_threads);
485	_builder.max_blocking_threads (_threads * 4);
486	_builder.thread_keep_alive (time::Duration::from_secs (60));
487	_builder.enable_all ();
488	_builder.build () .or_wrap (0x2692223a)
489}
490
491#[ cfg (feature = "tokio--rt") ]
492pub fn runtime_current_thread () -> ServerResult<Runtime> {
493	let mut _builder = tokio::RuntimeBuilder::new_current_thread ();
494	_builder.enable_all ();
495	_builder.build () .or_wrap (0x280fcb72)
496}
497