hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7    Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29    client_id: Uuid,
30    client_manager: &'a ClientManager,
31    bus_manager: &'a BusManager,
32    entity_cache: &'a EntityCache,
33    view_index: &'a ViewIndex,
34    #[cfg(feature = "otel")]
35    metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39    bind_addr: SocketAddr,
40    client_manager: ClientManager,
41    bus_manager: BusManager,
42    entity_cache: EntityCache,
43    view_index: Arc<ViewIndex>,
44    max_clients: usize,
45    #[cfg(feature = "otel")]
46    metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50    #[cfg(feature = "otel")]
51    pub fn new(
52        bind_addr: SocketAddr,
53        bus_manager: BusManager,
54        entity_cache: EntityCache,
55        view_index: Arc<ViewIndex>,
56        metrics: Option<Arc<Metrics>>,
57    ) -> Self {
58        Self {
59            bind_addr,
60            client_manager: ClientManager::new(),
61            bus_manager,
62            entity_cache,
63            view_index,
64            max_clients: 10000,
65            metrics,
66        }
67    }
68
69    #[cfg(not(feature = "otel"))]
70    pub fn new(
71        bind_addr: SocketAddr,
72        bus_manager: BusManager,
73        entity_cache: EntityCache,
74        view_index: Arc<ViewIndex>,
75    ) -> Self {
76        Self {
77            bind_addr,
78            client_manager: ClientManager::new(),
79            bus_manager,
80            entity_cache,
81            view_index,
82            max_clients: 10000,
83        }
84    }
85
86    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87        self.max_clients = max_clients;
88        self
89    }
90
91    pub async fn start(self) -> Result<()> {
92        info!(
93            "Starting WebSocket server on {} (max_clients: {})",
94            self.bind_addr, self.max_clients
95        );
96
97        let listener = TcpListener::bind(&self.bind_addr).await?;
98        info!("WebSocket server listening on {}", self.bind_addr);
99
100        self.client_manager.start_cleanup_task();
101
102        loop {
103            match listener.accept().await {
104                Ok((stream, addr)) => {
105                    let client_count = self.client_manager.client_count();
106                    if client_count >= self.max_clients {
107                        warn!(
108                            "Rejecting connection from {} - max clients ({}) reached",
109                            addr, self.max_clients
110                        );
111                        drop(stream);
112                        continue;
113                    }
114
115                    #[cfg(feature = "otel")]
116                    if let Some(ref metrics) = self.metrics {
117                        metrics.record_ws_connection();
118                    }
119
120                    info!(
121                        "New WebSocket connection from {} ({}/{} clients)",
122                        addr,
123                        client_count + 1,
124                        self.max_clients
125                    );
126                    let client_manager = self.client_manager.clone();
127                    let bus_manager = self.bus_manager.clone();
128                    let entity_cache = self.entity_cache.clone();
129                    let view_index = self.view_index.clone();
130                    #[cfg(feature = "otel")]
131                    let metrics = self.metrics.clone();
132
133                    tokio::spawn(
134                        async move {
135                            #[cfg(feature = "otel")]
136                            let result = handle_connection(
137                                stream,
138                                client_manager,
139                                bus_manager,
140                                entity_cache,
141                                view_index,
142                                metrics,
143                            )
144                            .await;
145                            #[cfg(not(feature = "otel"))]
146                            let result = handle_connection(
147                                stream,
148                                client_manager,
149                                bus_manager,
150                                entity_cache,
151                                view_index,
152                            )
153                            .await;
154
155                            if let Err(e) = result {
156                                error!("WebSocket connection error: {}", e);
157                            }
158                        }
159                        .instrument(info_span!("ws.connection", %addr)),
160                    );
161                }
162                Err(e) => {
163                    error!("Failed to accept connection: {}", e);
164                }
165            }
166        }
167    }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172    stream: TcpStream,
173    client_manager: ClientManager,
174    bus_manager: BusManager,
175    entity_cache: EntityCache,
176    view_index: Arc<ViewIndex>,
177    metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179    let ws_stream = accept_async(stream).await?;
180    let client_id = Uuid::new_v4();
181    let connection_start = Instant::now();
182
183    info!("WebSocket connection established for client {}", client_id);
184
185    let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187    client_manager.add_client(client_id, ws_sender);
188
189    let ctx = SubscriptionContext {
190        client_id,
191        client_manager: &client_manager,
192        bus_manager: &bus_manager,
193        entity_cache: &entity_cache,
194        view_index: &view_index,
195        metrics: metrics.clone(),
196    };
197
198    let mut active_subscriptions: Vec<String> = Vec::new();
199
200    loop {
201        tokio::select! {
202            ws_msg = ws_receiver.next() => {
203                match ws_msg {
204                    Some(Ok(msg)) => {
205                        if msg.is_close() {
206                            info!("Client {} requested close", client_id);
207                            break;
208                        }
209
210                        client_manager.update_client_last_seen(client_id);
211
212                        if msg.is_text() {
213                            if let Some(ref m) = metrics {
214                                m.record_ws_message_received();
215                            }
216
217                            if let Ok(text) = msg.to_text() {
218                                debug!("Received text message from client {}: {}", client_id, text);
219
220                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221                                    match client_msg {
222                                        ClientMessage::Subscribe(subscription) => {
223                                            let view_id = subscription.view.clone();
224                                            let sub_key = subscription.sub_key();
225                                            client_manager.update_subscription(client_id, subscription.clone());
226
227                                            let cancel_token = CancellationToken::new();
228                                            let is_new = client_manager.add_client_subscription(
229                                                client_id,
230                                                sub_key.clone(),
231                                                cancel_token.clone(),
232                                            ).await;
233
234                                            if !is_new {
235                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236                                                continue;
237                                            }
238
239                                            if let Some(ref m) = metrics {
240                                                m.record_subscription_created(&view_id);
241                                            }
242                                            active_subscriptions.push(view_id);
243
244                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
245                                        }
246                                        ClientMessage::Unsubscribe(unsub) => {
247                                            let sub_key = unsub.sub_key();
248                                            let removed = client_manager
249                                                .remove_client_subscription(client_id, &sub_key)
250                                                .await;
251
252                                            if removed {
253                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
254                                                if let Some(ref m) = metrics {
255                                                    m.record_subscription_removed(&unsub.view);
256                                                }
257                                            }
258                                        }
259                                        ClientMessage::Ping => {
260                                            debug!("Received ping from client {}", client_id);
261                                        }
262                                    }
263                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264                                    let view_id = subscription.view.clone();
265                                    let sub_key = subscription.sub_key();
266                                    client_manager.update_subscription(client_id, subscription.clone());
267
268                                    let cancel_token = CancellationToken::new();
269                                    let is_new = client_manager.add_client_subscription(
270                                        client_id,
271                                        sub_key.clone(),
272                                        cancel_token.clone(),
273                                    ).await;
274
275                                    if !is_new {
276                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277                                        continue;
278                                    }
279
280                                    if let Some(ref m) = metrics {
281                                        m.record_subscription_created(&view_id);
282                                    }
283                                    active_subscriptions.push(view_id);
284
285                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
286                                } else {
287                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
288                                }
289                            }
290                        }
291                    }
292                    Some(Err(e)) => {
293                        warn!("WebSocket error for client {}: {}", client_id, e);
294                        break;
295                    }
296                    None => {
297                        debug!("WebSocket stream ended for client {}", client_id);
298                        break;
299                    }
300                }
301            }
302        }
303    }
304
305    client_manager
306        .cancel_all_client_subscriptions(client_id)
307        .await;
308    client_manager.remove_client(client_id);
309
310    if let Some(ref m) = metrics {
311        let duration_secs = connection_start.elapsed().as_secs_f64();
312        m.record_ws_disconnection(duration_secs);
313
314        for view_id in active_subscriptions {
315            m.record_subscription_removed(&view_id);
316        }
317    }
318
319    info!("Client {} disconnected", client_id);
320
321    Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326    stream: TcpStream,
327    client_manager: ClientManager,
328    bus_manager: BusManager,
329    entity_cache: EntityCache,
330    view_index: Arc<ViewIndex>,
331) -> Result<()> {
332    let ws_stream = accept_async(stream).await?;
333    let client_id = Uuid::new_v4();
334
335    info!("WebSocket connection established for client {}", client_id);
336
337    let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339    client_manager.add_client(client_id, ws_sender);
340
341    let ctx = SubscriptionContext {
342        client_id,
343        client_manager: &client_manager,
344        bus_manager: &bus_manager,
345        entity_cache: &entity_cache,
346        view_index: &view_index,
347    };
348
349    loop {
350        tokio::select! {
351            ws_msg = ws_receiver.next() => {
352                match ws_msg {
353                    Some(Ok(msg)) => {
354                        if msg.is_close() {
355                            info!("Client {} requested close", client_id);
356                            break;
357                        }
358
359                        client_manager.update_client_last_seen(client_id);
360
361                        if msg.is_text() {
362                            if let Ok(text) = msg.to_text() {
363                                debug!("Received text message from client {}: {}", client_id, text);
364
365                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366                                    match client_msg {
367                                        ClientMessage::Subscribe(subscription) => {
368                                            let sub_key = subscription.sub_key();
369                                            client_manager.update_subscription(client_id, subscription.clone());
370
371                                            let cancel_token = CancellationToken::new();
372                                            let is_new = client_manager.add_client_subscription(
373                                                client_id,
374                                                sub_key.clone(),
375                                                cancel_token.clone(),
376                                            ).await;
377
378                                            if !is_new {
379                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380                                                continue;
381                                            }
382
383                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
384                                        }
385                                        ClientMessage::Unsubscribe(unsub) => {
386                                            let sub_key = unsub.sub_key();
387                                            let removed = client_manager
388                                                .remove_client_subscription(client_id, &sub_key)
389                                                .await;
390
391                                            if removed {
392                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
393                                            }
394                                        }
395                                        ClientMessage::Ping => {
396                                            debug!("Received ping from client {}", client_id);
397                                        }
398                                    }
399                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400                                    let sub_key = subscription.sub_key();
401                                    client_manager.update_subscription(client_id, subscription.clone());
402
403                                    let cancel_token = CancellationToken::new();
404                                    let is_new = client_manager.add_client_subscription(
405                                        client_id,
406                                        sub_key.clone(),
407                                        cancel_token.clone(),
408                                    ).await;
409
410                                    if !is_new {
411                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412                                        continue;
413                                    }
414
415                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
416                                } else {
417                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
418                                }
419                            }
420                        }
421                    }
422                    Some(Err(e)) => {
423                        warn!("WebSocket error for client {}: {}", client_id, e);
424                        break;
425                    }
426                    None => {
427                        debug!("WebSocket stream ended for client {}", client_id);
428                        break;
429                    }
430                }
431            }
432        }
433    }
434
435    client_manager
436        .cancel_all_client_subscriptions(client_id)
437        .await;
438    client_manager.remove_client(client_id);
439    info!("Client {} disconnected", client_id);
440
441    Ok(())
442}
443
444async fn send_snapshot_batches(
445    client_id: Uuid,
446    entities: &[SnapshotEntity],
447    mode: Mode,
448    view_id: &str,
449    client_manager: &ClientManager,
450    batch_config: &SnapshotBatchConfig,
451    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453    let total = entities.len();
454    if total == 0 {
455        return Ok(());
456    }
457
458    let mut offset = 0;
459    let mut batch_num = 0;
460
461    while offset < total {
462        let batch_size = if batch_num == 0 {
463            batch_config.initial_batch_size
464        } else {
465            batch_config.subsequent_batch_size
466        };
467
468        let end = (offset + batch_size).min(total);
469        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470        let is_complete = end >= total;
471
472        let snapshot_frame = SnapshotFrame {
473            mode,
474            export: view_id.to_string(),
475            op: "snapshot",
476            data: batch_data,
477            complete: is_complete,
478        };
479
480        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481            let payload = maybe_compress(&json_payload);
482            if client_manager
483                .send_compressed_async(client_id, payload)
484                .await
485                .is_err()
486            {
487                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488            }
489            #[cfg(feature = "otel")]
490            if let Some(m) = metrics {
491                m.record_ws_message_sent();
492            }
493        }
494
495        offset = end;
496        batch_num += 1;
497    }
498
499    debug!(
500        "Sent {} snapshot batches ({} entities) for {} to client {}",
501        batch_num, total, view_id, client_id
502    );
503
504    Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509        return Some(SortConfig {
510            field: sort.field_path.clone(),
511            order: match sort.order {
512                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514            },
515        });
516    }
517
518    if view_spec.mode == Mode::List {
519        return Some(SortConfig {
520            field: vec!["_seq".to_string()],
521            order: SortOrder::Desc,
522        });
523    }
524
525    None
526}
527
528fn send_subscribed_frame(
529    client_id: Uuid,
530    view_id: &str,
531    view_spec: &ViewSpec,
532    client_manager: &ClientManager,
533) -> Result<()> {
534    let sort_config = extract_sort_config(view_spec);
535    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537    let json_payload = serde_json::to_vec(&subscribed_frame)?;
538    let payload = Arc::new(Bytes::from(json_payload));
539    client_manager
540        .send_to_client(client_id, payload)
541        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546    ctx: &SubscriptionContext<'_>,
547    subscription: Subscription,
548    cancel_token: CancellationToken,
549) {
550    let view_id = &subscription.view;
551
552    let view_spec = match ctx.view_index.get_view(view_id) {
553        Some(spec) => spec.clone(),
554        None => {
555            warn!("Unknown view ID: {}", view_id);
556            return;
557        }
558    };
559
560    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561        warn!("Failed to send subscribed frame: {}", e);
562        return;
563    }
564
565    let is_derived_with_sort = view_spec.is_derived()
566        && view_spec
567            .pipeline
568            .as_ref()
569            .map(|p| p.sort.is_some())
570            .unwrap_or(false);
571
572    if is_derived_with_sort {
573        attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574        return;
575    }
576
577    match view_spec.mode {
578        Mode::State => {
579            let key = subscription.key.as_deref().unwrap_or("");
580            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
581
582            if !rx.borrow().is_empty() {
583                let data = rx.borrow().clone();
584                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
585                if let Some(ref m) = ctx.metrics {
586                    m.record_ws_message_sent();
587                }
588            }
589
590            let client_id = ctx.client_id;
591            let client_mgr = ctx.client_manager.clone();
592            let metrics_clone = ctx.metrics.clone();
593            let view_id_clone = view_id.clone();
594            let key_clone = key.to_string();
595            tokio::spawn(
596                async move {
597                    loop {
598                        tokio::select! {
599                            _ = cancel_token.cancelled() => {
600                                debug!("State subscription cancelled for client {}", client_id);
601                                break;
602                            }
603                            result = rx.changed() => {
604                                if result.is_err() {
605                                    break;
606                                }
607                                let data = rx.borrow().clone();
608                                if client_mgr.send_to_client(client_id, data).is_err() {
609                                    break;
610                                }
611                                if let Some(ref m) = metrics_clone {
612                                    m.record_ws_message_sent();
613                                }
614                            }
615                        }
616                    }
617                }
618                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
619            );
620        }
621        Mode::List | Mode::Append => {
622            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
623
624            let snapshots = ctx.entity_cache.get_all(view_id).await;
625            let snapshot_entities: Vec<SnapshotEntity> = snapshots
626                .into_iter()
627                .filter(|(key, _)| subscription.matches_key(key))
628                .map(|(key, data)| SnapshotEntity { key, data })
629                .collect();
630
631            if !snapshot_entities.is_empty() {
632                let batch_config = ctx.entity_cache.snapshot_config();
633                if send_snapshot_batches(
634                    ctx.client_id,
635                    &snapshot_entities,
636                    view_spec.mode,
637                    view_id,
638                    ctx.client_manager,
639                    &batch_config,
640                    #[cfg(feature = "otel")]
641                    ctx.metrics.as_ref(),
642                )
643                .await
644                .is_err()
645                {
646                    return;
647                }
648            }
649
650            let client_id = ctx.client_id;
651            let client_mgr = ctx.client_manager.clone();
652            let sub = subscription.clone();
653            let metrics_clone = ctx.metrics.clone();
654            let view_id_clone = view_id.clone();
655            let mode = view_spec.mode;
656            tokio::spawn(
657                async move {
658                    loop {
659                        tokio::select! {
660                            _ = cancel_token.cancelled() => {
661                                debug!("List subscription cancelled for client {}", client_id);
662                                break;
663                            }
664                            result = rx.recv() => {
665                                match result {
666                                    Ok(envelope) => {
667                                        if sub.matches(&envelope.entity, &envelope.key) {
668                                            if client_mgr
669                                                .send_to_client(client_id, envelope.payload.clone())
670                                                .is_err()
671                                            {
672                                                break;
673                                            }
674                                            if let Some(ref m) = metrics_clone {
675                                                m.record_ws_message_sent();
676                                            }
677                                        }
678                                    }
679                                    Err(_) => break,
680                                }
681                            }
682                        }
683                    }
684                }
685                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
686            );
687        }
688    }
689
690    info!(
691        "Client {} subscribed to {} (mode: {:?})",
692        ctx.client_id, view_id, view_spec.mode
693    );
694}
695
696#[cfg(feature = "otel")]
697async fn attach_derived_view_subscription_otel(
698    ctx: &SubscriptionContext<'_>,
699    subscription: Subscription,
700    view_spec: ViewSpec,
701    cancel_token: CancellationToken,
702) {
703    let view_id = &subscription.view;
704    let pipeline_limit = view_spec
705        .pipeline
706        .as_ref()
707        .and_then(|p| p.limit)
708        .unwrap_or(100);
709    let take = subscription.take.unwrap_or(pipeline_limit);
710    let skip = subscription.skip.unwrap_or(0);
711    let is_single = take == 1;
712
713    let source_view_id = match &view_spec.source_view {
714        Some(s) => s.clone(),
715        None => {
716            warn!("Derived view {} has no source_view", view_id);
717            return;
718        }
719    };
720
721    let sorted_caches = ctx.view_index.sorted_caches();
722    let initial_window: Vec<(String, serde_json::Value)> = {
723        let mut caches = sorted_caches.write().await;
724        if let Some(cache) = caches.get_mut(view_id) {
725            cache.get_window(skip, take)
726        } else {
727            warn!("No sorted cache for derived view {}", view_id);
728            vec![]
729        }
730    };
731
732    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
733
734    if !initial_window.is_empty() {
735        let snapshot_entities: Vec<SnapshotEntity> = initial_window
736            .into_iter()
737            .map(|(key, data)| SnapshotEntity { key, data })
738            .collect();
739
740        let batch_config = ctx.entity_cache.snapshot_config();
741        if send_snapshot_batches(
742            ctx.client_id,
743            &snapshot_entities,
744            view_spec.mode,
745            view_id,
746            ctx.client_manager,
747            &batch_config,
748            ctx.metrics.as_ref(),
749        )
750        .await
751        .is_err()
752        {
753            return;
754        }
755    }
756
757    let mut rx = ctx
758        .bus_manager
759        .get_or_create_list_bus(&source_view_id)
760        .await;
761
762    let client_id = ctx.client_id;
763    let client_mgr = ctx.client_manager.clone();
764    let view_id_clone = view_id.clone();
765    let view_id_span = view_id.clone();
766    let sorted_caches_clone = sorted_caches;
767    let metrics_clone = ctx.metrics.clone();
768    let frame_mode = view_spec.mode;
769
770    tokio::spawn(
771        async move {
772            let mut current_window_keys = initial_keys;
773
774            loop {
775                tokio::select! {
776                    _ = cancel_token.cancelled() => {
777                        debug!("Derived view subscription cancelled for client {}", client_id);
778                        break;
779                    }
780                    result = rx.recv() => {
781                        match result {
782                            Ok(_envelope) => {
783                                let new_window: Vec<(String, serde_json::Value)> = {
784                                    let mut caches = sorted_caches_clone.write().await;
785                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
786                                        cache.get_window(skip, take)
787                                    } else {
788                                        continue;
789                                    }
790                                };
791
792                                let new_keys: HashSet<String> =
793                                    new_window.iter().map(|(k, _)| k.clone()).collect();
794
795                                if is_single {
796                                    if let Some((new_key, data)) = new_window.first() {
797                                        for old_key in current_window_keys.difference(&new_keys) {
798                                            let delete_frame = Frame {
799                                                mode: frame_mode,
800                                                export: view_id_clone.clone(),
801                                                op: "delete",
802                                                key: old_key.clone(),
803                                                data: serde_json::Value::Null,
804                                                append: vec![],
805                                            };
806                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
807                                                let payload = Arc::new(Bytes::from(json));
808                                                if client_mgr.send_to_client(client_id, payload).is_err() {
809                                                    return;
810                                                }
811                                                if let Some(ref m) = metrics_clone {
812                                                    m.record_ws_message_sent();
813                                                }
814                                            }
815                                        }
816
817                                        let frame = Frame {
818                                            mode: frame_mode,
819                                            export: view_id_clone.clone(),
820                                            op: "upsert",
821                                            key: new_key.clone(),
822                                            data: data.clone(),
823                                            append: vec![],
824                                        };
825                                        if let Ok(json) = serde_json::to_vec(&frame) {
826                                            let payload = Arc::new(Bytes::from(json));
827                                            if client_mgr.send_to_client(client_id, payload).is_err() {
828                                                return;
829                                            }
830                                            if let Some(ref m) = metrics_clone {
831                                                m.record_ws_message_sent();
832                                            }
833                                        }
834                                    }
835                                } else {
836                                    for key in current_window_keys.difference(&new_keys) {
837                                        let delete_frame = Frame {
838                                            mode: frame_mode,
839                                            export: view_id_clone.clone(),
840                                            op: "delete",
841                                            key: key.clone(),
842                                            data: serde_json::Value::Null,
843                                            append: vec![],
844                                        };
845                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
846                                            let payload = Arc::new(Bytes::from(json));
847                                            if client_mgr.send_to_client(client_id, payload).is_err() {
848                                                return;
849                                            }
850                                            if let Some(ref m) = metrics_clone {
851                                                m.record_ws_message_sent();
852                                            }
853                                        }
854                                    }
855
856                                    for (key, data) in &new_window {
857                                        let frame = Frame {
858                                            mode: frame_mode,
859                                            export: view_id_clone.clone(),
860                                            op: "upsert",
861                                            key: key.clone(),
862                                            data: data.clone(),
863                                            append: vec![],
864                                        };
865                                        if let Ok(json) = serde_json::to_vec(&frame) {
866                                            let payload = Arc::new(Bytes::from(json));
867                                            if client_mgr.send_to_client(client_id, payload).is_err() {
868                                                return;
869                                            }
870                                            if let Some(ref m) = metrics_clone {
871                                                m.record_ws_message_sent();
872                                            }
873                                        }
874                                    }
875                                }
876
877                                current_window_keys = new_keys;
878                            }
879                            Err(_) => break,
880                        }
881                    }
882                }
883            }
884        }
885        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
886    );
887
888    info!(
889        "Client {} subscribed to derived view {} (take={}, skip={})",
890        ctx.client_id, view_id, take, skip
891    );
892}
893
894#[cfg(not(feature = "otel"))]
895async fn attach_client_to_bus(
896    ctx: &SubscriptionContext<'_>,
897    subscription: Subscription,
898    cancel_token: CancellationToken,
899) {
900    let view_id = &subscription.view;
901
902    let view_spec = match ctx.view_index.get_view(view_id) {
903        Some(spec) => spec.clone(),
904        None => {
905            warn!("Unknown view ID: {}", view_id);
906            return;
907        }
908    };
909
910    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
911        warn!("Failed to send subscribed frame: {}", e);
912        return;
913    }
914
915    let is_derived_with_sort = view_spec.is_derived()
916        && view_spec
917            .pipeline
918            .as_ref()
919            .map(|p| p.sort.is_some())
920            .unwrap_or(false);
921
922    if is_derived_with_sort {
923        attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
924        return;
925    }
926
927    match view_spec.mode {
928        Mode::State => {
929            let key = subscription.key.as_deref().unwrap_or("");
930            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
931
932            if !rx.borrow().is_empty() {
933                let data = rx.borrow().clone();
934                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
935            }
936
937            let client_id = ctx.client_id;
938            let client_mgr = ctx.client_manager.clone();
939            let view_id_clone = view_id.clone();
940            let key_clone = key.to_string();
941            tokio::spawn(
942                async move {
943                    loop {
944                        tokio::select! {
945                            _ = cancel_token.cancelled() => {
946                                debug!("State subscription cancelled for client {}", client_id);
947                                break;
948                            }
949                            result = rx.changed() => {
950                                if result.is_err() {
951                                    break;
952                                }
953                                let data = rx.borrow().clone();
954                                if client_mgr.send_to_client(client_id, data).is_err() {
955                                    break;
956                                }
957                            }
958                        }
959                    }
960                }
961                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
962            );
963        }
964        Mode::List | Mode::Append => {
965            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
966
967            let snapshots = ctx.entity_cache.get_all(view_id).await;
968            let snapshot_entities: Vec<SnapshotEntity> = snapshots
969                .into_iter()
970                .filter(|(key, _)| subscription.matches_key(key))
971                .map(|(key, data)| SnapshotEntity { key, data })
972                .collect();
973
974            if !snapshot_entities.is_empty() {
975                let batch_config = ctx.entity_cache.snapshot_config();
976                if send_snapshot_batches(
977                    ctx.client_id,
978                    &snapshot_entities,
979                    view_spec.mode,
980                    view_id,
981                    ctx.client_manager,
982                    &batch_config,
983                )
984                .await
985                .is_err()
986                {
987                    return;
988                }
989            }
990
991            let client_id = ctx.client_id;
992            let client_mgr = ctx.client_manager.clone();
993            let sub = subscription.clone();
994            let view_id_clone = view_id.clone();
995            let mode = view_spec.mode;
996            tokio::spawn(
997                async move {
998                    loop {
999                        tokio::select! {
1000                            _ = cancel_token.cancelled() => {
1001                                debug!("List subscription cancelled for client {}", client_id);
1002                                break;
1003                            }
1004                            result = rx.recv() => {
1005                                match result {
1006                                    Ok(envelope) => {
1007                                        if sub.matches(&envelope.entity, &envelope.key)
1008                                            && client_mgr
1009                                                .send_to_client(client_id, envelope.payload.clone())
1010                                                .is_err()
1011                                        {
1012                                            break;
1013                                        }
1014                                    }
1015                                    Err(_) => break,
1016                                }
1017                            }
1018                        }
1019                    }
1020                }
1021                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1022            );
1023        }
1024    }
1025
1026    info!(
1027        "Client {} subscribed to {} (mode: {:?})",
1028        ctx.client_id, view_id, view_spec.mode
1029    );
1030}
1031
1032#[cfg(not(feature = "otel"))]
1033async fn attach_derived_view_subscription(
1034    ctx: &SubscriptionContext<'_>,
1035    subscription: Subscription,
1036    view_spec: ViewSpec,
1037    cancel_token: CancellationToken,
1038) {
1039    let view_id = &subscription.view;
1040    let pipeline_limit = view_spec
1041        .pipeline
1042        .as_ref()
1043        .and_then(|p| p.limit)
1044        .unwrap_or(100);
1045    let take = subscription.take.unwrap_or(pipeline_limit);
1046    let skip = subscription.skip.unwrap_or(0);
1047    let is_single = take == 1;
1048
1049    let source_view_id = match &view_spec.source_view {
1050        Some(s) => s.clone(),
1051        None => {
1052            warn!("Derived view {} has no source_view", view_id);
1053            return;
1054        }
1055    };
1056
1057    let sorted_caches = ctx.view_index.sorted_caches();
1058    let initial_window: Vec<(String, serde_json::Value)> = {
1059        let mut caches = sorted_caches.write().await;
1060        if let Some(cache) = caches.get_mut(view_id) {
1061            cache.get_window(skip, take)
1062        } else {
1063            warn!("No sorted cache for derived view {}", view_id);
1064            vec![]
1065        }
1066    };
1067
1068    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1069
1070    if !initial_window.is_empty() {
1071        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1072            .into_iter()
1073            .map(|(key, data)| SnapshotEntity { key, data })
1074            .collect();
1075
1076        let batch_config = ctx.entity_cache.snapshot_config();
1077        if send_snapshot_batches(
1078            ctx.client_id,
1079            &snapshot_entities,
1080            view_spec.mode,
1081            view_id,
1082            ctx.client_manager,
1083            &batch_config,
1084        )
1085        .await
1086        .is_err()
1087        {
1088            return;
1089        }
1090    }
1091
1092    let mut rx = ctx
1093        .bus_manager
1094        .get_or_create_list_bus(&source_view_id)
1095        .await;
1096
1097    let client_id = ctx.client_id;
1098    let client_mgr = ctx.client_manager.clone();
1099    let view_id_clone = view_id.clone();
1100    let view_id_span = view_id.clone();
1101    let sorted_caches_clone = sorted_caches;
1102    let frame_mode = view_spec.mode;
1103
1104    tokio::spawn(
1105        async move {
1106            let mut current_window_keys = initial_keys;
1107
1108            loop {
1109                tokio::select! {
1110                    _ = cancel_token.cancelled() => {
1111                        debug!("Derived view subscription cancelled for client {}", client_id);
1112                        break;
1113                    }
1114                    result = rx.recv() => {
1115                        match result {
1116                            Ok(_envelope) => {
1117                                let new_window: Vec<(String, serde_json::Value)> = {
1118                                    let mut caches = sorted_caches_clone.write().await;
1119                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1120                                        cache.get_window(skip, take)
1121                                    } else {
1122                                        continue;
1123                                    }
1124                                };
1125
1126                                let new_keys: HashSet<String> =
1127                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1128
1129                                if is_single {
1130                                    if let Some((new_key, data)) = new_window.first() {
1131                                        for old_key in current_window_keys.difference(&new_keys) {
1132                                            let delete_frame = Frame {
1133                                                mode: frame_mode,
1134                                                export: view_id_clone.clone(),
1135                                                op: "delete",
1136                                                key: old_key.clone(),
1137                                                data: serde_json::Value::Null,
1138                                                append: vec![],
1139                                            };
1140                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1141                                                let payload = Arc::new(Bytes::from(json));
1142                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1143                                                    return;
1144                                                }
1145                                            }
1146                                        }
1147
1148                                        let frame = Frame {
1149                                            mode: frame_mode,
1150                                            export: view_id_clone.clone(),
1151                                            op: "upsert",
1152                                            key: new_key.clone(),
1153                                            data: data.clone(),
1154                                            append: vec![],
1155                                        };
1156                                        if let Ok(json) = serde_json::to_vec(&frame) {
1157                                            let payload = Arc::new(Bytes::from(json));
1158                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1159                                                return;
1160                                            }
1161                                        }
1162                                    }
1163                                } else {
1164                                    for key in current_window_keys.difference(&new_keys) {
1165                                        let delete_frame = Frame {
1166                                            mode: frame_mode,
1167                                            export: view_id_clone.clone(),
1168                                            op: "delete",
1169                                            key: key.clone(),
1170                                            data: serde_json::Value::Null,
1171                                            append: vec![],
1172                                        };
1173                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1174                                            let payload = Arc::new(Bytes::from(json));
1175                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1176                                                return;
1177                                            }
1178                                        }
1179                                    }
1180
1181                                    for (key, data) in &new_window {
1182                                        let frame = Frame {
1183                                            mode: frame_mode,
1184                                            export: view_id_clone.clone(),
1185                                            op: "upsert",
1186                                            key: key.clone(),
1187                                            data: data.clone(),
1188                                            append: vec![],
1189                                        };
1190                                        if let Ok(json) = serde_json::to_vec(&frame) {
1191                                            let payload = Arc::new(Bytes::from(json));
1192                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1193                                                return;
1194                                            }
1195                                        }
1196                                    }
1197                                }
1198
1199                                current_window_keys = new_keys;
1200                            }
1201                            Err(_) => break,
1202                        }
1203                    }
1204                }
1205            }
1206        }
1207        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1208    );
1209
1210    info!(
1211        "Client {} subscribed to derived view {} (take={}, skip={})",
1212        ctx.client_id, view_id, take, skip
1213    );
1214}