1
2
3use crate::prelude::*;
4
5
6
7
8#[ derive (Clone) ]
9#[ cfg (feature = "hss-server-core") ]
10#[ cfg (feature = "hyper--server") ]
11pub struct Server {
12 internals : ServerInternals,
13}
14
15#[ cfg (feature = "hss-server-core") ]
16#[ cfg (feature = "hyper--server") ]
17struct ServerInternals0 {
18 configuration : Configuration,
19}
20
21#[ cfg (feature = "hss-server-core") ]
22#[ cfg (feature = "hyper--server") ]
23type ServerInternals = Arc<RwLock<ServerInternals0>>;
24
25
26
27
28#[ cfg (feature = "hss-server-core") ]
29#[ cfg (feature = "hyper--server") ]
30impl Server {
31
32 pub fn new (_configuration : Configuration) -> ServerResult<Self> {
33 let _self = ServerInternals0 {
34 configuration : _configuration,
35 };
36 let _self = Server {
37 internals : Arc::new (RwLock::new (_self)),
38 };
39 Ok (_self)
40 }
41}
42
43
44#[ cfg (feature = "hss-handler") ]
45#[ cfg (feature = "hss-server-core") ]
46#[ cfg (feature = "hyper--server") ]
47impl Server {
48
49 pub fn run_and_wait (_configuration : Configuration) -> ServerResult {
50 let _handler = Self::handler_0 (&_configuration) ?;
51 Self::run_and_wait_with_handler (_configuration, _handler)
52 }
53
54 pub async fn run (_configuration : Configuration) -> ServerResult {
55 let _handler = Self::handler_0 (&_configuration) ?;
56 Self::run_with_handler (_configuration, _handler) .await
57 }
58
59 pub fn serve_and_wait (&self) -> ServerResult {
60 let _handler = self.handler () ?;
61 self.serve_and_wait_with_handler (_handler)
62 }
63
64 pub async fn serve (&self) -> ServerResult {
65 let _handler = self.handler () ?;
66 self.serve_with_handler (_handler) .await
67 }
68
69 fn handler (&self) -> ServerResult<HandlerDynArc> {
70 let _self = self.internals.read () .or_wrap (0x0f9770a1) ?;
71 Self::handler_0 (&_self.configuration)
72 }
73
74 fn handler_0 (_configuration : &Configuration) -> ServerResult<HandlerDynArc> {
75 if let Some (_handler) = _configuration.handler.clone () {
76 Ok (_handler)
77 } else {
78 Err (error_with_message (0x55a5104c, "no handler specified"))
79 }
80 }
81}
82
83
84#[ cfg (feature = "hss-handler") ]
85#[ cfg (feature = "hss-server-core") ]
86#[ cfg (feature = "hyper--server") ]
87impl Server
88{
89 pub fn run_and_wait_with_handler <H, F> (_configuration : Configuration, _handler : H) -> ServerResult
90 where
91 H : Handler<Future = F> + Send + Sync + 'static + Clone,
92 F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
93 {
94 let _server = Server::new (_configuration) ?;
95 _server.serve_and_wait_with_handler (_handler)
96 }
97
98 pub async fn run_with_handler <H, F> (_configuration : Configuration, _handler : H) -> ServerResult
99 where
100 H : Handler<Future = F> + Send + Sync + 'static + Clone,
101 F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
102 {
103 let _server = Server::new (_configuration) ?;
104 _server.serve_with_handler (_handler) .await
105 }
106
107 pub fn serve_and_wait_with_handler <H, F> (&self, _handler : H) -> ServerResult
108 where
109 H : Handler<Future = F> + Send + Sync + 'static + Clone,
110 F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
111 {
112 #[ cfg (feature = "hss-server-profiling") ]
113 let _profiling = {
114 let _self = self.internals.read () .or_panic (0x0a78cbe3);
115 if let Some (_path) = &_self.configuration.profiling {
116 Some (ProfilingSession::new_and_start (_path) ?)
117 } else {
118 None
119 }
120 };
121
122 let _runtime = self.serve_runtime () ?;
123 let _future = self.serve_with_handler (_handler);
124 let _outcome = _runtime.block_on (_future);
125
126 #[ cfg (feature = "hss-server-profiling") ]
127 if let Some (_profiling) = _profiling {
128 _profiling.stop_and_drop () ?;
129 }
130
131 _outcome
132 }
133
134 pub async fn serve_with_handler <H, F> (&self, _handler : H) -> ServerResult
135 where
136 H : Handler<Future = F> + Send + Sync + 'static + Clone,
137 F : Future<Output = ServerResult<Response<H::ResponseBody>>> + Send + 'static,
138 {
139 let _service = move |_ : &Connection| {
140 let _service = _handler.clone () .wrap ();
141 let _service = ServiceWrapper (_service);
142 async {
143 ServerResult::Ok (_service)
144 }
145 };
146
147 self.serve_with_make_service_fn (_service) .await
148 }
149}
150
151
152
153
154#[ cfg (feature = "hss-server-core") ]
155#[ cfg (feature = "hyper--server") ]
156impl Server {
157
158 pub fn serve_builder (&self) -> ServerResult<hyper::Builder<Accepter, ServerExecutor>> {
159
160 let _self = self.internals.read () .or_panic (0x62cbf380);
161
162 eprintln! ("[ii] [83af6f05] server listening on `{}`;", _self.configuration.endpoint.url ());
163
164 let _accepter = Accepter::new (&_self.configuration.endpoint) ?;
165 let _protocol = self.serve_protocol () ?;
166 let _executor = ServerExecutor ();
167
168 let _builder = hyper::Builder::new (_accepter, _protocol);
169 let _builder = _builder.executor (_executor);
170
171 Ok (_builder)
172 }
173
174 pub async fn serve_with_service_fn <S, SF, SB, SBD> (&self, _service : S) -> ServerResult
175 where
176 S : FnMut (Request<Body>) -> SF + Send + 'static + Clone,
177 SF : Future<Output = Result<Response<SB>, io::Error>> + Send + 'static,
178 SB : BodyTrait<Data = SBD, Error = io::Error> + Send + Sync + 'static,
179 SBD : Buf + Send + 'static,
180 {
181 let _make_service = move |_ : &Connection| {
182 let _service = _service.clone ();
183 let _service = hyper::service_fn (_service);
184 let _service = ServiceWrapper (_service);
185 async {
186 ServerResult::Ok (_service)
187 }
188 };
189
190 self.serve_with_make_service_fn (_make_service).await
191 }
192
193 pub async fn serve_with_make_service_fn <M, MF, ME, S, SF, SE, SB, SBD, SBE> (&self, _make_service : M) -> ServerResult
194 where
195 M : FnMut (&Connection) -> MF + Send + 'static,
196 MF : Future<Output = Result<S, ME>> + Send + 'static,
197 ME : Error + Send + Sync + 'static,
198 S : hyper::Service<Request<Body>, Response = Response<SB>, Future = SF, Error = SE> + Send + 'static,
199 SE : Error + Send + Sync + 'static,
200 SF : Future<Output = Result<Response<SB>, SE>> + Send + 'static,
201 SB : BodyTrait<Data = SBD, Error = SBE> + Send + Sync + 'static,
202 SBD : Buf + Send + 'static,
203 SBE : Error + Send + Sync + 'static,
204 {
205
206 let _service = hyper::make_service_fn (_make_service);
207 let _builder = self.serve_builder () ?;
208
209 let _future = _builder.serve (_service);
210 let _future = _future.with_graceful_shutdown (async { tokio::ctrl_c () .await .or_panic (0xa011830e); });
211
212 #[ cfg (debug_assertions) ]
213 eprintln! ("[ii] [3aed0938] server initialized;");
214
215 let _outcome = _future.await;
216
217 #[ cfg (debug_assertions) ]
218 eprintln! ("[ii] [3eff9778] server terminated;");
219
220 let _outcome = _outcome.or_wrap (0x73080376);
221 _outcome
222 }
223
224 pub fn serve_protocol (&self) -> ServerResult<hyper::Http> {
225
226 let _self = self.internals.read () .or_panic (0xdd5eec49);
227 let _protocol = &_self.configuration.endpoint.protocol;
228
229 let mut _http = hyper::Http::new ();
230
231 #[ cfg (feature = "hyper--server-http1") ]
232 if _protocol.supports_http1_only () {
233 _http.http1_only (true);
234 }
235 #[ cfg (feature = "hyper--server-http1") ]
236 if _protocol.supports_http1 () {
237 _http.http1_keep_alive (true);
238 _http.http1_half_close (true);
239 _http.max_buf_size (16 * 1024);
240 }
241
242 #[ cfg (feature = "hyper--server-http2") ]
243 if _protocol.supports_http2_only () {
244 _http.http2_only (true);
245 }
246 #[ cfg (feature = "hyper--server-http2") ]
247 if _protocol.supports_http2 () {
248 _http.http2_max_concurrent_streams (128);
249 #[ cfg (feature = "hyper--runtime") ]
250 _http.http2_keep_alive_interval (Some (time::Duration::new (6, 0)));
251 #[ cfg (feature = "hyper--runtime") ]
252 _http.http2_keep_alive_timeout (time::Duration::new (30, 0));
253 }
254
255 Ok (_http)
256 }
257
258 pub fn serve_runtime (&self) -> ServerResult<Runtime> {
259
260 let _self = self.internals.read () .or_panic (0xfc9b9ffb);
261
262 #[ cfg (feature = "hss-jemalloc") ]
263 if true {
264 #[ cfg (debug_assertions) ]
265 eprintln! ("[ii] [cecdcf1b] using `jemalloc` allocator;");
266 #[ cfg (feature = "hss-server-debug-jemalloc") ]
267 server_start_jemalloc_stats ();
268 }
269
270 #[ cfg (feature = "hss-server-debug-strace") ]
271 if true {
272 server_start_strace ();
273 }
274
275 let mut _runtime_0 = None;
276
277 #[ cfg (feature = "hss-server-mt") ]
278 if let Some (_threads) = _self.configuration.threads {
279 if _threads > 0 {
280 #[ cfg (debug_assertions) ]
281 eprintln! ("[ii] [cf4d96e6] using multi-threaded executor (with {} threads);", _threads);
282 let _runtime = runtime_multiple_threads (Some (_threads)) ?;
283 _runtime_0 = Some (_runtime);
284 }
285 }
286
287 if _runtime_0.is_none () {
288 #[ cfg (debug_assertions) ]
289 eprintln! ("[ii] [25065ee8] using current-thread executor (with 1 thread);");
290 let _runtime = runtime_current_thread () ?;
291 _runtime_0 = Some (_runtime);
292 };
293
294 let _runtime = _runtime_0.infallible (0xfb2d7cfb);
295
296 #[ cfg (feature = "hss-server-sanitize") ]
297 #[ cfg (debug_assertions) ]
298 eprintln! ("[ii] [3c1badd4] using URI sanitizer;");
299
300 Ok (_runtime)
301 }
302}
303
304
305
306
307#[ cfg (feature = "hss-server-core") ]
308#[ cfg (feature = "hyper--server") ]
309struct ServiceWrapper <S> (S)
310 where
311 S : hyper::Service<Request<Body>, Error = io::Error>,
312;
313
314#[ cfg (feature = "hss-server-core") ]
315#[ cfg (feature = "hyper--server") ]
316#[ allow (dead_code) ]
317enum ServiceWrapperFuture <S>
318 where
319 S : hyper::Service<Request<Body>, Error = io::Error>,
320{
321 Future (S::Future),
322 Error (io::Error),
323 Done,
324}
325
326
327#[ cfg (feature = "hss-server-core") ]
328#[ cfg (feature = "hyper--server") ]
329impl <S> hyper::Service<Request<Body>> for ServiceWrapper<S>
330 where
331 S : hyper::Service<Request<Body>, Error = io::Error>,
332{
333 type Future = ServiceWrapperFuture<S>;
334 type Response = S::Response;
335 type Error = io::Error;
336
337 fn poll_ready (&mut self, _context : &mut Context<'_>) -> Poll<Result<(), io::Error>> {
338 self.0.poll_ready (_context)
339 }
340
341 fn call (&mut self, mut _request : Request<Body>) -> Self::Future {
342
343 #[ cfg (feature = "hss-server-sanitize") ]
344 match sanitize_uri (_request.uri ()) {
345 Err (_error) => {
346 if true {
347 eprintln! ("[ww] [aace2099] URI sanitize failed for `{}`: {}", _request.uri (), _error);
348 }
349 return ServiceWrapperFuture::Error (_error);
350 }
351 Ok (Some (_uri)) => {
352 if true {
353 eprintln! ("[ww] [d1e356bc] URI sanitized to `{}` from `{}`;", _uri, _request.uri ());
354 }
355 * _request.uri_mut () = _uri;
356 }
357 Ok (None) => (),
358 }
359
360 let _future = self.0.call (_request);
361 ServiceWrapperFuture::Future (_future)
362 }
363}
364
365#[ cfg (feature = "hss-server-core") ]
366#[ cfg (feature = "hyper--server") ]
367impl <S> Future for ServiceWrapperFuture<S>
368 where
369 S : hyper::Service<Request<Body>, Error = io::Error>,
370{
371 type Output = <S::Future as Future>::Output;
372
373 fn poll (self : Pin<&mut Self>, _context : &mut Context<'_>) -> Poll<Self::Output> {
374 #[ allow (unsafe_code) ]
375 let _self_0 = unsafe { self.get_unchecked_mut () };
376 match _self_0 {
377 ServiceWrapperFuture::Future (_future) => {
378 #[ allow (unsafe_code) ]
379 let _delegate = unsafe { Pin::new_unchecked (_future) };
380 match _delegate.poll (_context) {
381 _outcome @ Poll::Pending =>
382 _outcome,
383 _outcome @ Poll::Ready (Ok (_)) => {
384 let _ = mem::replace (_self_0, ServiceWrapperFuture::Done);
385 _outcome
386 }
387 Poll::Ready (Err (_error)) => {
388 if true {
389 eprintln! ("[ee] [540dc2bc] handler failed: {}", _error);
390 }
391 Poll::Ready (Err (_error))
392 }
393 }
394 }
395 ServiceWrapperFuture::Error (_error) => {
396 let _self_1 = mem::replace (_self_0, ServiceWrapperFuture::Done);
397 if let ServiceWrapperFuture::Error (_error) = _self_1 {
398 Poll::Ready (Err (_error))
399 } else {
400 panic_with_code (0xd83566d8);
401 }
402 }
403 ServiceWrapperFuture::Done =>
404 Poll::Ready (Err (error_with_code (0x0722e578))),
405 }
406 }
407}
408
409
410
411
412#[ derive (Clone) ]
413#[ cfg (feature = "hss-server-core") ]
414#[ cfg (feature = "hyper--server") ]
415pub struct ServerExecutor ();
416
417#[ cfg (feature = "hss-server-core") ]
418#[ cfg (feature = "hyper--server") ]
419impl <F> hyper::Executor<F> for ServerExecutor
420 where
421 F : Future<Output = ()> + Send + 'static,
422{
423 fn execute (&self, _future : F) {
424 tokio::spawn (_future);
425 }
426}
427
428
429
430
431#[ cfg (feature = "hss-server-core") ]
432#[ cfg (feature = "hss-server-debug-strace") ]
433fn server_start_strace () -> () {
434
435 #[ cfg (debug_assertions) ]
436 eprintln! ("[ii] [19f96abc] starting `strace` tracing...");
437
438 process::Command::new ("strace")
439 .args (&["-f", "-p", & process::id () .to_string ()])
440 .spawn ()
441 .or_panic (0xff87ffef);
442}
443
444
445#[ cfg (feature = "hss-server-core") ]
446#[ cfg (feature = "hss-jemalloc") ]
447#[ cfg (feature = "hss-server-debug-jemalloc") ]
448fn server_start_jemalloc_stats () -> () {
449
450 #[ cfg (debug_assertions) ]
451 eprintln! ("[ii] [19f5dcf1] starting `jemalloc` tracing...");
452
453 extern "C" fn _write (_ : * mut os::raw::c_void, _message : * const os::raw::c_char) {
454 #[ allow (unsafe_code) ]
455 let _message = unsafe { ffi::CStr::from_ptr (_message) };
456 let _message = _message.to_str () .or_panic (0x2d88d281);
457 for _message in _message.split_terminator ("\n") {
458 if (_message == "___ Begin jemalloc statistics ___") || (_message == "--- End jemalloc statistics ---") {
459 continue;
460 }
461 if _message == "Background threads: 0, num_runs: 0, run_interval: 0 ns" {
462 continue;
463 }
464 eprintln! ("[dd] [35256205] jemalloc statistics: {}", _message);
465 }
466 }
467 thread::spawn (|| {
468 let _options = &b"gmdablxe\0"[..];
469 loop {
470 #[ allow (unsafe_code) ]
471 unsafe { ::jemalloc_sys::malloc_stats_print (Some (_write), ptr::null_mut (), _options.as_ptr () as * const os::raw::c_char) };
472 thread::sleep (time::Duration::from_secs (1));
473 }
474 });
475}
476
477
478
479
480#[ cfg (feature = "tokio--rt-multi-thread") ]
481pub fn runtime_multiple_threads (_threads : Option<usize>) -> ServerResult<Runtime> {
482 let _threads = _threads.unwrap_or (1);
483 let mut _builder = tokio::RuntimeBuilder::new_multi_thread ();
484 _builder.worker_threads (_threads);
485 _builder.max_blocking_threads (_threads * 4);
486 _builder.thread_keep_alive (time::Duration::from_secs (60));
487 _builder.enable_all ();
488 _builder.build () .or_wrap (0x2692223a)
489}
490
491#[ cfg (feature = "tokio--rt") ]
492pub fn runtime_current_thread () -> ServerResult<Runtime> {
493 let mut _builder = tokio::RuntimeBuilder::new_current_thread ();
494 _builder.enable_all ();
495 _builder.build () .or_wrap (0x280fcb72)
496}
497