myko_rs/server.rs
1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::{stream::StreamExt, SinkExt};
4use tokio::{
5 net::TcpListener,
6 sync::{
7 broadcast,
8 mpsc::{self, Receiver},
9 Mutex,
10 },
11};
12use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};
13
14use crate::{event::MEvent, module::Module};
15
16#[derive(PartialEq)]
17enum StartupState {
18 Off,
19 HasModules,
20 Bound,
21}
22
23pub struct Server {
24 startup_state: StartupState,
25 config: ServerConfig,
26 modules_map: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
27}
28
29pub struct ServerConfig {
30 pub kafka_brokers: &'static [&'static str],
31}
32
33impl Server {
34 pub fn new(config: ServerConfig) -> Server {
35 Server {
36 startup_state: StartupState::Off,
37 config,
38 modules_map: Arc::new(Mutex::new(HashMap::new())),
39 }
40 }
41
42 pub async fn add_modules(mut self, mut modules: Vec<Box<dyn Module + Send>>) -> Server {
43 if self.startup_state != StartupState::Off {
44 panic!("Cannot add modules after startup");
45 }
46
47 let (from_kafka_tx, from_kafka_rx) = mpsc::channel::<MEvent>(100);
48
49 // for module in modules.iter_mut() {
50 // module
51 // .start_kafka(self.config.kafka_brokers, from_kafka_tx.clone())
52 // .await;
53 // }
54
55 self.modules_map = Arc::new(Mutex::new(
56 modules.into_iter().map(|m| (m.entity_name(), m)).collect(),
57 ));
58
59 self.startup_state = StartupState::HasModules;
60
61 listen_kafka(from_kafka_rx, self.modules_map.clone());
62
63 self
64 }
65
66 pub async fn start(mut self) {
67 if self.startup_state != StartupState::HasModules
68 || self.modules_map.lock().await.is_empty()
69 {
70 panic!("Cannot start without modules");
71 }
72
73 let (broadcast_tx, _) = tokio::sync::broadcast::channel::<Message>(100);
74
75 let mut port = 5156;
76 let max_port = 5255;
77
78 loop {
79 let address = format!("0.0.0.0:{}", port);
80
81 match TcpListener::bind(&address).await {
82 Ok(listener) => {
83 println!("WebSocket server listening on {}", address);
84 while let Ok((stream, _)) = listener.accept().await {
85 let r = self.modules_map.clone();
86 tokio::spawn(handle_connection(
87 stream,
88 r,
89 broadcast_tx.clone(),
90 broadcast_tx.subscribe(),
91 ));
92 }
93 self.startup_state = StartupState::Bound;
94 break; // Exit loop if successfully bound
95 }
96 Err(e) => {
97 println!("Failed to bind to port {}: {}", port, e);
98 port += 1;
99 if port > max_port {
100 eprintln!("Exceeded maximum port limit");
101 return;
102 }
103 }
104 }
105 }
106 }
107}
108
109fn listen_kafka(
110 mut from_kafka_rx: Receiver<MEvent>,
111 modules: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
112) {
113 tokio::spawn(async move {
114 while let Some(event) = from_kafka_rx.recv().await {
115 let mut modules = modules.lock().await;
116
117 let module = modules.get_mut(&event.item_type());
118
119 match module {
120 Some(module) => {
121 module.process_event(event.clone(), false).await;
122 }
123 None => {
124 println!("No module found for event type: {}", event.item_type());
125 }
126 }
127 }
128 });
129}
130
131async fn handle_connection(
132 stream: tokio::net::TcpStream,
133 modules: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
134 broadcast_tx: broadcast::Sender<Message>,
135 mut broadcast_rx: broadcast::Receiver<Message>,
136) {
137 println!("New WebSocket connection");
138 let ws_stream = accept_async(stream)
139 .await
140 .expect("Error during WebSocket handshake");
141 let (mut ws_write, mut ws_read) = ws_stream.split();
142
143 let (to_ws_tx, mut to_ws_rx) = mpsc::channel::<Message>(100);
144
145 tokio::spawn(async move {
146 while let Some(message) = to_ws_rx.recv().await {
147 if (ws_write.send(message).await).is_err() {
148 println!("Failed to send message to WebSocket");
149 break;
150 }
151 }
152 });
153
154 let broadcast_ws_tx = to_ws_tx.clone();
155
156 tokio::spawn(async move {
157 while let Ok(message) = broadcast_rx.recv().await {
158 if (broadcast_ws_tx.send(message).await).is_err() {
159 println!("Failed to send message to WebSocket");
160 break;
161 }
162 }
163 });
164
165 while let Some(message) = ws_read.next().await {
166 match message {
167 Ok(message) => {
168 if message.is_text() {
169 let text = match message.to_text() {
170 Ok(t) => t,
171 Err(e) => {
172 println!("Failed to convert message into text: {}", e);
173 continue;
174 }
175 };
176
177 match MEvent::from_str_trim(text) {
178 Ok(event) => {
179 let mut modules = modules.lock().await;
180
181 let module = modules.get_mut(&event.item_type());
182
183 match module {
184 Some(module) => {
185 module.process_event(event.clone(), true).await;
186 }
187 None => {
188 println!(
189 "No module found for event type: {}",
190 event.item_type()
191 );
192 }
193 }
194
195 continue;
196 // todo!("Process Event to all modules, and continue");
197 }
198 Err(_e) => {}
199 };
200
201 // match Query::from_str_trim(text) {
202 // Ok(query) => {
203 // println!("Received query: {:?}", query);
204
205 // let mut modules = modules.lock().await;
206
207 // let item_type = match query.clone() {
208 // Query::Watch(q) => q.item_type.clone(),
209 // Query::WatchId(q) => q.item_type.clone(),
210 // };
211
212 // let module = modules.get_mut(&item_type);
213
214 // if module.is_none() {
215 // println!("No module found for item type: {}", item_type);
216 // continue;
217 // }
218
219 // let module = module.unwrap();
220
221 // if let Some(mut rx) = module.handle_query(query.clone()).await {
222 // let tx_clone = to_ws_tx.clone();
223
224 // tokio::spawn(async move {
225 // while let Some(response) = rx.recv().await {
226 // let response_str = match response.to_string() {
227 // Ok(s) => s,
228 // Err(e) => {
229 // println!(
230 // "Failed to convert response to string: {}",
231 // e
232 // );
233 // continue;
234 // }
235 // };
236
237 // if (tx_clone
238 // .clone()
239 // .send(Message::from(response_str))
240 // .await)
241 // .is_err()
242 // {
243 // break;
244 // }
245 // }
246 // });
247 // };
248
249 // continue;
250 // }
251 // Err(_e) => {
252 // println!("Failed to parse query: {}", _e);
253 // }
254 // };
255
256 println!("Received other message, broadcasting to all connections");
257 broadcast_tx.send(message).unwrap();
258 }
259 }
260 Err(e) => {
261 println!("Failed to receive message from WebSocket: {}", e);
262 break;
263 }
264 }
265 }
266}