Skip to main content

hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7    Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29    client_id: Uuid,
30    client_manager: &'a ClientManager,
31    bus_manager: &'a BusManager,
32    entity_cache: &'a EntityCache,
33    view_index: &'a ViewIndex,
34    #[cfg(feature = "otel")]
35    metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39    bind_addr: SocketAddr,
40    client_manager: ClientManager,
41    bus_manager: BusManager,
42    entity_cache: EntityCache,
43    view_index: Arc<ViewIndex>,
44    max_clients: usize,
45    #[cfg(feature = "otel")]
46    metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50    #[cfg(feature = "otel")]
51    pub fn new(
52        bind_addr: SocketAddr,
53        bus_manager: BusManager,
54        entity_cache: EntityCache,
55        view_index: Arc<ViewIndex>,
56        metrics: Option<Arc<Metrics>>,
57    ) -> Self {
58        Self {
59            bind_addr,
60            client_manager: ClientManager::new(),
61            bus_manager,
62            entity_cache,
63            view_index,
64            max_clients: 10000,
65            metrics,
66        }
67    }
68
69    #[cfg(not(feature = "otel"))]
70    pub fn new(
71        bind_addr: SocketAddr,
72        bus_manager: BusManager,
73        entity_cache: EntityCache,
74        view_index: Arc<ViewIndex>,
75    ) -> Self {
76        Self {
77            bind_addr,
78            client_manager: ClientManager::new(),
79            bus_manager,
80            entity_cache,
81            view_index,
82            max_clients: 10000,
83        }
84    }
85
86    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87        self.max_clients = max_clients;
88        self
89    }
90
91    pub async fn start(self) -> Result<()> {
92        info!(
93            "Starting WebSocket server on {} (max_clients: {})",
94            self.bind_addr, self.max_clients
95        );
96
97        let listener = TcpListener::bind(&self.bind_addr).await?;
98        info!("WebSocket server listening on {}", self.bind_addr);
99
100        self.client_manager.start_cleanup_task();
101
102        loop {
103            match listener.accept().await {
104                Ok((stream, addr)) => {
105                    let client_count = self.client_manager.client_count();
106                    if client_count >= self.max_clients {
107                        warn!(
108                            "Rejecting connection from {} - max clients ({}) reached",
109                            addr, self.max_clients
110                        );
111                        drop(stream);
112                        continue;
113                    }
114
115                    #[cfg(feature = "otel")]
116                    if let Some(ref metrics) = self.metrics {
117                        metrics.record_ws_connection();
118                    }
119
120                    info!(
121                        "New WebSocket connection from {} ({}/{} clients)",
122                        addr,
123                        client_count + 1,
124                        self.max_clients
125                    );
126                    let client_manager = self.client_manager.clone();
127                    let bus_manager = self.bus_manager.clone();
128                    let entity_cache = self.entity_cache.clone();
129                    let view_index = self.view_index.clone();
130                    #[cfg(feature = "otel")]
131                    let metrics = self.metrics.clone();
132
133                    tokio::spawn(
134                        async move {
135                            #[cfg(feature = "otel")]
136                            let result = handle_connection(
137                                stream,
138                                client_manager,
139                                bus_manager,
140                                entity_cache,
141                                view_index,
142                                metrics,
143                            )
144                            .await;
145                            #[cfg(not(feature = "otel"))]
146                            let result = handle_connection(
147                                stream,
148                                client_manager,
149                                bus_manager,
150                                entity_cache,
151                                view_index,
152                            )
153                            .await;
154
155                            if let Err(e) = result {
156                                error!("WebSocket connection error: {}", e);
157                            }
158                        }
159                        .instrument(info_span!("ws.connection", %addr)),
160                    );
161                }
162                Err(e) => {
163                    error!("Failed to accept connection: {}", e);
164                }
165            }
166        }
167    }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172    stream: TcpStream,
173    client_manager: ClientManager,
174    bus_manager: BusManager,
175    entity_cache: EntityCache,
176    view_index: Arc<ViewIndex>,
177    metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179    let ws_stream = accept_async(stream).await?;
180    let client_id = Uuid::new_v4();
181    let connection_start = Instant::now();
182
183    info!("WebSocket connection established for client {}", client_id);
184
185    let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187    client_manager.add_client(client_id, ws_sender);
188
189    let ctx = SubscriptionContext {
190        client_id,
191        client_manager: &client_manager,
192        bus_manager: &bus_manager,
193        entity_cache: &entity_cache,
194        view_index: &view_index,
195        metrics: metrics.clone(),
196    };
197
198    let mut active_subscriptions: Vec<String> = Vec::new();
199
200    loop {
201        tokio::select! {
202            ws_msg = ws_receiver.next() => {
203                match ws_msg {
204                    Some(Ok(msg)) => {
205                        if msg.is_close() {
206                            info!("Client {} requested close", client_id);
207                            break;
208                        }
209
210                        client_manager.update_client_last_seen(client_id);
211
212                        if msg.is_text() {
213                            if let Some(ref m) = metrics {
214                                m.record_ws_message_received();
215                            }
216
217                            if let Ok(text) = msg.to_text() {
218                                debug!("Received text message from client {}: {}", client_id, text);
219
220                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221                                    match client_msg {
222                                        ClientMessage::Subscribe(subscription) => {
223                                            let view_id = subscription.view.clone();
224                                            let sub_key = subscription.sub_key();
225                                            client_manager.update_subscription(client_id, subscription.clone());
226
227                                            let cancel_token = CancellationToken::new();
228                                            let is_new = client_manager.add_client_subscription(
229                                                client_id,
230                                                sub_key.clone(),
231                                                cancel_token.clone(),
232                                            ).await;
233
234                                            if !is_new {
235                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236                                                continue;
237                                            }
238
239                                            if let Some(ref m) = metrics {
240                                                m.record_subscription_created(&view_id);
241                                            }
242                                            active_subscriptions.push(view_id);
243
244                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
245                                        }
246                                        ClientMessage::Unsubscribe(unsub) => {
247                                            let sub_key = unsub.sub_key();
248                                            let removed = client_manager
249                                                .remove_client_subscription(client_id, &sub_key)
250                                                .await;
251
252                                            if removed {
253                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
254                                                if let Some(ref m) = metrics {
255                                                    m.record_subscription_removed(&unsub.view);
256                                                }
257                                            }
258                                        }
259                                        ClientMessage::Ping => {
260                                            debug!("Received ping from client {}", client_id);
261                                        }
262                                    }
263                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264                                    let view_id = subscription.view.clone();
265                                    let sub_key = subscription.sub_key();
266                                    client_manager.update_subscription(client_id, subscription.clone());
267
268                                    let cancel_token = CancellationToken::new();
269                                    let is_new = client_manager.add_client_subscription(
270                                        client_id,
271                                        sub_key.clone(),
272                                        cancel_token.clone(),
273                                    ).await;
274
275                                    if !is_new {
276                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277                                        continue;
278                                    }
279
280                                    if let Some(ref m) = metrics {
281                                        m.record_subscription_created(&view_id);
282                                    }
283                                    active_subscriptions.push(view_id);
284
285                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
286                                } else {
287                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
288                                }
289                            }
290                        }
291                    }
292                    Some(Err(e)) => {
293                        warn!("WebSocket error for client {}: {}", client_id, e);
294                        break;
295                    }
296                    None => {
297                        debug!("WebSocket stream ended for client {}", client_id);
298                        break;
299                    }
300                }
301            }
302        }
303    }
304
305    client_manager
306        .cancel_all_client_subscriptions(client_id)
307        .await;
308    client_manager.remove_client(client_id);
309
310    if let Some(ref m) = metrics {
311        let duration_secs = connection_start.elapsed().as_secs_f64();
312        m.record_ws_disconnection(duration_secs);
313
314        for view_id in active_subscriptions {
315            m.record_subscription_removed(&view_id);
316        }
317    }
318
319    info!("Client {} disconnected", client_id);
320
321    Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326    stream: TcpStream,
327    client_manager: ClientManager,
328    bus_manager: BusManager,
329    entity_cache: EntityCache,
330    view_index: Arc<ViewIndex>,
331) -> Result<()> {
332    let ws_stream = accept_async(stream).await?;
333    let client_id = Uuid::new_v4();
334
335    info!("WebSocket connection established for client {}", client_id);
336
337    let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339    client_manager.add_client(client_id, ws_sender);
340
341    let ctx = SubscriptionContext {
342        client_id,
343        client_manager: &client_manager,
344        bus_manager: &bus_manager,
345        entity_cache: &entity_cache,
346        view_index: &view_index,
347    };
348
349    loop {
350        tokio::select! {
351            ws_msg = ws_receiver.next() => {
352                match ws_msg {
353                    Some(Ok(msg)) => {
354                        if msg.is_close() {
355                            info!("Client {} requested close", client_id);
356                            break;
357                        }
358
359                        client_manager.update_client_last_seen(client_id);
360
361                        if msg.is_text() {
362                            if let Ok(text) = msg.to_text() {
363                                debug!("Received text message from client {}: {}", client_id, text);
364
365                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366                                    match client_msg {
367                                        ClientMessage::Subscribe(subscription) => {
368                                            let sub_key = subscription.sub_key();
369                                            client_manager.update_subscription(client_id, subscription.clone());
370
371                                            let cancel_token = CancellationToken::new();
372                                            let is_new = client_manager.add_client_subscription(
373                                                client_id,
374                                                sub_key.clone(),
375                                                cancel_token.clone(),
376                                            ).await;
377
378                                            if !is_new {
379                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380                                                continue;
381                                            }
382
383                                            attach_client_to_bus(&ctx, subscription, cancel_token).await;
384                                        }
385                                        ClientMessage::Unsubscribe(unsub) => {
386                                            let sub_key = unsub.sub_key();
387                                            let removed = client_manager
388                                                .remove_client_subscription(client_id, &sub_key)
389                                                .await;
390
391                                            if removed {
392                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
393                                            }
394                                        }
395                                        ClientMessage::Ping => {
396                                            debug!("Received ping from client {}", client_id);
397                                        }
398                                    }
399                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400                                    let sub_key = subscription.sub_key();
401                                    client_manager.update_subscription(client_id, subscription.clone());
402
403                                    let cancel_token = CancellationToken::new();
404                                    let is_new = client_manager.add_client_subscription(
405                                        client_id,
406                                        sub_key.clone(),
407                                        cancel_token.clone(),
408                                    ).await;
409
410                                    if !is_new {
411                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412                                        continue;
413                                    }
414
415                                    attach_client_to_bus(&ctx, subscription, cancel_token).await;
416                                } else {
417                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
418                                }
419                            }
420                        }
421                    }
422                    Some(Err(e)) => {
423                        warn!("WebSocket error for client {}: {}", client_id, e);
424                        break;
425                    }
426                    None => {
427                        debug!("WebSocket stream ended for client {}", client_id);
428                        break;
429                    }
430                }
431            }
432        }
433    }
434
435    client_manager
436        .cancel_all_client_subscriptions(client_id)
437        .await;
438    client_manager.remove_client(client_id);
439    info!("Client {} disconnected", client_id);
440
441    Ok(())
442}
443
444async fn send_snapshot_batches(
445    client_id: Uuid,
446    entities: &[SnapshotEntity],
447    mode: Mode,
448    view_id: &str,
449    client_manager: &ClientManager,
450    batch_config: &SnapshotBatchConfig,
451    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453    let total = entities.len();
454    if total == 0 {
455        return Ok(());
456    }
457
458    let mut offset = 0;
459    let mut batch_num = 0;
460
461    while offset < total {
462        let batch_size = if batch_num == 0 {
463            batch_config.initial_batch_size
464        } else {
465            batch_config.subsequent_batch_size
466        };
467
468        let end = (offset + batch_size).min(total);
469        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470        let is_complete = end >= total;
471
472        let snapshot_frame = SnapshotFrame {
473            mode,
474            export: view_id.to_string(),
475            op: "snapshot",
476            data: batch_data,
477            complete: is_complete,
478        };
479
480        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481            let payload = maybe_compress(&json_payload);
482            if client_manager
483                .send_compressed_async(client_id, payload)
484                .await
485                .is_err()
486            {
487                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488            }
489            #[cfg(feature = "otel")]
490            if let Some(m) = metrics {
491                m.record_ws_message_sent();
492            }
493        }
494
495        offset = end;
496        batch_num += 1;
497    }
498
499    debug!(
500        "Sent {} snapshot batches ({} entities) for {} to client {}",
501        batch_num, total, view_id, client_id
502    );
503
504    Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509        return Some(SortConfig {
510            field: sort.field_path.clone(),
511            order: match sort.order {
512                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514            },
515        });
516    }
517
518    if view_spec.mode == Mode::List {
519        return Some(SortConfig {
520            field: vec!["_seq".to_string()],
521            order: SortOrder::Desc,
522        });
523    }
524
525    None
526}
527
528fn send_subscribed_frame(
529    client_id: Uuid,
530    view_id: &str,
531    view_spec: &ViewSpec,
532    client_manager: &ClientManager,
533) -> Result<()> {
534    let sort_config = extract_sort_config(view_spec);
535    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537    let json_payload = serde_json::to_vec(&subscribed_frame)?;
538    let payload = Arc::new(Bytes::from(json_payload));
539    client_manager
540        .send_to_client(client_id, payload)
541        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546    ctx: &SubscriptionContext<'_>,
547    subscription: Subscription,
548    cancel_token: CancellationToken,
549) {
550    let view_id = &subscription.view;
551
552    let view_spec = match ctx.view_index.get_view(view_id) {
553        Some(spec) => spec.clone(),
554        None => {
555            warn!("Unknown view ID: {}", view_id);
556            return;
557        }
558    };
559
560    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561        warn!("Failed to send subscribed frame: {}", e);
562        return;
563    }
564
565    let is_derived_with_sort = view_spec.is_derived()
566        && view_spec
567            .pipeline
568            .as_ref()
569            .map(|p| p.sort.is_some())
570            .unwrap_or(false);
571
572    if is_derived_with_sort {
573        attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574        return;
575    }
576
577    match view_spec.mode {
578        Mode::State => {
579            let key = subscription.key.as_deref().unwrap_or("");
580
581            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
582
583            if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
584                let snapshot_entities = vec![SnapshotEntity {
585                    key: key.to_string(),
586                    data: cached_entity,
587                }];
588                let batch_config = ctx.entity_cache.snapshot_config();
589                let _ = send_snapshot_batches(
590                    ctx.client_id,
591                    &snapshot_entities,
592                    view_spec.mode,
593                    view_id,
594                    ctx.client_manager,
595                    &batch_config,
596                    #[cfg(feature = "otel")]
597                    ctx.metrics.as_ref(),
598                )
599                .await;
600                rx.borrow_and_update();
601            } else if !rx.borrow().is_empty() {
602                let data = rx.borrow_and_update().clone();
603                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
604            }
605
606            let client_id = ctx.client_id;
607            let client_mgr = ctx.client_manager.clone();
608            let metrics_clone = ctx.metrics.clone();
609            let view_id_clone = view_id.clone();
610            let key_clone = key.to_string();
611            tokio::spawn(
612                async move {
613                    loop {
614                        tokio::select! {
615                            _ = cancel_token.cancelled() => {
616                                debug!("State subscription cancelled for client {}", client_id);
617                                break;
618                            }
619                            result = rx.changed() => {
620                                if result.is_err() {
621                                    break;
622                                }
623                                let data = rx.borrow().clone();
624                                if client_mgr.send_to_client(client_id, data).is_err() {
625                                    break;
626                                }
627                                if let Some(ref m) = metrics_clone {
628                                    m.record_ws_message_sent();
629                                }
630                            }
631                        }
632                    }
633                }
634                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
635            );
636        }
637        Mode::List | Mode::Append => {
638            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
639
640            let snapshots = ctx.entity_cache.get_all(view_id).await;
641            let snapshot_entities: Vec<SnapshotEntity> = snapshots
642                .into_iter()
643                .filter(|(key, _)| subscription.matches_key(key))
644                .map(|(key, data)| SnapshotEntity { key, data })
645                .collect();
646
647            if !snapshot_entities.is_empty() {
648                let batch_config = ctx.entity_cache.snapshot_config();
649                if send_snapshot_batches(
650                    ctx.client_id,
651                    &snapshot_entities,
652                    view_spec.mode,
653                    view_id,
654                    ctx.client_manager,
655                    &batch_config,
656                    #[cfg(feature = "otel")]
657                    ctx.metrics.as_ref(),
658                )
659                .await
660                .is_err()
661                {
662                    return;
663                }
664            }
665
666            let client_id = ctx.client_id;
667            let client_mgr = ctx.client_manager.clone();
668            let sub = subscription.clone();
669            let metrics_clone = ctx.metrics.clone();
670            let view_id_clone = view_id.clone();
671            let mode = view_spec.mode;
672            tokio::spawn(
673                async move {
674                    loop {
675                        tokio::select! {
676                            _ = cancel_token.cancelled() => {
677                                debug!("List subscription cancelled for client {}", client_id);
678                                break;
679                            }
680                            result = rx.recv() => {
681                                match result {
682                                    Ok(envelope) => {
683                                        if sub.matches(&envelope.entity, &envelope.key) {
684                                            if client_mgr
685                                                .send_to_client(client_id, envelope.payload.clone())
686                                                .is_err()
687                                            {
688                                                break;
689                                            }
690                                            if let Some(ref m) = metrics_clone {
691                                                m.record_ws_message_sent();
692                                            }
693                                        }
694                                    }
695                                    Err(_) => break,
696                                }
697                            }
698                        }
699                    }
700                }
701                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
702            );
703        }
704    }
705
706    info!(
707        "Client {} subscribed to {} (mode: {:?})",
708        ctx.client_id, view_id, view_spec.mode
709    );
710}
711
712#[cfg(feature = "otel")]
713async fn attach_derived_view_subscription_otel(
714    ctx: &SubscriptionContext<'_>,
715    subscription: Subscription,
716    view_spec: ViewSpec,
717    cancel_token: CancellationToken,
718) {
719    let view_id = &subscription.view;
720    let pipeline_limit = view_spec
721        .pipeline
722        .as_ref()
723        .and_then(|p| p.limit)
724        .unwrap_or(100);
725    let take = subscription.take.unwrap_or(pipeline_limit);
726    let skip = subscription.skip.unwrap_or(0);
727    let is_single = take == 1;
728
729    let source_view_id = match &view_spec.source_view {
730        Some(s) => s.clone(),
731        None => {
732            warn!("Derived view {} has no source_view", view_id);
733            return;
734        }
735    };
736
737    let sorted_caches = ctx.view_index.sorted_caches();
738    let initial_window: Vec<(String, serde_json::Value)> = {
739        let mut caches = sorted_caches.write().await;
740        if let Some(cache) = caches.get_mut(view_id) {
741            cache.get_window(skip, take)
742        } else {
743            warn!("No sorted cache for derived view {}", view_id);
744            vec![]
745        }
746    };
747
748    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
749
750    if !initial_window.is_empty() {
751        let snapshot_entities: Vec<SnapshotEntity> = initial_window
752            .into_iter()
753            .map(|(key, data)| SnapshotEntity { key, data })
754            .collect();
755
756        let batch_config = ctx.entity_cache.snapshot_config();
757        if send_snapshot_batches(
758            ctx.client_id,
759            &snapshot_entities,
760            view_spec.mode,
761            view_id,
762            ctx.client_manager,
763            &batch_config,
764            ctx.metrics.as_ref(),
765        )
766        .await
767        .is_err()
768        {
769            return;
770        }
771    }
772
773    let mut rx = ctx
774        .bus_manager
775        .get_or_create_list_bus(&source_view_id)
776        .await;
777
778    let client_id = ctx.client_id;
779    let client_mgr = ctx.client_manager.clone();
780    let view_id_clone = view_id.clone();
781    let view_id_span = view_id.clone();
782    let sorted_caches_clone = sorted_caches;
783    let metrics_clone = ctx.metrics.clone();
784    let frame_mode = view_spec.mode;
785
786    tokio::spawn(
787        async move {
788            let mut current_window_keys = initial_keys;
789
790            loop {
791                tokio::select! {
792                    _ = cancel_token.cancelled() => {
793                        debug!("Derived view subscription cancelled for client {}", client_id);
794                        break;
795                    }
796                    result = rx.recv() => {
797                        match result {
798                            Ok(_envelope) => {
799                                let new_window: Vec<(String, serde_json::Value)> = {
800                                    let mut caches = sorted_caches_clone.write().await;
801                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
802                                        cache.get_window(skip, take)
803                                    } else {
804                                        continue;
805                                    }
806                                };
807
808                                let new_keys: HashSet<String> =
809                                    new_window.iter().map(|(k, _)| k.clone()).collect();
810
811                                if is_single {
812                                    if let Some((new_key, data)) = new_window.first() {
813                                        for old_key in current_window_keys.difference(&new_keys) {
814                                            let delete_frame = Frame {
815                                                mode: frame_mode,
816                                                export: view_id_clone.clone(),
817                                                op: "delete",
818                                                key: old_key.clone(),
819                                                data: serde_json::Value::Null,
820                                                append: vec![],
821                                            };
822                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
823                                                let payload = Arc::new(Bytes::from(json));
824                                                if client_mgr.send_to_client(client_id, payload).is_err() {
825                                                    return;
826                                                }
827                                                if let Some(ref m) = metrics_clone {
828                                                    m.record_ws_message_sent();
829                                                }
830                                            }
831                                        }
832
833                                        let frame = Frame {
834                                            mode: frame_mode,
835                                            export: view_id_clone.clone(),
836                                            op: "upsert",
837                                            key: new_key.clone(),
838                                            data: data.clone(),
839                                            append: vec![],
840                                        };
841                                        
842                                        if let Ok(json) = serde_json::to_vec(&frame) {
843                                            let payload = Arc::new(Bytes::from(json));
844                                            if client_mgr.send_to_client(client_id, payload).is_err() {
845                                                return;
846                                            }
847                                            if let Some(ref m) = metrics_clone {
848                                                m.record_ws_message_sent();
849                                            }
850                                        }
851                                    }
852                                } else {
853                                    for key in current_window_keys.difference(&new_keys) {
854                                        let delete_frame = Frame {
855                                            mode: frame_mode,
856                                            export: view_id_clone.clone(),
857                                            op: "delete",
858                                            key: key.clone(),
859                                            data: serde_json::Value::Null,
860                                            append: vec![],
861                                        };
862                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
863                                            let payload = Arc::new(Bytes::from(json));
864                                            if client_mgr.send_to_client(client_id, payload).is_err() {
865                                                return;
866                                            }
867                                            if let Some(ref m) = metrics_clone {
868                                                m.record_ws_message_sent();
869                                            }
870                                        }
871                                    }
872
873                                    for (key, data) in &new_window {
874                                        let frame = Frame {
875                                            mode: frame_mode,
876                                            export: view_id_clone.clone(),
877                                            op: "upsert",
878                                            key: key.clone(),
879                                            data: data.clone(),
880                                            append: vec![],
881                                        };
882                                        if let Ok(json) = serde_json::to_vec(&frame) {
883                                            let payload = Arc::new(Bytes::from(json));
884                                            if client_mgr.send_to_client(client_id, payload).is_err() {
885                                                return;
886                                            }
887                                            if let Some(ref m) = metrics_clone {
888                                                m.record_ws_message_sent();
889                                            }
890                                        }
891                                    }
892                                }
893
894                                current_window_keys = new_keys;
895                            }
896                            Err(_) => break,
897                        }
898                    }
899                }
900            }
901        }
902        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
903    );
904
905    info!(
906        "Client {} subscribed to derived view {} (take={}, skip={})",
907        ctx.client_id, view_id, take, skip
908    );
909}
910
911#[cfg(not(feature = "otel"))]
912async fn attach_client_to_bus(
913    ctx: &SubscriptionContext<'_>,
914    subscription: Subscription,
915    cancel_token: CancellationToken,
916) {
917    let view_id = &subscription.view;
918
919    let view_spec = match ctx.view_index.get_view(view_id) {
920        Some(spec) => spec.clone(),
921        None => {
922            warn!("Unknown view ID: {}", view_id);
923            return;
924        }
925    };
926
927    if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
928        warn!("Failed to send subscribed frame: {}", e);
929        return;
930    }
931
932    let is_derived_with_sort = view_spec.is_derived()
933        && view_spec
934            .pipeline
935            .as_ref()
936            .map(|p| p.sort.is_some())
937            .unwrap_or(false);
938
939    if is_derived_with_sort {
940        attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
941        return;
942    }
943
944    match view_spec.mode {
945        Mode::State => {
946            let key = subscription.key.as_deref().unwrap_or("");
947
948            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
949
950            if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
951                let snapshot_entities = vec![SnapshotEntity {
952                    key: key.to_string(),
953                    data: cached_entity,
954                }];
955                let batch_config = ctx.entity_cache.snapshot_config();
956                let _ = send_snapshot_batches(
957                    ctx.client_id,
958                    &snapshot_entities,
959                    view_spec.mode,
960                    view_id,
961                    ctx.client_manager,
962                    &batch_config,
963                )
964                .await;
965                rx.borrow_and_update();
966            } else if !rx.borrow().is_empty() {
967                let data = rx.borrow_and_update().clone();
968                let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
969            }
970
971            let client_id = ctx.client_id;
972            let client_mgr = ctx.client_manager.clone();
973            let view_id_clone = view_id.clone();
974            let key_clone = key.to_string();
975            tokio::spawn(
976                async move {
977                    loop {
978                        tokio::select! {
979                            _ = cancel_token.cancelled() => {
980                                debug!("State subscription cancelled for client {}", client_id);
981                                break;
982                            }
983                            result = rx.changed() => {
984                                if result.is_err() {
985                                    break;
986                                }
987                                let data = rx.borrow().clone();
988                                if client_mgr.send_to_client(client_id, data).is_err() {
989                                    break;
990                                }
991                            }
992                        }
993                    }
994                }
995                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
996            );
997        }
998        Mode::List | Mode::Append => {
999            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1000
1001            let snapshots = ctx.entity_cache.get_all(view_id).await;
1002            let snapshot_entities: Vec<SnapshotEntity> = snapshots
1003                .into_iter()
1004                .filter(|(key, _)| subscription.matches_key(key))
1005                .map(|(key, data)| SnapshotEntity { key, data })
1006                .collect();
1007
1008            if !snapshot_entities.is_empty() {
1009                let batch_config = ctx.entity_cache.snapshot_config();
1010                if send_snapshot_batches(
1011                    ctx.client_id,
1012                    &snapshot_entities,
1013                    view_spec.mode,
1014                    view_id,
1015                    ctx.client_manager,
1016                    &batch_config,
1017                )
1018                .await
1019                .is_err()
1020                {
1021                    return;
1022                }
1023            }
1024
1025            let client_id = ctx.client_id;
1026            let client_mgr = ctx.client_manager.clone();
1027            let sub = subscription.clone();
1028            let view_id_clone = view_id.clone();
1029            let mode = view_spec.mode;
1030            tokio::spawn(
1031                async move {
1032                    loop {
1033                        tokio::select! {
1034                            _ = cancel_token.cancelled() => {
1035                                debug!("List subscription cancelled for client {}", client_id);
1036                                break;
1037                            }
1038                            result = rx.recv() => {
1039                                match result {
1040                                    Ok(envelope) => {
1041                                        if sub.matches(&envelope.entity, &envelope.key)
1042                                            && client_mgr
1043                                                .send_to_client(client_id, envelope.payload.clone())
1044                                                .is_err()
1045                                        {
1046                                            break;
1047                                        }
1048                                    }
1049                                    Err(_) => break,
1050                                }
1051                            }
1052                        }
1053                    }
1054                }
1055                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1056            );
1057        }
1058    }
1059
1060    info!(
1061        "Client {} subscribed to {} (mode: {:?})",
1062        ctx.client_id, view_id, view_spec.mode
1063    );
1064}
1065
1066#[cfg(not(feature = "otel"))]
1067async fn attach_derived_view_subscription(
1068    ctx: &SubscriptionContext<'_>,
1069    subscription: Subscription,
1070    view_spec: ViewSpec,
1071    cancel_token: CancellationToken,
1072) {
1073    let view_id = &subscription.view;
1074    let pipeline_limit = view_spec
1075        .pipeline
1076        .as_ref()
1077        .and_then(|p| p.limit)
1078        .unwrap_or(100);
1079    let take = subscription.take.unwrap_or(pipeline_limit);
1080    let skip = subscription.skip.unwrap_or(0);
1081    let is_single = take == 1;
1082
1083    let source_view_id = match &view_spec.source_view {
1084        Some(s) => s.clone(),
1085        None => {
1086            warn!("Derived view {} has no source_view", view_id);
1087            return;
1088        }
1089    };
1090
1091    let sorted_caches = ctx.view_index.sorted_caches();
1092    let initial_window: Vec<(String, serde_json::Value)> = {
1093        let mut caches = sorted_caches.write().await;
1094        if let Some(cache) = caches.get_mut(view_id) {
1095            cache.get_window(skip, take)
1096        } else {
1097            warn!("No sorted cache for derived view {}", view_id);
1098            vec![]
1099        }
1100    };
1101
1102    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1103
1104    if !initial_window.is_empty() {
1105        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1106            .into_iter()
1107            .map(|(key, data)| SnapshotEntity { key, data })
1108            .collect();
1109
1110        let batch_config = ctx.entity_cache.snapshot_config();
1111        if send_snapshot_batches(
1112            ctx.client_id,
1113            &snapshot_entities,
1114            view_spec.mode,
1115            view_id,
1116            ctx.client_manager,
1117            &batch_config,
1118        )
1119        .await
1120        .is_err()
1121        {
1122            return;
1123        }
1124    }
1125
1126    let mut rx = ctx
1127        .bus_manager
1128        .get_or_create_list_bus(&source_view_id)
1129        .await;
1130
1131    let client_id = ctx.client_id;
1132    let client_mgr = ctx.client_manager.clone();
1133    let view_id_clone = view_id.clone();
1134    let view_id_span = view_id.clone();
1135    let sorted_caches_clone = sorted_caches;
1136    let frame_mode = view_spec.mode;
1137
1138    tokio::spawn(
1139        async move {
1140            let mut current_window_keys = initial_keys;
1141
1142            loop {
1143                tokio::select! {
1144                    _ = cancel_token.cancelled() => {
1145                        debug!("Derived view subscription cancelled for client {}", client_id);
1146                        break;
1147                    }
1148                    result = rx.recv() => {
1149                        match result {
1150                            Ok(_envelope) => {
1151                                let new_window: Vec<(String, serde_json::Value)> = {
1152                                    let mut caches = sorted_caches_clone.write().await;
1153                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1154                                        cache.get_window(skip, take)
1155                                    } else {
1156                                        continue;
1157                                    }
1158                                };
1159
1160                                let new_keys: HashSet<String> =
1161                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1162
1163                                if is_single {
1164                                    if let Some((new_key, data)) = new_window.first() {
1165                                        for old_key in current_window_keys.difference(&new_keys) {
1166                                            let delete_frame = Frame {
1167                                                mode: frame_mode,
1168                                                export: view_id_clone.clone(),
1169                                                op: "delete",
1170                                                key: old_key.clone(),
1171                                                data: serde_json::Value::Null,
1172                                                append: vec![],
1173                                            };
1174                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1175                                                let payload = Arc::new(Bytes::from(json));
1176                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1177                                                    return;
1178                                                }
1179                                            }
1180                                        }
1181
1182                                        let frame = Frame {
1183                                            mode: frame_mode,
1184                                            export: view_id_clone.clone(),
1185                                            op: "upsert",
1186                                            key: new_key.clone(),
1187                                            data: data.clone(),
1188                                            append: vec![],
1189                                        };
1190                                        if let Ok(json) = serde_json::to_vec(&frame) {
1191                                            let payload = Arc::new(Bytes::from(json));
1192                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1193                                                return;
1194                                            }
1195                                        }
1196                                    }
1197                                } else {
1198                                    for key in current_window_keys.difference(&new_keys) {
1199                                        let delete_frame = Frame {
1200                                            mode: frame_mode,
1201                                            export: view_id_clone.clone(),
1202                                            op: "delete",
1203                                            key: key.clone(),
1204                                            data: serde_json::Value::Null,
1205                                            append: vec![],
1206                                        };
1207                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1208                                            let payload = Arc::new(Bytes::from(json));
1209                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1210                                                return;
1211                                            }
1212                                        }
1213                                    }
1214
1215                                    for (key, data) in &new_window {
1216                                        let frame = Frame {
1217                                            mode: frame_mode,
1218                                            export: view_id_clone.clone(),
1219                                            op: "upsert",
1220                                            key: key.clone(),
1221                                            data: data.clone(),
1222                                            append: vec![],
1223                                        };
1224                                        if let Ok(json) = serde_json::to_vec(&frame) {
1225                                            let payload = Arc::new(Bytes::from(json));
1226                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1227                                                return;
1228                                            }
1229                                        }
1230                                    }
1231                                }
1232
1233                                current_window_keys = new_keys;
1234                            }
1235                            Err(_) => break,
1236                        }
1237                    }
1238                }
1239            }
1240        }
1241        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1242    );
1243
1244    info!(
1245        "Client {} subscribed to derived view {} (take={}, skip={})",
1246        ctx.client_id, view_id, take, skip
1247    );
1248}