ngyn_websocket/
lib.rs

1use core::fmt;
2use ngyn_shared::core::engine::{NgynPlatform, PlatformData, RouteInstance};
3use ngyn_shared::core::handler::RouteHandler;
4use ngyn_shared::server::response::ReadBytes;
5use ngyn_shared::server::NgynRequest;
6use std::io::ErrorKind;
7use std::net::ToSocketAddrs;
8use std::sync::{Arc, Mutex};
9use websocket::sync::Writer;
10use websocket::Message;
11use websocket::{sync::Server, OwnedMessage};
12
13#[derive(Default)]
14pub struct WebsocketApplication {
15    data: PlatformData,
16    clients: Arc<Mutex<Vec<Writer<std::net::TcpStream>>>>,
17}
18
19impl NgynPlatform for WebsocketApplication {
20    fn data_mut(&mut self) -> &mut PlatformData {
21        &mut self.data
22    }
23}
24
25impl WebsocketApplication {
26    /// add a route to handle
27    pub fn route(&mut self, path: &str, handler: impl Into<RouteHandler>) {
28        self.add_route(path, None, handler.into());
29    }
30
31    // Broadcast message to all connected clients
32    pub fn broadcast(&self, message: &str) -> Result<(), websocket::WebSocketError> {
33        let mut clients = self
34            .clients
35            .lock()
36            .map_err(|_| websocket::WebSocketError::IoError(ErrorKind::InvalidData.into()))?;
37
38        for client in clients.iter_mut() {
39            client.send_message(&OwnedMessage::Text(message.to_string()))?;
40        }
41
42        Ok(())
43    }
44
45    /// Listens for incoming connections and serves the application.
46    ///
47    /// ### Arguments
48    ///
49    /// * `addr` - The address to listen on.
50    ///
51    /// ### Returns
52    ///
53    /// A `Result` indicating success or failure.
54    pub fn listen<A: ToSocketAddrs + fmt::Debug>(
55        self,
56        addr: A,
57    ) -> Result<(), Box<dyn std::error::Error>> {
58        let server = Server::bind(addr)?;
59        let data_handler = Arc::new(self.data);
60
61        for request in server.filter_map(Result::ok) {
62            let path = request.uri();
63            let clients = Arc::clone(&self.clients);
64            let data_handler = data_handler.clone();
65
66            tokio::spawn(async move {
67                if let Ok(client) = request.accept() {
68                    let (mut receiver, mut sender) = client.split().unwrap();
69                    for message in receiver.incoming_messages() {
70                        match message {
71                            Ok(OwnedMessage::Text(_)) | Ok(OwnedMessage::Binary(_)) => {
72                                // Infallible at this point, so we can safely call `unwrap`
73                                let body = match message.unwrap() {
74                                    OwnedMessage::Binary(data) => data,
75                                    OwnedMessage::Text(data) => data.into(),
76                                    _ => return,
77                                };
78                                let mut req = NgynRequest::new(body);
79                                // default to index url if parsing fails
80                                *req.uri_mut() = path.parse().unwrap_or_default();
81
82                                let mut response = data_handler.respond(req).await;
83
84                                if let Ok(data) = response.read_bytes().await {
85                                    let message =
86                                        if response.headers().get("Content-Type").is_none() {
87                                            Message::text(String::from_utf8_lossy(&data))
88                                        } else {
89                                            Message::binary(data.to_vec())
90                                        };
91                                    sender.send_message(&message).unwrap();
92                                }
93                            }
94                            Ok(OwnedMessage::Close(_)) => {
95                                let message = Message::close();
96                                sender.send_message(&message).unwrap();
97                                break;
98                            }
99                            Ok(OwnedMessage::Ping(data)) => {
100                                let message = Message::pong(data);
101                                sender.send_message(&message).unwrap();
102                                break;
103                            }
104                            Err(_) => break,
105                            _ => {}
106                        }
107                    }
108
109                    // Add client to the list of connected clients
110                    if let Ok(mut client_list) = clients.lock() {
111                        client_list.push(sender);
112                    }
113                }
114            });
115        }
116
117        Ok(())
118    }
119}