hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::view::ViewIndex;
3use crate::websocket::client_manager::ClientManager;
4use crate::websocket::frame::Mode;
5use crate::websocket::subscription::Subscription;
6use anyhow::Result;
7use futures_util::StreamExt;
8use std::net::SocketAddr;
9use std::sync::Arc;
10
11use tokio::net::{TcpListener, TcpStream};
12use tokio_tungstenite::accept_async;
13use tracing::{debug, error, info, warn};
14use uuid::Uuid;
15
16#[cfg(feature = "otel")]
17use crate::metrics::Metrics;
18
19pub struct WebSocketServer {
20    bind_addr: SocketAddr,
21    client_manager: ClientManager,
22    bus_manager: BusManager,
23    view_index: Arc<ViewIndex>,
24    max_clients: usize,
25    #[cfg(feature = "otel")]
26    metrics: Option<Arc<Metrics>>,
27}
28
29impl WebSocketServer {
30    #[cfg(feature = "otel")]
31    pub fn new(
32        bind_addr: SocketAddr,
33        bus_manager: BusManager,
34        view_index: Arc<ViewIndex>,
35        metrics: Option<Arc<Metrics>>,
36    ) -> Self {
37        Self {
38            bind_addr,
39            client_manager: ClientManager::new(),
40            bus_manager,
41            view_index,
42            max_clients: 10000,
43            metrics,
44        }
45    }
46
47    #[cfg(not(feature = "otel"))]
48    pub fn new(bind_addr: SocketAddr, bus_manager: BusManager, view_index: Arc<ViewIndex>) -> Self {
49        Self {
50            bind_addr,
51            client_manager: ClientManager::new(),
52            bus_manager,
53            view_index,
54            max_clients: 10000,
55        }
56    }
57
58    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
59        self.max_clients = max_clients;
60        self
61    }
62
63    pub async fn start(self) -> Result<()> {
64        info!(
65            "Starting WebSocket server on {} (max_clients: {})",
66            self.bind_addr, self.max_clients
67        );
68
69        let listener = TcpListener::bind(&self.bind_addr).await?;
70        info!("WebSocket server listening on {}", self.bind_addr);
71
72        // Start cleanup task
73        self.client_manager.start_cleanup_task().await;
74
75        // Accept incoming connections
76        loop {
77            match listener.accept().await {
78                Ok((stream, addr)) => {
79                    // Check if we've reached the maximum number of clients
80                    let client_count = self.client_manager.client_count().await;
81                    if client_count >= self.max_clients {
82                        warn!(
83                            "Rejecting connection from {} - max clients ({}) reached",
84                            addr, self.max_clients
85                        );
86                        // Accept the connection but immediately close it
87                        if let Ok(mut ws_stream) = accept_async(stream).await {
88                            let _ = ws_stream.close(None).await;
89                        }
90                        continue;
91                    }
92
93                    // Record connection metric
94                    #[cfg(feature = "otel")]
95                    if let Some(ref metrics) = self.metrics {
96                        metrics.record_ws_connection();
97                    }
98
99                    info!(
100                        "New WebSocket connection from {} ({}/{} clients)",
101                        addr,
102                        client_count + 1,
103                        self.max_clients
104                    );
105                    let client_manager = self.client_manager.clone();
106                    let bus_manager = self.bus_manager.clone();
107                    let view_index = self.view_index.clone();
108                    #[cfg(feature = "otel")]
109                    let metrics = self.metrics.clone();
110
111                    tokio::spawn(async move {
112                        #[cfg(feature = "otel")]
113                        let result = handle_connection(
114                            stream,
115                            client_manager,
116                            bus_manager,
117                            view_index,
118                            metrics,
119                        )
120                        .await;
121                        #[cfg(not(feature = "otel"))]
122                        let result =
123                            handle_connection(stream, client_manager, bus_manager, view_index)
124                                .await;
125
126                        if let Err(e) = result {
127                            error!("WebSocket connection error: {}", e);
128                        }
129                    });
130                }
131                Err(e) => {
132                    error!("Failed to accept connection: {}", e);
133                }
134            }
135        }
136    }
137}
138
139#[cfg(feature = "otel")]
140async fn handle_connection(
141    stream: TcpStream,
142    client_manager: ClientManager,
143    bus_manager: BusManager,
144    view_index: Arc<ViewIndex>,
145    metrics: Option<Arc<Metrics>>,
146) -> Result<()> {
147    let ws_stream = accept_async(stream).await?;
148    let client_id = Uuid::new_v4();
149    let connection_start = Instant::now();
150
151    info!("WebSocket connection established for client {}", client_id);
152
153    let (ws_sender, mut ws_receiver) = ws_stream.split();
154
155    // Register client
156    client_manager.add_client(client_id, ws_sender).await?;
157
158    // Track active subscriptions for this client (for cleanup)
159    let mut active_subscriptions: Vec<String> = Vec::new();
160
161    // Handle incoming messages from client
162    loop {
163        tokio::select! {
164            ws_msg = ws_receiver.next() => {
165                match ws_msg {
166                    Some(Ok(msg)) => {
167                        if msg.is_close() {
168                            info!("Client {} requested close", client_id);
169                            break;
170                        }
171
172                        client_manager.update_client_last_seen(client_id).await;
173
174                        if msg.is_text() {
175                            // Record message received metric
176                            if let Some(ref m) = metrics {
177                                m.record_ws_message_received();
178                            }
179
180                            if let Ok(text) = msg.to_text() {
181                                debug!("Received text message from client {}: {}", client_id, text);
182
183                                // Try to parse as subscription
184                                if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
185                                    let view_id = subscription.view.clone();
186                                    client_manager.update_subscription(client_id, subscription.clone()).await;
187
188                                    // Record subscription metric
189                                    if let Some(ref m) = metrics {
190                                        m.record_subscription_created(&view_id);
191                                    }
192                                    active_subscriptions.push(view_id);
193
194                                    // Attach client to appropriate bus
195                                    attach_client_to_bus(
196                                        client_id,
197                                        subscription,
198                                        &client_manager,
199                                        &bus_manager,
200                                        &view_index,
201                                        metrics.clone(),
202                                    ).await;
203                                } else {
204                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
205                                }
206                            }
207                        }
208                    }
209                    Some(Err(e)) => {
210                        warn!("WebSocket error for client {}: {}", client_id, e);
211                        break;
212                    }
213                    None => {
214                        debug!("WebSocket stream ended for client {}", client_id);
215                        break;
216                    }
217                }
218            }
219        }
220    }
221
222    // Clean up client
223    client_manager.remove_client(client_id).await;
224
225    // Record disconnection metrics
226    if let Some(ref m) = metrics {
227        let duration_secs = connection_start.elapsed().as_secs_f64();
228        m.record_ws_disconnection(duration_secs);
229
230        // Clean up subscription metrics
231        for view_id in active_subscriptions {
232            m.record_subscription_removed(&view_id);
233        }
234    }
235
236    info!("Client {} disconnected", client_id);
237
238    Ok(())
239}
240
241#[cfg(not(feature = "otel"))]
242async fn handle_connection(
243    stream: TcpStream,
244    client_manager: ClientManager,
245    bus_manager: BusManager,
246    view_index: Arc<ViewIndex>,
247) -> Result<()> {
248    let ws_stream = accept_async(stream).await?;
249    let client_id = Uuid::new_v4();
250
251    info!("WebSocket connection established for client {}", client_id);
252
253    let (ws_sender, mut ws_receiver) = ws_stream.split();
254
255    // Register client
256    client_manager.add_client(client_id, ws_sender).await?;
257
258    // Handle incoming messages from client
259    loop {
260        tokio::select! {
261            ws_msg = ws_receiver.next() => {
262                match ws_msg {
263                    Some(Ok(msg)) => {
264                        if msg.is_close() {
265                            info!("Client {} requested close", client_id);
266                            break;
267                        }
268
269                        client_manager.update_client_last_seen(client_id).await;
270
271                        if msg.is_text() {
272                            if let Ok(text) = msg.to_text() {
273                                debug!("Received text message from client {}: {}", client_id, text);
274
275                                // Try to parse as subscription
276                                if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
277                                    client_manager.update_subscription(client_id, subscription.clone()).await;
278
279                                    // Attach client to appropriate bus
280                                    attach_client_to_bus(client_id, subscription, &client_manager, &bus_manager, &view_index).await;
281                                } else {
282                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
283                                }
284                            }
285                        }
286                    }
287                    Some(Err(e)) => {
288                        warn!("WebSocket error for client {}: {}", client_id, e);
289                        break;
290                    }
291                    None => {
292                        debug!("WebSocket stream ended for client {}", client_id);
293                        break;
294                    }
295                }
296            }
297        }
298    }
299
300    // Clean up client
301    client_manager.remove_client(client_id).await;
302    info!("Client {} disconnected", client_id);
303
304    Ok(())
305}
306
307#[cfg(feature = "otel")]
308async fn attach_client_to_bus(
309    client_id: Uuid,
310    subscription: Subscription,
311    client_manager: &ClientManager,
312    bus_manager: &BusManager,
313    view_index: &ViewIndex,
314    metrics: Option<Arc<Metrics>>,
315) {
316    let view_id = &subscription.view;
317
318    // Get the view spec to determine the mode
319    let view_spec = match view_index.get_view(view_id) {
320        Some(spec) => spec,
321        None => {
322            warn!("Unknown view ID: {}", view_id);
323            return;
324        }
325    };
326
327    match view_spec.mode {
328        Mode::State => {
329            let key = subscription.key.as_deref().unwrap_or("");
330            let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
331
332            // Send current value immediately (latest-only semantics)
333            if !rx.borrow().is_empty() {
334                let data = rx.borrow().clone();
335                let _ = client_manager.send_to_client(client_id, data).await;
336                if let Some(ref m) = metrics {
337                    m.record_ws_message_sent();
338                }
339            }
340
341            // Spawn task to listen for updates
342            let client_mgr = client_manager.clone();
343            let metrics_clone = metrics.clone();
344            tokio::spawn(async move {
345                while rx.changed().await.is_ok() {
346                    let data = rx.borrow().clone();
347                    if client_mgr.send_to_client(client_id, data).await.is_err() {
348                        break; // Client disconnected
349                    }
350                    if let Some(ref m) = metrics_clone {
351                        m.record_ws_message_sent();
352                    }
353                }
354            });
355        }
356        Mode::Kv | Mode::Append => {
357            let mut rx = bus_manager.get_or_create_kv_bus(view_id).await;
358
359            let client_mgr = client_manager.clone();
360            let sub = subscription.clone();
361            let metrics_clone = metrics.clone();
362            tokio::spawn(async move {
363                while let Ok(envelope) = rx.recv().await {
364                    // Filter messages based on subscription
365                    if sub.matches(&envelope.entity, &envelope.key) {
366                        if client_mgr
367                            .send_to_client(client_id, envelope.payload.clone())
368                            .await
369                            .is_err()
370                        {
371                            break; // Client disconnected
372                        }
373                        if let Some(ref m) = metrics_clone {
374                            m.record_ws_message_sent();
375                        }
376                    }
377                }
378            });
379        }
380        Mode::List => {
381            let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
382
383            let client_mgr = client_manager.clone();
384            let sub = subscription.clone();
385            let metrics_clone = metrics.clone();
386            tokio::spawn(async move {
387                while let Ok(envelope) = rx.recv().await {
388                    // Filter messages based on subscription
389                    if sub.matches(&envelope.entity, &envelope.key) {
390                        if client_mgr
391                            .send_to_client(client_id, envelope.payload.clone())
392                            .await
393                            .is_err()
394                        {
395                            break; // Client disconnected
396                        }
397                        if let Some(ref m) = metrics_clone {
398                            m.record_ws_message_sent();
399                        }
400                    }
401                }
402            });
403        }
404    }
405
406    info!(
407        "Client {} subscribed to {} (mode: {:?})",
408        client_id, view_id, view_spec.mode
409    );
410}
411
412#[cfg(not(feature = "otel"))]
413async fn attach_client_to_bus(
414    client_id: Uuid,
415    subscription: Subscription,
416    client_manager: &ClientManager,
417    bus_manager: &BusManager,
418    view_index: &ViewIndex,
419) {
420    let view_id = &subscription.view;
421
422    // Get the view spec to determine the mode
423    let view_spec = match view_index.get_view(view_id) {
424        Some(spec) => spec,
425        None => {
426            warn!("Unknown view ID: {}", view_id);
427            return;
428        }
429    };
430
431    match view_spec.mode {
432        Mode::State => {
433            let key = subscription.key.as_deref().unwrap_or("");
434            let mut rx = bus_manager.get_or_create_state_bus(view_id, key).await;
435
436            // Send current value immediately (latest-only semantics)
437            if !rx.borrow().is_empty() {
438                let data = rx.borrow().clone();
439                let _ = client_manager.send_to_client(client_id, data).await;
440            }
441
442            // Spawn task to listen for updates
443            let client_mgr = client_manager.clone();
444            tokio::spawn(async move {
445                while rx.changed().await.is_ok() {
446                    let data = rx.borrow().clone();
447                    if client_mgr.send_to_client(client_id, data).await.is_err() {
448                        break; // Client disconnected
449                    }
450                }
451            });
452        }
453        Mode::Kv | Mode::Append => {
454            let mut rx = bus_manager.get_or_create_kv_bus(view_id).await;
455
456            let client_mgr = client_manager.clone();
457            let sub = subscription.clone();
458            tokio::spawn(async move {
459                while let Ok(envelope) = rx.recv().await {
460                    // Filter messages based on subscription
461                    if sub.matches(&envelope.entity, &envelope.key)
462                        && client_mgr
463                            .send_to_client(client_id, envelope.payload.clone())
464                            .await
465                            .is_err()
466                    {
467                        break; // Client disconnected
468                    }
469                }
470            });
471        }
472        Mode::List => {
473            let mut rx = bus_manager.get_or_create_list_bus(view_id).await;
474
475            let client_mgr = client_manager.clone();
476            let sub = subscription.clone();
477            tokio::spawn(async move {
478                while let Ok(envelope) = rx.recv().await {
479                    // Filter messages based on subscription
480                    if sub.matches(&envelope.entity, &envelope.key)
481                        && client_mgr
482                            .send_to_client(client_id, envelope.payload.clone())
483                            .await
484                            .is_err()
485                    {
486                        break; // Client disconnected
487                    }
488                }
489            });
490        }
491    }
492
493    info!(
494        "Client {} subscribed to {} (mode: {:?})",
495        client_id, view_id, view_spec.mode
496    );
497}