hyperlane-plugin-websocket 3.1.2

A WebSocket plugin for the Hyperlane framework, providing robust WebSocket communication capabilities and integrating with hyperlane-broadcast for efficient message dissemination.
Documentation

hyperlane-plugin-websocket

Official Documentation

Api Docs

A WebSocket plugin for the Hyperlane framework, providing robust WebSocket communication capabilities and integrating with hyperlane-broadcast for efficient message dissemination.

Installation

To use this crate, you can run cmd:

cargo add hyperlane-plugin-websocket

Use

use hyperlane::*;
use hyperlane_plugin_websocket::*;

struct RequestMiddleware {
    socket_addr: String,
}
struct UpgradeHook;
struct GroupChat;
struct PrivateChat {
    config: WebSocketConfig<String>,
}
struct ConnectedHook {
    receiver_count: ReceiverCount,
    data: String,
    group_broadcast_type: BroadcastType<String>,
    private_broadcast_type: BroadcastType<String>,
}
struct PrivateClosedHook {
    body: String,
    receiver_count: ReceiverCount,
}
struct SendedHook {
    msg: String,
}
struct GroupChatRequestHook {
    body: RequestBody,
    receiver_count: ReceiverCount,
}
struct GroupClosedHook {
    body: String,
    receiver_count: ReceiverCount,
}
struct PrivateChatRequestHook {
    body: RequestBody,
    receiver_count: ReceiverCount,
}

static BROADCAST_MAP: OnceLock<WebSocket> = OnceLock::new();

fn get_broadcast_map() -> &'static WebSocket {
    BROADCAST_MAP.get_or_init(|| WebSocket::new())
}

impl ServerHook for RequestMiddleware {
    async fn new(ctx: &Context) -> Self {
        let socket_addr: String = ctx.get_socket_addr_string().await;
        Self { socket_addr }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_version(HttpVersion::HTTP1_1)
            .await
            .set_response_status_code(200)
            .await
            .set_response_header(SERVER, HYPERLANE)
            .await
            .set_response_header(CONNECTION, KEEP_ALIVE)
            .await
            .set_response_header(CONTENT_TYPE, TEXT_PLAIN)
            .await
            .set_response_header(ACCESS_CONTROL_ALLOW_ORIGIN, WILDCARD_ANY)
            .await
            .set_response_header("SocketAddr", &self.socket_addr)
            .await;
    }
}

impl ServerHook for UpgradeHook {
    async fn new(_ctx: &Context) -> Self {
        Self
    }

    async fn handle(self, ctx: &Context) {
        if !ctx.get_request().await.is_ws() {
            return;
        }
        if let Some(key) = &ctx.try_get_request_header_back(SEC_WEBSOCKET_KEY).await {
            let accept_key: String = WebSocketFrame::generate_accept_key(key);
            ctx.set_response_status_code(101)
                .await
                .set_response_header(UPGRADE, WEBSOCKET)
                .await
                .set_response_header(CONNECTION, UPGRADE)
                .await
                .set_response_header(SEC_WEBSOCKET_ACCEPT, &accept_key)
                .await
                .set_response_body(&vec![])
                .await
                .send()
                .await
                .unwrap();
        }
    }
}

impl ServerHook for ConnectedHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx
            .try_get_route_param("group_name")
            .await
            .unwrap_or_default();
        let group_broadcast_type: BroadcastType<String> =
            BroadcastType::PointToGroup(group_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count(group_broadcast_type.clone());
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap_or_default();
        let your_name: String = ctx
            .try_get_route_param("your_name")
            .await
            .unwrap_or_default();
        let private_broadcast_type: BroadcastType<String> =
            BroadcastType::PointToPoint(my_name, your_name);
        let data: String = format!("receiver_count => {:?}", receiver_count).into();
        Self {
            receiver_count,
            data,
            group_broadcast_type,
            private_broadcast_type,
        }
    }

    async fn handle(self, _ctx: &Context) {
        get_broadcast_map()
            .send(self.group_broadcast_type, self.data.clone())
            .unwrap_or_else(|err| {
                println!("[connected_hook]send group error => {:?}", err.to_string());
                None
            });
        get_broadcast_map()
            .send(self.private_broadcast_type, self.data)
            .unwrap_or_else(|err| {
                println!(
                    "[connected_hook]send private error => {:?}",
                    err.to_string()
                );
                None
            });
        println!(
            "[connected_hook]receiver_count => {:?}",
            self.receiver_count
        );
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for GroupChatRequestHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let mut receiver_count: ReceiverCount = get_broadcast_map().receiver_count(key.clone());
        let mut body: RequestBody = ctx.get_request_body().await;
        if body.is_empty() {
            receiver_count = get_broadcast_map().receiver_count_after_closed(key);
            body = format!("receiver_count => {:?}", receiver_count).into();
        }
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[group_chat]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for GroupClosedHook {
    async fn new(ctx: &Context) -> Self {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count_after_closed(key.clone());
        let body: String = format!("receiver_count => {:?}", receiver_count);
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[group_closed]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateChatRequestHook {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let mut receiver_count: ReceiverCount = get_broadcast_map().receiver_count(key.clone());
        let mut body: RequestBody = ctx.get_request_body().await;
        if body.is_empty() {
            receiver_count = get_broadcast_map().receiver_count_after_closed(key);
            body = format!("receiver_count => {:?}", receiver_count).into();
        }
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!("[private_chat]receiver_count => {:?}", self.receiver_count);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateClosedHook {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let receiver_count: ReceiverCount =
            get_broadcast_map().receiver_count_after_closed(key);
        let body: String = format!("receiver_count => {:?}", receiver_count);
        Self {
            body,
            receiver_count,
        }
    }

    async fn handle(self, ctx: &Context) {
        ctx.set_response_body(&self.body).await;
        println!(
            "[private_closed]receiver_count => {:?}",
            self.receiver_count
        );
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for SendedHook {
    async fn new(ctx: &Context) -> Self {
        let msg: String = ctx.get_response_body_string().await;
        Self { msg }
    }

    async fn handle(self, _ctx: &Context) {
        println!("[sended_hook]msg => {}", self.msg);
        let _ = std::io::Write::flush(&mut std::io::stdout());
    }
}

impl ServerHook for PrivateChat {
    async fn new(ctx: &Context) -> Self {
        let my_name: String = ctx.try_get_route_param("my_name").await.unwrap();
        let your_name: String = ctx.try_get_route_param("your_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToPoint(my_name, your_name);
        let config: WebSocketConfig<String> = WebSocketConfig::new()
            .set_context(ctx.clone())
            .set_broadcast_type(key)
            .set_buffer_size(4096)
            .set_capacity(1024)
            .set_connected_hook::<ConnectedHook>()
            .set_request_hook::<PrivateChatRequestHook>()
            .set_sended_hook::<SendedHook>()
            .set_closed_hook::<PrivateClosedHook>();
        Self { config }
    }

    async fn handle(self, _ctx: &Context) {
        get_broadcast_map().run(self.config).await;
    }
}

impl ServerHook for GroupChat {
    async fn new(_ctx: &Context) -> Self {
        Self
    }

    async fn handle(self, ctx: &Context) {
        let group_name: String = ctx.try_get_route_param("group_name").await.unwrap();
        let key: BroadcastType<String> = BroadcastType::PointToGroup(group_name);
        let config: WebSocketConfig<String> = WebSocketConfig::new()
            .set_context(ctx.clone())
            .set_broadcast_type(key)
            .set_buffer_size(4096)
            .set_capacity(1024)
            .set_connected_hook::<ConnectedHook>()
            .set_request_hook::<GroupChatRequestHook>()
            .set_sended_hook::<SendedHook>()
            .set_closed_hook::<GroupClosedHook>();
        get_broadcast_map().run(config).await;
    }
}

#[tokio::main]
async fn main() {
    let server: Server = Server::new().await;
    let config: ServerConfig = ServerConfig::new().await;
    config.host("0.0.0.0").await;
    config.port(60000).await;
    config.buffer(4096).await;
    config.disable_linger().await;
    config.disable_nodelay().await;
    server.config(config).await;
    server.request_middleware::<RequestMiddleware>().await;
    server.request_middleware::<UpgradeHook>().await;
    server.route::<GroupChat>("/{group_name}").await;
    server.route::<PrivateChat>("/{my_name}/{your_name}").await;
    let server_hook: ServerControlHook = server.run().await.unwrap_or_default();
    server_hook.wait().await;
}

License

This project is licensed under the MIT License. See the LICENSE file for details.

Contributing

Contributions are welcome! Please open an issue or submit a pull request.

Contact

For any inquiries, please reach out to the author at root@ltpp.vip.