Skip to main content

hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7    transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
8    SortOrder, SubscribedFrame,
9};
10use crate::websocket::subscription::{ClientMessage, Subscription};
11use anyhow::Result;
12use bytes::Bytes;
13use futures_util::StreamExt;
14use std::collections::HashSet;
15use std::net::SocketAddr;
16use std::sync::Arc;
17#[cfg(feature = "otel")]
18use std::time::Instant;
19
20use tokio::net::{TcpListener, TcpStream};
21use tokio_tungstenite::accept_async;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, error, info, info_span, warn, Instrument};
24use uuid::Uuid;
25
26#[cfg(feature = "otel")]
27use crate::metrics::Metrics;
28
29struct SubscriptionContext<'a> {
30    client_id: Uuid,
31    client_manager: &'a ClientManager,
32    bus_manager: &'a BusManager,
33    entity_cache: &'a EntityCache,
34    view_index: &'a ViewIndex,
35    #[cfg(feature = "otel")]
36    metrics: Option<Arc<Metrics>>,
37}
38
39pub struct WebSocketServer {
40    bind_addr: SocketAddr,
41    client_manager: ClientManager,
42    bus_manager: BusManager,
43    entity_cache: EntityCache,
44    view_index: Arc<ViewIndex>,
45    max_clients: usize,
46    #[cfg(feature = "otel")]
47    metrics: Option<Arc<Metrics>>,
48}
49
50impl WebSocketServer {
51    #[cfg(feature = "otel")]
52    pub fn new(
53        bind_addr: SocketAddr,
54        bus_manager: BusManager,
55        entity_cache: EntityCache,
56        view_index: Arc<ViewIndex>,
57        metrics: Option<Arc<Metrics>>,
58    ) -> Self {
59        Self {
60            bind_addr,
61            client_manager: ClientManager::new(),
62            bus_manager,
63            entity_cache,
64            view_index,
65            max_clients: 10000,
66            metrics,
67        }
68    }
69
70    #[cfg(not(feature = "otel"))]
71    pub fn new(
72        bind_addr: SocketAddr,
73        bus_manager: BusManager,
74        entity_cache: EntityCache,
75        view_index: Arc<ViewIndex>,
76    ) -> Self {
77        Self {
78            bind_addr,
79            client_manager: ClientManager::new(),
80            bus_manager,
81            entity_cache,
82            view_index,
83            max_clients: 10000,
84        }
85    }
86
87    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
88        self.max_clients = max_clients;
89        self
90    }
91
92    pub async fn start(self) -> Result<()> {
93        info!(
94            "Starting WebSocket server on {} (max_clients: {})",
95            self.bind_addr, self.max_clients
96        );
97
98        let listener = TcpListener::bind(&self.bind_addr).await?;
99        info!("WebSocket server listening on {}", self.bind_addr);
100
101        self.client_manager.start_cleanup_task();
102
103        loop {
104            match listener.accept().await {
105                Ok((stream, addr)) => {
106                    let client_count = self.client_manager.client_count();
107                    if client_count >= self.max_clients {
108                        warn!(
109                            "Rejecting connection from {} - max clients ({}) reached",
110                            addr, self.max_clients
111                        );
112                        drop(stream);
113                        continue;
114                    }
115
116                    #[cfg(feature = "otel")]
117                    if let Some(ref metrics) = self.metrics {
118                        metrics.record_ws_connection();
119                    }
120
121                    info!(
122                        "New WebSocket connection from {} ({}/{} clients)",
123                        addr,
124                        client_count + 1,
125                        self.max_clients
126                    );
127                    let client_manager = self.client_manager.clone();
128                    let bus_manager = self.bus_manager.clone();
129                    let entity_cache = self.entity_cache.clone();
130                    let view_index = self.view_index.clone();
131                    #[cfg(feature = "otel")]
132                    let metrics = self.metrics.clone();
133
134                    tokio::spawn(
135                        async move {
136                            #[cfg(feature = "otel")]
137                            let result = handle_connection(
138                                stream,
139                                client_manager,
140                                bus_manager,
141                                entity_cache,
142                                view_index,
143                                metrics,
144                            )
145                            .await;
146                            #[cfg(not(feature = "otel"))]
147                            let result = handle_connection(
148                                stream,
149                                client_manager,
150                                bus_manager,
151                                entity_cache,
152                                view_index,
153                            )
154                            .await;
155
156                            if let Err(e) = result {
157                                error!("WebSocket connection error: {}", e);
158                            }
159                        }
160                        .instrument(info_span!("ws.connection", %addr)),
161                    );
162                }
163                Err(e) => {
164                    error!("Failed to accept connection: {}", e);
165                }
166            }
167        }
168    }
169}
170
171#[cfg(feature = "otel")]
172async fn handle_connection(
173    stream: TcpStream,
174    client_manager: ClientManager,
175    bus_manager: BusManager,
176    entity_cache: EntityCache,
177    view_index: Arc<ViewIndex>,
178    metrics: Option<Arc<Metrics>>,
179) -> Result<()> {
180    let ws_stream = accept_async(stream).await?;
181    let client_id = Uuid::new_v4();
182    let connection_start = Instant::now();
183
184    info!("WebSocket connection established for client {}", client_id);
185
186    let (ws_sender, mut ws_receiver) = ws_stream.split();
187
188    client_manager.add_client(client_id, ws_sender);
189
190    let ctx = SubscriptionContext {
191        client_id,
192        client_manager: &client_manager,
193        bus_manager: &bus_manager,
194        entity_cache: &entity_cache,
195        view_index: &view_index,
196        metrics: metrics.clone(),
197    };
198
199    let mut active_subscriptions: Vec<String> = Vec::new();
200
201    loop {
202        tokio::select! {
203            ws_msg = ws_receiver.next() => {
204                match ws_msg {
205                    Some(Ok(msg)) => {
206                        if msg.is_close() {
207                            info!("Client {} requested close", client_id);
208                            break;
209                        }
210
211                        client_manager.update_client_last_seen(client_id);
212
213                        if msg.is_text() {
214                            if let Some(ref m) = metrics {
215                                m.record_ws_message_received();
216                            }
217
218                            if let Ok(text) = msg.to_text() {
219                                debug!("Received text message from client {}: {}", client_id, text);
220
221                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
222                                    match client_msg {
223                                        ClientMessage::Subscribe(subscription) => {
224                                            let view_id = subscription.view.clone();
225                                            let sub_key = subscription.sub_key();
226                                            client_manager.update_subscription(client_id, subscription.clone());
227
228                                            let cancel_token = CancellationToken::new();
229                                            let is_new = client_manager.add_client_subscription(
230                                                client_id,
231                                                sub_key.clone(),
232                                                cancel_token.clone(),
233                                            ).await;
234
235                                            if !is_new {
236                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
237                                                continue;
238                                            }
239
240                                            if let Some(ref m) = metrics {
241                                                m.record_subscription_created(&view_id);
242                                            }
243                                            active_subscriptions.push(view_id);
244
245                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
246                                        }
247                                        ClientMessage::Unsubscribe(unsub) => {
248                                            let sub_key = unsub.sub_key();
249                                            let removed = client_manager
250                                                .remove_client_subscription(client_id, &sub_key)
251                                                .await;
252
253                                            if removed {
254                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
255                                                if let Some(ref m) = metrics {
256                                                    m.record_subscription_removed(&unsub.view);
257                                                }
258                                            }
259                                        }
260                                        ClientMessage::Ping => {
261                                            debug!("Received ping from client {}", client_id);
262                                        }
263                                    }
264                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
265                                    let view_id = subscription.view.clone();
266                                    let sub_key = subscription.sub_key();
267                                    client_manager.update_subscription(client_id, subscription.clone());
268
269                                    let cancel_token = CancellationToken::new();
270                                    let is_new = client_manager.add_client_subscription(
271                                        client_id,
272                                        sub_key.clone(),
273                                        cancel_token.clone(),
274                                    ).await;
275
276                                    if !is_new {
277                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
278                                        continue;
279                                    }
280
281                                    if let Some(ref m) = metrics {
282                                        m.record_subscription_created(&view_id);
283                                    }
284                                    active_subscriptions.push(view_id);
285
286                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
287                                } else {
288                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
289                                }
290                            }
291                        }
292                    }
293                    Some(Err(e)) => {
294                        warn!("WebSocket error for client {}: {}", client_id, e);
295                        break;
296                    }
297                    None => {
298                        debug!("WebSocket stream ended for client {}", client_id);
299                        break;
300                    }
301                }
302            }
303        }
304    }
305
306    client_manager
307        .cancel_all_client_subscriptions(client_id)
308        .await;
309    client_manager.remove_client(client_id);
310
311    if let Some(ref m) = metrics {
312        let duration_secs = connection_start.elapsed().as_secs_f64();
313        m.record_ws_disconnection(duration_secs);
314
315        for view_id in active_subscriptions {
316            m.record_subscription_removed(&view_id);
317        }
318    }
319
320    info!("Client {} disconnected", client_id);
321
322    Ok(())
323}
324
325#[cfg(not(feature = "otel"))]
326async fn handle_connection(
327    stream: TcpStream,
328    client_manager: ClientManager,
329    bus_manager: BusManager,
330    entity_cache: EntityCache,
331    view_index: Arc<ViewIndex>,
332) -> Result<()> {
333    let ws_stream = accept_async(stream).await?;
334    let client_id = Uuid::new_v4();
335
336    info!("WebSocket connection established for client {}", client_id);
337
338    let (ws_sender, mut ws_receiver) = ws_stream.split();
339
340    client_manager.add_client(client_id, ws_sender);
341
342    let ctx = SubscriptionContext {
343        client_id,
344        client_manager: &client_manager,
345        bus_manager: &bus_manager,
346        entity_cache: &entity_cache,
347        view_index: &view_index,
348    };
349
350    loop {
351        tokio::select! {
352            ws_msg = ws_receiver.next() => {
353                match ws_msg {
354                    Some(Ok(msg)) => {
355                        if msg.is_close() {
356                            info!("Client {} requested close", client_id);
357                            break;
358                        }
359
360                        client_manager.update_client_last_seen(client_id);
361
362                        if msg.is_text() {
363                            if let Ok(text) = msg.to_text() {
364                                debug!("Received text message from client {}: {}", client_id, text);
365
366                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
367                                    match client_msg {
368                                        ClientMessage::Subscribe(subscription) => {
369                                            let sub_key = subscription.sub_key();
370                                            client_manager.update_subscription(client_id, subscription.clone());
371
372                                            let cancel_token = CancellationToken::new();
373                                            let is_new = client_manager.add_client_subscription(
374                                                client_id,
375                                                sub_key.clone(),
376                                                cancel_token.clone(),
377                                            ).await;
378
379                                            if !is_new {
380                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
381                                                continue;
382                                            }
383
384                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
385                                        }
386                                        ClientMessage::Unsubscribe(unsub) => {
387                                            let sub_key = unsub.sub_key();
388                                            let removed = client_manager
389                                                .remove_client_subscription(client_id, &sub_key)
390                                                .await;
391
392                                            if removed {
393                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
394                                            }
395                                        }
396                                        ClientMessage::Ping => {
397                                            debug!("Received ping from client {}", client_id);
398                                        }
399                                    }
400                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
401                                    let sub_key = subscription.sub_key();
402                                    client_manager.update_subscription(client_id, subscription.clone());
403
404                                    let cancel_token = CancellationToken::new();
405                                    let is_new = client_manager.add_client_subscription(
406                                        client_id,
407                                        sub_key.clone(),
408                                        cancel_token.clone(),
409                                    ).await;
410
411                                    if !is_new {
412                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
413                                        continue;
414                                    }
415
416                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
417                                } else {
418                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
419                                }
420                            }
421                        }
422                    }
423                    Some(Err(e)) => {
424                        warn!("WebSocket error for client {}: {}", client_id, e);
425                        break;
426                    }
427                    None => {
428                        debug!("WebSocket stream ended for client {}", client_id);
429                        break;
430                    }
431                }
432            }
433        }
434    }
435
436    client_manager
437        .cancel_all_client_subscriptions(client_id)
438        .await;
439    client_manager.remove_client(client_id);
440    info!("Client {} disconnected", client_id);
441
442    Ok(())
443}
444
445async fn send_snapshot_batches(
446    client_id: Uuid,
447    entities: &[SnapshotEntity],
448    mode: Mode,
449    view_id: &str,
450    client_manager: &ClientManager,
451    batch_config: &SnapshotBatchConfig,
452    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
453) -> Result<()> {
454    let total = entities.len();
455    if total == 0 {
456        return Ok(());
457    }
458
459    let mut offset = 0;
460    let mut batch_num = 0;
461
462    while offset < total {
463        let batch_size = if batch_num == 0 {
464            batch_config.initial_batch_size
465        } else {
466            batch_config.subsequent_batch_size
467        };
468
469        let end = (offset + batch_size).min(total);
470        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
471        let is_complete = end >= total;
472
473        let snapshot_frame = SnapshotFrame {
474            mode,
475            export: view_id.to_string(),
476            op: "snapshot",
477            data: batch_data,
478            complete: is_complete,
479        };
480
481        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
482            let payload = maybe_compress(&json_payload);
483            if client_manager
484                .send_compressed_async(client_id, payload)
485                .await
486                .is_err()
487            {
488                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
489            }
490            #[cfg(feature = "otel")]
491            if let Some(m) = metrics {
492                m.record_ws_message_sent();
493            }
494        }
495
496        offset = end;
497        batch_num += 1;
498    }
499
500    debug!(
501        "Sent {} snapshot batches ({} entities) for {} to client {}",
502        batch_num, total, view_id, client_id
503    );
504
505    Ok(())
506}
507
508fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
509    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
510        return Some(SortConfig {
511            field: sort.field_path.clone(),
512            order: match sort.order {
513                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
514                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
515            },
516        });
517    }
518
519    if view_spec.mode == Mode::List {
520        return Some(SortConfig {
521            field: vec!["_seq".to_string()],
522            order: SortOrder::Desc,
523        });
524    }
525
526    None
527}
528
529fn send_subscribed_frame(
530    client_id: Uuid,
531    view_id: &str,
532    view_spec: &ViewSpec,
533    client_manager: &ClientManager,
534) -> Result<()> {
535    let sort_config = extract_sort_config(view_spec);
536    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
537
538    let json_payload = serde_json::to_vec(&subscribed_frame)?;
539    let payload = Arc::new(Bytes::from(json_payload));
540    client_manager
541        .send_to_client(client_id, payload)
542        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
543}
544
545#[cfg(feature = "otel")]
546async fn attach_client_to_bus(
547    ctx: &SubscriptionContext<'_>,
548    subscription: Subscription,
549    cancel_token: CancellationToken,
550) {
551    let view_id = &subscription.view;
552
553    let view_spec = match ctx.view_index.get_view(view_id) {
554        Some(spec) => spec.clone(),
555        None => {
556            warn!("Unknown view ID: {}", view_id);
557            return;
558        }
559    };
560
561    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
562        warn!("Failed to send subscribed frame: {}", e);
563        return;
564    }
565
566    let is_derived_with_sort = view_spec.is_derived()
567        && view_spec
568            .pipeline
569            .as_ref()
570            .map(|p| p.sort.is_some())
571            .unwrap_or(false);
572
573    if is_derived_with_sort {
574        attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
575        return;
576    }
577
578    match view_spec.mode {
579        Mode::State => {
580            let key = subscription.key.as_deref().unwrap_or("");
581
582            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
583
584            if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
585                transform_large_u64_to_strings(&mut cached_entity);
586                let snapshot_entities = vec![SnapshotEntity {
587                    key: key.to_string(),
588                    data: cached_entity,
589                }];
590                let batch_config = ctx.entity_cache.snapshot_config();
591                let _ = send_snapshot_batches(
592                    ctx.client_id,
593                    &snapshot_entities,
594                    view_spec.mode,
595                    view_id,
596                    ctx.client_manager,
597                    &batch_config,
598                    #[cfg(feature = "otel")]
599                    ctx.metrics.as_ref(),
600                )
601                .await;
602                rx.borrow_and_update();
603            } else if !rx.borrow().is_empty() {
604                let data = rx.borrow_and_update().clone();
605                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
606            }
607
608            let client_id = ctx.client_id;
609            let client_mgr = ctx.client_manager.clone();
610            let metrics_clone = ctx.metrics.clone();
611            let view_id_clone = view_id.clone();
612            let key_clone = key.to_string();
613            tokio::spawn(
614                async move {
615                    loop {
616                        tokio::select! {
617                            _ = cancel_token.cancelled() => {
618                                debug!("State subscription cancelled for client {}", client_id);
619                                break;
620                            }
621                            result = rx.changed() => {
622                                if result.is_err() {
623                                    break;
624                                }
625                                let data = rx.borrow().clone();
626                                if client_mgr.send_to_client(client_id, data).is_err() {
627                                    break;
628                                }
629                                if let Some(ref m) = metrics_clone {
630                                    m.record_ws_message_sent();
631                                }
632                            }
633                        }
634                    }
635                }
636                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
637            );
638        }
639        Mode::List | Mode::Append => {
640            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
641
642            let snapshots = ctx.entity_cache.get_all(view_id).await;
643            let snapshot_entities: Vec<SnapshotEntity> = snapshots
644                .into_iter()
645                .filter(|(key, _)| subscription.matches_key(key))
646                .map(|(key, mut data)| {
647                    transform_large_u64_to_strings(&mut data);
648                    SnapshotEntity { key, data }
649                })
650                .collect();
651
652            if !snapshot_entities.is_empty() {
653                let batch_config = ctx.entity_cache.snapshot_config();
654                if send_snapshot_batches(
655                    ctx.client_id,
656                    &snapshot_entities,
657                    view_spec.mode,
658                    view_id,
659                    ctx.client_manager,
660                    &batch_config,
661                    #[cfg(feature = "otel")]
662                    ctx.metrics.as_ref(),
663                )
664                .await
665                .is_err()
666                {
667                    return;
668                }
669            }
670
671            let client_id = ctx.client_id;
672            let client_mgr = ctx.client_manager.clone();
673            let sub = subscription.clone();
674            let metrics_clone = ctx.metrics.clone();
675            let view_id_clone = view_id.clone();
676            let mode = view_spec.mode;
677            tokio::spawn(
678                async move {
679                    loop {
680                        tokio::select! {
681                            _ = cancel_token.cancelled() => {
682                                debug!("List subscription cancelled for client {}", client_id);
683                                break;
684                            }
685                            result = rx.recv() => {
686                                match result {
687                                    Ok(envelope) => {
688                                        if sub.matches(&envelope.entity, &envelope.key) {
689                                            if client_mgr
690                                                .send_to_client(client_id, envelope.payload.clone())
691                                                .is_err()
692                                            {
693                                                break;
694                                            }
695                                            if let Some(ref m) = metrics_clone {
696                                                m.record_ws_message_sent();
697                                            }
698                                        }
699                                    }
700                                    Err(_) => break,
701                                }
702                            }
703                        }
704                    }
705                }
706                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
707            );
708        }
709    }
710
711    info!(
712        "Client {} subscribed to {} (mode: {:?})",
713        ctx.client_id, view_id, view_spec.mode
714    );
715}
716
717#[cfg(feature = "otel")]
718async fn attach_derived_view_subscription_otel(
719    ctx: &SubscriptionContext<'_>,
720    subscription: Subscription,
721    view_spec: ViewSpec,
722    cancel_token: CancellationToken,
723) {
724    let view_id = &subscription.view;
725    let pipeline_limit = view_spec
726        .pipeline
727        .as_ref()
728        .and_then(|p| p.limit)
729        .unwrap_or(100);
730    let take = subscription.take.unwrap_or(pipeline_limit);
731    let skip = subscription.skip.unwrap_or(0);
732    let is_single = take == 1;
733
734    let source_view_id = match &view_spec.source_view {
735        Some(s) => s.clone(),
736        None => {
737            warn!("Derived view {} has no source_view", view_id);
738            return;
739        }
740    };
741
742    let sorted_caches = ctx.view_index.sorted_caches();
743    let initial_window: Vec<(String, serde_json::Value)> = {
744        let mut caches = sorted_caches.write().await;
745        if let Some(cache) = caches.get_mut(view_id) {
746            cache.get_window(skip, take)
747        } else {
748            warn!("No sorted cache for derived view {}", view_id);
749            vec![]
750        }
751    };
752
753    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
754
755    if !initial_window.is_empty() {
756        let snapshot_entities: Vec<SnapshotEntity> = initial_window
757            .into_iter()
758            .map(|(key, mut data)| {
759                transform_large_u64_to_strings(&mut data);
760                SnapshotEntity { key, data }
761            })
762            .collect();
763
764        let batch_config = ctx.entity_cache.snapshot_config();
765        if send_snapshot_batches(
766            ctx.client_id,
767            &snapshot_entities,
768            view_spec.mode,
769            view_id,
770            ctx.client_manager,
771            &batch_config,
772            ctx.metrics.as_ref(),
773        )
774        .await
775        .is_err()
776        {
777            return;
778        }
779    }
780
781    let mut rx = ctx
782        .bus_manager
783        .get_or_create_list_bus(&source_view_id)
784        .await;
785
786    let client_id = ctx.client_id;
787    let client_mgr = ctx.client_manager.clone();
788    let view_id_clone = view_id.clone();
789    let view_id_span = view_id.clone();
790    let sorted_caches_clone = sorted_caches;
791    let metrics_clone = ctx.metrics.clone();
792    let frame_mode = view_spec.mode;
793
794    tokio::spawn(
795        async move {
796            let mut current_window_keys = initial_keys;
797
798            loop {
799                tokio::select! {
800                    _ = cancel_token.cancelled() => {
801                        debug!("Derived view subscription cancelled for client {}", client_id);
802                        break;
803                    }
804                    result = rx.recv() => {
805                        match result {
806                            Ok(_envelope) => {
807                                let new_window: Vec<(String, serde_json::Value)> = {
808                                    let mut caches = sorted_caches_clone.write().await;
809                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
810                                        cache.get_window(skip, take)
811                                    } else {
812                                        continue;
813                                    }
814                                };
815
816                                let new_keys: HashSet<String> =
817                                    new_window.iter().map(|(k, _)| k.clone()).collect();
818
819                                if is_single {
820                                    if let Some((new_key, data)) = new_window.first() {
821                                        for old_key in current_window_keys.difference(&new_keys) {
822                                            let delete_frame = Frame {
823                                                mode: frame_mode,
824                                                export: view_id_clone.clone(),
825                                                op: "delete",
826                                                key: old_key.clone(),
827                                                data: serde_json::Value::Null,
828                                                append: vec![],
829                                            };
830                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
831                                                let payload = Arc::new(Bytes::from(json));
832                                                if client_mgr.send_to_client(client_id, payload).is_err() {
833                                                    return;
834                                                }
835                                                if let Some(ref m) = metrics_clone {
836                                                    m.record_ws_message_sent();
837                                                }
838                                            }
839                                        }
840
841                                        let mut transformed_data = data.clone();
842                                        transform_large_u64_to_strings(&mut transformed_data);
843                                        let frame = Frame {
844                                            mode: frame_mode,
845                                            export: view_id_clone.clone(),
846                                            op: "upsert",
847                                            key: new_key.clone(),
848                                            data: transformed_data,
849                                            append: vec![],
850                                        };
851                                        
852                                        if let Ok(json) = serde_json::to_vec(&frame) {
853                                            let payload = Arc::new(Bytes::from(json));
854                                            if client_mgr.send_to_client(client_id, payload).is_err() {
855                                                return;
856                                            }
857                                            if let Some(ref m) = metrics_clone {
858                                                m.record_ws_message_sent();
859                                            }
860                                        }
861                                    }
862                                } else {
863                                    for key in current_window_keys.difference(&new_keys) {
864                                        let delete_frame = Frame {
865                                            mode: frame_mode,
866                                            export: view_id_clone.clone(),
867                                            op: "delete",
868                                            key: key.clone(),
869                                            data: serde_json::Value::Null,
870                                            append: vec![],
871                                        };
872                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
873                                            let payload = Arc::new(Bytes::from(json));
874                                            if client_mgr.send_to_client(client_id, payload).is_err() {
875                                                return;
876                                            }
877                                            if let Some(ref m) = metrics_clone {
878                                                m.record_ws_message_sent();
879                                            }
880                                        }
881                                    }
882
883                                    for (key, data) in &new_window {
884                                        let mut transformed_data = data.clone();
885                                        transform_large_u64_to_strings(&mut transformed_data);
886                                        let frame = Frame {
887                                            mode: frame_mode,
888                                            export: view_id_clone.clone(),
889                                            op: "upsert",
890                                            key: key.clone(),
891                                            data: transformed_data,
892                                            append: vec![],
893                                        };
894                                        if let Ok(json) = serde_json::to_vec(&frame) {
895                                            let payload = Arc::new(Bytes::from(json));
896                                            if client_mgr.send_to_client(client_id, payload).is_err() {
897                                                return;
898                                            }
899                                            if let Some(ref m) = metrics_clone {
900                                                m.record_ws_message_sent();
901                                            }
902                                        }
903                                    }
904                                }
905
906                                current_window_keys = new_keys;
907                            }
908                            Err(_) => break,
909                        }
910                    }
911                }
912            }
913        }
914        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
915    );
916
917    info!(
918        "Client {} subscribed to derived view {} (take={}, skip={})",
919        ctx.client_id, view_id, take, skip
920    );
921}
922
923#[cfg(not(feature = "otel"))]
924async fn attach_client_to_bus(
925    ctx: &SubscriptionContext<'_>,
926    subscription: Subscription,
927    cancel_token: CancellationToken,
928) {
929    let view_id = &subscription.view;
930
931    let view_spec = match ctx.view_index.get_view(view_id) {
932        Some(spec) => spec.clone(),
933        None => {
934            warn!("Unknown view ID: {}", view_id);
935            return;
936        }
937    };
938
939    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
940        warn!("Failed to send subscribed frame: {}", e);
941        return;
942    }
943
944    let is_derived_with_sort = view_spec.is_derived()
945        && view_spec
946            .pipeline
947            .as_ref()
948            .map(|p| p.sort.is_some())
949            .unwrap_or(false);
950
951    if is_derived_with_sort {
952        attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
953        return;
954    }
955
956    match view_spec.mode {
957        Mode::State => {
958            let key = subscription.key.as_deref().unwrap_or("");
959
960            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
961
962            if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
963                transform_large_u64_to_strings(&mut cached_entity);
964                let snapshot_entities = vec![SnapshotEntity {
965                    key: key.to_string(),
966                    data: cached_entity,
967                }];
968                let batch_config = ctx.entity_cache.snapshot_config();
969                let _ = send_snapshot_batches(
970                    ctx.client_id,
971                    &snapshot_entities,
972                    view_spec.mode,
973                    view_id,
974                    ctx.client_manager,
975                    &batch_config,
976                )
977                .await;
978                rx.borrow_and_update();
979            } else if !rx.borrow().is_empty() {
980                let data = rx.borrow_and_update().clone();
981                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
982            }
983
984            let client_id = ctx.client_id;
985            let client_mgr = ctx.client_manager.clone();
986            let view_id_clone = view_id.clone();
987            let key_clone = key.to_string();
988            tokio::spawn(
989                async move {
990                    loop {
991                        tokio::select! {
992                            _ = cancel_token.cancelled() => {
993                                debug!("State subscription cancelled for client {}", client_id);
994                                break;
995                            }
996                            result = rx.changed() => {
997                                if result.is_err() {
998                                    break;
999                                }
1000                                let data = rx.borrow().clone();
1001                                if client_mgr.send_to_client(client_id, data).is_err() {
1002                                    break;
1003                                }
1004                            }
1005                        }
1006                    }
1007                }
1008                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
1009            );
1010        }
1011        Mode::List | Mode::Append => {
1012            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1013
1014            let snapshots = ctx.entity_cache.get_all(view_id).await;
1015            let snapshot_entities: Vec<SnapshotEntity> = snapshots
1016                .into_iter()
1017                .filter(|(key, _)| subscription.matches_key(key))
1018                .map(|(key, mut data)| {
1019                    transform_large_u64_to_strings(&mut data);
1020                    SnapshotEntity { key, data }
1021                })
1022                .collect();
1023
1024            if !snapshot_entities.is_empty() {
1025                let batch_config = ctx.entity_cache.snapshot_config();
1026                if send_snapshot_batches(
1027                    ctx.client_id,
1028                    &snapshot_entities,
1029                    view_spec.mode,
1030                    view_id,
1031                    ctx.client_manager,
1032                    &batch_config,
1033                )
1034                .await
1035                .is_err()
1036                {
1037                    return;
1038                }
1039            }
1040
1041            let client_id = ctx.client_id;
1042            let client_mgr = ctx.client_manager.clone();
1043            let sub = subscription.clone();
1044            let view_id_clone = view_id.clone();
1045            let mode = view_spec.mode;
1046            tokio::spawn(
1047                async move {
1048                    loop {
1049                        tokio::select! {
1050                            _ = cancel_token.cancelled() => {
1051                                debug!("List subscription cancelled for client {}", client_id);
1052                                break;
1053                            }
1054                            result = rx.recv() => {
1055                                match result {
1056                                    Ok(envelope) => {
1057                                        if sub.matches(&envelope.entity, &envelope.key)
1058                                            && client_mgr
1059                                                .send_to_client(client_id, envelope.payload.clone())
1060                                                .is_err()
1061                                        {
1062                                            break;
1063                                        }
1064                                    }
1065                                    Err(_) => break,
1066                                }
1067                            }
1068                        }
1069                    }
1070                }
1071                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1072            );
1073        }
1074    }
1075
1076    info!(
1077        "Client {} subscribed to {} (mode: {:?})",
1078        ctx.client_id, view_id, view_spec.mode
1079    );
1080}
1081
1082#[cfg(not(feature = "otel"))]
1083async fn attach_derived_view_subscription(
1084    ctx: &SubscriptionContext<'_>,
1085    subscription: Subscription,
1086    view_spec: ViewSpec,
1087    cancel_token: CancellationToken,
1088) {
1089    let view_id = &subscription.view;
1090    let pipeline_limit = view_spec
1091        .pipeline
1092        .as_ref()
1093        .and_then(|p| p.limit)
1094        .unwrap_or(100);
1095    let take = subscription.take.unwrap_or(pipeline_limit);
1096    let skip = subscription.skip.unwrap_or(0);
1097    let is_single = take == 1;
1098
1099    let source_view_id = match &view_spec.source_view {
1100        Some(s) => s.clone(),
1101        None => {
1102            warn!("Derived view {} has no source_view", view_id);
1103            return;
1104        }
1105    };
1106
1107    let sorted_caches = ctx.view_index.sorted_caches();
1108    let initial_window: Vec<(String, serde_json::Value)> = {
1109        let mut caches = sorted_caches.write().await;
1110        if let Some(cache) = caches.get_mut(view_id) {
1111            cache.get_window(skip, take)
1112        } else {
1113            warn!("No sorted cache for derived view {}", view_id);
1114            vec![]
1115        }
1116    };
1117
1118    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1119
1120    if !initial_window.is_empty() {
1121        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1122            .into_iter()
1123            .map(|(key, mut data)| {
1124                transform_large_u64_to_strings(&mut data);
1125                SnapshotEntity { key, data }
1126            })
1127            .collect();
1128
1129        let batch_config = ctx.entity_cache.snapshot_config();
1130        if send_snapshot_batches(
1131            ctx.client_id,
1132            &snapshot_entities,
1133            view_spec.mode,
1134            view_id,
1135            ctx.client_manager,
1136            &batch_config,
1137        )
1138        .await
1139        .is_err()
1140        {
1141            return;
1142        }
1143    }
1144
1145    let mut rx = ctx
1146        .bus_manager
1147        .get_or_create_list_bus(&source_view_id)
1148        .await;
1149
1150    let client_id = ctx.client_id;
1151    let client_mgr = ctx.client_manager.clone();
1152    let view_id_clone = view_id.clone();
1153    let view_id_span = view_id.clone();
1154    let sorted_caches_clone = sorted_caches;
1155    let frame_mode = view_spec.mode;
1156
1157    tokio::spawn(
1158        async move {
1159            let mut current_window_keys = initial_keys;
1160
1161            loop {
1162                tokio::select! {
1163                    _ = cancel_token.cancelled() => {
1164                        debug!("Derived view subscription cancelled for client {}", client_id);
1165                        break;
1166                    }
1167                    result = rx.recv() => {
1168                        match result {
1169                            Ok(_envelope) => {
1170                                let new_window: Vec<(String, serde_json::Value)> = {
1171                                    let mut caches = sorted_caches_clone.write().await;
1172                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1173                                        cache.get_window(skip, take)
1174                                    } else {
1175                                        continue;
1176                                    }
1177                                };
1178
1179                                let new_keys: HashSet<String> =
1180                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1181
1182                                if is_single {
1183                                    if let Some((new_key, data)) = new_window.first() {
1184                                        for old_key in current_window_keys.difference(&new_keys) {
1185                                            let delete_frame = Frame {
1186                                                mode: frame_mode,
1187                                                export: view_id_clone.clone(),
1188                                                op: "delete",
1189                                                key: old_key.clone(),
1190                                                data: serde_json::Value::Null,
1191                                                append: vec![],
1192                                            };
1193                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1194                                                let payload = Arc::new(Bytes::from(json));
1195                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1196                                                    return;
1197                                                }
1198                                            }
1199                                        }
1200
1201                                        let mut transformed_data = data.clone();
1202                                        transform_large_u64_to_strings(&mut transformed_data);
1203                                        let frame = Frame {
1204                                            mode: frame_mode,
1205                                            export: view_id_clone.clone(),
1206                                            op: "upsert",
1207                                            key: new_key.clone(),
1208                                            data: transformed_data,
1209                                            append: vec![],
1210                                        };
1211                                        if let Ok(json) = serde_json::to_vec(&frame) {
1212                                            let payload = Arc::new(Bytes::from(json));
1213                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1214                                                return;
1215                                            }
1216                                        }
1217                                    }
1218                                } else {
1219                                    for key in current_window_keys.difference(&new_keys) {
1220                                        let delete_frame = Frame {
1221                                            mode: frame_mode,
1222                                            export: view_id_clone.clone(),
1223                                            op: "delete",
1224                                            key: key.clone(),
1225                                            data: serde_json::Value::Null,
1226                                            append: vec![],
1227                                        };
1228                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1229                                            let payload = Arc::new(Bytes::from(json));
1230                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1231                                                return;
1232                                            }
1233                                        }
1234                                    }
1235
1236                                    for (key, data) in &new_window {
1237                                        let mut transformed_data = data.clone();
1238                                        transform_large_u64_to_strings(&mut transformed_data);
1239                                        let frame = Frame {
1240                                            mode: frame_mode,
1241                                            export: view_id_clone.clone(),
1242                                            op: "upsert",
1243                                            key: key.clone(),
1244                                            data: transformed_data,
1245                                            append: vec![],
1246                                        };
1247                                        if let Ok(json) = serde_json::to_vec(&frame) {
1248                                            let payload = Arc::new(Bytes::from(json));
1249                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1250                                                return;
1251                                            }
1252                                        }
1253                                    }
1254                                }
1255
1256                                current_window_keys = new_keys;
1257                            }
1258                            Err(_) => break,
1259                        }
1260                    }
1261                }
1262            }
1263        }
1264        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1265    );
1266
1267    info!(
1268        "Client {} subscribed to derived view {} (take={}, skip={})",
1269        ctx.client_id, view_id, take, skip
1270    );
1271}