1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::view::ViewIndex;
4use crate::websocket::client_manager::ClientManager;
5use crate::websocket::frame::{Mode, SnapshotEntity, SnapshotFrame};
6use crate::websocket::subscription::{ClientMessage, Subscription};
7use anyhow::Result;
8use bytes::Bytes;
9use futures_util::StreamExt;
10use std::net::SocketAddr;
11use std::sync::Arc;
12#[cfg(feature = "otel")]
13use std::time::Instant;
14
15use tokio::net::{TcpListener, TcpStream};
16use tokio_tungstenite::accept_async;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, info_span, warn, Instrument};
19use uuid::Uuid;
20
21#[cfg(feature = "otel")]
22use crate::metrics::Metrics;
23
24pub struct WebSocketServer {
25 bind_addr: SocketAddr,
26 client_manager: ClientManager,
27 bus_manager: BusManager,
28 entity_cache: EntityCache,
29 view_index: Arc<ViewIndex>,
30 max_clients: usize,
31 #[cfg(feature = "otel")]
32 metrics: Option<Arc<Metrics>>,
33}
34
35impl WebSocketServer {
36 #[cfg(feature = "otel")]
37 pub fn new(
38 bind_addr: SocketAddr,
39 bus_manager: BusManager,
40 entity_cache: EntityCache,
41 view_index: Arc<ViewIndex>,
42 metrics: Option<Arc<Metrics>>,
43 ) -> Self {
44 Self {
45 bind_addr,
46 client_manager: ClientManager::new(),
47 bus_manager,
48 entity_cache,
49 view_index,
50 max_clients: 10000,
51 metrics,
52 }
53 }
54
55 #[cfg(not(feature = "otel"))]
56 pub fn new(
57 bind_addr: SocketAddr,
58 bus_manager: BusManager,
59 entity_cache: EntityCache,
60 view_index: Arc<ViewIndex>,
61 ) -> Self {
62 Self {
63 bind_addr,
64 client_manager: ClientManager::new(),
65 bus_manager,
66 entity_cache,
67 view_index,
68 max_clients: 10000,
69 }
70 }
71
72 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
73 self.max_clients = max_clients;
74 self
75 }
76
77 pub async fn start(self) -> Result<()> {
78 info!(
79 "Starting WebSocket server on {} (max_clients: {})",
80 self.bind_addr, self.max_clients
81 );
82
83 let listener = TcpListener::bind(&self.bind_addr).await?;
84 info!("WebSocket server listening on {}", self.bind_addr);
85
86 self.client_manager.start_cleanup_task();
87
88 loop {
89 match listener.accept().await {
90 Ok((stream, addr)) => {
91 let client_count = self.client_manager.client_count();
92 if client_count >= self.max_clients {
93 warn!(
94 "Rejecting connection from {} - max clients ({}) reached",
95 addr, self.max_clients
96 );
97 drop(stream);
98 continue;
99 }
100
101 #[cfg(feature = "otel")]
102 if let Some(ref metrics) = self.metrics {
103 metrics.record_ws_connection();
104 }
105
106 info!(
107 "New WebSocket connection from {} ({}/{} clients)",
108 addr,
109 client_count + 1,
110 self.max_clients
111 );
112 let client_manager = self.client_manager.clone();
113 let bus_manager = self.bus_manager.clone();
114 let entity_cache = self.entity_cache.clone();
115 let view_index = self.view_index.clone();
116 #[cfg(feature = "otel")]
117 let metrics = self.metrics.clone();
118
119 tokio::spawn(
120 async move {
121 #[cfg(feature = "otel")]
122 let result = handle_connection(
123 stream,
124 client_manager,
125 bus_manager,
126 entity_cache,
127 view_index,
128 metrics,
129 )
130 .await;
131 #[cfg(not(feature = "otel"))]
132 let result = handle_connection(
133 stream,
134 client_manager,
135 bus_manager,
136 entity_cache,
137 view_index,
138 )
139 .await;
140
141 if let Err(e) = result {
142 error!("WebSocket connection error: {}", e);
143 }
144 }
145 .instrument(info_span!("ws.connection", %addr)),
146 );
147 }
148 Err(e) => {
149 error!("Failed to accept connection: {}", e);
150 }
151 }
152 }
153 }
154}
155
156#[cfg(feature = "otel")]
157async fn handle_connection(
158 stream: TcpStream,
159 client_manager: ClientManager,
160 bus_manager: BusManager,
161 entity_cache: EntityCache,
162 view_index: Arc<ViewIndex>,
163 metrics: Option<Arc<Metrics>>,
164) -> Result<()> {
165 let ws_stream = accept_async(stream).await?;
166 let client_id = Uuid::new_v4();
167 let connection_start = Instant::now();
168
169 info!("WebSocket connection established for client {}", client_id);
170
171 let (ws_sender, mut ws_receiver) = ws_stream.split();
172
173 client_manager.add_client(client_id, ws_sender);
174
175 let mut active_subscriptions: Vec<String> = Vec::new();
176
177 loop {
178 tokio::select! {
179 ws_msg = ws_receiver.next() => {
180 match ws_msg {
181 Some(Ok(msg)) => {
182 if msg.is_close() {
183 info!("Client {} requested close", client_id);
184 break;
185 }
186
187 client_manager.update_client_last_seen(client_id);
188
189 if msg.is_text() {
190 if let Some(ref m) = metrics {
191 m.record_ws_message_received();
192 }
193
194 if let Ok(text) = msg.to_text() {
195 debug!("Received text message from client {}: {}", client_id, text);
196
197 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
198 match client_msg {
199 ClientMessage::Subscribe(subscription) => {
200 let view_id = subscription.view.clone();
201 let sub_key = subscription.sub_key();
202 client_manager.update_subscription(client_id, subscription.clone());
203
204 if let Some(ref m) = metrics {
205 m.record_subscription_created(&view_id);
206 }
207 active_subscriptions.push(view_id);
208
209 let cancel_token = CancellationToken::new();
210 client_manager.add_client_subscription(
211 client_id,
212 sub_key,
213 cancel_token.clone(),
214 ).await;
215
216 attach_client_to_bus(
217 client_id,
218 subscription,
219 &client_manager,
220 &bus_manager,
221 &entity_cache,
222 &view_index,
223 cancel_token,
224 metrics.clone(),
225 ).await;
226 }
227 ClientMessage::Unsubscribe(unsub) => {
228 let sub_key = unsub.sub_key();
229 let removed = client_manager
230 .remove_client_subscription(client_id, &sub_key)
231 .await;
232
233 if removed {
234 info!("Client {} unsubscribed from {}", client_id, sub_key);
235 if let Some(ref m) = metrics {
236 m.record_subscription_removed(&unsub.view);
237 }
238 }
239 }
240 ClientMessage::Ping => {
241 debug!("Received ping from client {}", client_id);
242 }
243 }
244 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
245 let view_id = subscription.view.clone();
246 let sub_key = subscription.sub_key();
247 client_manager.update_subscription(client_id, subscription.clone());
248
249 if let Some(ref m) = metrics {
250 m.record_subscription_created(&view_id);
251 }
252 active_subscriptions.push(view_id);
253
254 let cancel_token = CancellationToken::new();
255 client_manager.add_client_subscription(
256 client_id,
257 sub_key,
258 cancel_token.clone(),
259 ).await;
260
261 attach_client_to_bus(
262 client_id,
263 subscription,
264 &client_manager,
265 &bus_manager,
266 &entity_cache,
267 &view_index,
268 cancel_token,
269 metrics.clone(),
270 ).await;
271 } else {
272 debug!("Received non-subscription message from client {}: {}", client_id, text);
273 }
274 }
275 }
276 }
277 Some(Err(e)) => {
278 warn!("WebSocket error for client {}: {}", client_id, e);
279 break;
280 }
281 None => {
282 debug!("WebSocket stream ended for client {}", client_id);
283 break;
284 }
285 }
286 }
287 }
288 }
289
290 client_manager
291 .cancel_all_client_subscriptions(client_id)
292 .await;
293 client_manager.remove_client(client_id);
294
295 if let Some(ref m) = metrics {
296 let duration_secs = connection_start.elapsed().as_secs_f64();
297 m.record_ws_disconnection(duration_secs);
298
299 for view_id in active_subscriptions {
300 m.record_subscription_removed(&view_id);
301 }
302 }
303
304 info!("Client {} disconnected", client_id);
305
306 Ok(())
307}
308
309#[cfg(not(feature = "otel"))]
310async fn handle_connection(
311 stream: TcpStream,
312 client_manager: ClientManager,
313 bus_manager: BusManager,
314 entity_cache: EntityCache,
315 view_index: Arc<ViewIndex>,
316) -> Result<()> {
317 let ws_stream = accept_async(stream).await?;
318 let client_id = Uuid::new_v4();
319
320 info!("WebSocket connection established for client {}", client_id);
321
322 let (ws_sender, mut ws_receiver) = ws_stream.split();
323
324 client_manager.add_client(client_id, ws_sender);
325
326 loop {
327 tokio::select! {
328 ws_msg = ws_receiver.next() => {
329 match ws_msg {
330 Some(Ok(msg)) => {
331 if msg.is_close() {
332 info!("Client {} requested close", client_id);
333 break;
334 }
335
336 client_manager.update_client_last_seen(client_id);
337
338 if msg.is_text() {
339 if let Ok(text) = msg.to_text() {
340 debug!("Received text message from client {}: {}", client_id, text);
341
342 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
343 match client_msg {
344 ClientMessage::Subscribe(subscription) => {
345 let sub_key = subscription.sub_key();
346 client_manager.update_subscription(client_id, subscription.clone());
347
348 let cancel_token = CancellationToken::new();
349 client_manager.add_client_subscription(
350 client_id,
351 sub_key,
352 cancel_token.clone(),
353 ).await;
354
355 attach_client_to_bus(
356 client_id,
357 subscription,
358 &client_manager,
359 &bus_manager,
360 &entity_cache,
361 &view_index,
362 cancel_token,
363 ).await;
364 }
365 ClientMessage::Unsubscribe(unsub) => {
366 let sub_key = unsub.sub_key();
367 let removed = client_manager
368 .remove_client_subscription(client_id, &sub_key)
369 .await;
370
371 if removed {
372 info!("Client {} unsubscribed from {}", client_id, sub_key);
373 }
374 }
375 ClientMessage::Ping => {
376 debug!("Received ping from client {}", client_id);
377 }
378 }
379 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
380 let sub_key = subscription.sub_key();
381 client_manager.update_subscription(client_id, subscription.clone());
382
383 let cancel_token = CancellationToken::new();
384 client_manager.add_client_subscription(
385 client_id,
386 sub_key,
387 cancel_token.clone(),
388 ).await;
389
390 attach_client_to_bus(
391 client_id,
392 subscription,
393 &client_manager,
394 &bus_manager,
395 &entity_cache,
396 &view_index,
397 cancel_token,
398 ).await;
399 } else {
400 debug!("Received non-subscription message from client {}: {}", client_id, text);
401 }
402 }
403 }
404 }
405 Some(Err(e)) => {
406 warn!("WebSocket error for client {}: {}", client_id, e);
407 break;
408 }
409 None => {
410 debug!("WebSocket stream ended for client {}", client_id);
411 break;
412 }
413 }
414 }
415 }
416 }
417
418 client_manager
419 .cancel_all_client_subscriptions(client_id)
420 .await;
421 client_manager.remove_client(client_id);
422 info!("Client {} disconnected", client_id);
423
424 Ok(())
425}
426
427#[cfg(feature = "otel")]
428async fn attach_client_to_bus(
429 client_id: Uuid,
430 subscription: Subscription,
431 client_manager: &ClientManager,
432 bus_manager: &BusManager,
433 entity_cache: &EntityCache,
434 view_index: &ViewIndex,
435 cancel_token: CancellationToken,
436 metrics: Option<Arc<Metrics>>,
437) {
438 let view_id = &subscription.view;
439
440 let view_spec = match view_index.get_view(view_id) {
441 Some(spec) => spec,
442 None => {
443 warn!("Unknown view ID: {}", view_id);
444 return;
445 }
446 };
447
448 match view_spec.mode {
449 Mode::State => {
450 let key = subscription.key.as_deref().unwrap_or("");
451 let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
452
453 if !rx.borrow().is_empty() {
454 let data = rx.borrow().clone();
455 let _ = client_manager.send_to_client(client_id, data);
456 if let Some(ref m) = metrics {
457 m.record_ws_message_sent();
458 }
459 }
460
461 let client_mgr = client_manager.clone();
462 let metrics_clone = metrics.clone();
463 let view_id_clone = view_id.clone();
464 let key_clone = key.to_string();
465 tokio::spawn(
466 async move {
467 loop {
468 tokio::select! {
469 _ = cancel_token.cancelled() => {
470 debug!("State subscription cancelled for client {}", client_id);
471 break;
472 }
473 result = rx.changed() => {
474 if result.is_err() {
475 break;
476 }
477 let data = rx.borrow().clone();
478 if client_mgr.send_to_client(client_id, data).is_err() {
479 break;
480 }
481 if let Some(ref m) = metrics_clone {
482 m.record_ws_message_sent();
483 }
484 }
485 }
486 }
487 }
488 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
489 );
490 }
491 Mode::List | Mode::Append => {
492 let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
493
494 let snapshots = entity_cache.get_all(view_id).await;
495 let snapshot_entities: Vec<SnapshotEntity> = snapshots
496 .into_iter()
497 .filter(|(key, _)| subscription.matches_key(key))
498 .map(|(key, data)| SnapshotEntity { key, data })
499 .collect();
500
501 if !snapshot_entities.is_empty() {
502 let snapshot_frame = SnapshotFrame {
503 mode: view_spec.mode,
504 export: view_id.clone(),
505 op: "snapshot",
506 data: snapshot_entities,
507 };
508 if let Ok(payload) = serde_json::to_vec(&snapshot_frame) {
509 if client_manager
510 .send_to_client_async(client_id, Arc::new(Bytes::from(payload)))
511 .await
512 .is_err()
513 {
514 return;
515 }
516 if let Some(ref m) = metrics {
517 m.record_ws_message_sent();
518 }
519 }
520 }
521
522 let client_mgr = client_manager.clone();
523 let sub = subscription.clone();
524 let metrics_clone = metrics.clone();
525 let view_id_clone = view_id.clone();
526 let mode = view_spec.mode;
527 tokio::spawn(
528 async move {
529 loop {
530 tokio::select! {
531 _ = cancel_token.cancelled() => {
532 debug!("List subscription cancelled for client {}", client_id);
533 break;
534 }
535 result = rx.recv() => {
536 match result {
537 Ok(envelope) => {
538 if sub.matches(&envelope.entity, &envelope.key) {
539 if client_mgr
540 .send_to_client(client_id, envelope.payload.clone())
541 .is_err()
542 {
543 break;
544 }
545 if let Some(ref m) = metrics_clone {
546 m.record_ws_message_sent();
547 }
548 }
549 }
550 Err(_) => break,
551 }
552 }
553 }
554 }
555 }
556 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
557 );
558 }
559 }
560
561 info!(
562 "Client {} subscribed to {} (mode: {:?})",
563 client_id, view_id, view_spec.mode
564 );
565}
566
567#[cfg(not(feature = "otel"))]
568async fn attach_client_to_bus(
569 client_id: Uuid,
570 subscription: Subscription,
571 client_manager: &ClientManager,
572 bus_manager: &BusManager,
573 entity_cache: &EntityCache,
574 view_index: &ViewIndex,
575 cancel_token: CancellationToken,
576) {
577 let view_id = &subscription.view;
578
579 let view_spec = match view_index.get_view(view_id) {
580 Some(spec) => spec,
581 None => {
582 warn!("Unknown view ID: {}", view_id);
583 return;
584 }
585 };
586
587 match view_spec.mode {
588 Mode::State => {
589 let key = subscription.key.as_deref().unwrap_or("");
590 let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
591
592 if !rx.borrow().is_empty() {
593 let data = rx.borrow().clone();
594 let _ = client_manager.send_to_client(client_id, data);
595 }
596
597 let client_mgr = client_manager.clone();
598 let view_id_clone = view_id.clone();
599 let key_clone = key.to_string();
600 tokio::spawn(
601 async move {
602 loop {
603 tokio::select! {
604 _ = cancel_token.cancelled() => {
605 debug!("State subscription cancelled for client {}", client_id);
606 break;
607 }
608 result = rx.changed() => {
609 if result.is_err() {
610 break;
611 }
612 let data = rx.borrow().clone();
613 if client_mgr.send_to_client(client_id, data).is_err() {
614 break;
615 }
616 }
617 }
618 }
619 }
620 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
621 );
622 }
623 Mode::List | Mode::Append => {
624 let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
625
626 let snapshots = entity_cache.get_all(view_id).await;
627 let snapshot_entities: Vec<SnapshotEntity> = snapshots
628 .into_iter()
629 .filter(|(key, _)| subscription.matches_key(key))
630 .map(|(key, data)| SnapshotEntity { key, data })
631 .collect();
632
633 if !snapshot_entities.is_empty() {
634 let snapshot_frame = SnapshotFrame {
635 mode: view_spec.mode,
636 export: view_id.clone(),
637 op: "snapshot",
638 data: snapshot_entities,
639 };
640 if let Ok(payload) = serde_json::to_vec(&snapshot_frame) {
641 if client_manager
642 .send_to_client_async(client_id, Arc::new(Bytes::from(payload)))
643 .await
644 .is_err()
645 {
646 return;
647 }
648 }
649 }
650
651 let client_mgr = client_manager.clone();
652 let sub = subscription.clone();
653 let view_id_clone = view_id.clone();
654 let mode = view_spec.mode;
655 tokio::spawn(
656 async move {
657 loop {
658 tokio::select! {
659 _ = cancel_token.cancelled() => {
660 debug!("List subscription cancelled for client {}", client_id);
661 break;
662 }
663 result = rx.recv() => {
664 match result {
665 Ok(envelope) => {
666 if sub.matches(&envelope.entity, &envelope.key)
667 && client_mgr
668 .send_to_client(client_id, envelope.payload.clone())
669 .is_err()
670 {
671 break;
672 }
673 }
674 Err(_) => break,
675 }
676 }
677 }
678 }
679 }
680 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
681 );
682 }
683 }
684
685 info!(
686 "Client {} subscribed to {} (mode: {:?})",
687 client_id, view_id, view_spec.mode
688 );
689}