myko_rs/
server.rs

1use std::{collections::HashMap, sync::Arc};
2
3use futures_util::{stream::StreamExt, SinkExt};
4use tokio::{
5    net::TcpListener,
6    sync::{
7        broadcast,
8        mpsc::{self, Receiver},
9        Mutex,
10    },
11};
12use tokio_tungstenite::{accept_async, tungstenite::protocol::Message};
13
14use crate::{event::MEvent, module::Module};
15
16#[derive(PartialEq)]
17enum StartupState {
18    Off,
19    HasModules,
20    Bound,
21}
22
23pub struct Server {
24    startup_state: StartupState,
25    config: ServerConfig,
26    modules_map: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
27}
28
29pub struct ServerConfig {
30    pub kafka_brokers: &'static [&'static str],
31}
32
33impl Server {
34    pub fn new(config: ServerConfig) -> Server {
35        Server {
36            startup_state: StartupState::Off,
37            config,
38            modules_map: Arc::new(Mutex::new(HashMap::new())),
39        }
40    }
41
42    pub async fn add_modules(mut self, mut modules: Vec<Box<dyn Module + Send>>) -> Server {
43        if self.startup_state != StartupState::Off {
44            panic!("Cannot add modules after startup");
45        }
46
47        let (from_kafka_tx, from_kafka_rx) = mpsc::channel::<MEvent>(100);
48
49        // for module in modules.iter_mut() {
50        //     module
51        //         .start_kafka(self.config.kafka_brokers, from_kafka_tx.clone())
52        //         .await;
53        // }
54
55        self.modules_map = Arc::new(Mutex::new(
56            modules.into_iter().map(|m| (m.entity_name(), m)).collect(),
57        ));
58
59        self.startup_state = StartupState::HasModules;
60
61        listen_kafka(from_kafka_rx, self.modules_map.clone());
62
63        self
64    }
65
66    pub async fn start(mut self) {
67        if self.startup_state != StartupState::HasModules
68            || self.modules_map.lock().await.is_empty()
69        {
70            panic!("Cannot start without modules");
71        }
72
73        let (broadcast_tx, _) = tokio::sync::broadcast::channel::<Message>(100);
74
75        let mut port = 5156;
76        let max_port = 5255;
77
78        loop {
79            let address = format!("0.0.0.0:{}", port);
80
81            match TcpListener::bind(&address).await {
82                Ok(listener) => {
83                    println!("WebSocket server listening on {}", address);
84                    while let Ok((stream, _)) = listener.accept().await {
85                        let r = self.modules_map.clone();
86                        tokio::spawn(handle_connection(
87                            stream,
88                            r,
89                            broadcast_tx.clone(),
90                            broadcast_tx.subscribe(),
91                        ));
92                    }
93                    self.startup_state = StartupState::Bound;
94                    break; // Exit loop if successfully bound
95                }
96                Err(e) => {
97                    println!("Failed to bind to port {}: {}", port, e);
98                    port += 1;
99                    if port > max_port {
100                        eprintln!("Exceeded maximum port limit");
101                        return;
102                    }
103                }
104            }
105        }
106    }
107}
108
109fn listen_kafka(
110    mut from_kafka_rx: Receiver<MEvent>,
111    modules: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
112) {
113    tokio::spawn(async move {
114        while let Some(event) = from_kafka_rx.recv().await {
115            let mut modules = modules.lock().await;
116
117            let module = modules.get_mut(&event.item_type());
118
119            match module {
120                Some(module) => {
121                    module.process_event(event.clone(), false).await;
122                }
123                None => {
124                    println!("No module found for event type: {}", event.item_type());
125                }
126            }
127        }
128    });
129}
130
131async fn handle_connection(
132    stream: tokio::net::TcpStream,
133    modules: Arc<Mutex<HashMap<String, Box<dyn Module + Send>>>>,
134    broadcast_tx: broadcast::Sender<Message>,
135    mut broadcast_rx: broadcast::Receiver<Message>,
136) {
137    println!("New WebSocket connection");
138    let ws_stream = accept_async(stream)
139        .await
140        .expect("Error during WebSocket handshake");
141    let (mut ws_write, mut ws_read) = ws_stream.split();
142
143    let (to_ws_tx, mut to_ws_rx) = mpsc::channel::<Message>(100);
144
145    tokio::spawn(async move {
146        while let Some(message) = to_ws_rx.recv().await {
147            if (ws_write.send(message).await).is_err() {
148                println!("Failed to send message to WebSocket");
149                break;
150            }
151        }
152    });
153
154    let broadcast_ws_tx = to_ws_tx.clone();
155
156    tokio::spawn(async move {
157        while let Ok(message) = broadcast_rx.recv().await {
158            if (broadcast_ws_tx.send(message).await).is_err() {
159                println!("Failed to send message to WebSocket");
160                break;
161            }
162        }
163    });
164
165    while let Some(message) = ws_read.next().await {
166        match message {
167            Ok(message) => {
168                if message.is_text() {
169                    let text = match message.to_text() {
170                        Ok(t) => t,
171                        Err(e) => {
172                            println!("Failed to convert message into text: {}", e);
173                            continue;
174                        }
175                    };
176
177                    match MEvent::from_str_trim(text) {
178                        Ok(event) => {
179                            let mut modules = modules.lock().await;
180
181                            let module = modules.get_mut(&event.item_type());
182
183                            match module {
184                                Some(module) => {
185                                    module.process_event(event.clone(), true).await;
186                                }
187                                None => {
188                                    println!(
189                                        "No module found for event type: {}",
190                                        event.item_type()
191                                    );
192                                }
193                            }
194
195                            continue;
196                            // todo!("Process Event to all modules, and continue");
197                        }
198                        Err(_e) => {}
199                    };
200
201                    // match Query::from_str_trim(text) {
202                    //     Ok(query) => {
203                    //         println!("Received query: {:?}", query);
204
205                    //         let mut modules = modules.lock().await;
206
207                    //         let item_type = match query.clone() {
208                    //             Query::Watch(q) => q.item_type.clone(),
209                    //             Query::WatchId(q) => q.item_type.clone(),
210                    //         };
211
212                    //         let module = modules.get_mut(&item_type);
213
214                    //         if module.is_none() {
215                    //             println!("No module found for item type: {}", item_type);
216                    //             continue;
217                    //         }
218
219                    //         let module = module.unwrap();
220
221                    //         if let Some(mut rx) = module.handle_query(query.clone()).await {
222                    //             let tx_clone = to_ws_tx.clone();
223
224                    //             tokio::spawn(async move {
225                    //                 while let Some(response) = rx.recv().await {
226                    //                     let response_str = match response.to_string() {
227                    //                         Ok(s) => s,
228                    //                         Err(e) => {
229                    //                             println!(
230                    //                                 "Failed to convert response to string: {}",
231                    //                                 e
232                    //                             );
233                    //                             continue;
234                    //                         }
235                    //                     };
236
237                    //                     if (tx_clone
238                    //                         .clone()
239                    //                         .send(Message::from(response_str))
240                    //                         .await)
241                    //                         .is_err()
242                    //                     {
243                    //                         break;
244                    //                     }
245                    //                 }
246                    //             });
247                    //         };
248
249                    //         continue;
250                    //     }
251                    //     Err(_e) => {
252                    //         println!("Failed to parse query: {}", _e);
253                    //     }
254                    // };
255
256                    println!("Received other message, broadcasting to all connections");
257                    broadcast_tx.send(message).unwrap();
258                }
259            }
260            Err(e) => {
261                println!("Failed to receive message from WebSocket: {}", e);
262                break;
263            }
264        }
265    }
266}