1use core::fmt;
2use ngyn_shared::core::engine::{NgynPlatform, PlatformData, RouteInstance};
3use ngyn_shared::core::handler::RouteHandler;
4use ngyn_shared::server::response::ReadBytes;
5use ngyn_shared::server::NgynRequest;
6use std::io::ErrorKind;
7use std::net::ToSocketAddrs;
8use std::sync::{Arc, Mutex};
9use websocket::sync::Writer;
10use websocket::Message;
11use websocket::{sync::Server, OwnedMessage};
12
13#[derive(Default)]
14pub struct WebsocketApplication {
15 data: PlatformData,
16 clients: Arc<Mutex<Vec<Writer<std::net::TcpStream>>>>,
17}
18
19impl NgynPlatform for WebsocketApplication {
20 fn data_mut(&mut self) -> &mut PlatformData {
21 &mut self.data
22 }
23}
24
25impl WebsocketApplication {
26 pub fn route(&mut self, path: &str, handler: impl Into<RouteHandler>) {
28 self.add_route(path, None, handler.into());
29 }
30
31 pub fn broadcast(&self, message: &str) -> Result<(), websocket::WebSocketError> {
33 let mut clients = self
34 .clients
35 .lock()
36 .map_err(|_| websocket::WebSocketError::IoError(ErrorKind::InvalidData.into()))?;
37
38 for client in clients.iter_mut() {
39 client.send_message(&OwnedMessage::Text(message.to_string()))?;
40 }
41
42 Ok(())
43 }
44
45 pub fn listen<A: ToSocketAddrs + fmt::Debug>(
55 self,
56 addr: A,
57 ) -> Result<(), Box<dyn std::error::Error>> {
58 let server = Server::bind(addr)?;
59 let data_handler = Arc::new(self.data);
60
61 for request in server.filter_map(Result::ok) {
62 let path = request.uri();
63 let clients = Arc::clone(&self.clients);
64 let data_handler = data_handler.clone();
65
66 tokio::spawn(async move {
67 if let Ok(client) = request.accept() {
68 let (mut receiver, mut sender) = client.split().unwrap();
69 for message in receiver.incoming_messages() {
70 match message {
71 Ok(OwnedMessage::Text(_)) | Ok(OwnedMessage::Binary(_)) => {
72 let body = match message.unwrap() {
74 OwnedMessage::Binary(data) => data,
75 OwnedMessage::Text(data) => data.into(),
76 _ => return,
77 };
78 let mut req = NgynRequest::new(body);
79 *req.uri_mut() = path.parse().unwrap_or_default();
81
82 let mut response = data_handler.respond(req).await;
83
84 if let Ok(data) = response.read_bytes().await {
85 let message =
86 if response.headers().get("Content-Type").is_none() {
87 Message::text(String::from_utf8_lossy(&data))
88 } else {
89 Message::binary(data.to_vec())
90 };
91 sender.send_message(&message).unwrap();
92 }
93 }
94 Ok(OwnedMessage::Close(_)) => {
95 let message = Message::close();
96 sender.send_message(&message).unwrap();
97 break;
98 }
99 Ok(OwnedMessage::Ping(data)) => {
100 let message = Message::pong(data);
101 sender.send_message(&message).unwrap();
102 break;
103 }
104 Err(_) => break,
105 _ => {}
106 }
107 }
108
109 if let Ok(mut client_list) = clients.lock() {
111 client_list.push(sender);
112 }
113 }
114 });
115 }
116
117 Ok(())
118 }
119}