Skip to main content

hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7    Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29    client_id: Uuid,
30    client_manager: &'a ClientManager,
31    bus_manager: &'a BusManager,
32    entity_cache: &'a EntityCache,
33    view_index: &'a ViewIndex,
34    #[cfg(feature = "otel")]
35    metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39    bind_addr: SocketAddr,
40    client_manager: ClientManager,
41    bus_manager: BusManager,
42    entity_cache: EntityCache,
43    view_index: Arc<ViewIndex>,
44    max_clients: usize,
45    #[cfg(feature = "otel")]
46    metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50    #[cfg(feature = "otel")]
51    pub fn new(
52        bind_addr: SocketAddr,
53        bus_manager: BusManager,
54        entity_cache: EntityCache,
55        view_index: Arc<ViewIndex>,
56        metrics: Option<Arc<Metrics>>,
57    ) -> Self {
58        Self {
59            bind_addr,
60            client_manager: ClientManager::new(),
61            bus_manager,
62            entity_cache,
63            view_index,
64            max_clients: 10000,
65            metrics,
66        }
67    }
68
69    #[cfg(not(feature = "otel"))]
70    pub fn new(
71        bind_addr: SocketAddr,
72        bus_manager: BusManager,
73        entity_cache: EntityCache,
74        view_index: Arc<ViewIndex>,
75    ) -> Self {
76        Self {
77            bind_addr,
78            client_manager: ClientManager::new(),
79            bus_manager,
80            entity_cache,
81            view_index,
82            max_clients: 10000,
83        }
84    }
85
86    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87        self.max_clients = max_clients;
88        self
89    }
90
91    pub async fn start(self) -> Result<()> {
92        info!(
93            "Starting WebSocket server on {} (max_clients: {})",
94            self.bind_addr, self.max_clients
95        );
96
97        let listener = TcpListener::bind(&self.bind_addr).await?;
98        info!("WebSocket server listening on {}", self.bind_addr);
99
100        self.client_manager.start_cleanup_task();
101
102        loop {
103            match listener.accept().await {
104                Ok((stream, addr)) => {
105                    let client_count = self.client_manager.client_count();
106                    if client_count >= self.max_clients {
107                        warn!(
108                            "Rejecting connection from {} - max clients ({}) reached",
109                            addr, self.max_clients
110                        );
111                        drop(stream);
112                        continue;
113                    }
114
115                    #[cfg(feature = "otel")]
116                    if let Some(ref metrics) = self.metrics {
117                        metrics.record_ws_connection();
118                    }
119
120                    info!(
121                        "New WebSocket connection from {} ({}/{} clients)",
122                        addr,
123                        client_count + 1,
124                        self.max_clients
125                    );
126                    let client_manager = self.client_manager.clone();
127                    let bus_manager = self.bus_manager.clone();
128                    let entity_cache = self.entity_cache.clone();
129                    let view_index = self.view_index.clone();
130                    #[cfg(feature = "otel")]
131                    let metrics = self.metrics.clone();
132
133                    tokio::spawn(
134                        async move {
135                            #[cfg(feature = "otel")]
136                            let result = handle_connection(
137                                stream,
138                                client_manager,
139                                bus_manager,
140                                entity_cache,
141                                view_index,
142                                metrics,
143                            )
144                            .await;
145                            #[cfg(not(feature = "otel"))]
146                            let result = handle_connection(
147                                stream,
148                                client_manager,
149                                bus_manager,
150                                entity_cache,
151                                view_index,
152                            )
153                            .await;
154
155                            if let Err(e) = result {
156                                error!("WebSocket connection error: {}", e);
157                            }
158                        }
159                        .instrument(info_span!("ws.connection", %addr)),
160                    );
161                }
162                Err(e) => {
163                    error!("Failed to accept connection: {}", e);
164                }
165            }
166        }
167    }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172    stream: TcpStream,
173    client_manager: ClientManager,
174    bus_manager: BusManager,
175    entity_cache: EntityCache,
176    view_index: Arc<ViewIndex>,
177    metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179    let ws_stream = accept_async(stream).await?;
180    let client_id = Uuid::new_v4();
181    let connection_start = Instant::now();
182
183    info!("WebSocket connection established for client {}", client_id);
184
185    let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187    client_manager.add_client(client_id, ws_sender);
188
189    let ctx = SubscriptionContext {
190        client_id,
191        client_manager: &client_manager,
192        bus_manager: &bus_manager,
193        entity_cache: &entity_cache,
194        view_index: &view_index,
195        metrics: metrics.clone(),
196    };
197
198    let mut active_subscriptions: Vec<String> = Vec::new();
199
200    loop {
201        tokio::select! {
202            ws_msg = ws_receiver.next() => {
203                match ws_msg {
204                    Some(Ok(msg)) => {
205                        if msg.is_close() {
206                            info!("Client {} requested close", client_id);
207                            break;
208                        }
209
210                        client_manager.update_client_last_seen(client_id);
211
212                        if msg.is_text() {
213                            if let Some(ref m) = metrics {
214                                m.record_ws_message_received();
215                            }
216
217                            if let Ok(text) = msg.to_text() {
218                                debug!("Received text message from client {}: {}", client_id, text);
219
220                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221                                    match client_msg {
222                                        ClientMessage::Subscribe(subscription) => {
223                                            let view_id = subscription.view.clone();
224                                            let sub_key = subscription.sub_key();
225                                            client_manager.update_subscription(client_id, subscription.clone());
226
227                                            let cancel_token = CancellationToken::new();
228                                            let is_new = client_manager.add_client_subscription(
229                                                client_id,
230                                                sub_key.clone(),
231                                                cancel_token.clone(),
232                                            ).await;
233
234                                            if !is_new {
235                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236                                                continue;
237                                            }
238
239                                            if let Some(ref m) = metrics {
240                                                m.record_subscription_created(&view_id);
241                                            }
242                                            active_subscriptions.push(view_id);
243
244                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
245                                        }
246                                        ClientMessage::Unsubscribe(unsub) => {
247                                            let sub_key = unsub.sub_key();
248                                            let removed = client_manager
249                                                .remove_client_subscription(client_id, &sub_key)
250                                                .await;
251
252                                            if removed {
253                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
254                                                if let Some(ref m) = metrics {
255                                                    m.record_subscription_removed(&unsub.view);
256                                                }
257                                            }
258                                        }
259                                        ClientMessage::Ping => {
260                                            debug!("Received ping from client {}", client_id);
261                                        }
262                                    }
263                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264                                    let view_id = subscription.view.clone();
265                                    let sub_key = subscription.sub_key();
266                                    client_manager.update_subscription(client_id, subscription.clone());
267
268                                    let cancel_token = CancellationToken::new();
269                                    let is_new = client_manager.add_client_subscription(
270                                        client_id,
271                                        sub_key.clone(),
272                                        cancel_token.clone(),
273                                    ).await;
274
275                                    if !is_new {
276                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277                                        continue;
278                                    }
279
280                                    if let Some(ref m) = metrics {
281                                        m.record_subscription_created(&view_id);
282                                    }
283                                    active_subscriptions.push(view_id);
284
285                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
286                                } else {
287                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
288                                }
289                            }
290                        }
291                    }
292                    Some(Err(e)) => {
293                        warn!("WebSocket error for client {}: {}", client_id, e);
294                        break;
295                    }
296                    None => {
297                        debug!("WebSocket stream ended for client {}", client_id);
298                        break;
299                    }
300                }
301            }
302        }
303    }
304
305    client_manager
306        .cancel_all_client_subscriptions(client_id)
307        .await;
308    client_manager.remove_client(client_id);
309
310    if let Some(ref m) = metrics {
311        let duration_secs = connection_start.elapsed().as_secs_f64();
312        m.record_ws_disconnection(duration_secs);
313
314        for view_id in active_subscriptions {
315            m.record_subscription_removed(&view_id);
316        }
317    }
318
319    info!("Client {} disconnected", client_id);
320
321    Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326    stream: TcpStream,
327    client_manager: ClientManager,
328    bus_manager: BusManager,
329    entity_cache: EntityCache,
330    view_index: Arc<ViewIndex>,
331) -> Result<()> {
332    let ws_stream = accept_async(stream).await?;
333    let client_id = Uuid::new_v4();
334
335    info!("WebSocket connection established for client {}", client_id);
336
337    let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339    client_manager.add_client(client_id, ws_sender);
340
341    let ctx = SubscriptionContext {
342        client_id,
343        client_manager: &client_manager,
344        bus_manager: &bus_manager,
345        entity_cache: &entity_cache,
346        view_index: &view_index,
347    };
348
349    loop {
350        tokio::select! {
351            ws_msg = ws_receiver.next() => {
352                match ws_msg {
353                    Some(Ok(msg)) => {
354                        if msg.is_close() {
355                            info!("Client {} requested close", client_id);
356                            break;
357                        }
358
359                        client_manager.update_client_last_seen(client_id);
360
361                        if msg.is_text() {
362                            if let Ok(text) = msg.to_text() {
363                                debug!("Received text message from client {}: {}", client_id, text);
364
365                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366                                    match client_msg {
367                                        ClientMessage::Subscribe(subscription) => {
368                                            let sub_key = subscription.sub_key();
369                                            client_manager.update_subscription(client_id, subscription.clone());
370
371                                            let cancel_token = CancellationToken::new();
372                                            let is_new = client_manager.add_client_subscription(
373                                                client_id,
374                                                sub_key.clone(),
375                                                cancel_token.clone(),
376                                            ).await;
377
378                                            if !is_new {
379                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380                                                continue;
381                                            }
382
383                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
384                                        }
385                                        ClientMessage::Unsubscribe(unsub) => {
386                                            let sub_key = unsub.sub_key();
387                                            let removed = client_manager
388                                                .remove_client_subscription(client_id, &sub_key)
389                                                .await;
390
391                                            if removed {
392                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
393                                            }
394                                        }
395                                        ClientMessage::Ping => {
396                                            debug!("Received ping from client {}", client_id);
397                                        }
398                                    }
399                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400                                    let sub_key = subscription.sub_key();
401                                    client_manager.update_subscription(client_id, subscription.clone());
402
403                                    let cancel_token = CancellationToken::new();
404                                    let is_new = client_manager.add_client_subscription(
405                                        client_id,
406                                        sub_key.clone(),
407                                        cancel_token.clone(),
408                                    ).await;
409
410                                    if !is_new {
411                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412                                        continue;
413                                    }
414
415                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
416                                } else {
417                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
418                                }
419                            }
420                        }
421                    }
422                    Some(Err(e)) => {
423                        warn!("WebSocket error for client {}: {}", client_id, e);
424                        break;
425                    }
426                    None => {
427                        debug!("WebSocket stream ended for client {}", client_id);
428                        break;
429                    }
430                }
431            }
432        }
433    }
434
435    client_manager
436        .cancel_all_client_subscriptions(client_id)
437        .await;
438    client_manager.remove_client(client_id);
439    info!("Client {} disconnected", client_id);
440
441    Ok(())
442}
443
444async fn send_snapshot_batches(
445    client_id: Uuid,
446    entities: &[SnapshotEntity],
447    mode: Mode,
448    view_id: &str,
449    client_manager: &ClientManager,
450    batch_config: &SnapshotBatchConfig,
451    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453    let total = entities.len();
454    if total == 0 {
455        return Ok(());
456    }
457
458    let mut offset = 0;
459    let mut batch_num = 0;
460
461    while offset < total {
462        let batch_size = if batch_num == 0 {
463            batch_config.initial_batch_size
464        } else {
465            batch_config.subsequent_batch_size
466        };
467
468        let end = (offset + batch_size).min(total);
469        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470        let is_complete = end >= total;
471
472        let snapshot_frame = SnapshotFrame {
473            mode,
474            export: view_id.to_string(),
475            op: "snapshot",
476            data: batch_data,
477            complete: is_complete,
478        };
479
480        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481            let payload = maybe_compress(&json_payload);
482            if client_manager
483                .send_compressed_async(client_id, payload)
484                .await
485                .is_err()
486            {
487                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488            }
489            #[cfg(feature = "otel")]
490            if let Some(m) = metrics {
491                m.record_ws_message_sent();
492            }
493        }
494
495        offset = end;
496        batch_num += 1;
497    }
498
499    debug!(
500        "Sent {} snapshot batches ({} entities) for {} to client {}",
501        batch_num, total, view_id, client_id
502    );
503
504    Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509        return Some(SortConfig {
510            field: sort.field_path.clone(),
511            order: match sort.order {
512                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514            },
515        });
516    }
517
518    if view_spec.mode == Mode::List {
519        return Some(SortConfig {
520            field: vec!["_seq".to_string()],
521            order: SortOrder::Desc,
522        });
523    }
524
525    None
526}
527
528fn send_subscribed_frame(
529    client_id: Uuid,
530    view_id: &str,
531    view_spec: &ViewSpec,
532    client_manager: &ClientManager,
533) -> Result<()> {
534    let sort_config = extract_sort_config(view_spec);
535    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537    let json_payload = serde_json::to_vec(&subscribed_frame)?;
538    let payload = Arc::new(Bytes::from(json_payload));
539    client_manager
540        .send_to_client(client_id, payload)
541        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546    ctx: &SubscriptionContext<'_>,
547    subscription: Subscription,
548    cancel_token: CancellationToken,
549) {
550    let view_id = &subscription.view;
551
552    let view_spec = match ctx.view_index.get_view(view_id) {
553        Some(spec) => spec.clone(),
554        None => {
555            warn!("Unknown view ID: {}", view_id);
556            return;
557        }
558    };
559
560    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561        warn!("Failed to send subscribed frame: {}", e);
562        return;
563    }
564
565    let is_derived_with_sort = view_spec.is_derived()
566        && view_spec
567            .pipeline
568            .as_ref()
569            .map(|p| p.sort.is_some())
570            .unwrap_or(false);
571
572    if is_derived_with_sort {
573        attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574        return;
575    }
576
577    match view_spec.mode {
578        Mode::State => {
579            let key = subscription.key.as_deref().unwrap_or("");
580
581            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
582
583            if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
584                let snapshot_entities = vec![SnapshotEntity {
585                    key: key.to_string(),
586                    data: cached_entity,
587                }];
588                let batch_config = ctx.entity_cache.snapshot_config();
589                let _ = send_snapshot_batches(
590                    ctx.client_id,
591                    &snapshot_entities,
592                    view_spec.mode,
593                    view_id,
594                    ctx.client_manager,
595                    &batch_config,
596                    #[cfg(feature = "otel")]
597                    ctx.metrics.as_ref(),
598                )
599                .await;
600                rx.borrow_and_update();
601            } else if !rx.borrow().is_empty() {
602                let data = rx.borrow_and_update().clone();
603                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
604            }
605
606            let client_id = ctx.client_id;
607            let client_mgr = ctx.client_manager.clone();
608            let metrics_clone = ctx.metrics.clone();
609            let view_id_clone = view_id.clone();
610            let key_clone = key.to_string();
611            tokio::spawn(
612                async move {
613                    loop {
614                        tokio::select! {
615                            _ = cancel_token.cancelled() => {
616                                debug!("State subscription cancelled for client {}", client_id);
617                                break;
618                            }
619                            result = rx.changed() => {
620                                if result.is_err() {
621                                    break;
622                                }
623                                let data = rx.borrow().clone();
624                                if client_mgr.send_to_client(client_id, data).is_err() {
625                                    break;
626                                }
627                                if let Some(ref m) = metrics_clone {
628                                    m.record_ws_message_sent();
629                                }
630                            }
631                        }
632                    }
633                }
634                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
635            );
636        }
637        Mode::List | Mode::Append => {
638            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
639
640            let snapshots = ctx.entity_cache.get_all(view_id).await;
641            let snapshot_entities: Vec<SnapshotEntity> = snapshots
642                .into_iter()
643                .filter(|(key, _)| subscription.matches_key(key))
644                .map(|(key, data)| SnapshotEntity { key, data })
645                .collect();
646
647            if !snapshot_entities.is_empty() {
648                let batch_config = ctx.entity_cache.snapshot_config();
649                if send_snapshot_batches(
650                    ctx.client_id,
651                    &snapshot_entities,
652                    view_spec.mode,
653                    view_id,
654                    ctx.client_manager,
655                    &batch_config,
656                    #[cfg(feature = "otel")]
657                    ctx.metrics.as_ref(),
658                )
659                .await
660                .is_err()
661                {
662                    return;
663                }
664            }
665
666            let client_id = ctx.client_id;
667            let client_mgr = ctx.client_manager.clone();
668            let sub = subscription.clone();
669            let metrics_clone = ctx.metrics.clone();
670            let view_id_clone = view_id.clone();
671            let mode = view_spec.mode;
672            tokio::spawn(
673                async move {
674                    loop {
675                        tokio::select! {
676                            _ = cancel_token.cancelled() => {
677                                debug!("List subscription cancelled for client {}", client_id);
678                                break;
679                            }
680                            result = rx.recv() => {
681                                match result {
682                                    Ok(envelope) => {
683                                        if sub.matches(&envelope.entity, &envelope.key) {
684                                            if client_mgr
685                                                .send_to_client(client_id, envelope.payload.clone())
686                                                .is_err()
687                                            {
688                                                break;
689                                            }
690                                            if let Some(ref m) = metrics_clone {
691                                                m.record_ws_message_sent();
692                                            }
693                                        }
694                                    }
695                                    Err(_) => break,
696                                }
697                            }
698                        }
699                    }
700                }
701                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
702            );
703        }
704    }
705
706    info!(
707        "Client {} subscribed to {} (mode: {:?})",
708        ctx.client_id, view_id, view_spec.mode
709    );
710}
711
712#[cfg(feature = "otel")]
713async fn attach_derived_view_subscription_otel(
714    ctx: &SubscriptionContext<'_>,
715    subscription: Subscription,
716    view_spec: ViewSpec,
717    cancel_token: CancellationToken,
718) {
719    let view_id = &subscription.view;
720    let pipeline_limit = view_spec
721        .pipeline
722        .as_ref()
723        .and_then(|p| p.limit)
724        .unwrap_or(100);
725    let take = subscription.take.unwrap_or(pipeline_limit);
726    let skip = subscription.skip.unwrap_or(0);
727    let is_single = take == 1;
728
729    let source_view_id = match &view_spec.source_view {
730        Some(s) => s.clone(),
731        None => {
732            warn!("Derived view {} has no source_view", view_id);
733            return;
734        }
735    };
736
737    let sorted_caches = ctx.view_index.sorted_caches();
738    let initial_window: Vec<(String, serde_json::Value)> = {
739        let mut caches = sorted_caches.write().await;
740        if let Some(cache) = caches.get_mut(view_id) {
741            cache.get_window(skip, take)
742        } else {
743            warn!("No sorted cache for derived view {}", view_id);
744            vec![]
745        }
746    };
747
748    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
749
750    if !initial_window.is_empty() {
751        let snapshot_entities: Vec<SnapshotEntity> = initial_window
752            .into_iter()
753            .map(|(key, data)| SnapshotEntity { key, data })
754            .collect();
755
756        let batch_config = ctx.entity_cache.snapshot_config();
757        if send_snapshot_batches(
758            ctx.client_id,
759            &snapshot_entities,
760            view_spec.mode,
761            view_id,
762            ctx.client_manager,
763            &batch_config,
764            ctx.metrics.as_ref(),
765        )
766        .await
767        .is_err()
768        {
769            return;
770        }
771    }
772
773    let mut rx = ctx
774        .bus_manager
775        .get_or_create_list_bus(&source_view_id)
776        .await;
777
778    let client_id = ctx.client_id;
779    let client_mgr = ctx.client_manager.clone();
780    let view_id_clone = view_id.clone();
781    let view_id_span = view_id.clone();
782    let sorted_caches_clone = sorted_caches;
783    let metrics_clone = ctx.metrics.clone();
784    let frame_mode = view_spec.mode;
785
786    tokio::spawn(
787        async move {
788            let mut current_window_keys = initial_keys;
789
790            loop {
791                tokio::select! {
792                    _ = cancel_token.cancelled() => {
793                        debug!("Derived view subscription cancelled for client {}", client_id);
794                        break;
795                    }
796                    result = rx.recv() => {
797                        match result {
798                            Ok(_envelope) => {
799                                let new_window: Vec<(String, serde_json::Value)> = {
800                                    let mut caches = sorted_caches_clone.write().await;
801                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
802                                        cache.get_window(skip, take)
803                                    } else {
804                                        continue;
805                                    }
806                                };
807
808                                let new_keys: HashSet<String> =
809                                    new_window.iter().map(|(k, _)| k.clone()).collect();
810
811                                if is_single {
812                                    if let Some((new_key, data)) = new_window.first() {
813                                        for old_key in current_window_keys.difference(&new_keys) {
814                                            let delete_frame = Frame {
815                                                mode: frame_mode,
816                                                export: view_id_clone.clone(),
817                                                op: "delete",
818                                                key: old_key.clone(),
819                                                data: serde_json::Value::Null,
820                                                append: vec![],
821                                            };
822                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
823                                                let payload = Arc::new(Bytes::from(json));
824                                                if client_mgr.send_to_client(client_id, payload).is_err() {
825                                                    return;
826                                                }
827                                                if let Some(ref m) = metrics_clone {
828                                                    m.record_ws_message_sent();
829                                                }
830                                            }
831                                        }
832
833                                        let frame = Frame {
834                                            mode: frame_mode,
835                                            export: view_id_clone.clone(),
836                                            op: "upsert",
837                                            key: new_key.clone(),
838                                            data: data.clone(),
839                                            append: vec![],
840                                        };
841                                        if let Ok(json) = serde_json::to_vec(&frame) {
842                                            let payload = Arc::new(Bytes::from(json));
843                                            if client_mgr.send_to_client(client_id, payload).is_err() {
844                                                return;
845                                            }
846                                            if let Some(ref m) = metrics_clone {
847                                                m.record_ws_message_sent();
848                                            }
849                                        }
850                                    }
851                                } else {
852                                    for key in current_window_keys.difference(&new_keys) {
853                                        let delete_frame = Frame {
854                                            mode: frame_mode,
855                                            export: view_id_clone.clone(),
856                                            op: "delete",
857                                            key: key.clone(),
858                                            data: serde_json::Value::Null,
859                                            append: vec![],
860                                        };
861                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
862                                            let payload = Arc::new(Bytes::from(json));
863                                            if client_mgr.send_to_client(client_id, payload).is_err() {
864                                                return;
865                                            }
866                                            if let Some(ref m) = metrics_clone {
867                                                m.record_ws_message_sent();
868                                            }
869                                        }
870                                    }
871
872                                    for (key, data) in &new_window {
873                                        let frame = Frame {
874                                            mode: frame_mode,
875                                            export: view_id_clone.clone(),
876                                            op: "upsert",
877                                            key: key.clone(),
878                                            data: data.clone(),
879                                            append: vec![],
880                                        };
881                                        if let Ok(json) = serde_json::to_vec(&frame) {
882                                            let payload = Arc::new(Bytes::from(json));
883                                            if client_mgr.send_to_client(client_id, payload).is_err() {
884                                                return;
885                                            }
886                                            if let Some(ref m) = metrics_clone {
887                                                m.record_ws_message_sent();
888                                            }
889                                        }
890                                    }
891                                }
892
893                                current_window_keys = new_keys;
894                            }
895                            Err(_) => break,
896                        }
897                    }
898                }
899            }
900        }
901        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
902    );
903
904    info!(
905        "Client {} subscribed to derived view {} (take={}, skip={})",
906        ctx.client_id, view_id, take, skip
907    );
908}
909
910#[cfg(not(feature = "otel"))]
911async fn attach_client_to_bus(
912    ctx: &SubscriptionContext<'_>,
913    subscription: Subscription,
914    cancel_token: CancellationToken,
915) {
916    let view_id = &subscription.view;
917
918    let view_spec = match ctx.view_index.get_view(view_id) {
919        Some(spec) => spec.clone(),
920        None => {
921            warn!("Unknown view ID: {}", view_id);
922            return;
923        }
924    };
925
926    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
927        warn!("Failed to send subscribed frame: {}", e);
928        return;
929    }
930
931    let is_derived_with_sort = view_spec.is_derived()
932        && view_spec
933            .pipeline
934            .as_ref()
935            .map(|p| p.sort.is_some())
936            .unwrap_or(false);
937
938    if is_derived_with_sort {
939        attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
940        return;
941    }
942
943    match view_spec.mode {
944        Mode::State => {
945            let key = subscription.key.as_deref().unwrap_or("");
946
947            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
948
949            if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
950                let snapshot_entities = vec![SnapshotEntity {
951                    key: key.to_string(),
952                    data: cached_entity,
953                }];
954                let batch_config = ctx.entity_cache.snapshot_config();
955                let _ = send_snapshot_batches(
956                    ctx.client_id,
957                    &snapshot_entities,
958                    view_spec.mode,
959                    view_id,
960                    ctx.client_manager,
961                    &batch_config,
962                )
963                .await;
964                rx.borrow_and_update();
965            } else if !rx.borrow().is_empty() {
966                let data = rx.borrow_and_update().clone();
967                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
968            }
969
970            let client_id = ctx.client_id;
971            let client_mgr = ctx.client_manager.clone();
972            let view_id_clone = view_id.clone();
973            let key_clone = key.to_string();
974            tokio::spawn(
975                async move {
976                    loop {
977                        tokio::select! {
978                            _ = cancel_token.cancelled() => {
979                                debug!("State subscription cancelled for client {}", client_id);
980                                break;
981                            }
982                            result = rx.changed() => {
983                                if result.is_err() {
984                                    break;
985                                }
986                                let data = rx.borrow().clone();
987                                if client_mgr.send_to_client(client_id, data).is_err() {
988                                    break;
989                                }
990                            }
991                        }
992                    }
993                }
994                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
995            );
996        }
997        Mode::List | Mode::Append => {
998            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
999
1000            let snapshots = ctx.entity_cache.get_all(view_id).await;
1001            let snapshot_entities: Vec<SnapshotEntity> = snapshots
1002                .into_iter()
1003                .filter(|(key, _)| subscription.matches_key(key))
1004                .map(|(key, data)| SnapshotEntity { key, data })
1005                .collect();
1006
1007            if !snapshot_entities.is_empty() {
1008                let batch_config = ctx.entity_cache.snapshot_config();
1009                if send_snapshot_batches(
1010                    ctx.client_id,
1011                    &snapshot_entities,
1012                    view_spec.mode,
1013                    view_id,
1014                    ctx.client_manager,
1015                    &batch_config,
1016                )
1017                .await
1018                .is_err()
1019                {
1020                    return;
1021                }
1022            }
1023
1024            let client_id = ctx.client_id;
1025            let client_mgr = ctx.client_manager.clone();
1026            let sub = subscription.clone();
1027            let view_id_clone = view_id.clone();
1028            let mode = view_spec.mode;
1029            tokio::spawn(
1030                async move {
1031                    loop {
1032                        tokio::select! {
1033                            _ = cancel_token.cancelled() => {
1034                                debug!("List subscription cancelled for client {}", client_id);
1035                                break;
1036                            }
1037                            result = rx.recv() => {
1038                                match result {
1039                                    Ok(envelope) => {
1040                                        if sub.matches(&envelope.entity, &envelope.key)
1041                                            && client_mgr
1042                                                .send_to_client(client_id, envelope.payload.clone())
1043                                                .is_err()
1044                                        {
1045                                            break;
1046                                        }
1047                                    }
1048                                    Err(_) => break,
1049                                }
1050                            }
1051                        }
1052                    }
1053                }
1054                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1055            );
1056        }
1057    }
1058
1059    info!(
1060        "Client {} subscribed to {} (mode: {:?})",
1061        ctx.client_id, view_id, view_spec.mode
1062    );
1063}
1064
1065#[cfg(not(feature = "otel"))]
1066async fn attach_derived_view_subscription(
1067    ctx: &SubscriptionContext<'_>,
1068    subscription: Subscription,
1069    view_spec: ViewSpec,
1070    cancel_token: CancellationToken,
1071) {
1072    let view_id = &subscription.view;
1073    let pipeline_limit = view_spec
1074        .pipeline
1075        .as_ref()
1076        .and_then(|p| p.limit)
1077        .unwrap_or(100);
1078    let take = subscription.take.unwrap_or(pipeline_limit);
1079    let skip = subscription.skip.unwrap_or(0);
1080    let is_single = take == 1;
1081
1082    let source_view_id = match &view_spec.source_view {
1083        Some(s) => s.clone(),
1084        None => {
1085            warn!("Derived view {} has no source_view", view_id);
1086            return;
1087        }
1088    };
1089
1090    let sorted_caches = ctx.view_index.sorted_caches();
1091    let initial_window: Vec<(String, serde_json::Value)> = {
1092        let mut caches = sorted_caches.write().await;
1093        if let Some(cache) = caches.get_mut(view_id) {
1094            cache.get_window(skip, take)
1095        } else {
1096            warn!("No sorted cache for derived view {}", view_id);
1097            vec![]
1098        }
1099    };
1100
1101    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1102
1103    if !initial_window.is_empty() {
1104        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1105            .into_iter()
1106            .map(|(key, data)| SnapshotEntity { key, data })
1107            .collect();
1108
1109        let batch_config = ctx.entity_cache.snapshot_config();
1110        if send_snapshot_batches(
1111            ctx.client_id,
1112            &snapshot_entities,
1113            view_spec.mode,
1114            view_id,
1115            ctx.client_manager,
1116            &batch_config,
1117        )
1118        .await
1119        .is_err()
1120        {
1121            return;
1122        }
1123    }
1124
1125    let mut rx = ctx
1126        .bus_manager
1127        .get_or_create_list_bus(&source_view_id)
1128        .await;
1129
1130    let client_id = ctx.client_id;
1131    let client_mgr = ctx.client_manager.clone();
1132    let view_id_clone = view_id.clone();
1133    let view_id_span = view_id.clone();
1134    let sorted_caches_clone = sorted_caches;
1135    let frame_mode = view_spec.mode;
1136
1137    tokio::spawn(
1138        async move {
1139            let mut current_window_keys = initial_keys;
1140
1141            loop {
1142                tokio::select! {
1143                    _ = cancel_token.cancelled() => {
1144                        debug!("Derived view subscription cancelled for client {}", client_id);
1145                        break;
1146                    }
1147                    result = rx.recv() => {
1148                        match result {
1149                            Ok(_envelope) => {
1150                                let new_window: Vec<(String, serde_json::Value)> = {
1151                                    let mut caches = sorted_caches_clone.write().await;
1152                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1153                                        cache.get_window(skip, take)
1154                                    } else {
1155                                        continue;
1156                                    }
1157                                };
1158
1159                                let new_keys: HashSet<String> =
1160                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1161
1162                                if is_single {
1163                                    if let Some((new_key, data)) = new_window.first() {
1164                                        for old_key in current_window_keys.difference(&new_keys) {
1165                                            let delete_frame = Frame {
1166                                                mode: frame_mode,
1167                                                export: view_id_clone.clone(),
1168                                                op: "delete",
1169                                                key: old_key.clone(),
1170                                                data: serde_json::Value::Null,
1171                                                append: vec![],
1172                                            };
1173                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1174                                                let payload = Arc::new(Bytes::from(json));
1175                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1176                                                    return;
1177                                                }
1178                                            }
1179                                        }
1180
1181                                        let frame = Frame {
1182                                            mode: frame_mode,
1183                                            export: view_id_clone.clone(),
1184                                            op: "upsert",
1185                                            key: new_key.clone(),
1186                                            data: data.clone(),
1187                                            append: vec![],
1188                                        };
1189                                        if let Ok(json) = serde_json::to_vec(&frame) {
1190                                            let payload = Arc::new(Bytes::from(json));
1191                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1192                                                return;
1193                                            }
1194                                        }
1195                                    }
1196                                } else {
1197                                    for key in current_window_keys.difference(&new_keys) {
1198                                        let delete_frame = Frame {
1199                                            mode: frame_mode,
1200                                            export: view_id_clone.clone(),
1201                                            op: "delete",
1202                                            key: key.clone(),
1203                                            data: serde_json::Value::Null,
1204                                            append: vec![],
1205                                        };
1206                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1207                                            let payload = Arc::new(Bytes::from(json));
1208                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1209                                                return;
1210                                            }
1211                                        }
1212                                    }
1213
1214                                    for (key, data) in &new_window {
1215                                        let frame = Frame {
1216                                            mode: frame_mode,
1217                                            export: view_id_clone.clone(),
1218                                            op: "upsert",
1219                                            key: key.clone(),
1220                                            data: data.clone(),
1221                                            append: vec![],
1222                                        };
1223                                        if let Ok(json) = serde_json::to_vec(&frame) {
1224                                            let payload = Arc::new(Bytes::from(json));
1225                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1226                                                return;
1227                                            }
1228                                        }
1229                                    }
1230                                }
1231
1232                                current_window_keys = new_keys;
1233                            }
1234                            Err(_) => break,
1235                        }
1236                    }
1237                }
1238            }
1239        }
1240        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1241    );
1242
1243    info!(
1244        "Client {} subscribed to derived view {} (take={}, skip={})",
1245        ctx.client_id, view_id, take, skip
1246    );
1247}