lynx_core/proxy/
websocket_proxy.rs1use anyhow::{anyhow, Error, Result};
2use futures_util::{SinkExt, StreamExt};
3use http_body_util::combinators::BoxBody;
4use http_body_util::BodyExt;
5use hyper::body::{Bytes, Incoming};
6use hyper::{Request, Response};
7use hyper_tungstenite::HyperWebsocket;
8use tokio_tungstenite::tungstenite::Message;
9
10pub struct WebsocketProxy {}
11
12impl WebsocketProxy {
13 pub async fn proxy(
14 &self,
15 req: Request<Incoming>,
16 ) -> anyhow::Result<Response<BoxBody<Bytes, Error>>> {
17 let mut req = req;
18 let (response, websocket) = hyper_tungstenite::upgrade(&mut req, None)?;
19
20 tokio::spawn(async move {
22 if let Err(e) = serve_websocket(websocket).await {
23 eprintln!("Error in websocket connection: {e}");
24 }
25 });
26 let (parts, body) = response.into_parts();
27
28 let body = body.boxed().map_err(|e| anyhow!(e)).boxed();
29
30 Ok(Response::from_parts(parts, body))
31 }
32}
33
34async fn serve_websocket(websocket: HyperWebsocket) -> Result<()> {
36 let mut websocket = websocket.await?;
37 while let Some(message) = websocket.next().await {
38 match message? {
39 Message::Text(msg) => {
40 println!("Received text message: {msg}");
41 websocket
42 .send(Message::text("Thank you, come again."))
43 .await?;
44 }
45 Message::Binary(msg) => {
46 println!("Received binary message: {msg:02X?}");
47 websocket
48 .send(Message::binary(b"Thank you, come again.".to_vec()))
49 .await?;
50 }
51 Message::Ping(msg) => {
52 println!("Received ping message: {msg:02X?}");
54 }
55 Message::Pong(msg) => {
56 println!("Received pong message: {msg:02X?}");
57 }
58 Message::Close(msg) => {
59 if let Some(msg) = &msg {
61 println!(
62 "Received close message with code {} and message: {}",
63 msg.code, msg.reason
64 );
65 } else {
66 println!("Received close message");
67 }
68 }
69 Message::Frame(_msg) => {
70 unreachable!();
71 }
72 }
73 }
74
75 Ok(())
76}