hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::EntityCache;
3use crate::view::ViewIndex;
4use crate::websocket::client_manager::ClientManager;
5use crate::websocket::frame::{Mode, SnapshotEntity, SnapshotFrame};
6use crate::websocket::subscription::{ClientMessage, Subscription};
7use anyhow::Result;
8use bytes::Bytes;
9use futures_util::StreamExt;
10use std::net::SocketAddr;
11use std::sync::Arc;
12#[cfg(feature = "otel")]
13use std::time::Instant;
14
15use tokio::net::{TcpListener, TcpStream};
16use tokio_tungstenite::accept_async;
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, info_span, warn, Instrument};
19use uuid::Uuid;
20
21#[cfg(feature = "otel")]
22use crate::metrics::Metrics;
23
24pub struct WebSocketServer {
25    bind_addr: SocketAddr,
26    client_manager: ClientManager,
27    bus_manager: BusManager,
28    entity_cache: EntityCache,
29    view_index: Arc<ViewIndex>,
30    max_clients: usize,
31    #[cfg(feature = "otel")]
32    metrics: Option<Arc<Metrics>>,
33}
34
35impl WebSocketServer {
36    #[cfg(feature = "otel")]
37    pub fn new(
38        bind_addr: SocketAddr,
39        bus_manager: BusManager,
40        entity_cache: EntityCache,
41        view_index: Arc<ViewIndex>,
42        metrics: Option<Arc<Metrics>>,
43    ) -> Self {
44        Self {
45            bind_addr,
46            client_manager: ClientManager::new(),
47            bus_manager,
48            entity_cache,
49            view_index,
50            max_clients: 10000,
51            metrics,
52        }
53    }
54
55    #[cfg(not(feature = "otel"))]
56    pub fn new(
57        bind_addr: SocketAddr,
58        bus_manager: BusManager,
59        entity_cache: EntityCache,
60        view_index: Arc<ViewIndex>,
61    ) -> Self {
62        Self {
63            bind_addr,
64            client_manager: ClientManager::new(),
65            bus_manager,
66            entity_cache,
67            view_index,
68            max_clients: 10000,
69        }
70    }
71
72    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
73        self.max_clients = max_clients;
74        self
75    }
76
77    pub async fn start(self) -> Result<()> {
78        info!(
79            "Starting WebSocket server on {} (max_clients: {})",
80            self.bind_addr, self.max_clients
81        );
82
83        let listener = TcpListener::bind(&self.bind_addr).await?;
84        info!("WebSocket server listening on {}", self.bind_addr);
85
86        self.client_manager.start_cleanup_task();
87
88        loop {
89            match listener.accept().await {
90                Ok((stream, addr)) => {
91                    let client_count = self.client_manager.client_count();
92                    if client_count >= self.max_clients {
93                        warn!(
94                            "Rejecting connection from {} - max clients ({}) reached",
95                            addr, self.max_clients
96                        );
97                        drop(stream);
98                        continue;
99                    }
100
101                    #[cfg(feature = "otel")]
102                    if let Some(ref metrics) = self.metrics {
103                        metrics.record_ws_connection();
104                    }
105
106                    info!(
107                        "New WebSocket connection from {} ({}/{} clients)",
108                        addr,
109                        client_count + 1,
110                        self.max_clients
111                    );
112                    let client_manager = self.client_manager.clone();
113                    let bus_manager = self.bus_manager.clone();
114                    let entity_cache = self.entity_cache.clone();
115                    let view_index = self.view_index.clone();
116                    #[cfg(feature = "otel")]
117                    let metrics = self.metrics.clone();
118
119                    tokio::spawn(
120                        async move {
121                            #[cfg(feature = "otel")]
122                            let result = handle_connection(
123                                stream,
124                                client_manager,
125                                bus_manager,
126                                entity_cache,
127                                view_index,
128                                metrics,
129                            )
130                            .await;
131                            #[cfg(not(feature = "otel"))]
132                            let result = handle_connection(
133                                stream,
134                                client_manager,
135                                bus_manager,
136                                entity_cache,
137                                view_index,
138                            )
139                            .await;
140
141                            if let Err(e) = result {
142                                error!("WebSocket connection error: {}", e);
143                            }
144                        }
145                        .instrument(info_span!("ws.connection", %addr)),
146                    );
147                }
148                Err(e) => {
149                    error!("Failed to accept connection: {}", e);
150                }
151            }
152        }
153    }
154}
155
156#[cfg(feature = "otel")]
157async fn handle_connection(
158    stream: TcpStream,
159    client_manager: ClientManager,
160    bus_manager: BusManager,
161    entity_cache: EntityCache,
162    view_index: Arc<ViewIndex>,
163    metrics: Option<Arc<Metrics>>,
164) -> Result<()> {
165    let ws_stream = accept_async(stream).await?;
166    let client_id = Uuid::new_v4();
167    let connection_start = Instant::now();
168
169    info!("WebSocket connection established for client {}", client_id);
170
171    let (ws_sender, mut ws_receiver) = ws_stream.split();
172
173    client_manager.add_client(client_id, ws_sender);
174
175    let mut active_subscriptions: Vec<String> = Vec::new();
176
177    loop {
178        tokio::select! {
179            ws_msg = ws_receiver.next() => {
180                match ws_msg {
181                    Some(Ok(msg)) => {
182                        if msg.is_close() {
183                            info!("Client {} requested close", client_id);
184                            break;
185                        }
186
187                        client_manager.update_client_last_seen(client_id);
188
189                        if msg.is_text() {
190                            if let Some(ref m) = metrics {
191                                m.record_ws_message_received();
192                            }
193
194                            if let Ok(text) = msg.to_text() {
195                                debug!("Received text message from client {}: {}", client_id, text);
196
197                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
198                                    match client_msg {
199                                        ClientMessage::Subscribe(subscription) => {
200                                            let view_id = subscription.view.clone();
201                                            let sub_key = subscription.sub_key();
202                                            client_manager.update_subscription(client_id, subscription.clone());
203
204                                            if let Some(ref m) = metrics {
205                                                m.record_subscription_created(&view_id);
206                                            }
207                                            active_subscriptions.push(view_id);
208
209                                            let cancel_token = CancellationToken::new();
210                                            client_manager.add_client_subscription(
211                                                client_id,
212                                                sub_key,
213                                                cancel_token.clone(),
214                                            ).await;
215
216                                            attach_client_to_bus(
217                                                client_id,
218                                                subscription,
219                                                &client_manager,
220                                                &bus_manager,
221                                                &entity_cache,
222                                                &view_index,
223                                                cancel_token,
224                                                metrics.clone(),
225                                            ).await;
226                                        }
227                                        ClientMessage::Unsubscribe(unsub) => {
228                                            let sub_key = unsub.sub_key();
229                                            let removed = client_manager
230                                                .remove_client_subscription(client_id, &sub_key)
231                                                .await;
232
233                                            if removed {
234                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
235                                                if let Some(ref m) = metrics {
236                                                    m.record_subscription_removed(&unsub.view);
237                                                }
238                                            }
239                                        }
240                                        ClientMessage::Ping => {
241                                            debug!("Received ping from client {}", client_id);
242                                        }
243                                    }
244                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
245                                    let view_id = subscription.view.clone();
246                                    let sub_key = subscription.sub_key();
247                                    client_manager.update_subscription(client_id, subscription.clone());
248
249                                    if let Some(ref m) = metrics {
250                                        m.record_subscription_created(&view_id);
251                                    }
252                                    active_subscriptions.push(view_id);
253
254                                    let cancel_token = CancellationToken::new();
255                                    client_manager.add_client_subscription(
256                                        client_id,
257                                        sub_key,
258                                        cancel_token.clone(),
259                                    ).await;
260
261                                    attach_client_to_bus(
262                                        client_id,
263                                        subscription,
264                                        &client_manager,
265                                        &bus_manager,
266                                        &entity_cache,
267                                        &view_index,
268                                        cancel_token,
269                                        metrics.clone(),
270                                    ).await;
271                                } else {
272                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
273                                }
274                            }
275                        }
276                    }
277                    Some(Err(e)) => {
278                        warn!("WebSocket error for client {}: {}", client_id, e);
279                        break;
280                    }
281                    None => {
282                        debug!("WebSocket stream ended for client {}", client_id);
283                        break;
284                    }
285                }
286            }
287        }
288    }
289
290    client_manager
291        .cancel_all_client_subscriptions(client_id)
292        .await;
293    client_manager.remove_client(client_id);
294
295    if let Some(ref m) = metrics {
296        let duration_secs = connection_start.elapsed().as_secs_f64();
297        m.record_ws_disconnection(duration_secs);
298
299        for view_id in active_subscriptions {
300            m.record_subscription_removed(&view_id);
301        }
302    }
303
304    info!("Client {} disconnected", client_id);
305
306    Ok(())
307}
308
309#[cfg(not(feature = "otel"))]
310async fn handle_connection(
311    stream: TcpStream,
312    client_manager: ClientManager,
313    bus_manager: BusManager,
314    entity_cache: EntityCache,
315    view_index: Arc<ViewIndex>,
316) -> Result<()> {
317    let ws_stream = accept_async(stream).await?;
318    let client_id = Uuid::new_v4();
319
320    info!("WebSocket connection established for client {}", client_id);
321
322    let (ws_sender, mut ws_receiver) = ws_stream.split();
323
324    client_manager.add_client(client_id, ws_sender);
325
326    loop {
327        tokio::select! {
328            ws_msg = ws_receiver.next() => {
329                match ws_msg {
330                    Some(Ok(msg)) => {
331                        if msg.is_close() {
332                            info!("Client {} requested close", client_id);
333                            break;
334                        }
335
336                        client_manager.update_client_last_seen(client_id);
337
338                        if msg.is_text() {
339                            if let Ok(text) = msg.to_text() {
340                                debug!("Received text message from client {}: {}", client_id, text);
341
342                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
343                                    match client_msg {
344                                        ClientMessage::Subscribe(subscription) => {
345                                            let sub_key = subscription.sub_key();
346                                            client_manager.update_subscription(client_id, subscription.clone());
347
348                                            let cancel_token = CancellationToken::new();
349                                            client_manager.add_client_subscription(
350                                                client_id,
351                                                sub_key,
352                                                cancel_token.clone(),
353                                            ).await;
354
355                                            attach_client_to_bus(
356                                                client_id,
357                                                subscription,
358                                                &client_manager,
359                                                &bus_manager,
360                                                &entity_cache,
361                                                &view_index,
362                                                cancel_token,
363                                            ).await;
364                                        }
365                                        ClientMessage::Unsubscribe(unsub) => {
366                                            let sub_key = unsub.sub_key();
367                                            let removed = client_manager
368                                                .remove_client_subscription(client_id, &sub_key)
369                                                .await;
370
371                                            if removed {
372                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
373                                            }
374                                        }
375                                        ClientMessage::Ping => {
376                                            debug!("Received ping from client {}", client_id);
377                                        }
378                                    }
379                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
380                                    let sub_key = subscription.sub_key();
381                                    client_manager.update_subscription(client_id, subscription.clone());
382
383                                    let cancel_token = CancellationToken::new();
384                                    client_manager.add_client_subscription(
385                                        client_id,
386                                        sub_key,
387                                        cancel_token.clone(),
388                                    ).await;
389
390                                    attach_client_to_bus(
391                                        client_id,
392                                        subscription,
393                                        &client_manager,
394                                        &bus_manager,
395                                        &entity_cache,
396                                        &view_index,
397                                        cancel_token,
398                                    ).await;
399                                } else {
400                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
401                                }
402                            }
403                        }
404                    }
405                    Some(Err(e)) => {
406                        warn!("WebSocket error for client {}: {}", client_id, e);
407                        break;
408                    }
409                    None => {
410                        debug!("WebSocket stream ended for client {}", client_id);
411                        break;
412                    }
413                }
414            }
415        }
416    }
417
418    client_manager
419        .cancel_all_client_subscriptions(client_id)
420        .await;
421    client_manager.remove_client(client_id);
422    info!("Client {} disconnected", client_id);
423
424    Ok(())
425}
426
427#[cfg(feature = "otel")]
428async fn attach_client_to_bus(
429    client_id: Uuid,
430    subscription: Subscription,
431    client_manager: &ClientManager,
432    bus_manager: &BusManager,
433    entity_cache: &EntityCache,
434    view_index: &ViewIndex,
435    cancel_token: CancellationToken,
436    metrics: Option<Arc<Metrics>>,
437) {
438    let view_id = &subscription.view;
439
440    let view_spec = match view_index.get_view(view_id) {
441        Some(spec) => spec,
442        None => {
443            warn!("Unknown view ID: {}", view_id);
444            return;
445        }
446    };
447
448    match view_spec.mode {
449        Mode::State => {
450            let key = subscription.key.as_deref().unwrap_or("");
451            let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
452
453            if !rx.borrow().is_empty() {
454                let data = rx.borrow().clone();
455                let _ = client_manager.send_to_client(client_id, data);
456                if let Some(ref m) = metrics {
457                    m.record_ws_message_sent();
458                }
459            }
460
461            let client_mgr = client_manager.clone();
462            let metrics_clone = metrics.clone();
463            let view_id_clone = view_id.clone();
464            let key_clone = key.to_string();
465            tokio::spawn(
466                async move {
467                    loop {
468                        tokio::select! {
469                            _ = cancel_token.cancelled() => {
470                                debug!("State subscription cancelled for client {}", client_id);
471                                break;
472                            }
473                            result = rx.changed() => {
474                                if result.is_err() {
475                                    break;
476                                }
477                                let data = rx.borrow().clone();
478                                if client_mgr.send_to_client(client_id, data).is_err() {
479                                    break;
480                                }
481                                if let Some(ref m) = metrics_clone {
482                                    m.record_ws_message_sent();
483                                }
484                            }
485                        }
486                    }
487                }
488                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
489            );
490        }
491        Mode::List | Mode::Append => {
492            let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
493
494            let snapshots = entity_cache.get_all(view_id).await;
495            let snapshot_entities: Vec<SnapshotEntity> = snapshots
496                .into_iter()
497                .filter(|(key, _)| subscription.matches_key(key))
498                .map(|(key, data)| SnapshotEntity { key, data })
499                .collect();
500
501            if !snapshot_entities.is_empty() {
502                let snapshot_frame = SnapshotFrame {
503                    mode: view_spec.mode,
504                    export: view_id.clone(),
505                    op: "snapshot",
506                    data: snapshot_entities,
507                };
508                if let Ok(payload) = serde_json::to_vec(&snapshot_frame) {
509                    if client_manager
510                        .send_to_client_async(client_id, Arc::new(Bytes::from(payload)))
511                        .await
512                        .is_err()
513                    {
514                        return;
515                    }
516                    if let Some(ref m) = metrics {
517                        m.record_ws_message_sent();
518                    }
519                }
520            }
521
522            let client_mgr = client_manager.clone();
523            let sub = subscription.clone();
524            let metrics_clone = metrics.clone();
525            let view_id_clone = view_id.clone();
526            let mode = view_spec.mode;
527            tokio::spawn(
528                async move {
529                    loop {
530                        tokio::select! {
531                            _ = cancel_token.cancelled() => {
532                                debug!("List subscription cancelled for client {}", client_id);
533                                break;
534                            }
535                            result = rx.recv() => {
536                                match result {
537                                    Ok(envelope) => {
538                                        if sub.matches(&envelope.entity, &envelope.key) {
539                                            if client_mgr
540                                                .send_to_client(client_id, envelope.payload.clone())
541                                                .is_err()
542                                            {
543                                                break;
544                                            }
545                                            if let Some(ref m) = metrics_clone {
546                                                m.record_ws_message_sent();
547                                            }
548                                        }
549                                    }
550                                    Err(_) => break,
551                                }
552                            }
553                        }
554                    }
555                }
556                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
557            );
558        }
559    }
560
561    info!(
562        "Client {} subscribed to {} (mode: {:?})",
563        client_id, view_id, view_spec.mode
564    );
565}
566
567#[cfg(not(feature = "otel"))]
568async fn attach_client_to_bus(
569    client_id: Uuid,
570    subscription: Subscription,
571    client_manager: &ClientManager,
572    bus_manager: &BusManager,
573    entity_cache: &EntityCache,
574    view_index: &ViewIndex,
575    cancel_token: CancellationToken,
576) {
577    let view_id = &subscription.view;
578
579    let view_spec = match view_index.get_view(view_id) {
580        Some(spec) => spec,
581        None => {
582            warn!("Unknown view ID: {}", view_id);
583            return;
584        }
585    };
586
587    match view_spec.mode {
588        Mode::State => {
589            let key = subscription.key.as_deref().unwrap_or("");
590            let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
591
592            if !rx.borrow().is_empty() {
593                let data = rx.borrow().clone();
594                let _ = client_manager.send_to_client(client_id, data);
595            }
596
597            let client_mgr = client_manager.clone();
598            let view_id_clone = view_id.clone();
599            let key_clone = key.to_string();
600            tokio::spawn(
601                async move {
602                    loop {
603                        tokio::select! {
604                            _ = cancel_token.cancelled() => {
605                                debug!("State subscription cancelled for client {}", client_id);
606                                break;
607                            }
608                            result = rx.changed() => {
609                                if result.is_err() {
610                                    break;
611                                }
612                                let data = rx.borrow().clone();
613                                if client_mgr.send_to_client(client_id, data).is_err() {
614                                    break;
615                                }
616                            }
617                        }
618                    }
619                }
620                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
621            );
622        }
623        Mode::List | Mode::Append => {
624            let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
625
626            let snapshots = entity_cache.get_all(view_id).await;
627            let snapshot_entities: Vec<SnapshotEntity> = snapshots
628                .into_iter()
629                .filter(|(key, _)| subscription.matches_key(key))
630                .map(|(key, data)| SnapshotEntity { key, data })
631                .collect();
632
633            if !snapshot_entities.is_empty() {
634                let snapshot_frame = SnapshotFrame {
635                    mode: view_spec.mode,
636                    export: view_id.clone(),
637                    op: "snapshot",
638                    data: snapshot_entities,
639                };
640                if let Ok(payload) = serde_json::to_vec(&snapshot_frame) {
641                    if client_manager
642                        .send_to_client_async(client_id, Arc::new(Bytes::from(payload)))
643                        .await
644                        .is_err()
645                    {
646                        return;
647                    }
648                }
649            }
650
651            let client_mgr = client_manager.clone();
652            let sub = subscription.clone();
653            let view_id_clone = view_id.clone();
654            let mode = view_spec.mode;
655            tokio::spawn(
656                async move {
657                    loop {
658                        tokio::select! {
659                            _ = cancel_token.cancelled() => {
660                                debug!("List subscription cancelled for client {}", client_id);
661                                break;
662                            }
663                            result = rx.recv() => {
664                                match result {
665                                    Ok(envelope) => {
666                                        if sub.matches(&envelope.entity, &envelope.key)
667                                            && client_mgr
668                                                .send_to_client(client_id, envelope.payload.clone())
669                                                .is_err()
670                                        {
671                                            break;
672                                        }
673                                    }
674                                    Err(_) => break,
675                                }
676                            }
677                        }
678                    }
679                }
680                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
681            );
682        }
683    }
684
685    info!(
686        "Client {} subscribed to {} (mode: {:?})",
687        client_id, view_id, view_spec.mode
688    );
689}