Skip to main content

hyperstack_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::auth::{
6    AuthContext, AuthDecision, AuthDeny, ConnectionAuthRequest, WebSocketAuthPlugin,
7};
8use crate::websocket::client_manager::{ClientManager, RateLimitConfig};
9use crate::websocket::frame::{
10    transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
11    SortOrder, SubscribedFrame,
12};
13use crate::websocket::subscription::{
14    ClientMessage, RefreshAuthRequest, RefreshAuthResponse, SocketIssueMessage, Subscription,
15};
16use crate::websocket::usage::{WebSocketUsageEmitter, WebSocketUsageEvent};
17use anyhow::Result;
18use bytes::Bytes;
19use futures_util::StreamExt;
20use std::collections::{HashMap, HashSet};
21use std::net::SocketAddr;
22use std::sync::Arc;
23#[cfg(feature = "otel")]
24use std::time::Instant;
25
26use tokio::net::{TcpListener, TcpStream};
27use tokio_tungstenite::{
28    accept_hdr_async,
29    tungstenite::{
30        handshake::server::{ErrorResponse as HandshakeErrorResponse, Request, Response},
31        http::{header::CONTENT_TYPE, StatusCode},
32        Error as WsError,
33    },
34};
35
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, info_span, warn, Instrument};
38use uuid::Uuid;
39
40#[cfg(feature = "otel")]
41use crate::metrics::Metrics;
42
43/// Helper function to handle refresh_auth messages
44async fn handle_refresh_auth(
45    client_id: Uuid,
46    refresh_req: &RefreshAuthRequest,
47    client_manager: &ClientManager,
48    auth_plugin: &Arc<dyn WebSocketAuthPlugin>,
49) {
50    // Try to verify the new token using the auth plugin
51    // We need to downcast to SignedSessionAuthPlugin to use verify_refresh_token
52    let refresh_result: Result<AuthContext, String> = {
53        // Try to downcast to SignedSessionAuthPlugin
54        if let Some(signed_plugin) = auth_plugin
55            .as_any()
56            .downcast_ref::<crate::websocket::auth::SignedSessionAuthPlugin>(
57        ) {
58            signed_plugin
59                .verify_refresh_token(&refresh_req.token)
60                .await
61                .map_err(|e| e.reason)
62        } else {
63            Err("In-band auth refresh not supported with current auth plugin".to_string())
64        }
65    };
66
67    match refresh_result {
68        Ok(new_context) => {
69            let expires_at = new_context.expires_at;
70            if client_manager.update_client_auth(client_id, new_context) {
71                info!(
72                    "Client {} refreshed auth successfully, expires at {}",
73                    client_id, expires_at
74                );
75
76                // Send success response
77                let response = RefreshAuthResponse {
78                    success: true,
79                    error: None,
80                    expires_at: Some(expires_at),
81                };
82                if let Ok(json) = serde_json::to_string(&response) {
83                    let _ = client_manager.send_text_to_client(client_id, json).await;
84                }
85            } else {
86                warn!("Client {} not found when refreshing auth", client_id);
87
88                // Send failure response - client not found
89                let response = RefreshAuthResponse {
90                    success: false,
91                    error: Some("client-not-found".to_string()),
92                    expires_at: None,
93                };
94                if let Ok(json) = serde_json::to_string(&response) {
95                    let _ = client_manager.send_text_to_client(client_id, json).await;
96                }
97            }
98        }
99        Err(err) => {
100            warn!("Client {} auth refresh failed: {}", client_id, err);
101
102            // Send failure response with machine-readable error code
103            let error_code = if err.contains("expired") {
104                "token-expired"
105            } else if err.contains("signature") {
106                "token-invalid-signature"
107            } else if err.contains("issuer") {
108                "token-invalid-issuer"
109            } else if err.contains("audience") {
110                "token-invalid-audience"
111            } else {
112                "token-invalid"
113            };
114
115            let response = RefreshAuthResponse {
116                success: false,
117                error: Some(error_code.to_string()),
118                expires_at: None,
119            };
120            if let Ok(json) = serde_json::to_string(&response) {
121                let _ = client_manager.send_text_to_client(client_id, json).await;
122            }
123        }
124    }
125}
126
127async fn send_socket_issue(
128    client_id: Uuid,
129    client_manager: &ClientManager,
130    deny: &AuthDeny,
131    fatal: bool,
132) {
133    let message = SocketIssueMessage::from_auth_deny(deny, fatal);
134    match serde_json::to_string(&message) {
135        Ok(json) => {
136            let _ = client_manager.send_text_to_client(client_id, json).await;
137        }
138        Err(error) => {
139            warn!(error = %error, client_id = %client_id, "failed to serialize socket issue message");
140        }
141    }
142}
143
144fn auth_deny_from_subscription_error(reason: &str) -> Option<AuthDeny> {
145    if reason.starts_with("Snapshot limit exceeded:") {
146        Some(AuthDeny::new(
147            crate::websocket::auth::AuthErrorCode::SnapshotLimitExceeded,
148            reason,
149        ))
150    } else {
151        None
152    }
153}
154
155fn key_class_label(key_class: hyperstack_auth::KeyClass) -> &'static str {
156    match key_class {
157        hyperstack_auth::KeyClass::Secret => "secret",
158        hyperstack_auth::KeyClass::Publishable => "publishable",
159    }
160}
161
162fn emit_usage_event(
163    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
164    event: WebSocketUsageEvent,
165) {
166    if let Some(emitter) = usage_emitter.clone() {
167        tokio::spawn(async move {
168            emitter.emit(event).await;
169        });
170    }
171}
172
173fn usage_identity(
174    auth_context: Option<&AuthContext>,
175) -> (
176    Option<String>,
177    Option<String>,
178    Option<String>,
179    Option<String>,
180) {
181    match auth_context {
182        Some(ctx) => (
183            Some(ctx.metering_key.clone()),
184            Some(ctx.subject.clone()),
185            Some(key_class_label(ctx.key_class).to_string()),
186            ctx.deployment_id.clone(),
187        ),
188        None => (None, None, None, None),
189    }
190}
191
192fn emit_update_sent_for_client(
193    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
194    client_manager: &ClientManager,
195    client_id: Uuid,
196    view_id: &str,
197    bytes: usize,
198) {
199    let auth_context = client_manager.get_auth_context(client_id);
200    let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
201    emit_usage_event(
202        usage_emitter,
203        WebSocketUsageEvent::UpdateSent {
204            client_id: client_id.to_string(),
205            deployment_id,
206            metering_key,
207            subject,
208            view_id: view_id.to_string(),
209            messages: 1,
210            bytes: bytes as u64,
211        },
212    );
213}
214
215struct SubscriptionContext<'a> {
216    client_id: Uuid,
217    client_manager: &'a ClientManager,
218    bus_manager: &'a BusManager,
219    entity_cache: &'a EntityCache,
220    view_index: &'a ViewIndex,
221    usage_emitter: &'a Option<Arc<dyn WebSocketUsageEmitter>>,
222    #[cfg(feature = "otel")]
223    metrics: Option<Arc<Metrics>>,
224}
225
226pub struct WebSocketServer {
227    bind_addr: SocketAddr,
228    client_manager: ClientManager,
229    bus_manager: BusManager,
230    entity_cache: EntityCache,
231    view_index: Arc<ViewIndex>,
232    max_clients: usize,
233    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
234    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
235    rate_limit_config: Option<RateLimitConfig>,
236    #[cfg(feature = "otel")]
237    metrics: Option<Arc<Metrics>>,
238}
239
240impl WebSocketServer {
241    #[cfg(feature = "otel")]
242    pub fn new(
243        bind_addr: SocketAddr,
244        bus_manager: BusManager,
245        entity_cache: EntityCache,
246        view_index: Arc<ViewIndex>,
247        metrics: Option<Arc<Metrics>>,
248    ) -> Self {
249        Self {
250            bind_addr,
251            client_manager: ClientManager::new(),
252            bus_manager,
253            entity_cache,
254            view_index,
255            max_clients: 10000,
256            auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
257            usage_emitter: None,
258            rate_limit_config: None,
259            metrics,
260        }
261    }
262
263    #[cfg(not(feature = "otel"))]
264    pub fn new(
265        bind_addr: SocketAddr,
266        bus_manager: BusManager,
267        entity_cache: EntityCache,
268        view_index: Arc<ViewIndex>,
269    ) -> Self {
270        Self {
271            bind_addr,
272            client_manager: ClientManager::new(),
273            bus_manager,
274            entity_cache,
275            view_index,
276            max_clients: 10000,
277            auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
278            usage_emitter: None,
279            rate_limit_config: None,
280        }
281    }
282
283    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
284        self.max_clients = max_clients;
285        self
286    }
287
288    pub fn with_auth_plugin(mut self, auth_plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
289        self.auth_plugin = auth_plugin;
290        self
291    }
292
293    pub fn with_usage_emitter(mut self, usage_emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
294        self.usage_emitter = Some(usage_emitter);
295        self
296    }
297
298    /// Configure rate limiting for the WebSocket server.
299    ///
300    /// This allows setting global rate limits that apply to all connections,
301    /// such as maximum connections per IP, timeouts, and rate windows.
302    /// Per-subject limits are controlled via AuthContext.Limits from the auth token.
303    pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
304        self.rate_limit_config = Some(config);
305        self
306    }
307
308    pub async fn start(self) -> Result<()> {
309        info!(
310            "Starting WebSocket server on {} (max_clients: {})",
311            self.bind_addr, self.max_clients
312        );
313
314        let listener = TcpListener::bind(&self.bind_addr).await?;
315        info!("WebSocket server listening on {}", self.bind_addr);
316
317        // Apply rate limit configuration if provided
318        let client_manager = if let Some(config) = self.rate_limit_config {
319            ClientManager::with_config(config)
320        } else {
321            self.client_manager
322        };
323
324        client_manager.start_cleanup_task();
325
326        loop {
327            match listener.accept().await {
328                Ok((stream, addr)) => {
329                    let client_count = client_manager.client_count();
330                    if client_count >= self.max_clients {
331                        warn!(
332                            "Rejecting connection from {} - max clients ({}) reached",
333                            addr, self.max_clients
334                        );
335                        drop(stream);
336                        continue;
337                    }
338
339                    info!(
340                        "New WebSocket connection from {} ({}/{} clients)",
341                        addr,
342                        client_count + 1,
343                        self.max_clients
344                    );
345                    let client_manager = client_manager.clone();
346                    let bus_manager = self.bus_manager.clone();
347                    let entity_cache = self.entity_cache.clone();
348                    let view_index = self.view_index.clone();
349                    #[cfg(feature = "otel")]
350                    let metrics = self.metrics.clone();
351
352                    let auth_plugin = self.auth_plugin.clone();
353                    let usage_emitter = self.usage_emitter.clone();
354
355                    tokio::spawn(
356                        async move {
357                            #[cfg(feature = "otel")]
358                            let result = handle_connection(
359                                stream,
360                                client_manager,
361                                bus_manager,
362                                entity_cache,
363                                view_index,
364                                addr,
365                                auth_plugin,
366                                usage_emitter,
367                                metrics,
368                            )
369                            .await;
370                            #[cfg(not(feature = "otel"))]
371                            let result = handle_connection(
372                                stream,
373                                client_manager,
374                                bus_manager,
375                                entity_cache,
376                                view_index,
377                                addr,
378                                auth_plugin,
379                                usage_emitter,
380                            )
381                            .await;
382
383                            if let Err(e) = result {
384                                error!("WebSocket connection error: {}", e);
385                            }
386                        }
387                        .instrument(info_span!("ws.connection", %addr)),
388                    );
389                }
390                Err(e) => {
391                    error!("Failed to accept connection: {}", e);
392                }
393            }
394        }
395    }
396}
397
398#[derive(Debug, Clone)]
399struct HandshakeReject {
400    status: StatusCode,
401    body: crate::websocket::auth::ErrorResponse,
402    error_code: String,
403    retry_after_secs: Option<u64>,
404}
405
406impl HandshakeReject {
407    fn from_deny(deny: &AuthDeny) -> Self {
408        let retry_after_secs = match deny.retry_policy {
409            crate::websocket::auth::RetryPolicy::RetryAfter(duration) => Some(duration.as_secs()),
410            _ => None,
411        };
412
413        Self {
414            status: StatusCode::from_u16(deny.http_status).unwrap_or(StatusCode::UNAUTHORIZED),
415            body: deny.to_error_response(),
416            error_code: deny.code.to_string(),
417            retry_after_secs,
418        }
419    }
420}
421
422fn build_handshake_error_response(
423    response: &Response,
424    reject: &HandshakeReject,
425) -> HandshakeErrorResponse {
426    let mut builder = Response::builder()
427        .status(reject.status)
428        .version(response.version())
429        .header(CONTENT_TYPE, "application/json; charset=utf-8")
430        .header("X-Error-Code", &reject.error_code)
431        .header("Cache-Control", "no-store");
432
433    if let Some(retry_after_secs) = reject.retry_after_secs {
434        builder = builder.header("Retry-After", retry_after_secs.to_string());
435    }
436
437    let body = serde_json::to_string(&reject.body).unwrap_or_else(|_| {
438        format!(
439            r#"{{"error":"{}","message":"{}","code":"{}","retryable":false}}"#,
440            reject.body.error, reject.body.message, reject.body.code
441        )
442    });
443
444    builder
445        .body(Some(body))
446        .expect("handshake rejection response should build")
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::websocket::auth::{AuthDeny, AuthErrorCode};
453    use std::time::Duration;
454
455    #[test]
456    fn handshake_error_response_serializes_json_and_retry_after() {
457        let response = Response::builder()
458            .status(StatusCode::SWITCHING_PROTOCOLS)
459            .body(())
460            .unwrap();
461        let deny = AuthDeny::rate_limited(Duration::from_secs(7), "websocket handshakes");
462        let reject = HandshakeReject::from_deny(&deny);
463
464        let handshake_response = build_handshake_error_response(&response, &reject);
465        assert_eq!(handshake_response.status(), StatusCode::TOO_MANY_REQUESTS);
466        assert_eq!(
467            handshake_response.headers().get("X-Error-Code").unwrap(),
468            "rate-limit-exceeded"
469        );
470        assert_eq!(
471            handshake_response.headers().get("Retry-After").unwrap(),
472            "7"
473        );
474
475        let body = handshake_response.into_body().unwrap();
476        assert!(body.contains("rate-limit-exceeded"));
477        assert!(body.contains("retryable"));
478    }
479
480    #[test]
481    fn handshake_error_response_preserves_non_retryable_auth_denies() {
482        let response = Response::builder()
483            .status(StatusCode::SWITCHING_PROTOCOLS)
484            .body(())
485            .unwrap();
486        let deny = AuthDeny::new(AuthErrorCode::OriginMismatch, "origin mismatch");
487        let reject = HandshakeReject::from_deny(&deny);
488
489        let handshake_response = build_handshake_error_response(&response, &reject);
490        assert_eq!(handshake_response.status(), StatusCode::FORBIDDEN);
491        assert!(handshake_response.headers().get("Retry-After").is_none());
492    }
493}
494
495#[allow(clippy::result_large_err)]
496async fn accept_authorized_connection(
497    stream: TcpStream,
498    remote_addr: SocketAddr,
499    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
500    client_manager: ClientManager,
501) -> Result<Option<(tokio_tungstenite::WebSocketStream<TcpStream>, AuthContext)>> {
502    use std::sync::Mutex;
503
504    let auth_result_capture: Arc<Mutex<Option<Result<AuthContext, HandshakeReject>>>> =
505        Arc::new(Mutex::new(None));
506    let auth_result_ref = auth_result_capture.clone();
507    let auth_plugin_ref = auth_plugin.clone();
508    let client_manager_for_auth = client_manager.clone();
509
510    let handshake_result = accept_hdr_async(stream, move |request: &Request, response| {
511        let connection_request = ConnectionAuthRequest::from_http_request(remote_addr, request);
512
513        let auth_result = tokio::task::block_in_place(|| {
514            tokio::runtime::Handle::current().block_on(async {
515                match auth_plugin_ref.authorize(&connection_request).await {
516                    AuthDecision::Allow(ctx) => {
517                        match client_manager_for_auth
518                            .check_connection_allowed(remote_addr, &Some(ctx.clone()))
519                            .await
520                        {
521                            Ok(()) => Ok(ctx),
522                            Err(deny) => Err(HandshakeReject::from_deny(&deny)),
523                        }
524                    }
525                    AuthDecision::Deny(deny) => Err(HandshakeReject::from_deny(&deny)),
526                }
527            })
528        });
529
530        let mut capture_lock = auth_result_ref.lock().expect("capture lock poisoned");
531        *capture_lock = Some(auth_result.clone());
532
533        match auth_result {
534            Ok(_) => Ok(response),
535            Err(reject) => Err(build_handshake_error_response(&response, &reject)),
536        }
537    })
538    .await;
539
540    let auth_result = {
541        let mut guard = auth_result_capture.lock().expect("capture lock poisoned");
542        guard.take()
543    };
544
545    match handshake_result {
546        Ok(ws_stream) => match auth_result {
547            Some(Ok(ctx)) => {
548                info!("WebSocket connection authorized for {}", remote_addr);
549                Ok(Some((ws_stream, ctx)))
550            }
551            Some(Err(reject)) => Err(anyhow::anyhow!(
552                "handshake unexpectedly succeeded after rejection: {}",
553                reject.body.message
554            )),
555            None => Err(anyhow::anyhow!(
556                "no auth result captured for authorized connection {}",
557                remote_addr
558            )),
559        },
560        Err(WsError::Http(_)) => {
561            match auth_result {
562                Some(Err(reject)) => {
563                    warn!(
564                        "WebSocket connection rejected during handshake for {}: {}",
565                        remote_addr, reject.body.message
566                    );
567                }
568                Some(Ok(_)) => {
569                    warn!(
570                        "WebSocket handshake failed after auth success for {}",
571                        remote_addr
572                    );
573                }
574                None => {
575                    warn!(
576                        "WebSocket handshake rejected for {} without captured auth result",
577                        remote_addr
578                    );
579                }
580            }
581            Ok(None)
582        }
583        Err(err) => Err(err.into()),
584    }
585}
586
587#[cfg(feature = "otel")]
588async fn handle_connection(
589    stream: TcpStream,
590    client_manager: ClientManager,
591    bus_manager: BusManager,
592    entity_cache: EntityCache,
593    view_index: Arc<ViewIndex>,
594    remote_addr: std::net::SocketAddr,
595    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
596    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
597    metrics: Option<Arc<Metrics>>,
598) -> Result<()> {
599    let Some((ws_stream, auth_context)) = accept_authorized_connection(
600        stream,
601        remote_addr,
602        auth_plugin.clone(),
603        client_manager.clone(),
604    )
605    .await?
606    else {
607        return Ok(());
608    };
609
610    let client_id = Uuid::new_v4();
611    let connection_start = Instant::now();
612
613    let auth_context = Some(auth_context);
614    let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
615        usage_identity(auth_context.as_ref());
616
617    // Extract metering key from auth context for metrics attribution
618    let metering_key = auth_context.as_ref().map(|ctx| ctx.metering_key.clone());
619
620    if let Some(ref m) = metrics {
621        if let Some(ref mk) = metering_key {
622            m.record_ws_connection_with_metering(mk);
623        } else {
624            m.record_ws_connection();
625        }
626    }
627
628    info!("WebSocket connection established for client {}", client_id);
629
630    emit_usage_event(
631        &usage_emitter,
632        WebSocketUsageEvent::ConnectionEstablished {
633            client_id: client_id.to_string(),
634            remote_addr: remote_addr.to_string(),
635            deployment_id: usage_deployment_id.clone(),
636            metering_key: usage_metering_key.clone(),
637            subject: usage_subject.clone(),
638            key_class: usage_key_class,
639        },
640    );
641
642    let (ws_sender, mut ws_receiver) = ws_stream.split();
643
644    // Add client with auth context and IP tracking
645    client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
646
647    let ctx = SubscriptionContext {
648        client_id,
649        client_manager: &client_manager,
650        bus_manager: &bus_manager,
651        entity_cache: &entity_cache,
652        view_index: &view_index,
653        usage_emitter: &usage_emitter,
654        metrics: metrics.clone(),
655    };
656
657    let mut active_subscriptions: HashMap<String, String> = HashMap::new();
658
659    loop {
660        tokio::select! {
661            ws_msg = ws_receiver.next() => {
662                match ws_msg {
663                    Some(Ok(msg)) => {
664                        if msg.is_close() {
665                            info!("Client {} requested close", client_id);
666                            break;
667                        }
668
669                        client_manager.update_client_last_seen(client_id);
670
671                        if msg.is_text() {
672                            if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
673                                warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
674                                send_socket_issue(client_id, &client_manager, &deny, true).await;
675                                break;
676                            }
677
678                            if let Some(ref m) = metrics {
679                                if let Some(ref mk) = metering_key {
680                                    m.record_ws_message_received_with_metering(mk);
681                                } else {
682                                    m.record_ws_message_received();
683                                }
684                            }
685
686                            if let Ok(text) = msg.to_text() {
687                                debug!("Received text message from client {}: {}", client_id, text);
688
689                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
690                                    match client_msg {
691                                        ClientMessage::Subscribe(subscription) => {
692                                            let view_id = subscription.view.clone();
693                                            let sub_key = subscription.sub_key();
694
695                                            // Check subscription limits
696                                            if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
697                                                warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
698                                                send_socket_issue(client_id, &client_manager, &deny, false).await;
699                                                continue;
700                                            }
701
702                                            client_manager.update_subscription(client_id, subscription.clone());
703
704                                            let cancel_token = CancellationToken::new();
705                                            let is_new = client_manager.add_client_subscription(
706                                                client_id,
707                                                sub_key.clone(),
708                                                cancel_token.clone(),
709                                            ).await;
710
711                                            if !is_new {
712                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
713                                                continue;
714                                            }
715
716                                            if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
717                                                warn!(
718                                                    "Subscription rejected for client {} on {}: {}",
719                                                    client_id, view_id, err
720                                                );
721                                                if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
722                                                    send_socket_issue(client_id, &client_manager, &deny, false).await;
723                                                }
724                                                let _ = client_manager
725                                                    .remove_client_subscription(client_id, &sub_key)
726                                                    .await;
727                                                continue;
728                                            }
729
730                                            if let Some(ref m) = metrics {
731                                                if let Some(ref mk) = metering_key {
732                                                    m.record_subscription_created_with_metering(&view_id, mk);
733                                                } else {
734                                                    m.record_subscription_created(&view_id);
735                                                }
736                                            }
737                                            active_subscriptions.insert(sub_key, view_id.clone());
738                                            emit_usage_event(
739                                                &usage_emitter,
740                                                WebSocketUsageEvent::SubscriptionCreated {
741                                                    client_id: client_id.to_string(),
742                                                    deployment_id: usage_deployment_id.clone(),
743                                                    metering_key: usage_metering_key.clone(),
744                                                    subject: usage_subject.clone(),
745                                                    view_id,
746                                                },
747                                            );
748                                        }
749                                        ClientMessage::Unsubscribe(unsub) => {
750                                            let sub_key = unsub.sub_key();
751                                            let removed = client_manager
752                                                .remove_client_subscription(client_id, &sub_key)
753                                                .await;
754
755                                            if removed {
756                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
757                                                active_subscriptions.remove(&sub_key);
758                                                if let Some(ref m) = metrics {
759                                                    if let Some(ref mk) = metering_key {
760                                                        m.record_subscription_removed_with_metering(&unsub.view, mk);
761                                                    } else {
762                                                        m.record_subscription_removed(&unsub.view);
763                                                    }
764                                                }
765                                                emit_usage_event(
766                                                    &usage_emitter,
767                                                    WebSocketUsageEvent::SubscriptionRemoved {
768                                                        client_id: client_id.to_string(),
769                                                        deployment_id: usage_deployment_id.clone(),
770                                                        metering_key: usage_metering_key.clone(),
771                                                        subject: usage_subject.clone(),
772                                                        view_id: unsub.view.clone(),
773                                                    },
774                                                );
775                                            }
776                                        }
777                                        ClientMessage::Ping => {
778                                            debug!("Received ping from client {}", client_id);
779                                        }
780                                        ClientMessage::RefreshAuth(refresh_req) => {
781                                            debug!("Received refresh_auth from client {}", client_id);
782                                            handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
783                                        }
784                                    }
785                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
786                                    let view_id = subscription.view.clone();
787                                    let sub_key = subscription.sub_key();
788
789                                    if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
790                                        warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
791                                        send_socket_issue(client_id, &client_manager, &deny, false).await;
792                                        continue;
793                                    }
794
795                                    client_manager.update_subscription(client_id, subscription.clone());
796
797                                    let cancel_token = CancellationToken::new();
798                                    let is_new = client_manager.add_client_subscription(
799                                        client_id,
800                                        sub_key.clone(),
801                                        cancel_token.clone(),
802                                    ).await;
803
804                                    if !is_new {
805                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
806                                        continue;
807                                    }
808
809                                    if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
810                                        warn!(
811                                            "Subscription rejected for client {} on {}: {}",
812                                            client_id, view_id, err
813                                        );
814                                        if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
815                                            send_socket_issue(client_id, &client_manager, &deny, false).await;
816                                        }
817                                        let _ = client_manager
818                                            .remove_client_subscription(client_id, &sub_key)
819                                            .await;
820                                        continue;
821                                    }
822
823                                    if let Some(ref m) = metrics {
824                                        if let Some(ref mk) = metering_key {
825                                            m.record_subscription_created_with_metering(&view_id, mk);
826                                        } else {
827                                            m.record_subscription_created(&view_id);
828                                        }
829                                    }
830                                    active_subscriptions.insert(sub_key, view_id.clone());
831                                    emit_usage_event(
832                                        &usage_emitter,
833                                        WebSocketUsageEvent::SubscriptionCreated {
834                                            client_id: client_id.to_string(),
835                                            deployment_id: usage_deployment_id.clone(),
836                                            metering_key: usage_metering_key.clone(),
837                                            subject: usage_subject.clone(),
838                                            view_id,
839                                        },
840                                    );
841                                } else {
842                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
843                                }
844                            }
845                        }
846                    }
847                    Some(Err(e)) => {
848                        warn!("WebSocket error for client {}: {}", client_id, e);
849                        break;
850                    }
851                    None => {
852                        debug!("WebSocket stream ended for client {}", client_id);
853                        break;
854                    }
855                }
856            }
857        }
858    }
859
860    client_manager
861        .cancel_all_client_subscriptions(client_id)
862        .await;
863    client_manager.remove_client(client_id);
864    if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
865        rate_limiter.remove_client_buckets(client_id).await;
866    }
867
868    if let Some(ref m) = metrics {
869        let duration_secs = connection_start.elapsed().as_secs_f64();
870        if let Some(ref mk) = metering_key {
871            m.record_ws_disconnection_with_metering(duration_secs, mk);
872            for view_id in active_subscriptions.values() {
873                m.record_subscription_removed_with_metering(view_id, mk);
874            }
875        } else {
876            m.record_ws_disconnection(duration_secs);
877            for view_id in active_subscriptions.values() {
878                m.record_subscription_removed(view_id);
879            }
880        }
881    }
882
883    for view_id in active_subscriptions.values() {
884        emit_usage_event(
885            &usage_emitter,
886            WebSocketUsageEvent::SubscriptionRemoved {
887                client_id: client_id.to_string(),
888                deployment_id: usage_deployment_id.clone(),
889                metering_key: usage_metering_key.clone(),
890                subject: usage_subject.clone(),
891                view_id: view_id.clone(),
892            },
893        );
894    }
895
896    emit_usage_event(
897        &usage_emitter,
898        WebSocketUsageEvent::ConnectionClosed {
899            client_id: client_id.to_string(),
900            deployment_id: usage_deployment_id,
901            metering_key: usage_metering_key,
902            subject: usage_subject,
903            duration_secs: Some(connection_start.elapsed().as_secs_f64()),
904            subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
905        },
906    );
907
908    info!("Client {} disconnected", client_id);
909
910    Ok(())
911}
912
913#[cfg(not(feature = "otel"))]
914#[allow(clippy::too_many_arguments)]
915async fn handle_connection(
916    stream: TcpStream,
917    client_manager: ClientManager,
918    bus_manager: BusManager,
919    entity_cache: EntityCache,
920    view_index: Arc<ViewIndex>,
921    remote_addr: std::net::SocketAddr,
922    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
923    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
924) -> Result<()> {
925    let Some((ws_stream, auth_context)) = accept_authorized_connection(
926        stream,
927        remote_addr,
928        auth_plugin.clone(),
929        client_manager.clone(),
930    )
931    .await?
932    else {
933        return Ok(());
934    };
935
936    let client_id = Uuid::new_v4();
937    let auth_context_ref = Some(&auth_context);
938    let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
939        usage_identity(auth_context_ref);
940
941    let auth_context = Some(auth_context);
942
943    info!("WebSocket connection established for client {}", client_id);
944
945    emit_usage_event(
946        &usage_emitter,
947        WebSocketUsageEvent::ConnectionEstablished {
948            client_id: client_id.to_string(),
949            remote_addr: remote_addr.to_string(),
950            deployment_id: usage_deployment_id.clone(),
951            metering_key: usage_metering_key.clone(),
952            subject: usage_subject.clone(),
953            key_class: usage_key_class,
954        },
955    );
956
957    let (ws_sender, mut ws_receiver) = ws_stream.split();
958
959    // Add client with auth context and IP tracking
960    client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
961
962    let ctx = SubscriptionContext {
963        client_id,
964        client_manager: &client_manager,
965        bus_manager: &bus_manager,
966        entity_cache: &entity_cache,
967        view_index: &view_index,
968        usage_emitter: &usage_emitter,
969    };
970
971    let mut active_subscriptions: HashMap<String, String> = HashMap::new();
972
973    loop {
974        tokio::select! {
975            ws_msg = ws_receiver.next() => {
976                match ws_msg {
977                    Some(Ok(msg)) => {
978                        if msg.is_close() {
979                            info!("Client {} requested close", client_id);
980                            break;
981                        }
982
983                        client_manager.update_client_last_seen(client_id);
984
985                        if msg.is_text() {
986                            if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
987                                warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
988                                send_socket_issue(client_id, &client_manager, &deny, true).await;
989                                break;
990                            }
991
992                            if let Ok(text) = msg.to_text() {
993                                debug!("Received text message from client {}: {}", client_id, text);
994
995                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
996                                    match client_msg {
997                                        ClientMessage::Subscribe(subscription) => {
998                                            let view_id = subscription.view.clone();
999                                            if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1000                                                warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1001                                                send_socket_issue(client_id, &client_manager, &deny, false).await;
1002                                                continue;
1003                                            }
1004
1005                                            let sub_key = subscription.sub_key();
1006                                            client_manager.update_subscription(client_id, subscription.clone());
1007
1008                                            let cancel_token = CancellationToken::new();
1009                                            let is_new = client_manager.add_client_subscription(
1010                                                client_id,
1011                                                sub_key.clone(),
1012                                                cancel_token.clone(),
1013                                            ).await;
1014
1015                                            if !is_new {
1016                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1017                                                continue;
1018                                            }
1019
1020                                            if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1021                                                warn!(
1022                                                    "Subscription rejected for client {} on {}: {}",
1023                                                    client_id,
1024                                                    sub_key,
1025                                                    err
1026                                                );
1027                                                if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1028                                                    send_socket_issue(client_id, &client_manager, &deny, false).await;
1029                                                }
1030                                                let _ = client_manager
1031                                                    .remove_client_subscription(client_id, &sub_key)
1032                                                    .await;
1033                                            } else {
1034                                                active_subscriptions.insert(sub_key, view_id.clone());
1035                                                emit_usage_event(
1036                                                    &usage_emitter,
1037                                                    WebSocketUsageEvent::SubscriptionCreated {
1038                                                        client_id: client_id.to_string(),
1039                                                        deployment_id: usage_deployment_id.clone(),
1040                                                        metering_key: usage_metering_key.clone(),
1041                                                        subject: usage_subject.clone(),
1042                                                        view_id,
1043                                                    },
1044                                                );
1045                                            }
1046                                        }
1047                                        ClientMessage::Unsubscribe(unsub) => {
1048                                            let sub_key = unsub.sub_key();
1049                                            let removed = client_manager
1050                                                .remove_client_subscription(client_id, &sub_key)
1051                                                .await;
1052
1053                                            if removed {
1054                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
1055                                                active_subscriptions.remove(&sub_key);
1056                                                emit_usage_event(
1057                                                    &usage_emitter,
1058                                                    WebSocketUsageEvent::SubscriptionRemoved {
1059                                                        client_id: client_id.to_string(),
1060                                                        deployment_id: usage_deployment_id.clone(),
1061                                                        metering_key: usage_metering_key.clone(),
1062                                                        subject: usage_subject.clone(),
1063                                                        view_id: unsub.view.clone(),
1064                                                    },
1065                                                );
1066                                            }
1067                                        }
1068                                        ClientMessage::Ping => {
1069                                            debug!("Received ping from client {}", client_id);
1070                                        }
1071                                        ClientMessage::RefreshAuth(refresh_req) => {
1072                                            debug!("Received refresh_auth from client {}", client_id);
1073                                            handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
1074                                        }
1075                                    }
1076                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
1077                                    let view_id = subscription.view.clone();
1078                                    if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1079                                        warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1080                                        send_socket_issue(client_id, &client_manager, &deny, false).await;
1081                                        continue;
1082                                    }
1083
1084                                    let sub_key = subscription.sub_key();
1085                                    client_manager.update_subscription(client_id, subscription.clone());
1086
1087                                    let cancel_token = CancellationToken::new();
1088                                    let is_new = client_manager.add_client_subscription(
1089                                        client_id,
1090                                        sub_key.clone(),
1091                                        cancel_token.clone(),
1092                                    ).await;
1093
1094                                    if !is_new {
1095                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1096                                        continue;
1097                                    }
1098
1099                                    if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1100                                        warn!(
1101                                            "Subscription rejected for client {} on {}: {}",
1102                                            client_id,
1103                                            sub_key,
1104                                            err
1105                                        );
1106                                        if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1107                                            send_socket_issue(client_id, &client_manager, &deny, false).await;
1108                                        }
1109                                        let _ = client_manager
1110                                            .remove_client_subscription(client_id, &sub_key)
1111                                            .await;
1112                                    } else {
1113                                        active_subscriptions.insert(sub_key, view_id.clone());
1114                                        emit_usage_event(
1115                                            &usage_emitter,
1116                                            WebSocketUsageEvent::SubscriptionCreated {
1117                                                client_id: client_id.to_string(),
1118                                                deployment_id: usage_deployment_id.clone(),
1119                                                metering_key: usage_metering_key.clone(),
1120                                                subject: usage_subject.clone(),
1121                                                view_id,
1122                                            },
1123                                        );
1124                                    }
1125                                } else {
1126                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
1127                                }
1128                            }
1129                        }
1130                    }
1131                    Some(Err(e)) => {
1132                        warn!("WebSocket error for client {}: {}", client_id, e);
1133                        break;
1134                    }
1135                    None => {
1136                        debug!("WebSocket stream ended for client {}", client_id);
1137                        break;
1138                    }
1139                }
1140            }
1141        }
1142    }
1143
1144    client_manager
1145        .cancel_all_client_subscriptions(client_id)
1146        .await;
1147    client_manager.remove_client(client_id);
1148    if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
1149        rate_limiter.remove_client_buckets(client_id).await;
1150    }
1151
1152    for view_id in active_subscriptions.values() {
1153        emit_usage_event(
1154            &usage_emitter,
1155            WebSocketUsageEvent::SubscriptionRemoved {
1156                client_id: client_id.to_string(),
1157                deployment_id: usage_deployment_id.clone(),
1158                metering_key: usage_metering_key.clone(),
1159                subject: usage_subject.clone(),
1160                view_id: view_id.clone(),
1161            },
1162        );
1163    }
1164
1165    emit_usage_event(
1166        &usage_emitter,
1167        WebSocketUsageEvent::ConnectionClosed {
1168            client_id: client_id.to_string(),
1169            deployment_id: usage_deployment_id,
1170            metering_key: usage_metering_key,
1171            subject: usage_subject,
1172            duration_secs: None,
1173            subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
1174        },
1175    );
1176
1177    info!("Client {} disconnected", client_id);
1178
1179    Ok(())
1180}
1181
1182async fn send_snapshot_batches(
1183    client_id: Uuid,
1184    entities: &[SnapshotEntity],
1185    mode: Mode,
1186    view_id: &str,
1187    client_manager: &ClientManager,
1188    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1189    batch_config: &SnapshotBatchConfig,
1190    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
1191) -> Result<()> {
1192    let total = entities.len();
1193    if total == 0 {
1194        return Ok(());
1195    }
1196
1197    let mut offset = 0;
1198    let mut batch_num = 0;
1199
1200    while offset < total {
1201        let batch_size = if batch_num == 0 {
1202            batch_config.initial_batch_size
1203        } else {
1204            batch_config.subsequent_batch_size
1205        };
1206
1207        let end = (offset + batch_size).min(total);
1208        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
1209        let rows_in_batch = batch_data.len() as u32;
1210        let is_complete = end >= total;
1211
1212        let snapshot_frame = SnapshotFrame {
1213            mode,
1214            export: view_id.to_string(),
1215            op: "snapshot",
1216            data: batch_data,
1217            complete: is_complete,
1218        };
1219
1220        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
1221            let payload = maybe_compress(&json_payload);
1222            let payload_bytes = payload.as_bytes().len() as u64;
1223            if client_manager
1224                .send_compressed_async(client_id, payload)
1225                .await
1226                .is_err()
1227            {
1228                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
1229            }
1230            #[cfg(feature = "otel")]
1231            if let Some(m) = metrics {
1232                m.record_ws_message_sent();
1233            }
1234
1235            let auth_context = client_manager.get_auth_context(client_id);
1236            let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1237            emit_usage_event(
1238                usage_emitter,
1239                WebSocketUsageEvent::SnapshotSent {
1240                    client_id: client_id.to_string(),
1241                    deployment_id,
1242                    metering_key,
1243                    subject,
1244                    view_id: view_id.to_string(),
1245                    rows: rows_in_batch,
1246                    messages: 1,
1247                    bytes: payload_bytes,
1248                },
1249            );
1250        }
1251
1252        offset = end;
1253        batch_num += 1;
1254    }
1255
1256    debug!(
1257        "Sent {} snapshot batches ({} entities) for {} to client {}",
1258        batch_num, total, view_id, client_id
1259    );
1260
1261    Ok(())
1262}
1263
1264fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
1265    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
1266        return Some(SortConfig {
1267            field: sort.field_path.clone(),
1268            order: match sort.order {
1269                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
1270                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
1271            },
1272        });
1273    }
1274
1275    if view_spec.mode == Mode::List {
1276        return Some(SortConfig {
1277            field: vec!["_seq".to_string()],
1278            order: SortOrder::Desc,
1279        });
1280    }
1281
1282    None
1283}
1284
1285fn send_subscribed_frame(
1286    client_id: Uuid,
1287    view_id: &str,
1288    view_spec: &ViewSpec,
1289    client_manager: &ClientManager,
1290    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1291) -> Result<()> {
1292    let sort_config = extract_sort_config(view_spec);
1293    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
1294
1295    let json_payload = serde_json::to_vec(&subscribed_frame)?;
1296    let payload_bytes = json_payload.len() as u64;
1297    let payload = Arc::new(Bytes::from(json_payload));
1298    client_manager
1299        .send_to_client(client_id, payload)
1300        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))?;
1301
1302    let auth_context = client_manager.get_auth_context(client_id);
1303    let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1304    emit_usage_event(
1305        usage_emitter,
1306        WebSocketUsageEvent::UpdateSent {
1307            client_id: client_id.to_string(),
1308            deployment_id,
1309            metering_key,
1310            subject,
1311            view_id: view_id.to_string(),
1312            messages: 1,
1313            bytes: payload_bytes,
1314        },
1315    );
1316
1317    Ok(())
1318}
1319
1320fn enforce_snapshot_limit(ctx: &SubscriptionContext<'_>, rows: usize) -> Result<()> {
1321    let requested_rows = u32::try_from(rows).unwrap_or(u32::MAX);
1322    ctx.client_manager
1323        .check_snapshot_allowed(ctx.client_id, requested_rows)
1324        .map_err(|deny| anyhow::anyhow!(deny.reason))
1325}
1326
1327#[cfg(feature = "otel")]
1328async fn attach_client_to_bus(
1329    ctx: &SubscriptionContext<'_>,
1330    subscription: Subscription,
1331    cancel_token: CancellationToken,
1332) -> Result<()> {
1333    let view_id = &subscription.view;
1334
1335    let view_spec = match ctx.view_index.get_view(view_id) {
1336        Some(spec) => spec.clone(),
1337        None => {
1338            return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1339        }
1340    };
1341
1342    send_subscribed_frame(
1343        ctx.client_id,
1344        view_id,
1345        &view_spec,
1346        ctx.client_manager,
1347        ctx.usage_emitter,
1348    )?;
1349
1350    let is_derived_with_sort = view_spec.is_derived()
1351        && view_spec
1352            .pipeline
1353            .as_ref()
1354            .map(|p| p.sort.is_some())
1355            .unwrap_or(false);
1356
1357    if is_derived_with_sort {
1358        return attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token)
1359            .await;
1360    }
1361
1362    match view_spec.mode {
1363        Mode::State => {
1364            let key = subscription.key.as_deref().unwrap_or("");
1365
1366            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1367
1368            // Check if we should send snapshot (defaults to true for backward compatibility)
1369            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1370
1371            if should_send_snapshot {
1372                if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1373                    transform_large_u64_to_strings(&mut cached_entity);
1374                    let snapshot_entities = vec![SnapshotEntity {
1375                        key: key.to_string(),
1376                        data: cached_entity,
1377                    }];
1378                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1379                    let batch_config = ctx.entity_cache.snapshot_config();
1380                    send_snapshot_batches(
1381                        ctx.client_id,
1382                        &snapshot_entities,
1383                        view_spec.mode,
1384                        view_id,
1385                        ctx.client_manager,
1386                        ctx.usage_emitter,
1387                        &batch_config,
1388                        #[cfg(feature = "otel")]
1389                        ctx.metrics.as_ref(),
1390                    )
1391                    .await?;
1392                    rx.borrow_and_update();
1393                } else if !rx.borrow().is_empty() {
1394                    let data = rx.borrow_and_update().clone();
1395                    let data_len = data.len();
1396                    if ctx
1397                        .client_manager
1398                        .send_to_client(ctx.client_id, data)
1399                        .is_ok()
1400                    {
1401                        emit_update_sent_for_client(
1402                            ctx.usage_emitter,
1403                            ctx.client_manager,
1404                            ctx.client_id,
1405                            view_id,
1406                            data_len,
1407                        );
1408                    }
1409                }
1410            } else {
1411                info!(
1412                    "Client {} subscribed to {} without snapshot",
1413                    ctx.client_id, view_id
1414                );
1415                rx.borrow_and_update();
1416            }
1417
1418            let client_id = ctx.client_id;
1419            let client_mgr = ctx.client_manager.clone();
1420            let usage_emitter = ctx.usage_emitter.clone();
1421            let metrics_clone = ctx.metrics.clone();
1422            let view_id_clone = view_id.clone();
1423            let view_id_span = view_id.clone();
1424            let key_clone = key.to_string();
1425            tokio::spawn(
1426                async move {
1427                    loop {
1428                        tokio::select! {
1429                            _ = cancel_token.cancelled() => {
1430                                debug!("State subscription cancelled for client {}", client_id);
1431                                break;
1432                            }
1433                            result = rx.changed() => {
1434                                if result.is_err() {
1435                                    break;
1436                                }
1437                                let data = rx.borrow().clone();
1438                                let data_len = data.len();
1439                                if client_mgr.send_to_client(client_id, data).is_err() {
1440                                    break;
1441                                }
1442                                if let Some(ref m) = metrics_clone {
1443                                    m.record_ws_message_sent();
1444                                }
1445                                emit_update_sent_for_client(
1446                                    &usage_emitter,
1447                                    &client_mgr,
1448                                    client_id,
1449                                    &view_id_clone,
1450                                    data_len,
1451                                );
1452                            }
1453                        }
1454                    }
1455                }
1456                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1457            );
1458        }
1459        Mode::List | Mode::Append => {
1460            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1461
1462            // Check if we should send snapshot (defaults to true for backward compatibility)
1463            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1464
1465            if should_send_snapshot {
1466                // Determine which entities to send based on cursor
1467                let mut snapshots = if let Some(ref cursor) = subscription.after {
1468                    ctx.entity_cache
1469                        .get_after(view_id, cursor, subscription.snapshot_limit)
1470                        .await
1471                } else {
1472                    ctx.entity_cache.get_all(view_id).await
1473                };
1474
1475                // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache)
1476                if let Some(limit) = subscription.snapshot_limit {
1477                    if subscription.after.is_none() {
1478                        snapshots.sort_by(|a, b| {
1479                            let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1480                            let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1481                            cmp_seq(sb, sa) // descending: most-recent N
1482                        });
1483                        snapshots.truncate(limit);
1484                    }
1485                }
1486
1487                let snapshot_entities: Vec<SnapshotEntity> = snapshots
1488                    .into_iter()
1489                    .filter(|(key, _)| subscription.matches_key(key))
1490                    .map(|(key, mut data)| {
1491                        transform_large_u64_to_strings(&mut data);
1492                        SnapshotEntity { key, data }
1493                    })
1494                    .collect();
1495
1496                if !snapshot_entities.is_empty() {
1497                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1498                    let batch_config = ctx.entity_cache.snapshot_config();
1499                    send_snapshot_batches(
1500                        ctx.client_id,
1501                        &snapshot_entities,
1502                        view_spec.mode,
1503                        view_id,
1504                        ctx.client_manager,
1505                        ctx.usage_emitter,
1506                        &batch_config,
1507                        #[cfg(feature = "otel")]
1508                        ctx.metrics.as_ref(),
1509                    )
1510                    .await?;
1511                }
1512            } else {
1513                info!(
1514                    "Client {} subscribed to {} without snapshot",
1515                    ctx.client_id, view_id
1516                );
1517            }
1518
1519            let client_id = ctx.client_id;
1520            let client_mgr = ctx.client_manager.clone();
1521            let usage_emitter = ctx.usage_emitter.clone();
1522            let sub = subscription.clone();
1523            let metrics_clone = ctx.metrics.clone();
1524            let view_id_clone = view_id.clone();
1525            let view_id_span = view_id.clone();
1526            let mode = view_spec.mode;
1527            tokio::spawn(
1528                async move {
1529                    loop {
1530                        tokio::select! {
1531                            _ = cancel_token.cancelled() => {
1532                                debug!("List subscription cancelled for client {}", client_id);
1533                                break;
1534                            }
1535                            result = rx.recv() => {
1536                                match result {
1537                                    Ok(envelope) => {
1538                                        if sub.matches(&envelope.entity, &envelope.key) {
1539                                            if client_mgr
1540                                                .send_to_client(client_id, envelope.payload.clone())
1541                                                .is_err()
1542                                            {
1543                                                break;
1544                                            }
1545                                            if let Some(ref m) = metrics_clone {
1546                                                m.record_ws_message_sent();
1547                                            }
1548                                            emit_update_sent_for_client(
1549                                                &usage_emitter,
1550                                                &client_mgr,
1551                                                client_id,
1552                                                &view_id_clone,
1553                                                envelope.payload.len(),
1554                                            );
1555                                        }
1556                                    }
1557                                    Err(_) => break,
1558                                }
1559                            }
1560                        }
1561                    }
1562                }
1563                .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode)),
1564            );
1565        }
1566    }
1567
1568    info!(
1569        "Client {} subscribed to {} (mode: {:?})",
1570        ctx.client_id, view_id, view_spec.mode
1571    );
1572
1573    Ok(())
1574}
1575
1576#[cfg(feature = "otel")]
1577async fn attach_derived_view_subscription_otel(
1578    ctx: &SubscriptionContext<'_>,
1579    subscription: Subscription,
1580    view_spec: ViewSpec,
1581    cancel_token: CancellationToken,
1582) -> Result<()> {
1583    let view_id = &subscription.view;
1584    let pipeline_limit = view_spec
1585        .pipeline
1586        .as_ref()
1587        .and_then(|p| p.limit)
1588        .unwrap_or(100);
1589    let take = subscription.take.unwrap_or(pipeline_limit);
1590    let skip = subscription.skip.unwrap_or(0);
1591    let is_single = take == 1;
1592
1593    let source_view_id = match &view_spec.source_view {
1594        Some(s) => s.clone(),
1595        None => {
1596            return Err(anyhow::anyhow!(
1597                "Derived view {} has no source_view",
1598                view_id
1599            ));
1600        }
1601    };
1602
1603    let sorted_caches = ctx.view_index.sorted_caches();
1604    let initial_window: Vec<(String, serde_json::Value)> = {
1605        let mut caches = sorted_caches.write().await;
1606        if let Some(cache) = caches.get_mut(view_id) {
1607            cache.get_window(skip, take)
1608        } else {
1609            warn!("No sorted cache for derived view {}", view_id);
1610            vec![]
1611        }
1612    };
1613
1614    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1615
1616    if !initial_window.is_empty() {
1617        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1618            .into_iter()
1619            .map(|(key, mut data)| {
1620                transform_large_u64_to_strings(&mut data);
1621                SnapshotEntity { key, data }
1622            })
1623            .collect();
1624
1625        enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1626        let batch_config = ctx.entity_cache.snapshot_config();
1627        send_snapshot_batches(
1628            ctx.client_id,
1629            &snapshot_entities,
1630            view_spec.mode,
1631            view_id,
1632            ctx.client_manager,
1633            ctx.usage_emitter,
1634            &batch_config,
1635            ctx.metrics.as_ref(),
1636        )
1637        .await?;
1638    }
1639
1640    let mut rx = ctx
1641        .bus_manager
1642        .get_or_create_list_bus(&source_view_id)
1643        .await;
1644
1645    let client_id = ctx.client_id;
1646    let client_mgr = ctx.client_manager.clone();
1647    let usage_emitter = ctx.usage_emitter.clone();
1648    let view_id_clone = view_id.clone();
1649    let view_id_span = view_id.clone();
1650    let sorted_caches_clone = sorted_caches;
1651    let metrics_clone = ctx.metrics.clone();
1652    let frame_mode = view_spec.mode;
1653
1654    tokio::spawn(
1655        async move {
1656            let mut current_window_keys = initial_keys;
1657
1658            loop {
1659                tokio::select! {
1660                    _ = cancel_token.cancelled() => {
1661                        debug!("Derived view subscription cancelled for client {}", client_id);
1662                        break;
1663                    }
1664                    result = rx.recv() => {
1665                        match result {
1666                            Ok(_envelope) => {
1667                                let new_window: Vec<(String, serde_json::Value)> = {
1668                                    let mut caches = sorted_caches_clone.write().await;
1669                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1670                                        cache.get_window(skip, take)
1671                                    } else {
1672                                        continue;
1673                                    }
1674                                };
1675
1676                                let new_keys: HashSet<String> =
1677                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1678
1679                                if is_single {
1680                                    if let Some((new_key, data)) = new_window.first() {
1681                                        for old_key in current_window_keys.difference(&new_keys) {
1682                                            let delete_frame = Frame {
1683                                            seq: None,
1684                                                mode: frame_mode,
1685                                                export: view_id_clone.clone(),
1686                                                op: "delete",
1687                                                key: old_key.clone(),
1688                                                data: serde_json::Value::Null,
1689                                                append: vec![],
1690                                            };
1691                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1692                                                let payload = Arc::new(Bytes::from(json));
1693                                                let payload_len = payload.len();
1694                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1695                                                    return;
1696                                                }
1697                                                if let Some(ref m) = metrics_clone {
1698                                                    m.record_ws_message_sent();
1699                                                }
1700                                                emit_update_sent_for_client(
1701                                                    &usage_emitter,
1702                                                    &client_mgr,
1703                                                    client_id,
1704                                                    &view_id_clone,
1705                                                    payload_len,
1706                                                );
1707                                            }
1708                                        }
1709
1710                                        let mut transformed_data = data.clone();
1711                                        transform_large_u64_to_strings(&mut transformed_data);
1712                                        let frame = Frame {
1713                                            seq: None,
1714                                            mode: frame_mode,
1715                                            export: view_id_clone.clone(),
1716                                            op: "upsert",
1717                                            key: new_key.clone(),
1718                                            data: transformed_data,
1719                                            append: vec![],
1720                                        };
1721
1722                                        if let Ok(json) = serde_json::to_vec(&frame) {
1723                                            let payload = Arc::new(Bytes::from(json));
1724                                            let payload_len = payload.len();
1725                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1726                                                return;
1727                                            }
1728                                            if let Some(ref m) = metrics_clone {
1729                                                m.record_ws_message_sent();
1730                                            }
1731                                            emit_update_sent_for_client(
1732                                                &usage_emitter,
1733                                                &client_mgr,
1734                                                client_id,
1735                                                &view_id_clone,
1736                                                payload_len,
1737                                            );
1738                                        }
1739                                    }
1740                                } else {
1741                                    for key in current_window_keys.difference(&new_keys) {
1742                                        let delete_frame = Frame {
1743                                            seq: None,
1744                                            mode: frame_mode,
1745                                            export: view_id_clone.clone(),
1746                                            op: "delete",
1747                                            key: key.clone(),
1748                                            data: serde_json::Value::Null,
1749                                            append: vec![],
1750                                        };
1751                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1752                                            let payload = Arc::new(Bytes::from(json));
1753                                            let payload_len = payload.len();
1754                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1755                                                return;
1756                                            }
1757                                            if let Some(ref m) = metrics_clone {
1758                                                m.record_ws_message_sent();
1759                                            }
1760                                            emit_update_sent_for_client(
1761                                                &usage_emitter,
1762                                                &client_mgr,
1763                                                client_id,
1764                                                &view_id_clone,
1765                                                payload_len,
1766                                            );
1767                                        }
1768                                    }
1769
1770                                    for (key, data) in &new_window {
1771                                        let mut transformed_data = data.clone();
1772                                        transform_large_u64_to_strings(&mut transformed_data);
1773                                        let frame = Frame {
1774                                            seq: None,
1775                                            mode: frame_mode,
1776                                            export: view_id_clone.clone(),
1777                                            op: "upsert",
1778                                            key: key.clone(),
1779                                            data: transformed_data,
1780                                            append: vec![],
1781                                        };
1782                                        if let Ok(json) = serde_json::to_vec(&frame) {
1783                                            let payload = Arc::new(Bytes::from(json));
1784                                            let payload_len = payload.len();
1785                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1786                                                return;
1787                                            }
1788                                            if let Some(ref m) = metrics_clone {
1789                                                m.record_ws_message_sent();
1790                                            }
1791                                            emit_update_sent_for_client(
1792                                                &usage_emitter,
1793                                                &client_mgr,
1794                                                client_id,
1795                                                &view_id_clone,
1796                                                payload_len,
1797                                            );
1798                                        }
1799                                    }
1800                                }
1801
1802                                current_window_keys = new_keys;
1803                            }
1804                            Err(_) => break,
1805                        }
1806                    }
1807                }
1808            }
1809        }
1810        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1811    );
1812
1813    info!(
1814        "Client {} subscribed to derived view {} (take={}, skip={})",
1815        ctx.client_id, view_id, take, skip
1816    );
1817
1818    Ok(())
1819}
1820
1821#[cfg(not(feature = "otel"))]
1822async fn attach_client_to_bus(
1823    ctx: &SubscriptionContext<'_>,
1824    subscription: Subscription,
1825    cancel_token: CancellationToken,
1826) -> Result<()> {
1827    let view_id = &subscription.view;
1828
1829    let view_spec = match ctx.view_index.get_view(view_id) {
1830        Some(spec) => spec.clone(),
1831        None => {
1832            return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1833        }
1834    };
1835
1836    send_subscribed_frame(
1837        ctx.client_id,
1838        view_id,
1839        &view_spec,
1840        ctx.client_manager,
1841        ctx.usage_emitter,
1842    )?;
1843
1844    let is_derived_with_sort = view_spec.is_derived()
1845        && view_spec
1846            .pipeline
1847            .as_ref()
1848            .map(|p| p.sort.is_some())
1849            .unwrap_or(false);
1850
1851    if is_derived_with_sort {
1852        return attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
1853    }
1854
1855    match view_spec.mode {
1856        Mode::State => {
1857            let key = subscription.key.as_deref().unwrap_or("");
1858
1859            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1860
1861            // Check if we should send snapshot (defaults to true for backward compatibility)
1862            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1863
1864            if should_send_snapshot {
1865                if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1866                    transform_large_u64_to_strings(&mut cached_entity);
1867                    let snapshot_entities = vec![SnapshotEntity {
1868                        key: key.to_string(),
1869                        data: cached_entity,
1870                    }];
1871                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1872                    let batch_config = ctx.entity_cache.snapshot_config();
1873                    send_snapshot_batches(
1874                        ctx.client_id,
1875                        &snapshot_entities,
1876                        view_spec.mode,
1877                        view_id,
1878                        ctx.client_manager,
1879                        ctx.usage_emitter,
1880                        &batch_config,
1881                    )
1882                    .await?;
1883                    rx.borrow_and_update();
1884                } else if !rx.borrow().is_empty() {
1885                    let data = rx.borrow_and_update().clone();
1886                    let data_len = data.len();
1887                    if ctx
1888                        .client_manager
1889                        .send_to_client(ctx.client_id, data)
1890                        .is_ok()
1891                    {
1892                        emit_update_sent_for_client(
1893                            ctx.usage_emitter,
1894                            ctx.client_manager,
1895                            ctx.client_id,
1896                            view_id,
1897                            data_len,
1898                        );
1899                    }
1900                }
1901            } else {
1902                info!(
1903                    "Client {} subscribed to {} without snapshot",
1904                    ctx.client_id, view_id
1905                );
1906                rx.borrow_and_update();
1907            }
1908
1909            let client_id = ctx.client_id;
1910            let client_mgr = ctx.client_manager.clone();
1911            let usage_emitter = ctx.usage_emitter.clone();
1912            let view_id_clone = view_id.clone();
1913            let view_id_span = view_id.clone();
1914            let key_clone = key.to_string();
1915            tokio::spawn(
1916                async move {
1917                    loop {
1918                        tokio::select! {
1919                            _ = cancel_token.cancelled() => {
1920                                debug!("State subscription cancelled for client {}", client_id);
1921                                break;
1922                            }
1923                            result = rx.changed() => {
1924                                if result.is_err() {
1925                                    break;
1926                                }
1927                                let data = rx.borrow().clone();
1928                                let data_len = data.len();
1929                                if client_mgr.send_to_client(client_id, data).is_err() {
1930                                    break;
1931                                }
1932                                emit_update_sent_for_client(
1933                                    &usage_emitter,
1934                                    &client_mgr,
1935                                    client_id,
1936                                    &view_id_clone,
1937                                    data_len,
1938                                );
1939                            }
1940                        }
1941                    }
1942                }
1943                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1944            );
1945        }
1946        Mode::List | Mode::Append => {
1947            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1948
1949            // Check if we should send snapshot (defaults to true for backward compatibility)
1950            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1951
1952            if should_send_snapshot {
1953                // Determine which entities to send based on cursor
1954                let mut snapshots = if let Some(ref cursor) = subscription.after {
1955                    ctx.entity_cache
1956                        .get_after(view_id, cursor, subscription.snapshot_limit)
1957                        .await
1958                } else {
1959                    ctx.entity_cache.get_all(view_id).await
1960                };
1961
1962                // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache)
1963                if let Some(limit) = subscription.snapshot_limit {
1964                    if subscription.after.is_none() {
1965                        snapshots.sort_by(|a, b| {
1966                            let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1967                            let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1968                            cmp_seq(sb, sa) // descending: most-recent N
1969                        });
1970                        snapshots.truncate(limit);
1971                    }
1972                }
1973
1974                let snapshot_entities: Vec<SnapshotEntity> = snapshots
1975                    .into_iter()
1976                    .filter(|(key, _)| subscription.matches_key(key))
1977                    .map(|(key, mut data)| {
1978                        transform_large_u64_to_strings(&mut data);
1979                        SnapshotEntity { key, data }
1980                    })
1981                    .collect();
1982
1983                if !snapshot_entities.is_empty() {
1984                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1985                    let batch_config = ctx.entity_cache.snapshot_config();
1986                    send_snapshot_batches(
1987                        ctx.client_id,
1988                        &snapshot_entities,
1989                        view_spec.mode,
1990                        view_id,
1991                        ctx.client_manager,
1992                        ctx.usage_emitter,
1993                        &batch_config,
1994                    )
1995                    .await?;
1996                }
1997            } else {
1998                info!(
1999                    "Client {} subscribed to {} without snapshot",
2000                    ctx.client_id, view_id
2001                );
2002            }
2003
2004            let client_id = ctx.client_id;
2005            let client_mgr = ctx.client_manager.clone();
2006            let usage_emitter = ctx.usage_emitter.clone();
2007            let sub = subscription.clone();
2008            let view_id_clone = view_id.clone();
2009            let view_id_span = view_id.clone();
2010            let mode = view_spec.mode;
2011            tokio::spawn(
2012                async move {
2013                    loop {
2014                        tokio::select! {
2015                            _ = cancel_token.cancelled() => {
2016                                debug!("List subscription cancelled for client {}", client_id);
2017                                break;
2018                            }
2019                            result = rx.recv() => {
2020                                match result {
2021                                    Ok(envelope) => {
2022                                        if sub.matches(&envelope.entity, &envelope.key)
2023                                            && client_mgr
2024                                                .send_to_client(client_id, envelope.payload.clone())
2025                                                .is_err()
2026                                        {
2027                                            break;
2028                                        } else if sub.matches(&envelope.entity, &envelope.key) {
2029                                            emit_update_sent_for_client(
2030                                                &usage_emitter,
2031                                                &client_mgr,
2032                                                client_id,
2033                                                &view_id_clone,
2034                                                envelope.payload.len(),
2035                                            );
2036                                        }
2037                                    }
2038                                    Err(_) => break,
2039                                }
2040                            }
2041                        }
2042                    }
2043                }
2044                .instrument(
2045                    info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
2046                ),
2047            );
2048        }
2049    }
2050
2051    info!(
2052        "Client {} subscribed to {} (mode: {:?})",
2053        ctx.client_id, view_id, view_spec.mode
2054    );
2055
2056    Ok(())
2057}
2058
2059#[cfg(not(feature = "otel"))]
2060async fn attach_derived_view_subscription(
2061    ctx: &SubscriptionContext<'_>,
2062    subscription: Subscription,
2063    view_spec: ViewSpec,
2064    cancel_token: CancellationToken,
2065) -> Result<()> {
2066    let view_id = &subscription.view;
2067    let pipeline_limit = view_spec
2068        .pipeline
2069        .as_ref()
2070        .and_then(|p| p.limit)
2071        .unwrap_or(100);
2072    let take = subscription.take.unwrap_or(pipeline_limit);
2073    let skip = subscription.skip.unwrap_or(0);
2074    let is_single = take == 1;
2075
2076    let source_view_id = match &view_spec.source_view {
2077        Some(s) => s.clone(),
2078        None => {
2079            return Err(anyhow::anyhow!(
2080                "Derived view {} has no source_view",
2081                view_id
2082            ));
2083        }
2084    };
2085
2086    let sorted_caches = ctx.view_index.sorted_caches();
2087    let initial_window: Vec<(String, serde_json::Value)> = {
2088        let mut caches = sorted_caches.write().await;
2089        if let Some(cache) = caches.get_mut(view_id) {
2090            cache.get_window(skip, take)
2091        } else {
2092            warn!("No sorted cache for derived view {}", view_id);
2093            vec![]
2094        }
2095    };
2096
2097    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
2098
2099    if !initial_window.is_empty() {
2100        let snapshot_entities: Vec<SnapshotEntity> = initial_window
2101            .into_iter()
2102            .map(|(key, mut data)| {
2103                transform_large_u64_to_strings(&mut data);
2104                SnapshotEntity { key, data }
2105            })
2106            .collect();
2107
2108        enforce_snapshot_limit(ctx, snapshot_entities.len())?;
2109        let batch_config = ctx.entity_cache.snapshot_config();
2110        send_snapshot_batches(
2111            ctx.client_id,
2112            &snapshot_entities,
2113            view_spec.mode,
2114            view_id,
2115            ctx.client_manager,
2116            ctx.usage_emitter,
2117            &batch_config,
2118        )
2119        .await?;
2120    }
2121
2122    let mut rx = ctx
2123        .bus_manager
2124        .get_or_create_list_bus(&source_view_id)
2125        .await;
2126
2127    let client_id = ctx.client_id;
2128    let client_mgr = ctx.client_manager.clone();
2129    let usage_emitter = ctx.usage_emitter.clone();
2130    let view_id_clone = view_id.clone();
2131    let view_id_span = view_id.clone();
2132    let sorted_caches_clone = sorted_caches;
2133    let frame_mode = view_spec.mode;
2134
2135    tokio::spawn(
2136        async move {
2137            let mut current_window_keys = initial_keys;
2138
2139            loop {
2140                tokio::select! {
2141                    _ = cancel_token.cancelled() => {
2142                        debug!("Derived view subscription cancelled for client {}", client_id);
2143                        break;
2144                    }
2145                    result = rx.recv() => {
2146                        match result {
2147                            Ok(_envelope) => {
2148                                let new_window: Vec<(String, serde_json::Value)> = {
2149                                    let mut caches = sorted_caches_clone.write().await;
2150                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
2151                                        cache.get_window(skip, take)
2152                                    } else {
2153                                        continue;
2154                                    }
2155                                };
2156
2157                                let new_keys: HashSet<String> =
2158                                    new_window.iter().map(|(k, _)| k.clone()).collect();
2159
2160                                if is_single {
2161                                    if let Some((new_key, data)) = new_window.first() {
2162                                        for old_key in current_window_keys.difference(&new_keys) {
2163                                            let delete_frame = Frame {
2164                                            seq: None,
2165                                                mode: frame_mode,
2166                                                export: view_id_clone.clone(),
2167                                                op: "delete",
2168                                                key: old_key.clone(),
2169                                                data: serde_json::Value::Null,
2170                                                append: vec![],
2171                                            };
2172                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
2173                                                let payload = Arc::new(Bytes::from(json));
2174                                                let payload_len = payload.len();
2175                                                if client_mgr.send_to_client(client_id, payload).is_err() {
2176                                                    return;
2177                                                }
2178                                                emit_update_sent_for_client(
2179                                                    &usage_emitter,
2180                                                    &client_mgr,
2181                                                    client_id,
2182                                                    &view_id_clone,
2183                                                    payload_len,
2184                                                );
2185                                            }
2186                                        }
2187
2188                                        let mut transformed_data = data.clone();
2189                                        transform_large_u64_to_strings(&mut transformed_data);
2190                                        let frame = Frame {
2191                                            seq: None,
2192                                            mode: frame_mode,
2193                                            export: view_id_clone.clone(),
2194                                            op: "upsert",
2195                                            key: new_key.clone(),
2196                                            data: transformed_data,
2197                                            append: vec![],
2198                                        };
2199                                        if let Ok(json) = serde_json::to_vec(&frame) {
2200                                            let payload = Arc::new(Bytes::from(json));
2201                                            let payload_len = payload.len();
2202                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2203                                                return;
2204                                            }
2205                                            emit_update_sent_for_client(
2206                                                &usage_emitter,
2207                                                &client_mgr,
2208                                                client_id,
2209                                                &view_id_clone,
2210                                                payload_len,
2211                                            );
2212                                        }
2213                                    }
2214                                } else {
2215                                    for key in current_window_keys.difference(&new_keys) {
2216                                        let delete_frame = Frame {
2217                                            seq: None,
2218                                            mode: frame_mode,
2219                                            export: view_id_clone.clone(),
2220                                            op: "delete",
2221                                            key: key.clone(),
2222                                            data: serde_json::Value::Null,
2223                                            append: vec![],
2224                                        };
2225                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
2226                                            let payload = Arc::new(Bytes::from(json));
2227                                            let payload_len = payload.len();
2228                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2229                                                return;
2230                                            }
2231                                            emit_update_sent_for_client(
2232                                                &usage_emitter,
2233                                                &client_mgr,
2234                                                client_id,
2235                                                &view_id_clone,
2236                                                payload_len,
2237                                            );
2238                                        }
2239                                    }
2240
2241                                    for (key, data) in &new_window {
2242                                        let mut transformed_data = data.clone();
2243                                        transform_large_u64_to_strings(&mut transformed_data);
2244                                        let frame = Frame {
2245                                            seq: None,
2246                                            mode: frame_mode,
2247                                            export: view_id_clone.clone(),
2248                                            op: "upsert",
2249                                            key: key.clone(),
2250                                            data: transformed_data,
2251                                            append: vec![],
2252                                        };
2253                                        if let Ok(json) = serde_json::to_vec(&frame) {
2254                                            let payload = Arc::new(Bytes::from(json));
2255                                            let payload_len = payload.len();
2256                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2257                                                return;
2258                                            }
2259                                            emit_update_sent_for_client(
2260                                                &usage_emitter,
2261                                                &client_mgr,
2262                                                client_id,
2263                                                &view_id_clone,
2264                                                payload_len,
2265                                            );
2266                                        }
2267                                    }
2268                                }
2269
2270                                current_window_keys = new_keys;
2271                            }
2272                            Err(_) => break,
2273                        }
2274                    }
2275                }
2276            }
2277        }
2278        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
2279    );
2280
2281    info!(
2282        "Client {} subscribed to derived view {} (take={}, skip={})",
2283        ctx.client_id, view_id, take, skip
2284    );
2285
2286    Ok(())
2287}