1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
// Server entry point: parse config, bind sockets, drop privileges, then
// spawn listener tasks. Sockets are bound while still root (for ports
// below 1024); all further work runs as the configured unprivileged user.
// The status-page JSON builder assembles a large object literal via
// `serde_json::json!`, whose macro expansion exceeds the default
// recursion limit.
#![recursion_limit = "256"]
mod access;
mod access_log;
mod auth;
mod bootstrap;
mod cache;
mod cert;
mod compress;
mod config;
mod dns_provider;
mod error;
mod geoip;
mod handler;
mod headers;
#[cfg(unix)]
mod inherit;
mod jwt;
mod lb;
mod listener;
mod matcher;
mod metrics;
mod oidc;
#[cfg(unix)]
mod privdrop;
mod proxy_proto;
mod rate_limit;
mod reload;
mod router;
mod security;
mod task;
#[cfg(test)]
mod test;
use anyhow::Context;
use arc_swap::ArcSwap;
use cert::acme::ChallengeMap;
use clap::Parser;
use config::ErrorPageDef;
use error::{ErrorPageEntry, ErrorPages};
use listener::{AppState, BoundSocket};
use router::Router;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::sync::watch;
use tokio::task::JoinSet;
use bootstrap::{
build_authenticator, build_cert_registry, build_cert_source,
build_upstream_tls,
};
/// Crate-wide `Result<T>` alias for `anyhow::Result<T>`. Use this
/// instead of `anyhow::Result` directly so we can swap to a custom
/// error enum later without touching every signature in the codebase.
/// In scope of every crate module via `crate::Result`.
#[allow(dead_code)]
pub(crate) type Result<T> = anyhow::Result<T>;
/// Return the `RUST_LOG` directives that are bare words but not valid
/// log levels. `EnvFilter` silently treats such a word as a *target
/// name* (e.g. the syslog level "notice", which tracing does not know),
/// so the filter ends up enabling only that non-existent target and
/// disabling everything else — a baffling way to lose every log line.
/// Directives of the form `target=level` are intentional and skipped.
fn invalid_log_levels(raw: &str) -> Vec<&str> {
use tracing_subscriber::filter::LevelFilter;
raw.split(',')
.map(str::trim)
.filter(|token| !token.is_empty() && !token.contains('='))
.filter(|token| token.parse::<LevelFilter>().is_err())
.collect()
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
// --check-config: parse + validate without touching rustls, tracing,
// or any network resource. Errors flow up through anyhow and print
// to stderr with full context; success exits silently with code 0.
if args.check_config {
config::Config::load(&args.config).with_context(|| {
format!("loading config from {}", args.config.display())
})?;
return Ok(());
}
// Must be installed before any TLS work, including rcgen's
// self-signed cert generation which also calls into rustls.
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.ok(); // Err just means it was already installed.
// Disable ANSI escapes unconditionally: journald and fail2ban need
// plain text; journalctl adds its own colour when viewed in a terminal.
tracing_subscriber::fmt()
.with_ansi(false)
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| "hypershunt=info".parse().unwrap()),
)
.init();
// Warn about RUST_LOG words that aren't log levels. This must go
// straight to stderr, not through `tracing`: an invalid filter like
// RUST_LOG=notice suppresses our own log targets, so a tracing
// warning here would be swallowed by the very mistake it reports.
if let Ok(raw) = std::env::var("RUST_LOG") {
for token in invalid_log_levels(&raw) {
eprintln!(
"warning: RUST_LOG directive {token:?} is not a log \
level (expected error, warn, info, debug, trace, or \
off); it is being treated as a logging target name, \
which may silence all other output"
);
}
}
let config_path = args.config;
let config = config::Config::load(&config_path).with_context(|| {
format!("loading config from {}", config_path.display())
})?;
let proxy_count = config
.listeners
.iter()
.filter(|l| l.proxy.is_some())
.count();
tracing::info!(
path = %config_path.display(),
listeners = config.listeners.len() - proxy_count,
proxy_listeners = proxy_count,
vhosts = config.vhosts.len(),
"config loaded"
);
let tls_defaults = config.server.tls_defaults.clone();
let state_dir = config.server.state_dir.clone().map(PathBuf::from);
// -- Bind all sockets before dropping privileges ----------------
//
// Ports < 1024 (80, 443) require root on Linux. We bind them all
// here, then drop to an unprivileged user before accepting any
// connections or running application code.
//
// Inherited sockets (passed from a parent process) are matched by
// address and reused rather than rebound, enabling seamless upgrades.
#[cfg(unix)]
let mut inherited = inherit::InheritedSockets::scan();
let bound: Vec<(config::ListenerConfig, BoundSocket)> = config
.listeners
.iter()
.map(|cfg| {
listener::bind_socket(
cfg,
#[cfg(unix)]
&mut inherited,
)
.with_context(|| format!("binding {}", cfg.local_name()))
.map(|sock| (cfg.clone(), sock))
})
.collect::<anyhow::Result<_>>()?;
#[cfg(unix)]
inherited.close_unclaimed();
// -- Privilege drop ---------------------------------------------
#[cfg(unix)]
{
if let Some(ref user) = config.server.user {
// Create and chown the state directory before dropping
// privileges -- StateDirectory= in the systemd unit creates
// it owned by root, and the unprivileged process cannot
// write ACME certificates there without this step.
if let Some(ref sd) = state_dir {
privdrop::prepare_state_dir(
sd,
user,
config.server.group.as_deref(),
)?;
}
privdrop::drop_privileges(
user,
config.server.group.as_deref(),
config.server.inherit_supplementary_groups,
)?;
} else if nix::unistd::getuid().is_root() {
tracing::warn!(
"running as root with no server.user configured; \
set server user=\"nobody\" to drop privileges \
after binding"
);
}
}
// Create metrics before the router so StatusHandler can hold a
// clone of the Arc, and AppState can record per-request data.
let metrics = Arc::new(metrics::Metrics::new());
let summary =
Arc::new(handler::status::ServerSummary::from_config(&config));
// Shared certificate state: written by each AcmeManager after
// renewal, read by StatusHandler for countdown timers.
let cert_state = cert::state::new_shared();
let router = Router::new(&config, &metrics, &summary, Some(&cert_state))
.context("building router")?;
// Phase 1: create shared ACME challenge map and app state.
let challenges: ChallengeMap = Arc::new(Mutex::new(HashMap::new()));
// When auth is `jwt`, the inner back-end (if any) becomes the
// credential authenticator; JWT issuance and validation are
// handled by the JwtManager in listener.rs.
let (authenticator, jwt_manager): (
Arc<dyn auth::Authenticator>,
Option<Arc<jwt::JwtManager>>,
) = if let Some(config::AuthBackend::Jwt {
ref cookie_name,
validity_secs,
ref inner,
}) = config.server.auth
{
let inner_auth: Option<Arc<dyn auth::Authenticator>> = inner
.as_deref()
.map(|b| build_authenticator(&Some(b.clone())))
.transpose()
.context("building jwt inner authenticator")?;
let sd = state_dir
.as_deref()
.expect("state_dir required for jwt (validated earlier)");
let mgr = jwt::JwtManager::load_or_generate(
sd,
jwt::JwtConfig {
cookie_name: cookie_name.clone(),
validity_secs,
},
inner_auth,
)
.context("initialising jwt manager")?;
tracing::info!(
kid = %mgr.kid,
session_mode = mgr.is_session_mode(),
"jwt: key loaded"
);
(Arc::new(auth::AnonymousAuthenticator), Some(Arc::new(mgr)))
} else {
(
build_authenticator(&config.server.auth)
.context("building authenticator")?,
None,
)
};
let geoip: Option<Arc<geoip::CountryReader>> = config
.server
.geoip
.as_ref()
.map(|g| geoip::open(&g.db))
.transpose()
.context("opening GeoIP database")?
.map(Arc::new);
if let Some(ref g) = config.server.geoip {
tracing::info!(db = %g.db, "geoip: database loaded");
}
// Retain a clone for stream proxy listeners, which don't share AppState.
let tcp_geoip = geoip.clone();
// Build custom error pages map from config.
let mut ep_map = HashMap::new();
for (code, def) in &config.server.error_pages {
let entry = match def {
ErrorPageDef::File(path) => {
ErrorPageEntry::File(PathBuf::from(path))
}
ErrorPageDef::Inline(html) => {
ErrorPageEntry::Inline(bytes::Bytes::from(html.clone()))
}
};
ep_map.insert(*code, entry);
}
let error_pages = Arc::new(ErrorPages::new(ep_map));
let router = Arc::new(router);
// Rate-limit eviction: spawn a single background task that
// sweeps every configured rule's bucket map and refreshes the
// `rate_limit_active_keys` gauge. Returns None when no rule
// is configured, in which case nothing runs.
// Rate-limit rule set is wrapped in ArcSwap so SIGHUP can publish
// a fresh set without restarting the eviction task. Holds the
// rules from the freshly built router at startup; on reload, the
// SIGHUP handler stores a new Vec built from the new Router.
let rate_limit_rules: Arc<rate_limit::RuleSet> = Arc::new(
arc_swap::ArcSwap::from_pointee(router.all_rate_limit_rules()),
);
let _rl_eviction = rate_limit::spawn_eviction_task(
rate_limit_rules.clone(),
metrics.clone(),
);
// Response cache: build the single shared store only when a
// location opted in, and spawn the TTL sweeper. The store is
// carried forward across SIGHUP (see reload.rs) so cached entries
// survive a reload.
let cache_store = if router.any_cache_enabled() {
let max_size = config
.server
.cache
.as_ref()
.map(|c| c.max_size)
.unwrap_or(256 * 1024 * 1024);
let store = cache::CacheStore::new(max_size, metrics.clone());
let _cache_eviction = cache::spawn_cache_eviction_task(
store.clone(),
metrics.clone(),
);
Some(store)
} else {
None
};
// Construct the OIDC provider in not-ready state and let it
// bootstrap itself in the background. Discovery failures do
// not block startup; the provider's endpoints serve 503 until
// the first successful discovery (controlled by
// `discovery-retry`).
let oidc: Option<Arc<oidc::OidcProvider>> = match &config.server.auth {
Some(config::AuthBackend::Jwt { inner: Some(b), .. }) => {
if let config::AuthBackend::Oidc(cfg) = b.as_ref() {
let p = oidc::OidcProvider::new(
(**cfg).clone(),
metrics.clone(),
);
tracing::info!(
issuer = %cfg.issuer,
"oidc: bootstrapping discovery in background"
);
Some(p)
} else {
None
}
}
_ => None,
};
let access_log = access_log::build_access_log(&config.server)
.context("building access logger")?;
// AppState is wrapped in ArcSwap so SIGHUP can atomically install a
// fresh state without disturbing live connections. Each listener
// accept loop calls `state.load_full()` per connection to pin its
// own snapshot for the connection's lifetime; in-flight requests
// never observe a mid-flight swap.
let state = Arc::new(ArcSwap::from_pointee(AppState {
router: router.clone(),
acme_challenges: challenges.clone(),
authenticator,
metrics: metrics.clone(),
geoip,
health: Arc::new(crate::handler::health::HealthState::from_config(
&config.server.health,
&config.listeners,
)),
error_pages,
jwt_manager,
oidc,
access_log,
cache: cache_store,
}));
// Background task: advance the request-rate ring buffer every 5 s.
// Not tracked in `handles` -- it carries no state worth draining.
crate::task::spawn_supervised("metrics.tick", metrics.clone().tick_loop());
// Shutdown channel: false = running, true = drain and exit.
let (shutdown_tx, shutdown_rx) = watch::channel(false);
let mut handles: JoinSet<()> = JoinSet::new();
// Per-listener stop-accept senders, kept alive for the lifetime of
// main so dropping them doesn't close the watch channels held by
// the listener tasks. SIGHUP reload (#6) and SIGUSR2 upgrade
// (#14) both look up listeners by bind here and flip the sender
// to drain a removed listener without disturbing live connections
// on the others. Wrapped in a Mutex so the SIGUSR2 task can
// borrow it without contending with the main bind loop.
let stop_accept_txs: Arc<
std::sync::Mutex<HashMap<String, watch::Sender<bool>>>,
> = Arc::new(std::sync::Mutex::new(HashMap::new()));
// Partition listeners by (kind family, tls, proxy) into six
// buckets. Plain stream proxies and plain HTTP listeners start
// first so ACME HTTP-01 challenges can be served before ACME
// flows begin.
let mut plain_http = Vec::new();
let mut tls_http = Vec::new();
let mut plain_stream = Vec::new();
let mut tls_stream = Vec::new();
let mut quic_http: Vec<(config::ListenerConfig, _)> = Vec::new();
let mut dgram_proxy: Vec<(config::ListenerConfig, _)> = Vec::new();
for (cfg, socket) in bound {
let kind = cfg.bind.kind;
let has_tls = cfg.tls.is_some();
let has_proxy = cfg.proxy.is_some();
// Validate accepts only six concrete combinations; every
// other arm is unreachable here (the parser rejected the
// input upstream). On a datagram listener `tls` means HTTP/3.
match (kind.is_byte_stream(), has_tls, has_proxy) {
(true, false, false) => plain_http.push((cfg, socket)),
(true, true, false) => tls_http.push((cfg, socket)),
(true, false, true) => plain_stream.push((cfg, socket)),
(true, true, true) => tls_stream.push((cfg, socket)),
(false, true, false) => quic_http.push((cfg, socket)),
(false, false, true) => dgram_proxy.push((cfg, socket)),
_ => unreachable!(
"validate() rejects this listener-layer combo: \
byte_stream={}, tls={has_tls}, proxy={has_proxy}",
kind.is_byte_stream()
),
}
}
// Build the named-certificate registry before any TLS listener
// spawns: one AcmeManager and one acceptor per top-level
// `certificate` definition, regardless of how many listeners refer
// to it. This is the single change that turns "each listener
// races on its own ACME directory" into "one shared renewal loop".
let cert_key_mode = config.server.cert_key_mode.unwrap_or(0o600);
let (cert_registry, initial_cert_handles) = build_cert_registry(
&config.certificates,
&tls_defaults,
state_dir.as_ref(),
&challenges,
&cert_state,
cert_key_mode,
&HashMap::new(),
&HashMap::new(),
&metrics,
)
.await
.context("building certificate registry")?;
// Compute the per-vhost ALPN overrides snapshot used by every
// TLS listener's VhostAlpnMap. Stored in an ArcSwap so reload
// can publish a fresh snapshot before adding new TLS listeners.
let vhost_alpn_overrides: Vec<(String, Vec<String>)> = config
.vhosts
.iter()
.filter(|v| !v.name.regex)
.filter_map(|v| {
v.alpn.as_ref().map(|a| (v.name.value.clone(), a.clone()))
})
.chain(config.vhosts.iter().flat_map(|v| {
let alpn = v.alpn.as_ref();
v.aliases.iter().filter(|a| !a.regex).filter_map(
move |alias| alpn.map(|a| (alias.value.clone(), a.clone())),
)
}))
.collect();
let vhost_alpn_overrides_swap =
Arc::new(ArcSwap::from_pointee(vhost_alpn_overrides));
// Wrap cert_registry in ArcSwap so SIGHUP can publish a new map
// (with added / removed named certs). Parallel fingerprint map
// lets reload skip rebuilding unchanged entries.
let initial_cert_fingerprints: HashMap<String, String> = config
.certificates
.iter()
.map(|d| (d.name.clone(), format!("{:?}", d.source)))
.collect();
let cert_registry_swap =
Arc::new(arc_swap::ArcSwap::from_pointee(cert_registry));
let cert_source_fingerprints_swap = Arc::new(
arc_swap::ArcSwap::from_pointee(initial_cert_fingerprints),
);
// Per-listener helpers and per-cert ACME helpers, tracked by
// bind / cert-name so SIGHUP can abort them in lock-step with
// the listener or cert being removed.
let listener_helpers = Arc::new(std::sync::Mutex::new(HashMap::<
String,
Vec<tokio::task::JoinHandle<()>>,
>::new()));
let cert_helpers = Arc::new(std::sync::Mutex::new({
let mut m: HashMap<String, Vec<tokio::task::JoinHandle<()>>> =
HashMap::new();
for (name, h) in initial_cert_handles {
m.insert(name, vec![h]);
}
m
}));
let spawn_deps = Arc::new(reload::ListenerSpawnDeps {
tls_defaults: tls_defaults.clone(),
state_dir: state_dir.clone(),
challenges: challenges.clone(),
cert_state: cert_state.clone(),
cert_registry: cert_registry_swap.clone(),
cert_source_fingerprints: cert_source_fingerprints_swap.clone(),
cert_key_mode,
vhost_alpn_overrides: vhost_alpn_overrides_swap.clone(),
metrics: metrics.clone(),
tcp_geoip: tcp_geoip.clone(),
stop_accept_txs: stop_accept_txs.clone(),
shutdown_rx: shutdown_rx.clone(),
listener_helpers: listener_helpers.clone(),
cert_helpers: cert_helpers.clone(),
});
// Phase 2a: plain stream listeners (no TLS, no ACME dependency).
for (cfg, socket) in plain_stream {
let fut = reload::build_stream_listener_future(
&spawn_deps, &router, cfg, socket,
)
.await?;
handles.spawn(fut);
}
// Phase 2b: plain HTTP listeners first so that ACME HTTP-01
// challenge requests can be served before we start ACME flows.
for (cfg, socket) in plain_http {
let fut = reload::build_plain_listener_future(
&spawn_deps,
state.clone(),
cfg,
socket,
);
handles.spawn(fut);
}
// Phase 3: spawn TLS HTTP listeners via the shared builder.
// Cert source, ALPN map, mTLS verifier, cert-renewal watcher,
// CRL hot-reload, and OCSP refresh all happen inside
// build_tls_listener_future (ACME may do network I/O).
for (cfg, socket) in tls_http {
let fut = reload::build_tls_listener_future(
&spawn_deps,
state.clone(),
cfg,
socket,
)
.await?;
handles.spawn(fut);
}
// Phase 3c: QUIC/HTTP/3 listeners.
for (cfg, socket) in quic_http {
let fut = reload::build_quic_listener_future(
&spawn_deps,
state.clone(),
cfg,
socket,
)
.await?;
handles.spawn(fut);
}
// Phase 3b: TLS-terminating stream listeners.
for (cfg, socket) in tls_stream {
let fut = reload::build_stream_listener_future(
&spawn_deps, &router, cfg, socket,
)
.await?;
handles.spawn(fut);
}
// Phase 4: raw datagram proxies (no QUIC termination).
for (cfg, socket) in dgram_proxy {
let fut = reload::build_dgram_proxy_future(
&spawn_deps, &router, cfg, socket,
)
.await?;
handles.spawn(fut);
}
// SIGHUP reload (#6): the spawned task awaits SIGHUP and calls
// reload::reload() for each one.
#[cfg(unix)]
let _sighup_task = {
let reload_state = Arc::new(reload::ReloadState {
config_path: config_path.clone(),
current_listeners: Arc::new(arc_swap::ArcSwap::from_pointee(
config.listeners.clone(),
)),
spawn_deps: spawn_deps.clone(),
auth_fingerprint: Arc::new(arc_swap::ArcSwap::from_pointee(
format!("{:?}", config.server.auth),
)),
state: state.clone(),
rate_limit_rules: rate_limit_rules.clone(),
metrics: metrics.clone(),
cert_state: cert_state.clone(),
summary: summary.clone(),
});
reload::spawn_sighup_listener(reload_state)
};
// SIGUSR2 binary upgrade (#14): fork+exec the new binary with
// listening fds inherited. Child writes one byte to the
// HYPERSHUNT_UPGRADE_READY_FD pipe once accepting; parent then
// drains and exits. drain_tx wakes the shutdown-wait loop in
// upgrade mode, distinguishing "upgrade drain" (bounded by
// graceful-drain-timeout) from "SIGTERM shutdown" (bounded by
// the standard shutdown timeout).
#[cfg(unix)]
let (upgrade_drain_tx, mut upgrade_drain_rx) = watch::channel(false);
#[cfg(unix)]
let _sigusr2_task = {
let upgrade_state = Arc::new(reload::UpgradeState {
stop_accept_txs: stop_accept_txs.clone(),
startup_timeout_secs: config.server.upgrade_startup_timeout,
drain_signal: upgrade_drain_tx,
});
reload::spawn_sigusr2_listener(upgrade_state)
};
// Child of a SIGUSR2 upgrade: tell the parent we're now serving
// requests so it can begin draining its own listeners. No-op
// when the env var isn't set (i.e. fresh start, not an upgrade).
#[cfg(unix)]
reload::signal_upgrade_ready();
// -- Wait for a shutdown signal ---------------------------------
//
// On Unix we handle SIGTERM (systemd stop), SIGINT (ctrl-c), and
// the upgrade drain signal fired by the SIGUSR2 handler once the
// child reports ready. On other platforms only ctrl-c is
// available.
let mut via_upgrade = false;
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigterm = signal(SignalKind::terminate())
.context("failed to install SIGTERM handler")?;
tokio::select! {
_ = tokio::signal::ctrl_c() => {}
_ = sigterm.recv() => {}
_ = upgrade_drain_rx.changed() => {
if *upgrade_drain_rx.borrow() {
via_upgrade = true;
}
}
}
}
#[cfg(not(unix))]
tokio::signal::ctrl_c().await.context("ctrl-c signal")?;
// We are now draining. Flip readiness to 503 immediately so a load
// balancer / kubelet sees the instance as not-ready right away.
crate::handler::health::set_draining();
if via_upgrade {
// The child already took over accepting; drain the parent
// promptly (no lame-duck -- the child serves new traffic).
tracing::info!(
"upgrade: child took over; draining parent connections"
);
} else {
// Lame-duck window: keep accepting + serving (readiness already
// reports 503) so the LB deregisters this instance before we
// stop accepting and connections would be refused. We delay the
// shutdown signal rather than the accept loop so in-flight and
// fresh requests are both served normally during the window.
let lame_duck = config.server.lame_duck_timeout;
if lame_duck > 0 {
tracing::info!(
secs = lame_duck,
"shutdown: lame-duck (readiness 503, still serving)"
);
tokio::time::sleep(std::time::Duration::from_secs(
lame_duck as u64,
))
.await;
}
tracing::info!("shutdown: signalling listeners");
let _ = shutdown_tx.send(true);
}
// Wait for all listener tasks (each drains its own connections).
// Drain timeout differs by trigger:
// - Standard shutdown (SIGTERM/SIGINT): 30 s, the historical
// default; bounded so a wedged connection doesn't hold the
// process forever during a `systemctl stop`.
// - Upgrade drain (SIGUSR2 child took over): use the operator's
// `graceful-drain-timeout`. Default 0 means "wait forever"
// -- the parent stays alive until every connection completes
// naturally, matching nginx's behaviour.
let drain_secs: u64 = if via_upgrade {
config.server.graceful_drain_timeout as u64
} else {
30
};
if via_upgrade && drain_secs == 0 {
tracing::info!(
"upgrade: draining parent connections (no timeout)"
);
} else {
tracing::info!(drain_secs, "shutdown: draining");
}
let drain = async { while handles.join_next().await.is_some() {} };
if via_upgrade && drain_secs == 0 {
// Operator opted into "wait indefinitely" for the upgrade
// drain. No timeout wrapper; we sit here until every
// in-flight connection finishes naturally.
drain.await;
} else if tokio::time::timeout(
Duration::from_secs(drain_secs),
drain,
)
.await
.is_err()
{
tracing::warn!("shutdown: drain timeout; exiting");
}
tracing::info!("shutdown: complete");
Ok(())
}
#[derive(Parser)]
#[command(
version,
about = "HTTP server and reverse proxy",
long_about = "hypershunt is an HTTP/1.1, HTTP/2 and HTTP/3 server \
and reverse proxy, configured with KDL. It serves \
static files and virtual hosts, terminates TLS \
(file certs, self-signed, or ACME/Let's Encrypt), \
and proxies to HTTP, FastCGI, SCGI and CGI backends \
with load balancing and health checks.\n\n\
With no options it reads ./hypershunt.kdl; the \
packaged service reads /etc/hypershunt.kdl. Send \
SIGHUP to hot-reload the configuration."
)]
struct Args {
/// Path to the KDL configuration file
#[arg(short, long, default_value = "hypershunt.kdl")]
config: PathBuf,
/// Validate the configuration and exit
#[arg(
long,
long_help = "Parse and validate the configuration, then exit. \
Exit code 0 on success, non-zero with diagnostics \
on stderr if the config has parse or semantic \
errors. Useful for CI and as a pre-flight check \
before sending SIGHUP for hot reload."
)]
check_config: bool,
}
#[cfg(test)]
mod log_filter_tests {
use super::invalid_log_levels;
#[test]
fn flags_syslog_level_typo() {
// "notice" is a syslog severity, not a tracing level; EnvFilter
// would treat it as a target and go dark.
assert_eq!(invalid_log_levels("notice"), vec!["notice"]);
}
#[test]
fn accepts_valid_levels() {
for level in ["error", "warn", "info", "debug", "trace", "off"] {
assert!(
invalid_log_levels(level).is_empty(),
"{level} should be a valid level"
);
}
}
#[test]
fn accepts_levels_case_insensitively() {
assert!(invalid_log_levels("INFO").is_empty());
}
#[test]
fn ignores_target_level_directives() {
// `target=level` forms are deliberate, never flagged.
assert!(invalid_log_levels("hypershunt=info").is_empty());
assert!(invalid_log_levels("hypershunt::oidc=debug").is_empty());
}
#[test]
fn flags_only_the_bad_directive_in_a_list() {
assert_eq!(
invalid_log_levels("hypershunt=info,verbose"),
vec!["verbose"]
);
}
#[test]
fn ignores_empty_and_whitespace_directives() {
assert!(invalid_log_levels("").is_empty());
assert!(invalid_log_levels("info, ,").is_empty());
}
}