h2_ws_client/
lib.rs

1//! Minimal client-side WebSocket over HTTP/2 on top of hyper and tokio-tungstenite.
2//!
3//! This crate provides a small abstraction that:
4//! - Establishes an HTTP/2 connection over an arbitrary I/O (TCP for now)
5//! - Issues an RFC 8441 extended CONNECT request with `:protocol = "websocket"`
6//! - Returns a `tokio_tungstenite::WebSocketStream` you can use as a normal WebSocket.
7
8use std::fmt;
9
10use bytes::Bytes;
11use http::{Method, Request, header};
12use http_body_util::Empty;
13use hyper::client::conn::http2;
14use hyper::ext::Protocol;
15use hyper_util::rt::{TokioExecutor, TokioIo};
16use thiserror::Error;
17use tokio::net::TcpStream;
18use tokio_tungstenite::{WebSocketStream, tungstenite::protocol::Role};
19use tracing::error;
20
21/// Body type used for HTTP/2 CONNECT requests.
22///
23/// We do not send a request body for WebSocket CONNECT, so this is always empty.
24type Body = Empty<Bytes>;
25
26/// Type alias for the WebSocket stream used by this crate.
27///
28/// The inner I/O is a hyper `Upgraded` stream wrapped in `TokioIo`,
29/// which implements `AsyncRead + AsyncWrite`.
30pub type H2WebSocketStream = WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>;
31
32/// Errors that can occur when establishing or using an HTTP/2 WebSocket connection.
33#[derive(Debug, Error)]
34pub enum H2WsError {
35    /// I/O error while creating the underlying TCP connection.
36    #[error("I/O error: {0}")]
37    Io(#[from] std::io::Error),
38
39    /// Error returned from hyper (handshake, request send, upgrade, etc.).
40    #[error("hyper error: {0}")]
41    Hyper(#[from] hyper::Error),
42
43    /// The server responded with a non-success HTTP status code.
44    #[error("unexpected HTTP status: {0}")]
45    Status(hyper::StatusCode),
46}
47
48// ⚠️ This impl is NOT needed and conflicts with the standard library's blanket impl:
49// impl<E: Error + Send + Sync + 'static> From<E> for Box<dyn Error + Send + Sync>.
50// So we REMOVE it.
51// impl From<H2WsError> for Box<dyn std::error::Error + Send + Sync> { ... }
52
53/// An HTTP/2 connection that can open WebSocket streams via extended CONNECT.
54///
55/// Internally this wraps hyper's HTTP/2 client connection and keeps the
56/// `SendRequest` handle alive. The HTTP/2 connection driver is spawned on a
57/// background task.
58pub struct H2WsConnection {
59    send_request: http2::SendRequest<Body>,
60}
61
62impl fmt::Debug for H2WsConnection {
63    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
64        f.debug_struct("H2WsConnection").finish()
65    }
66}
67
68impl H2WsConnection {
69    /// Establish a new HTTP/2 client connection over plain TCP (h2c).
70    ///
71    /// This helper is TCP-specific. If you need TLS, you can add a method like
72    /// `connect_tls` that wraps a TLS stream and passes it into `Self::from_io`.
73    pub async fn connect_tcp(addr: &str) -> Result<Self, H2WsError> {
74        // Connect the underlying TCP socket.
75        let tcp = TcpStream::connect(addr).await?;
76        let io = TokioIo::new(tcp);
77
78        // Perform the HTTP/2 handshake. This sets up the H2 state machine and
79        // returns:
80        // - a `SendRequest` handle for creating new streams
81        // - a connection future that drives the H2 state forward
82        let (send_request, conn) = http2::Builder::new(TokioExecutor::new())
83            .handshake(io)
84            .await?;
85
86        // Spawn the H2 connection driver on a background task. It is responsible
87        // for reading/writing frames, handling flow control, etc.
88        tokio::spawn(async move {
89            if let Err(e) = conn.await {
90                error!("h2 connection error: {e}");
91            }
92        });
93
94        Ok(Self { send_request })
95    }
96
97    /// Create an `H2WsConnection` from an arbitrary I/O stream.
98    ///
99    /// The `io` type must implement `AsyncRead + AsyncWrite + Unpin + Send + 'static`.
100    /// This allows integrating with custom transports (TLS, Unix sockets, etc).
101    pub async fn from_io<I>(io: I) -> Result<Self, H2WsError>
102    where
103        I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
104    {
105        let io = TokioIo::new(io);
106
107        let (send_request, conn) = http2::Builder::new(TokioExecutor::new())
108            .handshake(io)
109            .await?;
110
111        tokio::spawn(async move {
112            if let Err(e) = conn.await {
113                error!("h2 connection error: {e}");
114            }
115        });
116
117        Ok(Self { send_request })
118    }
119
120    /// Open a WebSocket over this HTTP/2 connection using RFC 8441 extended CONNECT.
121    ///
122    /// - `path`: HTTP request path, e.g. `"/echo"`.
123    /// - `host`: value for the `Host` header, e.g. `"localhost"`.
124    /// - `subprotocol`: optional WebSocket subprotocol, e.g. `"echo"` or `"graphql-ws"`.
125    ///
126    /// This method:
127    /// 1. Sends a `CONNECT` request with `:protocol = "websocket"`.
128    /// 2. Checks that the status code is 2xx.
129    /// 3. Calls `hyper::upgrade::on` to obtain the upgraded I/O stream.
130    /// 4. Wraps the upgraded stream in `tokio_tungstenite::WebSocketStream`.
131    pub async fn connect_websocket(
132        &mut self,
133        path: &str,
134        host: &str,
135        subprotocol: Option<&str>,
136    ) -> Result<H2WebSocketStream, H2WsError> {
137        // Build the CONNECT request with :protocol = "websocket".
138        let mut builder = Request::builder()
139            .method(Method::CONNECT)
140            .uri(path)
141            .extension(Protocol::from_static("websocket"))
142            .header("sec-websocket-version", "13")
143            .header(header::HOST, host);
144
145        if let Some(proto) = subprotocol {
146            builder = builder.header("sec-websocket-protocol", proto);
147        }
148
149        let req: Request<Body> = builder
150            .body(Empty::new())
151            .expect("request builder never fails");
152
153        // Send the request on a new HTTP/2 stream.
154        let mut response = self.send_request.send_request(req).await?;
155
156        // For WebSocket over CONNECT, the server should respond with a 2xx status code.
157        if !response.status().is_success() {
158            // Optionally collect the body for debugging. This is skipped for performance
159            // reasons, but you can uncomment if you want more details.
160            /*
161            let body_bytes = response
162                .into_body()
163                .collect()
164                .await
165                .map_err(H2WsError::Hyper)?
166                .to_bytes();
167            tracing::error!(
168                "WebSocket CONNECT failed: {} {}",
169                response.status(),
170                String::from_utf8_lossy(&body_bytes),
171            );
172            */
173            return Err(H2WsError::Status(response.status()));
174        }
175
176        // Upgrade this HTTP/2 stream into a raw I/O stream. For HTTP/2, this
177        // represents the DATA frame payloads of this CONNECT stream, abstracted
178        // as a bidirectional byte stream.
179        let upgraded = hyper::upgrade::on(&mut response).await?;
180        let upgraded = TokioIo::new(upgraded);
181
182        // Wrap the upgraded I/O in a WebSocketStream. From this point on we can
183        // use it like a normal WebSocket (send/recv messages).
184        let ws = WebSocketStream::from_raw_socket(upgraded, Role::Client, None).await;
185
186        Ok(ws)
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn error_display_is_reasonable() {
196        let status = hyper::StatusCode::BAD_REQUEST;
197        let err = H2WsError::Status(status);
198        let s = format!("{err}");
199        assert!(s.contains("unexpected HTTP status"));
200        assert!(s.contains("400"));
201    }
202}