cloudflare_quick_tunnel/
stream.rs1use capnp::message::ReaderOptions;
24use capnp_futures::serialize;
25use futures::{AsyncReadExt, AsyncWriteExt};
26use tokio_util::compat::{Compat, TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
27use tracing::debug;
28
29use crate::error::TunnelError;
30use crate::quic_metadata_protocol_capnp;
31
32pub const DATA_STREAM_SIGNATURE: [u8; 6] = [0x0A, 0x36, 0xCD, 0x12, 0xA1, 0x3E];
38pub const RPC_STREAM_SIGNATURE: [u8; 6] = [0x52, 0xBB, 0x82, 0x5C, 0xDB, 0x65];
39
40pub const PROTOCOL_V1: [u8; 2] = [b'0', b'1'];
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum ConnectionType {
49 Http,
50 Websocket,
51 Tcp,
52}
53
54#[derive(Debug, Clone)]
58pub struct ConnectRequest {
59 pub dest: String,
60 pub conn_type: ConnectionType,
61 pub metadata: Vec<(String, String)>,
65}
66
67impl ConnectRequest {
68 pub fn meta(&self, key: &str) -> Option<&str> {
70 self.metadata
71 .iter()
72 .find(|(k, _)| k == key)
73 .map(|(_, v)| v.as_str())
74 }
75}
76
77pub const HTTP_METHOD_KEY: &str = "HttpMethod";
80pub const HTTP_HOST_KEY: &str = "HttpHost";
81pub const HTTP_HEADER_KEY: &str = "HttpHeader";
82pub const HTTP_STATUS_KEY: &str = "HttpStatus";
83
84pub async fn read_connect_request<R>(reader: &mut R) -> Result<ConnectRequest, TunnelError>
89where
90 R: futures::io::AsyncRead + Unpin,
91{
92 let mut sig = [0u8; 6];
93 reader
94 .read_exact(&mut sig)
95 .await
96 .map_err(|e| TunnelError::Internal(format!("read signature: {e}")))?;
97 if sig != DATA_STREAM_SIGNATURE {
98 return Err(TunnelError::Internal(format!(
99 "unexpected stream signature: {sig:02x?}"
100 )));
101 }
102 let mut ver = [0u8; 2];
103 reader
104 .read_exact(&mut ver)
105 .await
106 .map_err(|e| TunnelError::Internal(format!("read version: {e}")))?;
107 debug!(version = %String::from_utf8_lossy(&ver), "stream preamble");
108
109 let msg = serialize::read_message(reader, ReaderOptions::new())
110 .await
111 .map_err(|e| TunnelError::Internal(format!("read capnp message: {e}")))?;
112 let root: quic_metadata_protocol_capnp::connect_request::Reader = msg
113 .get_root()
114 .map_err(|e| TunnelError::Internal(format!("capnp root: {e}")))?;
115
116 let dest = root
117 .get_dest()
118 .map_err(|e| TunnelError::Internal(format!("dest: {e}")))?
119 .to_string()
120 .map_err(|e| TunnelError::Internal(format!("dest utf-8: {e}")))?;
121 let conn_type = match root
122 .get_type()
123 .map_err(|e| TunnelError::Internal(format!("type: {e}")))?
124 {
125 quic_metadata_protocol_capnp::ConnectionType::Http => ConnectionType::Http,
126 quic_metadata_protocol_capnp::ConnectionType::Websocket => ConnectionType::Websocket,
127 quic_metadata_protocol_capnp::ConnectionType::Tcp => ConnectionType::Tcp,
128 };
129 let mut metadata = Vec::new();
130 if let Ok(list) = root.get_metadata() {
131 for i in 0..list.len() {
132 let m = list.get(i);
133 let k = m
134 .get_key()
135 .ok()
136 .and_then(|t| t.to_string().ok())
137 .unwrap_or_default();
138 let v = m
139 .get_val()
140 .ok()
141 .and_then(|t| t.to_string().ok())
142 .unwrap_or_default();
143 metadata.push((k, v));
144 }
145 }
146 Ok(ConnectRequest {
147 dest,
148 conn_type,
149 metadata,
150 })
151}
152
153pub type MetaPair<'a> = (&'a str, &'a str);
157
158pub async fn write_connect_response<W>(
165 writer: &mut W,
166 error: &str,
167 metadata: &[MetaPair<'_>],
168) -> Result<(), TunnelError>
169where
170 W: futures::io::AsyncWrite + Unpin,
171{
172 writer
173 .write_all(&DATA_STREAM_SIGNATURE)
174 .await
175 .map_err(|e| TunnelError::Internal(format!("write signature: {e}")))?;
176 writer
177 .write_all(&PROTOCOL_V1)
178 .await
179 .map_err(|e| TunnelError::Internal(format!("write version: {e}")))?;
180
181 let mut message = ::capnp::message::Builder::new_default();
182 {
183 let mut root: quic_metadata_protocol_capnp::connect_response::Builder = message.init_root();
184 root.set_error(error);
185 let mut meta = root.init_metadata(metadata.len() as u32);
186 for (i, (k, v)) in metadata.iter().enumerate() {
187 let mut entry = meta.reborrow().get(i as u32);
188 entry.set_key(*k);
189 entry.set_val(*v);
190 }
191 }
192 serialize::write_message(&mut *writer, &message)
193 .await
194 .map_err(|e| TunnelError::Internal(format!("write capnp: {e}")))?;
195 writer
196 .flush()
197 .await
198 .map_err(|e| TunnelError::Internal(format!("flush: {e}")))?;
199 Ok(())
200}
201
202pub fn split(
207 send: quinn::SendStream,
208 recv: quinn::RecvStream,
209) -> (Compat<quinn::RecvStream>, Compat<quinn::SendStream>) {
210 (recv.compat(), send.compat_write())
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use futures::io::Cursor;
217
218 #[tokio::test]
219 async fn roundtrip_response_through_buffer() {
220 let mut buf: Vec<u8> = Vec::new();
221 {
222 let mut cursor = Cursor::new(&mut buf);
223 write_connect_response(
224 &mut cursor,
225 "",
226 &[
227 ("HttpStatus", "200"),
228 ("HttpHeader:Content-Type", "text/plain"),
229 ],
230 )
231 .await
232 .unwrap();
233 }
234 assert_eq!(&buf[0..6], &DATA_STREAM_SIGNATURE);
236 assert_eq!(&buf[6..8], &PROTOCOL_V1);
237 assert!(buf.len() > 8 + 8, "capnp body present");
238 }
239
240 #[tokio::test]
241 async fn rejects_wrong_signature() {
242 let mut buf = vec![0u8; 16];
243 let mut r = Cursor::new(buf.as_mut_slice());
245 let err = read_connect_request(&mut r).await.unwrap_err();
246 assert!(matches!(err, TunnelError::Internal(s) if s.contains("signature")));
247 }
248}