Skip to main content

cloudflare_quick_tunnel/
supervisor.rs

1//! Long-running task that owns one QUIC connection after register.
2//!
3//! Two concurrent arms in a biased select!:
4//!
5//!   1. Inbound-stream acceptor — `conn.accept_bi()` in a loop;
6//!      every new stream is the edge wanting us to serve a single
7//!      HTTP request, so we hand it to `proxy::handle_inbound_stream`
8//!      on a spawned task.
9//!   2. Shutdown watcher — receives on the shutdown channel; on
10//!      signal, closes the connection with an application code so
11//!      the edge sees a graceful close instead of an idle-timeout.
12//!
13//! Returns a [`SupervisorExit`] tag so the reactor in `manager.rs`
14//! can tell apart "the operator asked to stop" from "the edge dropped
15//! us and we should reconnect".
16//!
17//! QUIC-level keepalive (`keep_alive_interval = 1s` set in
18//! `quic_dial::build_client_config`) handles the connection-liveness
19//! side — without it the edge's `MaxIdleTimeout = 5s` would terminate
20//! us once the control-stream RPC drains.
21
22use std::sync::atomic::{AtomicU64, Ordering};
23use std::sync::Arc;
24
25use tokio::sync::oneshot;
26use tracing::{debug, info, warn};
27
28use crate::pool::Pool;
29use crate::proxy::StreamCounters;
30
31#[derive(Debug, Default, Clone)]
32pub struct SupervisorMetrics {
33    pub streams_total: Arc<AtomicU64>,
34    pub bytes_in: Arc<AtomicU64>,
35    pub bytes_out: Arc<AtomicU64>,
36}
37
38impl SupervisorMetrics {
39    fn stream_counters(&self) -> StreamCounters {
40        StreamCounters {
41            bytes_in: self.bytes_in.clone(),
42            bytes_out: self.bytes_out.clone(),
43        }
44    }
45
46    pub fn snapshot(&self) -> (u64, u64, u64) {
47        (
48            self.streams_total.load(Ordering::Relaxed),
49            self.bytes_in.load(Ordering::Relaxed),
50            self.bytes_out.load(Ordering::Relaxed),
51        )
52    }
53}
54
55/// Why the supervisor's accept loop returned. The reactor uses this
56/// to decide between "exit cleanly" and "reconnect with backoff".
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum SupervisorExit {
59    /// The operator signalled shutdown via the channel.
60    Shutdown,
61    /// The edge or local stack closed the QUIC connection out
62    /// from under us. Reconnect candidate.
63    ConnectionLost,
64}
65
66/// Run one accept-loop cycle on `conn`. Returns when the connection
67/// closes (for any reason) or when `shutdown_rx` fires.
68///
69/// Callers own the shutdown channel so the reactor can stitch
70/// multiple supervisor cycles together with the same QuickTunnelHandle.
71pub async fn run(
72    conn: quinn::Connection,
73    local_port: u16,
74    metrics: SupervisorMetrics,
75    pool: Arc<Pool>,
76    mut shutdown_rx: oneshot::Receiver<()>,
77) -> SupervisorExit {
78    info!(local_port, "tunnel supervisor running");
79    let exit = loop {
80        tokio::select! {
81            biased;
82            _ = &mut shutdown_rx => {
83                debug!("supervisor: shutdown signal");
84                conn.close(0u32.into(), b"client shutdown");
85                break SupervisorExit::Shutdown;
86            }
87            accepted = conn.accept_bi() => {
88                match accepted {
89                    Ok((send, recv)) => {
90                        metrics.streams_total.fetch_add(1, Ordering::Relaxed);
91                        let counters = metrics.stream_counters();
92                        let pool = pool.clone();
93                        tokio::spawn(async move {
94                            if let Err(e) = crate::proxy::handle_inbound_stream(
95                                local_port, send, recv, counters, pool,
96                            )
97                            .await
98                            {
99                                warn!(error = %e, "stream proxy failed");
100                            }
101                        });
102                    }
103                    Err(quinn::ConnectionError::ApplicationClosed(_))
104                    | Err(quinn::ConnectionError::LocallyClosed) => {
105                        debug!("connection closed cleanly");
106                        // Locally closed is almost always us, but
107                        // accept loops can race with the close; if
108                        // we got here without seeing the shutdown
109                        // signal, treat as a lost connection.
110                        break SupervisorExit::ConnectionLost;
111                    }
112                    Err(e) => {
113                        warn!(error = %e, "accept_bi failed; supervisor cycling");
114                        break SupervisorExit::ConnectionLost;
115                    }
116                }
117            }
118        }
119    };
120    info!(?exit, "tunnel supervisor exited");
121    exit
122}