hyperstack_server/websocket/
server.rs1use crate::bus::BusManager;
2use crate::view::ViewIndex;
3use crate::websocket::client_manager::ClientManager;
4use crate::websocket::frame::Mode;
5use crate::websocket::subscription::Subscription;
6use anyhow::Result;
7use futures_util::StreamExt;
8use std::net::SocketAddr;
9use std::sync::Arc;
10
11use tokio::net::{TcpListener, TcpStream};
12use tokio_tungstenite::accept_async;
13use tracing::{debug, error, info, warn};
14use uuid::Uuid;
15
16#[cfg(feature = "otel")]
17use crate::metrics::Metrics;
18
19pub struct WebSocketServer {
20 bind_addr: SocketAddr,
21 client_manager: ClientManager,
22 bus_manager: BusManager,
23 view_index: Arc<ViewIndex>,
24 max_clients: usize,
25 #[cfg(feature = "otel")]
26 metrics: Option<Arc<Metrics>>,
27}
28
29impl WebSocketServer {
30 #[cfg(feature = "otel")]
31 pub fn new(
32 bind_addr: SocketAddr,
33 bus_manager: BusManager,
34 view_index: Arc<ViewIndex>,
35 metrics: Option<Arc<Metrics>>,
36 ) -> Self {
37 Self {
38 bind_addr,
39 client_manager: ClientManager::new(),
40 bus_manager,
41 view_index,
42 max_clients: 10000,
43 metrics,
44 }
45 }
46
47 #[cfg(not(feature = "otel"))]
48 pub fn new(bind_addr: SocketAddr, bus_manager: BusManager, view_index: Arc<ViewIndex>) -> Self {
49 Self {
50 bind_addr,
51 client_manager: ClientManager::new(),
52 bus_manager,
53 view_index,
54 max_clients: 10000,
55 }
56 }
57
58 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
59 self.max_clients = max_clients;
60 self
61 }
62
63 pub async fn start(self) -> Result<()> {
64 info!(
65 "Starting WebSocket server on {} (max_clients: {})",
66 self.bind_addr, self.max_clients
67 );
68
69 let listener = TcpListener::bind(&self.bind_addr).await?;
70 info!("WebSocket server listening on {}", self.bind_addr);
71
72 self.client_manager.start_cleanup_task().await;
74
75 loop {
77 match listener.accept().await {
78 Ok((stream, addr)) => {
79 let client_count = self.client_manager.client_count().await;
81 if client_count >= self.max_clients {
82 warn!(
83 "Rejecting connection from {} - max clients ({}) reached",
84 addr, self.max_clients
85 );
86 if let Ok(mut ws_stream) = accept_async(stream).await {
88 let _ = ws_stream.close(None).await;
89 }
90 continue;
91 }
92
93 #[cfg(feature = "otel")]
95 if let Some(ref metrics) = self.metrics {
96 metrics.record_ws_connection();
97 }
98
99 info!(
100 "New WebSocket connection from {} ({}/{} clients)",
101 addr,
102 client_count + 1,
103 self.max_clients
104 );
105 let client_manager = self.client_manager.clone();
106 let bus_manager = self.bus_manager.clone();
107 let view_index = self.view_index.clone();
108 #[cfg(feature = "otel")]
109 let metrics = self.metrics.clone();
110
111 tokio::spawn(async move {
112 #[cfg(feature = "otel")]
113 let result = handle_connection(
114 stream,
115 client_manager,
116 bus_manager,
117 view_index,
118 metrics,
119 )
120 .await;
121 #[cfg(not(feature = "otel"))]
122 let result =
123 handle_connection(stream, client_manager, bus_manager, view_index)
124 .await;
125
126 if let Err(e) = result {
127 error!("WebSocket connection error: {}", e);
128 }
129 });
130 }
131 Err(e) => {
132 error!("Failed to accept connection: {}", e);
133 }
134 }
135 }
136 }
137}
138
139#[cfg(feature = "otel")]
140async fn handle_connection(
141 stream: TcpStream,
142 client_manager: ClientManager,
143 bus_manager: BusManager,
144 view_index: Arc<ViewIndex>,
145 metrics: Option<Arc<Metrics>>,
146) -> Result<()> {
147 let ws_stream = accept_async(stream).await?;
148 let client_id = Uuid::new_v4();
149 let connection_start = Instant::now();
150
151 info!("WebSocket connection established for client {}", client_id);
152
153 let (ws_sender, mut ws_receiver) = ws_stream.split();
154
155 client_manager.add_client(client_id, ws_sender).await?;
157
158 let mut active_subscriptions: Vec<String> = Vec::new();
160
161 loop {
163 tokio::select! {
164 ws_msg = ws_receiver.next() => {
165 match ws_msg {
166 Some(Ok(msg)) => {
167 if msg.is_close() {
168 info!("Client {} requested close", client_id);
169 break;
170 }
171
172 client_manager.update_client_last_seen(client_id).await;
173
174 if msg.is_text() {
175 if let Some(ref m) = metrics {
177 m.record_ws_message_received();
178 }
179
180 if let Ok(text) = msg.to_text() {
181 debug!("Received text message from client {}: {}", client_id, text);
182
183 if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
185 let view_id = subscription.view.clone();
186 client_manager.update_subscription(client_id, subscription.clone()).await;
187
188 if let Some(ref m) = metrics {
190 m.record_subscription_created(&view_id);
191 }
192 active_subscriptions.push(view_id);
193
194 attach_client_to_bus(
196 client_id,
197 subscription,
198 &client_manager,
199 &bus_manager,
200 &view_index,
201 metrics.clone(),
202 ).await;
203 } else {
204 debug!("Received non-subscription message from client {}: {}", client_id, text);
205 }
206 }
207 }
208 }
209 Some(Err(e)) => {
210 warn!("WebSocket error for client {}: {}", client_id, e);
211 break;
212 }
213 None => {
214 debug!("WebSocket stream ended for client {}", client_id);
215 break;
216 }
217 }
218 }
219 }
220 }
221
222 client_manager.remove_client(client_id).await;
224
225 if let Some(ref m) = metrics {
227 let duration_secs = connection_start.elapsed().as_secs_f64();
228 m.record_ws_disconnection(duration_secs);
229
230 for view_id in active_subscriptions {
232 m.record_subscription_removed(&view_id);
233 }
234 }
235
236 info!("Client {} disconnected", client_id);
237
238 Ok(())
239}
240
241#[cfg(not(feature = "otel"))]
242async fn handle_connection(
243 stream: TcpStream,
244 client_manager: ClientManager,
245 bus_manager: BusManager,
246 view_index: Arc<ViewIndex>,
247) -> Result<()> {
248 let ws_stream = accept_async(stream).await?;
249 let client_id = Uuid::new_v4();
250
251 info!("WebSocket connection established for client {}", client_id);
252
253 let (ws_sender, mut ws_receiver) = ws_stream.split();
254
255 client_manager.add_client(client_id, ws_sender).await?;
257
258 loop {
260 tokio::select! {
261 ws_msg = ws_receiver.next() => {
262 match ws_msg {
263 Some(Ok(msg)) => {
264 if msg.is_close() {
265 info!("Client {} requested close", client_id);
266 break;
267 }
268
269 client_manager.update_client_last_seen(client_id).await;
270
271 if msg.is_text() {
272 if let Ok(text) = msg.to_text() {
273 debug!("Received text message from client {}: {}", client_id, text);
274
275 if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
277 client_manager.update_subscription(client_id, subscription.clone()).await;
278
279 attach_client_to_bus(client_id, subscription, &client_manager, &bus_manager, &view_index).await;
281 } else {
282 debug!("Received non-subscription message from client {}: {}", client_id, text);
283 }
284 }
285 }
286 }
287 Some(Err(e)) => {
288 warn!("WebSocket error for client {}: {}", client_id, e);
289 break;
290 }
291 None => {
292 debug!("WebSocket stream ended for client {}", client_id);
293 break;
294 }
295 }
296 }
297 }
298 }
299
300 client_manager.remove_client(client_id).await;
302 info!("Client {} disconnected", client_id);
303
304 Ok(())
305}
306
307#[cfg(feature = "otel")]
308async fn attach_client_to_bus(
309 client_id: Uuid,
310 subscription: Subscription,
311 client_manager: &ClientManager,
312 bus_manager: &BusManager,
313 view_index: &ViewIndex,
314 metrics: Option<Arc<Metrics>>,
315) {
316 let view_id = &subscription.view;
317
318 let view_spec = match view_index.get_view(view_id) {
320 Some(spec) => spec,
321 None => {
322 warn!("Unknown view ID: {}", view_id);
323 return;
324 }
325 };
326
327 match view_spec.mode {
328 Mode::State => {
329 let key = subscription.key.as_deref().unwrap_or("");
330 let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
331
332 if !rx.borrow().is_empty() {
334 let data = rx.borrow().clone();
335 let _ = client_manager.send_to_client(client_id, data).await;
336 if let Some(ref m) = metrics {
337 m.record_ws_message_sent();
338 }
339 }
340
341 let client_mgr = client_manager.clone();
343 let metrics_clone = metrics.clone();
344 tokio::spawn(async move {
345 while rx.changed().await.is_ok() {
346 let data = rx.borrow().clone();
347 if client_mgr.send_to_client(client_id, data).await.is_err() {
348 break; }
350 if let Some(ref m) = metrics_clone {
351 m.record_ws_message_sent();
352 }
353 }
354 });
355 }
356 Mode::Kv | Mode::Append => {
357 let mut rx = bus_manager.get_or_create_kv_bus(view_id).await;
358
359 let client_mgr = client_manager.clone();
360 let sub = subscription.clone();
361 let metrics_clone = metrics.clone();
362 tokio::spawn(async move {
363 while let Ok(envelope) = rx.recv().await {
364 if sub.matches(&envelope.entity, &envelope.key) {
366 if client_mgr
367 .send_to_client(client_id, envelope.payload.clone())
368 .await
369 .is_err()
370 {
371 break; }
373 if let Some(ref m) = metrics_clone {
374 m.record_ws_message_sent();
375 }
376 }
377 }
378 });
379 }
380 Mode::List => {
381 let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
382
383 let client_mgr = client_manager.clone();
384 let sub = subscription.clone();
385 let metrics_clone = metrics.clone();
386 tokio::spawn(async move {
387 while let Ok(envelope) = rx.recv().await {
388 if sub.matches(&envelope.entity, &envelope.key) {
390 if client_mgr
391 .send_to_client(client_id, envelope.payload.clone())
392 .await
393 .is_err()
394 {
395 break; }
397 if let Some(ref m) = metrics_clone {
398 m.record_ws_message_sent();
399 }
400 }
401 }
402 });
403 }
404 }
405
406 info!(
407 "Client {} subscribed to {} (mode: {:?})",
408 client_id, view_id, view_spec.mode
409 );
410}
411
412#[cfg(not(feature = "otel"))]
413async fn attach_client_to_bus(
414 client_id: Uuid,
415 subscription: Subscription,
416 client_manager: &ClientManager,
417 bus_manager: &BusManager,
418 view_index: &ViewIndex,
419) {
420 let view_id = &subscription.view;
421
422 let view_spec = match view_index.get_view(view_id) {
424 Some(spec) => spec,
425 None => {
426 warn!("Unknown view ID: {}", view_id);
427 return;
428 }
429 };
430
431 match view_spec.mode {
432 Mode::State => {
433 let key = subscription.key.as_deref().unwrap_or("");
434 let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
435
436 if !rx.borrow().is_empty() {
438 let data = rx.borrow().clone();
439 let _ = client_manager.send_to_client(client_id, data).await;
440 }
441
442 let client_mgr = client_manager.clone();
444 tokio::spawn(async move {
445 while rx.changed().await.is_ok() {
446 let data = rx.borrow().clone();
447 if client_mgr.send_to_client(client_id, data).await.is_err() {
448 break; }
450 }
451 });
452 }
453 Mode::Kv | Mode::Append => {
454 let mut rx = bus_manager.get_or_create_kv_bus(view_id).await;
455
456 let client_mgr = client_manager.clone();
457 let sub = subscription.clone();
458 tokio::spawn(async move {
459 while let Ok(envelope) = rx.recv().await {
460 if sub.matches(&envelope.entity, &envelope.key)
462 && client_mgr
463 .send_to_client(client_id, envelope.payload.clone())
464 .await
465 .is_err()
466 {
467 break; }
469 }
470 });
471 }
472 Mode::List => {
473 let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
474
475 let client_mgr = client_manager.clone();
476 let sub = subscription.clone();
477 tokio::spawn(async move {
478 while let Ok(envelope) = rx.recv().await {
479 if sub.matches(&envelope.entity, &envelope.key)
481 && client_mgr
482 .send_to_client(client_id, envelope.payload.clone())
483 .await
484 .is_err()
485 {
486 break; }
488 }
489 });
490 }
491 }
492
493 info!(
494 "Client {} subscribed to {} (mode: {:?})",
495 client_id, view_id, view_spec.mode
496 );
497}