Skip to main content

cloudflare_quick_tunnel/
stream.rs

1//! Per-request stream framing on the edge ↔ tunnel side.
2//!
3//! When the edge receives a request bound for our quick-tunnel
4//! hostname it opens a new bidi QUIC stream to us and writes:
5//!
6//! ```text
7//!   [ 6-byte protocolSignature ][ 2-byte protocolVersion ][ capnp ConnectRequest ]
8//! ```
9//!
10//! We respond with the analogous frame carrying a `ConnectResponse`
11//! (status + headers as `metadata`), and then the stream becomes a
12//! byte-pumped channel for the HTTP body (or arbitrary TCP, for
13//! `ConnectionType::Tcp`).
14//!
15//! Constants + helpers here mirror
16//! `cloudflared/tunnelrpc/quic/protocol.go` +
17//! `request_server_stream.go`. We are the **server** side of this
18//! framing because the edge is the one opening the stream — even
19//! though "server" is confusing given we initiated the QUIC
20//! connection. cloudflared calls it `RequestServerStream` for
21//! exactly the same reason.
22
23use capnp::message::ReaderOptions;
24use capnp_futures::serialize;
25use futures::{AsyncReadExt, AsyncWriteExt};
26use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
27use tracing::debug;
28
29use crate::error::TunnelError;
30use crate::quic_metadata_protocol_capnp;
31
32/// 6-byte tag the edge writes first to disambiguate stream kinds.
33/// We only ever see `DATA_STREAM_SIGNATURE` on per-request streams.
34/// `RPC_STREAM_SIGNATURE` is reserved for the cloudflared-server
35/// RPC (session manager / config), which the edge does NOT open
36/// against quick tunnels.
37pub const DATA_STREAM_SIGNATURE: [u8; 6] = [0x0A, 0x36, 0xCD, 0x12, 0xA1, 0x3E];
38pub const RPC_STREAM_SIGNATURE: [u8; 6] = [0x52, 0xBB, 0x82, 0x5C, 0xDB, 0x65];
39
40/// Two ASCII bytes (`"01"`). cloudflared treats `readVersion` as a
41/// NO-OP for now — kept here verbatim so a future bump shows up
42/// loudly.
43pub const PROTOCOL_V1: [u8; 2] = [b'0', b'1'];
44
45/// What kind of payload the edge is asking us to serve on this
46/// stream. Mirror of `quic_metadata_protocol.ConnectionType`.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ConnectionType {
49    Http,
50    Websocket,
51    Tcp,
52}
53
54/// Parsed `ConnectRequest`. `dest` is a full URL on HTTP/Websocket
55/// streams (e.g. `https://abc.trycloudflare.com/path?q=1`) and a
56/// `host:port` on TCP streams.
57#[derive(Debug, Clone)]
58pub struct ConnectRequest {
59    pub dest: String,
60    pub conn_type: ConnectionType,
61    /// Free-form key/value map the edge attaches to the stream.
62    /// For HTTP requests this carries `HttpMethod`, `HttpHost`,
63    /// and one `HttpHeader:<Name>` entry per request header.
64    pub metadata: Vec<(String, String)>,
65}
66
67impl ConnectRequest {
68    /// O(n) lookup — usually fewer than a dozen entries, fine.
69    pub fn meta(&self, key: &str) -> Option<&str> {
70        self.metadata
71            .iter()
72            .find(|(k, _)| k == key)
73            .map(|(_, v)| v.as_str())
74    }
75}
76
77// ── Well-known metadata keys (mirror connection/quic_connection.go) ──
78
79pub const HTTP_METHOD_KEY: &str = "HttpMethod";
80pub const HTTP_HOST_KEY: &str = "HttpHost";
81pub const HTTP_HEADER_KEY: &str = "HttpHeader";
82pub const HTTP_STATUS_KEY: &str = "HttpStatus";
83
84// ── Read side ────────────────────────────────────────────────────────────────
85
86/// Read the preamble (signature + version), assert it's the data
87/// stream, and decode the capnp `ConnectRequest` that follows.
88pub async fn read_connect_request<R>(reader: &mut R) -> Result<ConnectRequest, TunnelError>
89where
90    R: futures::io::AsyncRead + Unpin,
91{
92    let mut sig = [0u8; 6];
93    reader
94        .read_exact(&mut sig)
95        .await
96        .map_err(|e| TunnelError::Internal(format!("read signature: {e}")))?;
97    if sig != DATA_STREAM_SIGNATURE {
98        return Err(TunnelError::Internal(format!(
99            "unexpected stream signature: {sig:02x?}"
100        )));
101    }
102    let mut ver = [0u8; 2];
103    reader
104        .read_exact(&mut ver)
105        .await
106        .map_err(|e| TunnelError::Internal(format!("read version: {e}")))?;
107    debug!(version = %String::from_utf8_lossy(&ver), "stream preamble");
108
109    let msg = serialize::read_message(reader, ReaderOptions::new())
110        .await
111        .map_err(|e| TunnelError::Internal(format!("read capnp message: {e}")))?;
112    let root: quic_metadata_protocol_capnp::connect_request::Reader = msg
113        .get_root()
114        .map_err(|e| TunnelError::Internal(format!("capnp root: {e}")))?;
115
116    let dest = root
117        .get_dest()
118        .map_err(|e| TunnelError::Internal(format!("dest: {e}")))?
119        .to_string()
120        .map_err(|e| TunnelError::Internal(format!("dest utf-8: {e}")))?;
121    let conn_type = match root
122        .get_type()
123        .map_err(|e| TunnelError::Internal(format!("type: {e}")))?
124    {
125        quic_metadata_protocol_capnp::ConnectionType::Http => ConnectionType::Http,
126        quic_metadata_protocol_capnp::ConnectionType::Websocket => ConnectionType::Websocket,
127        quic_metadata_protocol_capnp::ConnectionType::Tcp => ConnectionType::Tcp,
128    };
129    let mut metadata = Vec::new();
130    if let Ok(list) = root.get_metadata() {
131        for i in 0..list.len() {
132            let m = list.get(i);
133            let k = m
134                .get_key()
135                .ok()
136                .and_then(|t| t.to_string().ok())
137                .unwrap_or_default();
138            let v = m
139                .get_val()
140                .ok()
141                .and_then(|t| t.to_string().ok())
142                .unwrap_or_default();
143            metadata.push((k, v));
144        }
145    }
146    Ok(ConnectRequest {
147        dest,
148        conn_type,
149        metadata,
150    })
151}
152
153// ── Write side ───────────────────────────────────────────────────────────────
154
155/// Borrowed key/value pair for write_connect_response.
156pub type MetaPair<'a> = (&'a str, &'a str);
157
158/// Write a `ConnectResponse` preceded by the data-stream preamble.
159///
160/// `error` is set when the local upstream refused / errored before
161/// any body bytes flow. On the happy path it's empty and `metadata`
162/// carries `HttpStatus` + each `HttpHeader:<Name>` the local
163/// origin emitted.
164pub async fn write_connect_response<W>(
165    writer: &mut W,
166    error: &str,
167    metadata: &[MetaPair<'_>],
168) -> Result<(), TunnelError>
169where
170    W: futures::io::AsyncWrite + Unpin,
171{
172    writer
173        .write_all(&DATA_STREAM_SIGNATURE)
174        .await
175        .map_err(|e| TunnelError::Internal(format!("write signature: {e}")))?;
176    writer
177        .write_all(&PROTOCOL_V1)
178        .await
179        .map_err(|e| TunnelError::Internal(format!("write version: {e}")))?;
180
181    let mut message = ::capnp::message::Builder::new_default();
182    {
183        let mut root: quic_metadata_protocol_capnp::connect_response::Builder = message.init_root();
184        root.set_error(error);
185        let mut meta = root.init_metadata(metadata.len() as u32);
186        for (i, (k, v)) in metadata.iter().enumerate() {
187            let mut entry = meta.reborrow().get(i as u32);
188            entry.set_key(*k);
189            entry.set_val(*v);
190        }
191    }
192    serialize::write_message(&mut *writer, &message)
193        .await
194        .map_err(|e| TunnelError::Internal(format!("write capnp: {e}")))?;
195    writer
196        .flush()
197        .await
198        .map_err(|e| TunnelError::Internal(format!("flush: {e}")))?;
199    Ok(())
200}
201
202// ── Quinn stream → futures-io bridge ─────────────────────────────────────────
203
204/// Wrap a quinn pair into the futures-io halves capnp + our framing
205/// code expects.
206pub fn split(
207    send: quinn::SendStream,
208    recv: quinn::RecvStream,
209) -> (Compat<quinn::RecvStream>, Compat<quinn::SendStream>) {
210    (recv.compat(), send.compat_write())
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use futures::io::Cursor;
217
218    #[tokio::test]
219    async fn roundtrip_response_through_buffer() {
220        let mut buf: Vec<u8> = Vec::new();
221        {
222            let mut cursor = Cursor::new(&mut buf);
223            write_connect_response(
224                &mut cursor,
225                "",
226                &[
227                    ("HttpStatus", "200"),
228                    ("HttpHeader:Content-Type", "text/plain"),
229                ],
230            )
231            .await
232            .unwrap();
233        }
234        // Sanity: starts with signature + version + at least 8 capnp bytes.
235        assert_eq!(&buf[0..6], &DATA_STREAM_SIGNATURE);
236        assert_eq!(&buf[6..8], &PROTOCOL_V1);
237        assert!(buf.len() > 8 + 8, "capnp body present");
238    }
239
240    #[tokio::test]
241    async fn rejects_wrong_signature() {
242        let mut buf = vec![0u8; 16];
243        // intentional garbage signature
244        let mut r = Cursor::new(buf.as_mut_slice());
245        let err = read_connect_request(&mut r).await.unwrap_err();
246        assert!(matches!(err, TunnelError::Internal(s) if s.contains("signature")));
247    }
248}