h2_ws_client/lib.rs
1//! Minimal client-side WebSocket over HTTP/2 on top of hyper and tokio-tungstenite.
2//!
3//! This crate provides a small abstraction that:
4//! - Establishes an HTTP/2 connection over an arbitrary I/O (TCP for now)
5//! - Issues an RFC 8441 extended CONNECT request with `:protocol = "websocket"`
6//! - Returns a `tokio_tungstenite::WebSocketStream` you can use as a normal WebSocket.
7
8use std::fmt;
9
10use bytes::Bytes;
11use http::{Method, Request, header};
12use http_body_util::Empty;
13use hyper::client::conn::http2;
14use hyper::ext::Protocol;
15use hyper_util::rt::{TokioExecutor, TokioIo};
16use thiserror::Error;
17use tokio::net::TcpStream;
18use tokio_tungstenite::{WebSocketStream, tungstenite::protocol::Role};
19use tracing::error;
20
21/// Body type used for HTTP/2 CONNECT requests.
22///
23/// We do not send a request body for WebSocket CONNECT, so this is always empty.
24type Body = Empty<Bytes>;
25
26/// Type alias for the WebSocket stream used by this crate.
27///
28/// The inner I/O is a hyper `Upgraded` stream wrapped in `TokioIo`,
29/// which implements `AsyncRead + AsyncWrite`.
30pub type H2WebSocketStream = WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>;
31
32/// Errors that can occur when establishing or using an HTTP/2 WebSocket connection.
33#[derive(Debug, Error)]
34pub enum H2WsError {
35 /// I/O error while creating the underlying TCP connection.
36 #[error("I/O error: {0}")]
37 Io(#[from] std::io::Error),
38
39 /// Error returned from hyper (handshake, request send, upgrade, etc.).
40 #[error("hyper error: {0}")]
41 Hyper(#[from] hyper::Error),
42
43 /// The server responded with a non-success HTTP status code.
44 #[error("unexpected HTTP status: {0}")]
45 Status(hyper::StatusCode),
46}
47
48// ⚠️ This impl is NOT needed and conflicts with the standard library's blanket impl:
49// impl<E: Error + Send + Sync + 'static> From<E> for Box<dyn Error + Send + Sync>.
50// So we REMOVE it.
51// impl From<H2WsError> for Box<dyn std::error::Error + Send + Sync> { ... }
52
53/// An HTTP/2 connection that can open WebSocket streams via extended CONNECT.
54///
55/// Internally this wraps hyper's HTTP/2 client connection and keeps the
56/// `SendRequest` handle alive. The HTTP/2 connection driver is spawned on a
57/// background task.
58pub struct H2WsConnection {
59 send_request: http2::SendRequest<Body>,
60}
61
62impl fmt::Debug for H2WsConnection {
63 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64 f.debug_struct("H2WsConnection").finish()
65 }
66}
67
68impl H2WsConnection {
69 /// Establish a new HTTP/2 client connection over plain TCP (h2c).
70 ///
71 /// This helper is TCP-specific. If you need TLS, you can add a method like
72 /// `connect_tls` that wraps a TLS stream and passes it into `Self::from_io`.
73 pub async fn connect_tcp(addr: &str) -> Result<Self, H2WsError> {
74 // Connect the underlying TCP socket.
75 let tcp = TcpStream::connect(addr).await?;
76 let io = TokioIo::new(tcp);
77
78 // Perform the HTTP/2 handshake. This sets up the H2 state machine and
79 // returns:
80 // - a `SendRequest` handle for creating new streams
81 // - a connection future that drives the H2 state forward
82 let (send_request, conn) = http2::Builder::new(TokioExecutor::new())
83 .handshake(io)
84 .await?;
85
86 // Spawn the H2 connection driver on a background task. It is responsible
87 // for reading/writing frames, handling flow control, etc.
88 tokio::spawn(async move {
89 if let Err(e) = conn.await {
90 error!("h2 connection error: {e}");
91 }
92 });
93
94 Ok(Self { send_request })
95 }
96
97 /// Create an `H2WsConnection` from an arbitrary I/O stream.
98 ///
99 /// The `io` type must implement `AsyncRead + AsyncWrite + Unpin + Send + 'static`.
100 /// This allows integrating with custom transports (TLS, Unix sockets, etc).
101 pub async fn from_io<I>(io: I) -> Result<Self, H2WsError>
102 where
103 I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
104 {
105 let io = TokioIo::new(io);
106
107 let (send_request, conn) = http2::Builder::new(TokioExecutor::new())
108 .handshake(io)
109 .await?;
110
111 tokio::spawn(async move {
112 if let Err(e) = conn.await {
113 error!("h2 connection error: {e}");
114 }
115 });
116
117 Ok(Self { send_request })
118 }
119
120 /// Open a WebSocket over this HTTP/2 connection using RFC 8441 extended CONNECT.
121 ///
122 /// - `path`: HTTP request path, e.g. `"/echo"`.
123 /// - `host`: value for the `Host` header, e.g. `"localhost"`.
124 /// - `subprotocol`: optional WebSocket subprotocol, e.g. `"echo"` or `"graphql-ws"`.
125 ///
126 /// This method:
127 /// 1. Sends a `CONNECT` request with `:protocol = "websocket"`.
128 /// 2. Checks that the status code is 2xx.
129 /// 3. Calls `hyper::upgrade::on` to obtain the upgraded I/O stream.
130 /// 4. Wraps the upgraded stream in `tokio_tungstenite::WebSocketStream`.
131 pub async fn connect_websocket(
132 &mut self,
133 path: &str,
134 host: &str,
135 subprotocol: Option<&str>,
136 ) -> Result<H2WebSocketStream, H2WsError> {
137 // Build the CONNECT request with :protocol = "websocket".
138 let mut builder = Request::builder()
139 .method(Method::CONNECT)
140 .uri(path)
141 .extension(Protocol::from_static("websocket"))
142 .header("sec-websocket-version", "13")
143 .header(header::HOST, host);
144
145 if let Some(proto) = subprotocol {
146 builder = builder.header("sec-websocket-protocol", proto);
147 }
148
149 let req: Request<Body> = builder
150 .body(Empty::new())
151 .expect("request builder never fails");
152
153 // Send the request on a new HTTP/2 stream.
154 let mut response = self.send_request.send_request(req).await?;
155
156 // For WebSocket over CONNECT, the server should respond with a 2xx status code.
157 if !response.status().is_success() {
158 // Optionally collect the body for debugging. This is skipped for performance
159 // reasons, but you can uncomment if you want more details.
160 /*
161 let body_bytes = response
162 .into_body()
163 .collect()
164 .await
165 .map_err(H2WsError::Hyper)?
166 .to_bytes();
167 tracing::error!(
168 "WebSocket CONNECT failed: {} {}",
169 response.status(),
170 String::from_utf8_lossy(&body_bytes),
171 );
172 */
173 return Err(H2WsError::Status(response.status()));
174 }
175
176 // Upgrade this HTTP/2 stream into a raw I/O stream. For HTTP/2, this
177 // represents the DATA frame payloads of this CONNECT stream, abstracted
178 // as a bidirectional byte stream.
179 let upgraded = hyper::upgrade::on(&mut response).await?;
180 let upgraded = TokioIo::new(upgraded);
181
182 // Wrap the upgraded I/O in a WebSocketStream. From this point on we can
183 // use it like a normal WebSocket (send/recv messages).
184 let ws = WebSocketStream::from_raw_socket(upgraded, Role::Client, None).await;
185
186 Ok(ws)
187 }
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn error_display_is_reasonable() {
196 let status = hyper::StatusCode::BAD_REQUEST;
197 let err = H2WsError::Status(status);
198 let s = format!("{err}");
199 assert!(s.contains("unexpected HTTP status"));
200 assert!(s.contains("400"));
201 }
202}