cloudflare_quick_tunnel/supervisor.rs
1//! Long-running task that owns one QUIC connection after register.
2//!
3//! Two concurrent arms in a biased select!:
4//!
5//! 1. Inbound-stream acceptor — `conn.accept_bi()` in a loop;
6//! every new stream is the edge wanting us to serve a single
7//! HTTP request, so we hand it to `proxy::handle_inbound_stream`
8//! on a spawned task.
9//! 2. Shutdown watcher — receives on the shutdown channel; on
10//! signal, closes the connection with an application code so
11//! the edge sees a graceful close instead of an idle-timeout.
12//!
13//! Returns a [`SupervisorExit`] tag so the reactor in `manager.rs`
14//! can tell apart "the operator asked to stop" from "the edge dropped
15//! us and we should reconnect".
16//!
17//! QUIC-level keepalive (`keep_alive_interval = 1s` set in
18//! `quic_dial::build_client_config`) handles the connection-liveness
19//! side — without it the edge's `MaxIdleTimeout = 5s` would terminate
20//! us once the control-stream RPC drains.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use tokio::sync::oneshot;
26use tracing::{debug, info, warn};
27
28use crate::proxy::StreamCounters;
29
30#[derive(Debug, Default, Clone)]
31pub struct SupervisorMetrics {
32 pub streams_total: Arc<AtomicU64>,
33 pub bytes_in: Arc<AtomicU64>,
34 pub bytes_out: Arc<AtomicU64>,
35}
36
37impl SupervisorMetrics {
38 fn stream_counters(&self) -> StreamCounters {
39 StreamCounters {
40 bytes_in: self.bytes_in.clone(),
41 bytes_out: self.bytes_out.clone(),
42 }
43 }
44
45 pub fn snapshot(&self) -> (u64, u64, u64) {
46 (
47 self.streams_total.load(Ordering::Relaxed),
48 self.bytes_in.load(Ordering::Relaxed),
49 self.bytes_out.load(Ordering::Relaxed),
50 )
51 }
52}
53
54/// Why the supervisor's accept loop returned. The reactor uses this
55/// to decide between "exit cleanly" and "reconnect with backoff".
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum SupervisorExit {
58 /// The operator signalled shutdown via the channel.
59 Shutdown,
60 /// The edge or local stack closed the QUIC connection out
61 /// from under us. Reconnect candidate.
62 ConnectionLost,
63}
64
65/// Run one accept-loop cycle on `conn`. Returns when the connection
66/// closes (for any reason) or when `shutdown_rx` fires.
67///
68/// Callers own the shutdown channel so the reactor can stitch
69/// multiple supervisor cycles together with the same QuickTunnelHandle.
70pub async fn run(
71 conn: quinn::Connection,
72 local_port: u16,
73 metrics: SupervisorMetrics,
74 mut shutdown_rx: oneshot::Receiver<()>,
75) -> SupervisorExit {
76 info!(local_port, "tunnel supervisor running");
77 let exit = loop {
78 tokio::select! {
79 biased;
80 _ = &mut shutdown_rx => {
81 debug!("supervisor: shutdown signal");
82 conn.close(0u32.into(), b"client shutdown");
83 break SupervisorExit::Shutdown;
84 }
85 accepted = conn.accept_bi() => {
86 match accepted {
87 Ok((send, recv)) => {
88 metrics.streams_total.fetch_add(1, Ordering::Relaxed);
89 let counters = metrics.stream_counters();
90 tokio::spawn(async move {
91 if let Err(e) =
92 crate::proxy::handle_inbound_stream(local_port, send, recv, counters).await
93 {
94 warn!(error = %e, "stream proxy failed");
95 }
96 });
97 }
98 Err(quinn::ConnectionError::ApplicationClosed(_))
99 | Err(quinn::ConnectionError::LocallyClosed) => {
100 debug!("connection closed cleanly");
101 // Locally closed is almost always us, but
102 // accept loops can race with the close; if
103 // we got here without seeing the shutdown
104 // signal, treat as a lost connection.
105 break SupervisorExit::ConnectionLost;
106 }
107 Err(e) => {
108 warn!(error = %e, "accept_bi failed; supervisor cycling");
109 break SupervisorExit::ConnectionLost;
110 }
111 }
112 }
113 }
114 };
115 info!(?exit, "tunnel supervisor exited");
116 exit
117}