Skip to main content

arete_server/websocket/
server.rs

1use crate::bus::BusManager;
2use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::auth::{
6    AuthContext, AuthDecision, AuthDeny, ConnectionAuthRequest, WebSocketAuthPlugin,
7};
8use crate::websocket::client_manager::{ClientManager, RateLimitConfig};
9use crate::websocket::frame::{
10    transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
11    SortOrder, SubscribedFrame,
12};
13use crate::websocket::subscription::{
14    ClientMessage, RefreshAuthRequest, RefreshAuthResponse, SocketIssueMessage, Subscription,
15};
16use crate::websocket::usage::{WebSocketUsageEmitter, WebSocketUsageEvent};
17use anyhow::Result;
18use bytes::Bytes;
19use futures_util::StreamExt;
20use std::collections::{HashMap, HashSet};
21use std::net::SocketAddr;
22use std::sync::Arc;
23#[cfg(feature = "otel")]
24use std::time::Instant;
25
26use tokio::net::{TcpListener, TcpStream};
27use tokio_tungstenite::{
28    accept_hdr_async,
29    tungstenite::{
30        handshake::server::{ErrorResponse as HandshakeErrorResponse, Request, Response},
31        http::{header::CONTENT_TYPE, StatusCode},
32        Error as WsError,
33    },
34};
35
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, info_span, warn, Instrument};
38use uuid::Uuid;
39
40#[cfg(feature = "otel")]
41use crate::metrics::Metrics;
42
43/// Helper function to handle refresh_auth messages
44async fn handle_refresh_auth(
45    client_id: Uuid,
46    refresh_req: &RefreshAuthRequest,
47    client_manager: &ClientManager,
48    auth_plugin: &Arc<dyn WebSocketAuthPlugin>,
49) {
50    // Try to verify the new token using the auth plugin
51    // We need to downcast to SignedSessionAuthPlugin to use verify_refresh_token
52    let refresh_result: Result<AuthContext, String> = {
53        // Try to downcast to SignedSessionAuthPlugin
54        if let Some(signed_plugin) = auth_plugin
55            .as_any()
56            .downcast_ref::<crate::websocket::auth::SignedSessionAuthPlugin>(
57        ) {
58            signed_plugin
59                .verify_refresh_token(&refresh_req.token)
60                .await
61                .map_err(|e| e.reason)
62        } else {
63            Err("In-band auth refresh not supported with current auth plugin".to_string())
64        }
65    };
66
67    match refresh_result {
68        Ok(new_context) => {
69            let expires_at = new_context.expires_at;
70            if client_manager.update_client_auth(client_id, new_context) {
71                info!(
72                    "Client {} refreshed auth successfully, expires at {}",
73                    client_id, expires_at
74                );
75
76                // Send success response
77                let response = RefreshAuthResponse {
78                    success: true,
79                    error: None,
80                    expires_at: Some(expires_at),
81                };
82                if let Ok(json) = serde_json::to_string(&response) {
83                    let _ = client_manager.send_text_to_client(client_id, json).await;
84                }
85            } else {
86                warn!("Client {} not found when refreshing auth", client_id);
87
88                // Send failure response - client not found
89                let response = RefreshAuthResponse {
90                    success: false,
91                    error: Some("client-not-found".to_string()),
92                    expires_at: None,
93                };
94                if let Ok(json) = serde_json::to_string(&response) {
95                    let _ = client_manager.send_text_to_client(client_id, json).await;
96                }
97            }
98        }
99        Err(err) => {
100            warn!("Client {} auth refresh failed: {}", client_id, err);
101
102            // Send failure response with machine-readable error code
103            let error_code = if err.contains("expired") {
104                "token-expired"
105            } else if err.contains("signature") {
106                "token-invalid-signature"
107            } else if err.contains("issuer") {
108                "token-invalid-issuer"
109            } else if err.contains("audience") {
110                "token-invalid-audience"
111            } else {
112                "token-invalid"
113            };
114
115            let response = RefreshAuthResponse {
116                success: false,
117                error: Some(error_code.to_string()),
118                expires_at: None,
119            };
120            if let Ok(json) = serde_json::to_string(&response) {
121                let _ = client_manager.send_text_to_client(client_id, json).await;
122            }
123        }
124    }
125}
126
127async fn send_socket_issue(
128    client_id: Uuid,
129    client_manager: &ClientManager,
130    deny: &AuthDeny,
131    fatal: bool,
132) {
133    let message = SocketIssueMessage::from_auth_deny(deny, fatal);
134    match serde_json::to_string(&message) {
135        Ok(json) => {
136            let _ = client_manager.send_text_to_client(client_id, json).await;
137        }
138        Err(error) => {
139            warn!(error = %error, client_id = %client_id, "failed to serialize socket issue message");
140        }
141    }
142}
143
144fn auth_deny_from_subscription_error(reason: &str) -> Option<AuthDeny> {
145    if reason.starts_with("Snapshot limit exceeded:") {
146        Some(AuthDeny::new(
147            crate::websocket::auth::AuthErrorCode::SnapshotLimitExceeded,
148            reason,
149        ))
150    } else {
151        None
152    }
153}
154
155fn key_class_label(key_class: arete_auth::KeyClass) -> &'static str {
156    match key_class {
157        arete_auth::KeyClass::Secret => "secret",
158        arete_auth::KeyClass::Publishable => "publishable",
159    }
160}
161
162fn emit_usage_event(
163    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
164    event: WebSocketUsageEvent,
165) {
166    if let Some(emitter) = usage_emitter.clone() {
167        tokio::spawn(async move {
168            emitter.emit(event).await;
169        });
170    }
171}
172
173fn usage_identity(
174    auth_context: Option<&AuthContext>,
175) -> (
176    Option<String>,
177    Option<String>,
178    Option<String>,
179    Option<String>,
180) {
181    match auth_context {
182        Some(ctx) => (
183            Some(ctx.metering_key.clone()),
184            Some(ctx.subject.clone()),
185            Some(key_class_label(ctx.key_class).to_string()),
186            ctx.deployment_id.clone(),
187        ),
188        None => (None, None, None, None),
189    }
190}
191
192fn emit_update_sent_for_client(
193    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
194    client_manager: &ClientManager,
195    client_id: Uuid,
196    view_id: &str,
197    bytes: usize,
198) {
199    let auth_context = client_manager.get_auth_context(client_id);
200    let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
201    emit_usage_event(
202        usage_emitter,
203        WebSocketUsageEvent::UpdateSent {
204            client_id: client_id.to_string(),
205            deployment_id,
206            metering_key,
207            subject,
208            view_id: view_id.to_string(),
209            messages: 1,
210            bytes: bytes as u64,
211        },
212    );
213}
214
215struct SubscriptionContext<'a> {
216    client_id: Uuid,
217    client_manager: &'a ClientManager,
218    bus_manager: &'a BusManager,
219    entity_cache: &'a EntityCache,
220    view_index: &'a ViewIndex,
221    usage_emitter: &'a Option<Arc<dyn WebSocketUsageEmitter>>,
222    #[cfg(feature = "otel")]
223    metrics: Option<Arc<Metrics>>,
224}
225
226pub struct WebSocketServer {
227    bind_addr: SocketAddr,
228    client_manager: ClientManager,
229    bus_manager: BusManager,
230    entity_cache: EntityCache,
231    view_index: Arc<ViewIndex>,
232    max_clients: usize,
233    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
234    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
235    rate_limit_config: Option<RateLimitConfig>,
236    #[cfg(feature = "otel")]
237    metrics: Option<Arc<Metrics>>,
238}
239
240impl WebSocketServer {
241    #[cfg(feature = "otel")]
242    pub fn new(
243        bind_addr: SocketAddr,
244        bus_manager: BusManager,
245        entity_cache: EntityCache,
246        view_index: Arc<ViewIndex>,
247        metrics: Option<Arc<Metrics>>,
248    ) -> Self {
249        Self {
250            bind_addr,
251            client_manager: ClientManager::new(),
252            bus_manager,
253            entity_cache,
254            view_index,
255            max_clients: 10000,
256            auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
257            usage_emitter: None,
258            rate_limit_config: None,
259            metrics,
260        }
261    }
262
263    #[cfg(not(feature = "otel"))]
264    pub fn new(
265        bind_addr: SocketAddr,
266        bus_manager: BusManager,
267        entity_cache: EntityCache,
268        view_index: Arc<ViewIndex>,
269    ) -> Self {
270        Self {
271            bind_addr,
272            client_manager: ClientManager::new(),
273            bus_manager,
274            entity_cache,
275            view_index,
276            max_clients: 10000,
277            auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
278            usage_emitter: None,
279            rate_limit_config: None,
280        }
281    }
282
283    pub fn with_max_clients(mut self, max_clients: usize) -> Self {
284        self.max_clients = max_clients;
285        self
286    }
287
288    pub fn with_auth_plugin(mut self, auth_plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
289        self.auth_plugin = auth_plugin;
290        self
291    }
292
293    pub fn with_usage_emitter(mut self, usage_emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
294        self.usage_emitter = Some(usage_emitter);
295        self
296    }
297
298    /// Configure rate limiting for the WebSocket server.
299    ///
300    /// This allows setting global rate limits that apply to all connections,
301    /// such as maximum connections per IP, timeouts, and rate windows.
302    /// Per-subject limits are controlled via AuthContext.Limits from the auth token.
303    pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
304        self.rate_limit_config = Some(config);
305        self
306    }
307
308    pub async fn start(self) -> Result<()> {
309        info!(
310            "Starting WebSocket server on {} (max_clients: {})",
311            self.bind_addr, self.max_clients
312        );
313
314        let listener = TcpListener::bind(&self.bind_addr).await?;
315        info!("WebSocket server listening on {}", self.bind_addr);
316
317        // Apply rate limit configuration if provided
318        let client_manager = if let Some(config) = self.rate_limit_config {
319            ClientManager::with_config(config)
320        } else {
321            self.client_manager
322        };
323
324        client_manager.start_cleanup_task();
325
326        loop {
327            match listener.accept().await {
328                Ok((stream, addr)) => {
329                    let client_count = client_manager.client_count();
330                    if client_count >= self.max_clients {
331                        warn!(
332                            "Rejecting connection from {} - max clients ({}) reached",
333                            addr, self.max_clients
334                        );
335                        drop(stream);
336                        continue;
337                    }
338
339                    info!(
340                        "New WebSocket connection from {} ({}/{} clients)",
341                        addr,
342                        client_count + 1,
343                        self.max_clients
344                    );
345                    let client_manager = client_manager.clone();
346                    let bus_manager = self.bus_manager.clone();
347                    let entity_cache = self.entity_cache.clone();
348                    let view_index = self.view_index.clone();
349                    #[cfg(feature = "otel")]
350                    let metrics = self.metrics.clone();
351
352                    let auth_plugin = self.auth_plugin.clone();
353                    let usage_emitter = self.usage_emitter.clone();
354
355                    tokio::spawn(
356                        async move {
357                            #[cfg(feature = "otel")]
358                            let result = handle_connection(
359                                stream,
360                                client_manager,
361                                bus_manager,
362                                entity_cache,
363                                view_index,
364                                addr,
365                                auth_plugin,
366                                usage_emitter,
367                                metrics,
368                            )
369                            .await;
370                            #[cfg(not(feature = "otel"))]
371                            let result = handle_connection(
372                                stream,
373                                client_manager,
374                                bus_manager,
375                                entity_cache,
376                                view_index,
377                                addr,
378                                auth_plugin,
379                                usage_emitter,
380                            )
381                            .await;
382
383                            if let Err(e) = result {
384                                error!("WebSocket connection error: {}", e);
385                            }
386                        }
387                        .instrument(info_span!("ws.connection", %addr)),
388                    );
389                }
390                Err(e) => {
391                    error!("Failed to accept connection: {}", e);
392                }
393            }
394        }
395    }
396}
397
398#[derive(Debug, Clone)]
399struct HandshakeReject {
400    status: StatusCode,
401    body: crate::websocket::auth::ErrorResponse,
402    error_code: String,
403    retry_after_secs: Option<u64>,
404}
405
406impl HandshakeReject {
407    fn from_deny(deny: &AuthDeny) -> Self {
408        let retry_after_secs = match deny.retry_policy {
409            crate::websocket::auth::RetryPolicy::RetryAfter(duration) => Some(duration.as_secs()),
410            _ => None,
411        };
412
413        Self {
414            status: StatusCode::from_u16(deny.http_status).unwrap_or(StatusCode::UNAUTHORIZED),
415            body: deny.to_error_response(),
416            error_code: deny.code.to_string(),
417            retry_after_secs,
418        }
419    }
420}
421
422fn build_handshake_error_response(
423    response: &Response,
424    reject: &HandshakeReject,
425) -> HandshakeErrorResponse {
426    let mut builder = Response::builder()
427        .status(reject.status)
428        .version(response.version())
429        .header(CONTENT_TYPE, "application/json; charset=utf-8")
430        .header("X-Error-Code", &reject.error_code)
431        .header("Cache-Control", "no-store");
432
433    if let Some(retry_after_secs) = reject.retry_after_secs {
434        builder = builder.header("Retry-After", retry_after_secs.to_string());
435    }
436
437    let body = serde_json::to_string(&reject.body).unwrap_or_else(|_| {
438        format!(
439            r#"{{"error":"{}","message":"{}","code":"{}","retryable":false}}"#,
440            reject.body.error, reject.body.message, reject.body.code
441        )
442    });
443
444    builder
445        .body(Some(body))
446        .expect("handshake rejection response should build")
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use crate::websocket::auth::{AuthDeny, AuthErrorCode};
453    use std::time::Duration;
454
455    #[test]
456    fn handshake_error_response_serializes_json_and_retry_after() {
457        let response = Response::builder()
458            .status(StatusCode::SWITCHING_PROTOCOLS)
459            .body(())
460            .unwrap();
461        let deny = AuthDeny::rate_limited(Duration::from_secs(7), "websocket handshakes");
462        let reject = HandshakeReject::from_deny(&deny);
463
464        let handshake_response = build_handshake_error_response(&response, &reject);
465        assert_eq!(handshake_response.status(), StatusCode::TOO_MANY_REQUESTS);
466        assert_eq!(
467            handshake_response.headers().get("X-Error-Code").unwrap(),
468            "rate-limit-exceeded"
469        );
470        assert_eq!(
471            handshake_response.headers().get("Retry-After").unwrap(),
472            "7"
473        );
474
475        let body = handshake_response.into_body().unwrap();
476        assert!(body.contains("rate-limit-exceeded"));
477        assert!(body.contains("retryable"));
478    }
479
480    #[test]
481    fn handshake_error_response_preserves_non_retryable_auth_denies() {
482        let response = Response::builder()
483            .status(StatusCode::SWITCHING_PROTOCOLS)
484            .body(())
485            .unwrap();
486        let deny = AuthDeny::new(AuthErrorCode::OriginMismatch, "origin mismatch");
487        let reject = HandshakeReject::from_deny(&deny);
488
489        let handshake_response = build_handshake_error_response(&response, &reject);
490        assert_eq!(handshake_response.status(), StatusCode::FORBIDDEN);
491        assert!(handshake_response.headers().get("Retry-After").is_none());
492    }
493}
494
495#[allow(clippy::result_large_err)]
496async fn accept_authorized_connection(
497    stream: TcpStream,
498    remote_addr: SocketAddr,
499    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
500    client_manager: ClientManager,
501) -> Result<Option<(tokio_tungstenite::WebSocketStream<TcpStream>, AuthContext)>> {
502    use std::sync::Mutex;
503
504    let auth_result_capture: Arc<Mutex<Option<Result<AuthContext, HandshakeReject>>>> =
505        Arc::new(Mutex::new(None));
506    let auth_result_ref = auth_result_capture.clone();
507    let auth_plugin_ref = auth_plugin.clone();
508    let client_manager_for_auth = client_manager.clone();
509
510    let handshake_result = accept_hdr_async(stream, move |request: &Request, response| {
511        let connection_request = ConnectionAuthRequest::from_http_request(remote_addr, request);
512
513        let auth_result = tokio::task::block_in_place(|| {
514            tokio::runtime::Handle::current().block_on(async {
515                match auth_plugin_ref.authorize(&connection_request).await {
516                    AuthDecision::Allow(ctx) => {
517                        match client_manager_for_auth
518                            .check_connection_allowed(remote_addr, &Some(ctx.clone()))
519                            .await
520                        {
521                            Ok(()) => Ok(ctx),
522                            Err(deny) => Err(HandshakeReject::from_deny(&deny)),
523                        }
524                    }
525                    AuthDecision::Deny(deny) => Err(HandshakeReject::from_deny(&deny)),
526                }
527            })
528        });
529
530        let mut capture_lock = auth_result_ref.lock().expect("capture lock poisoned");
531        *capture_lock = Some(auth_result.clone());
532
533        match auth_result {
534            Ok(_) => Ok(response),
535            Err(reject) => Err(build_handshake_error_response(&response, &reject)),
536        }
537    })
538    .await;
539
540    let auth_result = {
541        let mut guard = auth_result_capture.lock().expect("capture lock poisoned");
542        guard.take()
543    };
544
545    match handshake_result {
546        Ok(ws_stream) => match auth_result {
547            Some(Ok(ctx)) => {
548                info!("WebSocket connection authorized for {}", remote_addr);
549                Ok(Some((ws_stream, ctx)))
550            }
551            Some(Err(reject)) => Err(anyhow::anyhow!(
552                "handshake unexpectedly succeeded after rejection: {}",
553                reject.body.message
554            )),
555            None => Err(anyhow::anyhow!(
556                "no auth result captured for authorized connection {}",
557                remote_addr
558            )),
559        },
560        Err(WsError::Http(_)) => {
561            match auth_result {
562                Some(Err(reject)) => {
563                    warn!(
564                        "WebSocket connection rejected during handshake for {}: {}",
565                        remote_addr, reject.body.message
566                    );
567                }
568                Some(Ok(_)) => {
569                    warn!(
570                        "WebSocket handshake failed after auth success for {}",
571                        remote_addr
572                    );
573                }
574                None => {
575                    warn!(
576                        "WebSocket handshake rejected for {} without captured auth result",
577                        remote_addr
578                    );
579                }
580            }
581            Ok(None)
582        }
583        Err(err) => Err(err.into()),
584    }
585}
586
587#[cfg(feature = "otel")]
588async fn handle_connection(
589    stream: TcpStream,
590    client_manager: ClientManager,
591    bus_manager: BusManager,
592    entity_cache: EntityCache,
593    view_index: Arc<ViewIndex>,
594    remote_addr: std::net::SocketAddr,
595    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
596    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
597    metrics: Option<Arc<Metrics>>,
598) -> Result<()> {
599    let Some((ws_stream, auth_context)) = accept_authorized_connection(
600        stream,
601        remote_addr,
602        auth_plugin.clone(),
603        client_manager.clone(),
604    )
605    .await?
606    else {
607        return Ok(());
608    };
609
610    let client_id = Uuid::new_v4();
611    let connection_start = Instant::now();
612
613    let auth_context = Some(auth_context);
614    let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
615        usage_identity(auth_context.as_ref());
616
617    // Extract metering key from auth context for metrics attribution
618    let metering_key = auth_context.as_ref().map(|ctx| ctx.metering_key.clone());
619
620    if let Some(ref m) = metrics {
621        if let Some(ref mk) = metering_key {
622            m.record_ws_connection_with_metering(mk);
623        } else {
624            m.record_ws_connection();
625        }
626    }
627
628    info!("WebSocket connection established for client {}", client_id);
629
630    emit_usage_event(
631        &usage_emitter,
632        WebSocketUsageEvent::ConnectionEstablished {
633            client_id: client_id.to_string(),
634            remote_addr: remote_addr.to_string(),
635            deployment_id: usage_deployment_id.clone(),
636            metering_key: usage_metering_key.clone(),
637            subject: usage_subject.clone(),
638            key_class: usage_key_class,
639        },
640    );
641
642    let (ws_sender, mut ws_receiver) = ws_stream.split();
643
644    // Add client with auth context and IP tracking
645    client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
646
647    let ctx = SubscriptionContext {
648        client_id,
649        client_manager: &client_manager,
650        bus_manager: &bus_manager,
651        entity_cache: &entity_cache,
652        view_index: &view_index,
653        usage_emitter: &usage_emitter,
654        metrics: metrics.clone(),
655    };
656
657    let mut active_subscriptions: HashMap<String, String> = HashMap::new();
658
659    loop {
660        tokio::select! {
661            ws_msg = ws_receiver.next() => {
662                match ws_msg {
663                    Some(Ok(msg)) => {
664                        if msg.is_close() {
665                            info!("Client {} requested close", client_id);
666                            break;
667                        }
668
669                        client_manager.update_client_last_seen(client_id);
670
671                        if msg.is_text() {
672                            if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
673                                warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
674                                send_socket_issue(client_id, &client_manager, &deny, true).await;
675                                break;
676                            }
677
678                            if let Some(ref m) = metrics {
679                                if let Some(ref mk) = metering_key {
680                                    m.record_ws_message_received_with_metering(mk);
681                                } else {
682                                    m.record_ws_message_received();
683                                }
684                            }
685
686                            if let Ok(text) = msg.to_text() {
687                                debug!("Received text message from client {}: {}", client_id, text);
688
689                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
690                                    match client_msg {
691                                        ClientMessage::Subscribe(subscription) => {
692                                            let view_id = subscription.view.clone();
693                                            let sub_key = subscription.sub_key();
694
695                                            // Check subscription limits
696                                            if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
697                                                warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
698                                                send_socket_issue(client_id, &client_manager, &deny, false).await;
699                                                continue;
700                                            }
701
702                                            client_manager.update_subscription(client_id, subscription.clone());
703
704                                            let cancel_token = CancellationToken::new();
705                                            let is_new = client_manager.add_client_subscription(
706                                                client_id,
707                                                sub_key.clone(),
708                                                cancel_token.clone(),
709                                            ).await;
710
711                                            if !is_new {
712                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
713                                                continue;
714                                            }
715
716                                            if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
717                                                warn!(
718                                                    "Subscription rejected for client {} on {}: {}",
719                                                    client_id, view_id, err
720                                                );
721                                                if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
722                                                    send_socket_issue(client_id, &client_manager, &deny, false).await;
723                                                }
724                                                let _ = client_manager
725                                                    .remove_client_subscription(client_id, &sub_key)
726                                                    .await;
727                                                continue;
728                                            }
729
730                                            if let Some(ref m) = metrics {
731                                                if let Some(ref mk) = metering_key {
732                                                    m.record_subscription_created_with_metering(&view_id, mk);
733                                                } else {
734                                                    m.record_subscription_created(&view_id);
735                                                }
736                                            }
737                                            active_subscriptions.insert(sub_key, view_id.clone());
738                                            emit_usage_event(
739                                                &usage_emitter,
740                                                WebSocketUsageEvent::SubscriptionCreated {
741                                                    client_id: client_id.to_string(),
742                                                    deployment_id: usage_deployment_id.clone(),
743                                                    metering_key: usage_metering_key.clone(),
744                                                    subject: usage_subject.clone(),
745                                                    view_id,
746                                                },
747                                            );
748                                        }
749                                        ClientMessage::Unsubscribe(unsub) => {
750                                            let sub_key = unsub.sub_key();
751                                            let removed = client_manager
752                                                .remove_client_subscription(client_id, &sub_key)
753                                                .await;
754
755                                            if removed {
756                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
757                                                active_subscriptions.remove(&sub_key);
758                                                if let Some(ref m) = metrics {
759                                                    if let Some(ref mk) = metering_key {
760                                                        m.record_subscription_removed_with_metering(&unsub.view, mk);
761                                                    } else {
762                                                        m.record_subscription_removed(&unsub.view);
763                                                    }
764                                                }
765                                                emit_usage_event(
766                                                    &usage_emitter,
767                                                    WebSocketUsageEvent::SubscriptionRemoved {
768                                                        client_id: client_id.to_string(),
769                                                        deployment_id: usage_deployment_id.clone(),
770                                                        metering_key: usage_metering_key.clone(),
771                                                        subject: usage_subject.clone(),
772                                                        view_id: unsub.view.clone(),
773                                                    },
774                                                );
775                                            }
776                                        }
777                                        ClientMessage::Ping => {
778                                            debug!("Received ping from client {}", client_id);
779                                        }
780                                        ClientMessage::RefreshAuth(refresh_req) => {
781                                            debug!("Received refresh_auth from client {}", client_id);
782                                            handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
783                                        }
784                                    }
785                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
786                                    let view_id = subscription.view.clone();
787                                    let sub_key = subscription.sub_key();
788
789                                    if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
790                                        warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
791                                        send_socket_issue(client_id, &client_manager, &deny, false).await;
792                                        continue;
793                                    }
794
795                                    client_manager.update_subscription(client_id, subscription.clone());
796
797                                    let cancel_token = CancellationToken::new();
798                                    let is_new = client_manager.add_client_subscription(
799                                        client_id,
800                                        sub_key.clone(),
801                                        cancel_token.clone(),
802                                    ).await;
803
804                                    if !is_new {
805                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
806                                        continue;
807                                    }
808
809                                    if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
810                                        warn!(
811                                            "Subscription rejected for client {} on {}: {}",
812                                            client_id, view_id, err
813                                        );
814                                        if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
815                                            send_socket_issue(client_id, &client_manager, &deny, false).await;
816                                        }
817                                        let _ = client_manager
818                                            .remove_client_subscription(client_id, &sub_key)
819                                            .await;
820                                        continue;
821                                    }
822
823                                    if let Some(ref m) = metrics {
824                                        if let Some(ref mk) = metering_key {
825                                            m.record_subscription_created_with_metering(&view_id, mk);
826                                        } else {
827                                            m.record_subscription_created(&view_id);
828                                        }
829                                    }
830                                    active_subscriptions.insert(sub_key, view_id.clone());
831                                    emit_usage_event(
832                                        &usage_emitter,
833                                        WebSocketUsageEvent::SubscriptionCreated {
834                                            client_id: client_id.to_string(),
835                                            deployment_id: usage_deployment_id.clone(),
836                                            metering_key: usage_metering_key.clone(),
837                                            subject: usage_subject.clone(),
838                                            view_id,
839                                        },
840                                    );
841                                } else {
842                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
843                                }
844                            }
845                        }
846                    }
847                    Some(Err(e)) => {
848                        warn!("WebSocket error for client {}: {}", client_id, e);
849                        break;
850                    }
851                    None => {
852                        debug!("WebSocket stream ended for client {}", client_id);
853                        break;
854                    }
855                }
856            }
857        }
858    }
859
860    client_manager
861        .cancel_all_client_subscriptions(client_id)
862        .await;
863    client_manager.remove_client(client_id);
864    if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
865        rate_limiter.remove_client_buckets(client_id).await;
866    }
867
868    if let Some(ref m) = metrics {
869        let duration_secs = connection_start.elapsed().as_secs_f64();
870        if let Some(ref mk) = metering_key {
871            m.record_ws_disconnection_with_metering(duration_secs, mk);
872            for view_id in active_subscriptions.values() {
873                m.record_subscription_removed_with_metering(view_id, mk);
874            }
875        } else {
876            m.record_ws_disconnection(duration_secs);
877            for view_id in active_subscriptions.values() {
878                m.record_subscription_removed(view_id);
879            }
880        }
881    }
882
883    for view_id in active_subscriptions.values() {
884        emit_usage_event(
885            &usage_emitter,
886            WebSocketUsageEvent::SubscriptionRemoved {
887                client_id: client_id.to_string(),
888                deployment_id: usage_deployment_id.clone(),
889                metering_key: usage_metering_key.clone(),
890                subject: usage_subject.clone(),
891                view_id: view_id.clone(),
892            },
893        );
894    }
895
896    emit_usage_event(
897        &usage_emitter,
898        WebSocketUsageEvent::ConnectionClosed {
899            client_id: client_id.to_string(),
900            deployment_id: usage_deployment_id,
901            metering_key: usage_metering_key,
902            subject: usage_subject,
903            duration_secs: Some(connection_start.elapsed().as_secs_f64()),
904            subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
905        },
906    );
907
908    info!("Client {} disconnected", client_id);
909
910    Ok(())
911}
912
913#[cfg(not(feature = "otel"))]
914#[allow(clippy::too_many_arguments)]
915async fn handle_connection(
916    stream: TcpStream,
917    client_manager: ClientManager,
918    bus_manager: BusManager,
919    entity_cache: EntityCache,
920    view_index: Arc<ViewIndex>,
921    remote_addr: std::net::SocketAddr,
922    auth_plugin: Arc<dyn WebSocketAuthPlugin>,
923    usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
924) -> Result<()> {
925    let Some((ws_stream, auth_context)) = accept_authorized_connection(
926        stream,
927        remote_addr,
928        auth_plugin.clone(),
929        client_manager.clone(),
930    )
931    .await?
932    else {
933        return Ok(());
934    };
935
936    let client_id = Uuid::new_v4();
937    let auth_context_ref = Some(&auth_context);
938    let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
939        usage_identity(auth_context_ref);
940
941    let auth_context = Some(auth_context);
942
943    info!("WebSocket connection established for client {}", client_id);
944
945    emit_usage_event(
946        &usage_emitter,
947        WebSocketUsageEvent::ConnectionEstablished {
948            client_id: client_id.to_string(),
949            remote_addr: remote_addr.to_string(),
950            deployment_id: usage_deployment_id.clone(),
951            metering_key: usage_metering_key.clone(),
952            subject: usage_subject.clone(),
953            key_class: usage_key_class,
954        },
955    );
956
957    let (ws_sender, mut ws_receiver) = ws_stream.split();
958
959    // Add client with auth context and IP tracking
960    client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
961
962    let ctx = SubscriptionContext {
963        client_id,
964        client_manager: &client_manager,
965        bus_manager: &bus_manager,
966        entity_cache: &entity_cache,
967        view_index: &view_index,
968        usage_emitter: &usage_emitter,
969    };
970
971    let mut active_subscriptions: HashMap<String, String> = HashMap::new();
972
973    loop {
974        tokio::select! {
975            ws_msg = ws_receiver.next() => {
976                match ws_msg {
977                    Some(Ok(msg)) => {
978                        if msg.is_close() {
979                            info!("Client {} requested close", client_id);
980                            break;
981                        }
982
983                        client_manager.update_client_last_seen(client_id);
984
985                        if msg.is_text() {
986                            if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
987                                warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
988                                send_socket_issue(client_id, &client_manager, &deny, true).await;
989                                break;
990                            }
991
992                            if let Ok(text) = msg.to_text() {
993                                debug!("Received text message from client {}: {}", client_id, text);
994
995                                if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
996                                    match client_msg {
997                                        ClientMessage::Subscribe(subscription) => {
998                                            let view_id = subscription.view.clone();
999                                            if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1000                                                warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1001                                                send_socket_issue(client_id, &client_manager, &deny, false).await;
1002                                                continue;
1003                                            }
1004
1005                                            let sub_key = subscription.sub_key();
1006                                            client_manager.update_subscription(client_id, subscription.clone());
1007
1008                                            let cancel_token = CancellationToken::new();
1009                                            let is_new = client_manager.add_client_subscription(
1010                                                client_id,
1011                                                sub_key.clone(),
1012                                                cancel_token.clone(),
1013                                            ).await;
1014
1015                                            if !is_new {
1016                                                debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1017                                                continue;
1018                                            }
1019
1020                                            if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1021                                                warn!(
1022                                                    "Subscription rejected for client {} on {}: {}",
1023                                                    client_id,
1024                                                    sub_key,
1025                                                    err
1026                                                );
1027                                                if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1028                                                    send_socket_issue(client_id, &client_manager, &deny, false).await;
1029                                                }
1030                                                let _ = client_manager
1031                                                    .remove_client_subscription(client_id, &sub_key)
1032                                                    .await;
1033                                            } else {
1034                                                active_subscriptions.insert(sub_key, view_id.clone());
1035                                                emit_usage_event(
1036                                                    &usage_emitter,
1037                                                    WebSocketUsageEvent::SubscriptionCreated {
1038                                                        client_id: client_id.to_string(),
1039                                                        deployment_id: usage_deployment_id.clone(),
1040                                                        metering_key: usage_metering_key.clone(),
1041                                                        subject: usage_subject.clone(),
1042                                                        view_id,
1043                                                    },
1044                                                );
1045                                            }
1046                                        }
1047                                        ClientMessage::Unsubscribe(unsub) => {
1048                                            let sub_key = unsub.sub_key();
1049                                            let removed = client_manager
1050                                                .remove_client_subscription(client_id, &sub_key)
1051                                                .await;
1052
1053                                            if removed {
1054                                                info!("Client {} unsubscribed from {}", client_id, sub_key);
1055                                                active_subscriptions.remove(&sub_key);
1056                                                emit_usage_event(
1057                                                    &usage_emitter,
1058                                                    WebSocketUsageEvent::SubscriptionRemoved {
1059                                                        client_id: client_id.to_string(),
1060                                                        deployment_id: usage_deployment_id.clone(),
1061                                                        metering_key: usage_metering_key.clone(),
1062                                                        subject: usage_subject.clone(),
1063                                                        view_id: unsub.view.clone(),
1064                                                    },
1065                                                );
1066                                            }
1067                                        }
1068                                        ClientMessage::Ping => {
1069                                            debug!("Received ping from client {}", client_id);
1070                                        }
1071                                        ClientMessage::RefreshAuth(refresh_req) => {
1072                                            debug!("Received refresh_auth from client {}", client_id);
1073                                            handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
1074                                        }
1075                                    }
1076                                } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
1077                                    let view_id = subscription.view.clone();
1078                                    if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1079                                        warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1080                                        send_socket_issue(client_id, &client_manager, &deny, false).await;
1081                                        continue;
1082                                    }
1083
1084                                    let sub_key = subscription.sub_key();
1085                                    client_manager.update_subscription(client_id, subscription.clone());
1086
1087                                    let cancel_token = CancellationToken::new();
1088                                    let is_new = client_manager.add_client_subscription(
1089                                        client_id,
1090                                        sub_key.clone(),
1091                                        cancel_token.clone(),
1092                                    ).await;
1093
1094                                    if !is_new {
1095                                        debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1096                                        continue;
1097                                    }
1098
1099                                    if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1100                                        warn!(
1101                                            "Subscription rejected for client {} on {}: {}",
1102                                            client_id,
1103                                            sub_key,
1104                                            err
1105                                        );
1106                                        if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1107                                            send_socket_issue(client_id, &client_manager, &deny, false).await;
1108                                        }
1109                                        let _ = client_manager
1110                                            .remove_client_subscription(client_id, &sub_key)
1111                                            .await;
1112                                    } else {
1113                                        active_subscriptions.insert(sub_key, view_id.clone());
1114                                        emit_usage_event(
1115                                            &usage_emitter,
1116                                            WebSocketUsageEvent::SubscriptionCreated {
1117                                                client_id: client_id.to_string(),
1118                                                deployment_id: usage_deployment_id.clone(),
1119                                                metering_key: usage_metering_key.clone(),
1120                                                subject: usage_subject.clone(),
1121                                                view_id,
1122                                            },
1123                                        );
1124                                    }
1125                                } else {
1126                                    debug!("Received non-subscription message from client {}: {}", client_id, text);
1127                                }
1128                            }
1129                        }
1130                    }
1131                    Some(Err(e)) => {
1132                        warn!("WebSocket error for client {}: {}", client_id, e);
1133                        break;
1134                    }
1135                    None => {
1136                        debug!("WebSocket stream ended for client {}", client_id);
1137                        break;
1138                    }
1139                }
1140            }
1141        }
1142    }
1143
1144    client_manager
1145        .cancel_all_client_subscriptions(client_id)
1146        .await;
1147    client_manager.remove_client(client_id);
1148    if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
1149        rate_limiter.remove_client_buckets(client_id).await;
1150    }
1151
1152    for view_id in active_subscriptions.values() {
1153        emit_usage_event(
1154            &usage_emitter,
1155            WebSocketUsageEvent::SubscriptionRemoved {
1156                client_id: client_id.to_string(),
1157                deployment_id: usage_deployment_id.clone(),
1158                metering_key: usage_metering_key.clone(),
1159                subject: usage_subject.clone(),
1160                view_id: view_id.clone(),
1161            },
1162        );
1163    }
1164
1165    emit_usage_event(
1166        &usage_emitter,
1167        WebSocketUsageEvent::ConnectionClosed {
1168            client_id: client_id.to_string(),
1169            deployment_id: usage_deployment_id,
1170            metering_key: usage_metering_key,
1171            subject: usage_subject,
1172            duration_secs: None,
1173            subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
1174        },
1175    );
1176
1177    info!("Client {} disconnected", client_id);
1178
1179    Ok(())
1180}
1181
1182async fn send_snapshot_batches(
1183    client_id: Uuid,
1184    entities: &[SnapshotEntity],
1185    mode: Mode,
1186    view_id: &str,
1187    client_manager: &ClientManager,
1188    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1189    batch_config: &SnapshotBatchConfig,
1190    #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
1191) -> Result<()> {
1192    let total = entities.len();
1193    if total == 0 {
1194        return Ok(());
1195    }
1196
1197    let mut offset = 0;
1198    let mut batch_num = 0;
1199
1200    while offset < total {
1201        let batch_size = if batch_num == 0 {
1202            batch_config.initial_batch_size
1203        } else {
1204            batch_config.subsequent_batch_size
1205        };
1206
1207        let end = (offset + batch_size).min(total);
1208        let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
1209        let rows_in_batch = batch_data.len() as u32;
1210        let is_complete = end >= total;
1211
1212        let snapshot_frame = SnapshotFrame {
1213            mode,
1214            export: view_id.to_string(),
1215            op: "snapshot",
1216            data: batch_data,
1217            complete: is_complete,
1218        };
1219
1220        if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
1221            let payload = maybe_compress(&json_payload);
1222            let payload_bytes = payload.as_bytes().len() as u64;
1223            if client_manager
1224                .send_compressed_async(client_id, payload)
1225                .await
1226                .is_err()
1227            {
1228                return Err(anyhow::anyhow!("Failed to send snapshot batch"));
1229            }
1230            #[cfg(feature = "otel")]
1231            if let Some(m) = metrics {
1232                m.record_ws_message_sent();
1233            }
1234
1235            let auth_context = client_manager.get_auth_context(client_id);
1236            let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1237            emit_usage_event(
1238                usage_emitter,
1239                WebSocketUsageEvent::SnapshotSent {
1240                    client_id: client_id.to_string(),
1241                    deployment_id,
1242                    metering_key,
1243                    subject,
1244                    view_id: view_id.to_string(),
1245                    rows: rows_in_batch,
1246                    messages: 1,
1247                    bytes: payload_bytes,
1248                },
1249            );
1250        }
1251
1252        offset = end;
1253        batch_num += 1;
1254    }
1255
1256    debug!(
1257        "Sent {} snapshot batches ({} entities) for {} to client {}",
1258        batch_num, total, view_id, client_id
1259    );
1260
1261    Ok(())
1262}
1263
1264fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
1265    if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
1266        return Some(SortConfig {
1267            field: sort.field_path.clone(),
1268            order: match sort.order {
1269                crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
1270                crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
1271            },
1272        });
1273    }
1274
1275    if view_spec.mode == Mode::List {
1276        return Some(SortConfig {
1277            field: vec!["_seq".to_string()],
1278            order: SortOrder::Desc,
1279        });
1280    }
1281
1282    None
1283}
1284
1285fn send_subscribed_frame(
1286    client_id: Uuid,
1287    view_id: &str,
1288    view_spec: &ViewSpec,
1289    client_manager: &ClientManager,
1290    usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1291) -> Result<()> {
1292    let sort_config = extract_sort_config(view_spec);
1293    let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
1294
1295    let json_payload = serde_json::to_vec(&subscribed_frame)?;
1296    let payload_bytes = json_payload.len() as u64;
1297    let payload = Arc::new(Bytes::from(json_payload));
1298    client_manager
1299        .send_to_client(client_id, payload)
1300        .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))?;
1301
1302    let auth_context = client_manager.get_auth_context(client_id);
1303    let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1304    emit_usage_event(
1305        usage_emitter,
1306        WebSocketUsageEvent::UpdateSent {
1307            client_id: client_id.to_string(),
1308            deployment_id,
1309            metering_key,
1310            subject,
1311            view_id: view_id.to_string(),
1312            messages: 1,
1313            bytes: payload_bytes,
1314        },
1315    );
1316
1317    Ok(())
1318}
1319
1320fn enforce_snapshot_limit(ctx: &SubscriptionContext<'_>, rows: usize) -> Result<()> {
1321    let requested_rows = u32::try_from(rows).unwrap_or(u32::MAX);
1322    ctx.client_manager
1323        .check_snapshot_allowed(ctx.client_id, requested_rows)
1324        .map_err(|deny| anyhow::anyhow!(deny.reason))
1325}
1326
1327#[cfg(feature = "otel")]
1328async fn attach_client_to_bus(
1329    ctx: &SubscriptionContext<'_>,
1330    subscription: Subscription,
1331    cancel_token: CancellationToken,
1332) -> Result<()> {
1333    let view_id = &subscription.view;
1334
1335    let view_spec = match ctx.view_index.get_view(view_id) {
1336        Some(spec) => spec.clone(),
1337        None => {
1338            return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1339        }
1340    };
1341
1342    send_subscribed_frame(
1343        ctx.client_id,
1344        view_id,
1345        &view_spec,
1346        ctx.client_manager,
1347        ctx.usage_emitter,
1348    )?;
1349
1350    let is_derived_with_sort = view_spec.is_derived()
1351        && view_spec
1352            .pipeline
1353            .as_ref()
1354            .map(|p| p.sort.is_some())
1355            .unwrap_or(false);
1356
1357    if is_derived_with_sort {
1358        return attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token)
1359            .await;
1360    }
1361
1362    match view_spec.mode {
1363        Mode::State => {
1364            let key = subscription.key.as_deref().unwrap_or("");
1365
1366            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1367
1368            // Check if we should send snapshot (defaults to true for backward compatibility)
1369            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1370
1371            if should_send_snapshot {
1372                if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1373                    transform_large_u64_to_strings(&mut cached_entity);
1374                    let snapshot_entities = vec![SnapshotEntity {
1375                        key: key.to_string(),
1376                        data: cached_entity,
1377                    }];
1378                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1379                    let batch_config = ctx.entity_cache.snapshot_config();
1380                    send_snapshot_batches(
1381                        ctx.client_id,
1382                        &snapshot_entities,
1383                        view_spec.mode,
1384                        view_id,
1385                        ctx.client_manager,
1386                        ctx.usage_emitter,
1387                        &batch_config,
1388                        #[cfg(feature = "otel")]
1389                        ctx.metrics.as_ref(),
1390                    )
1391                    .await?;
1392                    rx.borrow_and_update();
1393                } else if !rx.borrow().is_empty() {
1394                    let data = rx.borrow_and_update().clone();
1395                    let data_len = data.len();
1396                    if ctx
1397                        .client_manager
1398                        .send_to_client(ctx.client_id, data)
1399                        .is_ok()
1400                    {
1401                        emit_update_sent_for_client(
1402                            ctx.usage_emitter,
1403                            ctx.client_manager,
1404                            ctx.client_id,
1405                            view_id,
1406                            data_len,
1407                        );
1408                    }
1409                }
1410            } else {
1411                info!(
1412                    "Client {} subscribed to {} without snapshot",
1413                    ctx.client_id, view_id
1414                );
1415                rx.borrow_and_update();
1416            }
1417
1418            let client_id = ctx.client_id;
1419            let client_mgr = ctx.client_manager.clone();
1420            let usage_emitter = ctx.usage_emitter.clone();
1421            let metrics_clone = ctx.metrics.clone();
1422            let view_id_clone = view_id.clone();
1423            let view_id_span = view_id.clone();
1424            let key_clone = key.to_string();
1425            tokio::spawn(
1426                async move {
1427                    loop {
1428                        tokio::select! {
1429                            _ = cancel_token.cancelled() => {
1430                                debug!("State subscription cancelled for client {}", client_id);
1431                                break;
1432                            }
1433                            result = rx.changed() => {
1434                                if result.is_err() {
1435                                    break;
1436                                }
1437                                let data = rx.borrow().clone();
1438                                let data_len = data.len();
1439                                if client_mgr.send_to_client(client_id, data).is_err() {
1440                                    break;
1441                                }
1442                                if let Some(ref m) = metrics_clone {
1443                                    m.record_ws_message_sent();
1444                                }
1445                                emit_update_sent_for_client(
1446                                    &usage_emitter,
1447                                    &client_mgr,
1448                                    client_id,
1449                                    &view_id_clone,
1450                                    data_len,
1451                                );
1452                            }
1453                        }
1454                    }
1455                }
1456                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1457            );
1458        }
1459        Mode::List | Mode::Append => {
1460            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1461
1462            // Check if we should send snapshot (defaults to true for backward compatibility)
1463            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1464
1465            if should_send_snapshot {
1466                // Determine which entities to send based on cursor
1467                let mut snapshots = if let Some(ref cursor) = subscription.after {
1468                    ctx.entity_cache
1469                        .get_after(view_id, cursor, subscription.snapshot_limit)
1470                        .await
1471                } else {
1472                    ctx.entity_cache.get_all(view_id).await
1473                };
1474
1475                // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache)
1476                if let Some(limit) = subscription.snapshot_limit {
1477                    if subscription.after.is_none() {
1478                        snapshots.sort_by(|a, b| {
1479                            let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1480                            let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1481                            cmp_seq(sb, sa) // descending: most-recent N
1482                        });
1483                        snapshots.truncate(limit);
1484                    }
1485                }
1486
1487                let snapshot_entities: Vec<SnapshotEntity> = snapshots
1488                    .into_iter()
1489                    .filter(|(key, _)| subscription.matches_key(key))
1490                    .map(|(key, mut data)| {
1491                        transform_large_u64_to_strings(&mut data);
1492                        SnapshotEntity { key, data }
1493                    })
1494                    .collect();
1495
1496                if !snapshot_entities.is_empty() {
1497                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1498                    let batch_config = ctx.entity_cache.snapshot_config();
1499                    send_snapshot_batches(
1500                        ctx.client_id,
1501                        &snapshot_entities,
1502                        view_spec.mode,
1503                        view_id,
1504                        ctx.client_manager,
1505                        ctx.usage_emitter,
1506                        &batch_config,
1507                        #[cfg(feature = "otel")]
1508                        ctx.metrics.as_ref(),
1509                    )
1510                    .await?;
1511                }
1512            } else {
1513                info!(
1514                    "Client {} subscribed to {} without snapshot",
1515                    ctx.client_id, view_id
1516                );
1517            }
1518
1519            let client_id = ctx.client_id;
1520            let client_mgr = ctx.client_manager.clone();
1521            let usage_emitter = ctx.usage_emitter.clone();
1522            let sub = subscription.clone();
1523            let metrics_clone = ctx.metrics.clone();
1524            let view_id_clone = view_id.clone();
1525            let view_id_span = view_id.clone();
1526            let mode = view_spec.mode;
1527            tokio::spawn(
1528                async move {
1529                    loop {
1530                        tokio::select! {
1531                            _ = cancel_token.cancelled() => {
1532                                debug!("List subscription cancelled for client {}", client_id);
1533                                break;
1534                            }
1535                            result = rx.recv() => {
1536                                match result {
1537                                    Ok(envelope) => {
1538                                        if sub.matches(&envelope.entity, &envelope.key) {
1539                                            if client_mgr
1540                                                .send_to_client(client_id, envelope.payload.clone())
1541                                                .is_err()
1542                                            {
1543                                                break;
1544                                            }
1545                                            if let Some(ref m) = metrics_clone {
1546                                                m.record_ws_message_sent();
1547                                            }
1548                                            emit_update_sent_for_client(
1549                                                &usage_emitter,
1550                                                &client_mgr,
1551                                                client_id,
1552                                                &view_id_clone,
1553                                                envelope.payload.len(),
1554                                            );
1555                                        }
1556                                    }
1557                                    Err(_) => break,
1558                                }
1559                            }
1560                        }
1561                    }
1562                }
1563                .instrument(
1564                    info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
1565                ),
1566            );
1567        }
1568    }
1569
1570    info!(
1571        "Client {} subscribed to {} (mode: {:?})",
1572        ctx.client_id, view_id, view_spec.mode
1573    );
1574
1575    Ok(())
1576}
1577
1578#[cfg(feature = "otel")]
1579async fn attach_derived_view_subscription_otel(
1580    ctx: &SubscriptionContext<'_>,
1581    subscription: Subscription,
1582    view_spec: ViewSpec,
1583    cancel_token: CancellationToken,
1584) -> Result<()> {
1585    let view_id = &subscription.view;
1586    let pipeline_limit = view_spec
1587        .pipeline
1588        .as_ref()
1589        .and_then(|p| p.limit)
1590        .unwrap_or(100);
1591    let take = subscription.take.unwrap_or(pipeline_limit);
1592    let skip = subscription.skip.unwrap_or(0);
1593    let is_single = take == 1;
1594
1595    let source_view_id = match &view_spec.source_view {
1596        Some(s) => s.clone(),
1597        None => {
1598            return Err(anyhow::anyhow!(
1599                "Derived view {} has no source_view",
1600                view_id
1601            ));
1602        }
1603    };
1604
1605    let sorted_caches = ctx.view_index.sorted_caches();
1606    let initial_window: Vec<(String, serde_json::Value)> = {
1607        let mut caches = sorted_caches.write().await;
1608        if let Some(cache) = caches.get_mut(view_id) {
1609            cache.get_window(skip, take)
1610        } else {
1611            warn!("No sorted cache for derived view {}", view_id);
1612            vec![]
1613        }
1614    };
1615
1616    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1617
1618    if !initial_window.is_empty() {
1619        let snapshot_entities: Vec<SnapshotEntity> = initial_window
1620            .into_iter()
1621            .map(|(key, mut data)| {
1622                transform_large_u64_to_strings(&mut data);
1623                SnapshotEntity { key, data }
1624            })
1625            .collect();
1626
1627        enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1628        let batch_config = ctx.entity_cache.snapshot_config();
1629        send_snapshot_batches(
1630            ctx.client_id,
1631            &snapshot_entities,
1632            view_spec.mode,
1633            view_id,
1634            ctx.client_manager,
1635            ctx.usage_emitter,
1636            &batch_config,
1637            ctx.metrics.as_ref(),
1638        )
1639        .await?;
1640    }
1641
1642    let mut rx = ctx
1643        .bus_manager
1644        .get_or_create_list_bus(&source_view_id)
1645        .await;
1646
1647    let client_id = ctx.client_id;
1648    let client_mgr = ctx.client_manager.clone();
1649    let usage_emitter = ctx.usage_emitter.clone();
1650    let view_id_clone = view_id.clone();
1651    let view_id_span = view_id.clone();
1652    let sorted_caches_clone = sorted_caches;
1653    let metrics_clone = ctx.metrics.clone();
1654    let frame_mode = view_spec.mode;
1655
1656    tokio::spawn(
1657        async move {
1658            let mut current_window_keys = initial_keys;
1659
1660            loop {
1661                tokio::select! {
1662                    _ = cancel_token.cancelled() => {
1663                        debug!("Derived view subscription cancelled for client {}", client_id);
1664                        break;
1665                    }
1666                    result = rx.recv() => {
1667                        match result {
1668                            Ok(_envelope) => {
1669                                let new_window: Vec<(String, serde_json::Value)> = {
1670                                    let mut caches = sorted_caches_clone.write().await;
1671                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
1672                                        cache.get_window(skip, take)
1673                                    } else {
1674                                        continue;
1675                                    }
1676                                };
1677
1678                                let new_keys: HashSet<String> =
1679                                    new_window.iter().map(|(k, _)| k.clone()).collect();
1680
1681                                if is_single {
1682                                    if let Some((new_key, data)) = new_window.first() {
1683                                        for old_key in current_window_keys.difference(&new_keys) {
1684                                            let delete_frame = Frame {
1685                                            seq: None,
1686                                                mode: frame_mode,
1687                                                export: view_id_clone.clone(),
1688                                                op: "delete",
1689                                                key: old_key.clone(),
1690                                                data: serde_json::Value::Null,
1691                                                append: vec![],
1692                                            };
1693                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
1694                                                let payload = Arc::new(Bytes::from(json));
1695                                                let payload_len = payload.len();
1696                                                if client_mgr.send_to_client(client_id, payload).is_err() {
1697                                                    return;
1698                                                }
1699                                                if let Some(ref m) = metrics_clone {
1700                                                    m.record_ws_message_sent();
1701                                                }
1702                                                emit_update_sent_for_client(
1703                                                    &usage_emitter,
1704                                                    &client_mgr,
1705                                                    client_id,
1706                                                    &view_id_clone,
1707                                                    payload_len,
1708                                                );
1709                                            }
1710                                        }
1711
1712                                        let mut transformed_data = data.clone();
1713                                        transform_large_u64_to_strings(&mut transformed_data);
1714                                        let frame = Frame {
1715                                            seq: None,
1716                                            mode: frame_mode,
1717                                            export: view_id_clone.clone(),
1718                                            op: "upsert",
1719                                            key: new_key.clone(),
1720                                            data: transformed_data,
1721                                            append: vec![],
1722                                        };
1723
1724                                        if let Ok(json) = serde_json::to_vec(&frame) {
1725                                            let payload = Arc::new(Bytes::from(json));
1726                                            let payload_len = payload.len();
1727                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1728                                                return;
1729                                            }
1730                                            if let Some(ref m) = metrics_clone {
1731                                                m.record_ws_message_sent();
1732                                            }
1733                                            emit_update_sent_for_client(
1734                                                &usage_emitter,
1735                                                &client_mgr,
1736                                                client_id,
1737                                                &view_id_clone,
1738                                                payload_len,
1739                                            );
1740                                        }
1741                                    }
1742                                } else {
1743                                    for key in current_window_keys.difference(&new_keys) {
1744                                        let delete_frame = Frame {
1745                                            seq: None,
1746                                            mode: frame_mode,
1747                                            export: view_id_clone.clone(),
1748                                            op: "delete",
1749                                            key: key.clone(),
1750                                            data: serde_json::Value::Null,
1751                                            append: vec![],
1752                                        };
1753                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
1754                                            let payload = Arc::new(Bytes::from(json));
1755                                            let payload_len = payload.len();
1756                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1757                                                return;
1758                                            }
1759                                            if let Some(ref m) = metrics_clone {
1760                                                m.record_ws_message_sent();
1761                                            }
1762                                            emit_update_sent_for_client(
1763                                                &usage_emitter,
1764                                                &client_mgr,
1765                                                client_id,
1766                                                &view_id_clone,
1767                                                payload_len,
1768                                            );
1769                                        }
1770                                    }
1771
1772                                    for (key, data) in &new_window {
1773                                        let mut transformed_data = data.clone();
1774                                        transform_large_u64_to_strings(&mut transformed_data);
1775                                        let frame = Frame {
1776                                            seq: None,
1777                                            mode: frame_mode,
1778                                            export: view_id_clone.clone(),
1779                                            op: "upsert",
1780                                            key: key.clone(),
1781                                            data: transformed_data,
1782                                            append: vec![],
1783                                        };
1784                                        if let Ok(json) = serde_json::to_vec(&frame) {
1785                                            let payload = Arc::new(Bytes::from(json));
1786                                            let payload_len = payload.len();
1787                                            if client_mgr.send_to_client(client_id, payload).is_err() {
1788                                                return;
1789                                            }
1790                                            if let Some(ref m) = metrics_clone {
1791                                                m.record_ws_message_sent();
1792                                            }
1793                                            emit_update_sent_for_client(
1794                                                &usage_emitter,
1795                                                &client_mgr,
1796                                                client_id,
1797                                                &view_id_clone,
1798                                                payload_len,
1799                                            );
1800                                        }
1801                                    }
1802                                }
1803
1804                                current_window_keys = new_keys;
1805                            }
1806                            Err(_) => break,
1807                        }
1808                    }
1809                }
1810            }
1811        }
1812        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1813    );
1814
1815    info!(
1816        "Client {} subscribed to derived view {} (take={}, skip={})",
1817        ctx.client_id, view_id, take, skip
1818    );
1819
1820    Ok(())
1821}
1822
1823#[cfg(not(feature = "otel"))]
1824async fn attach_client_to_bus(
1825    ctx: &SubscriptionContext<'_>,
1826    subscription: Subscription,
1827    cancel_token: CancellationToken,
1828) -> Result<()> {
1829    let view_id = &subscription.view;
1830
1831    let view_spec = match ctx.view_index.get_view(view_id) {
1832        Some(spec) => spec.clone(),
1833        None => {
1834            return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1835        }
1836    };
1837
1838    send_subscribed_frame(
1839        ctx.client_id,
1840        view_id,
1841        &view_spec,
1842        ctx.client_manager,
1843        ctx.usage_emitter,
1844    )?;
1845
1846    let is_derived_with_sort = view_spec.is_derived()
1847        && view_spec
1848            .pipeline
1849            .as_ref()
1850            .map(|p| p.sort.is_some())
1851            .unwrap_or(false);
1852
1853    if is_derived_with_sort {
1854        return attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
1855    }
1856
1857    match view_spec.mode {
1858        Mode::State => {
1859            let key = subscription.key.as_deref().unwrap_or("");
1860
1861            let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1862
1863            // Check if we should send snapshot (defaults to true for backward compatibility)
1864            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1865
1866            if should_send_snapshot {
1867                if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1868                    transform_large_u64_to_strings(&mut cached_entity);
1869                    let snapshot_entities = vec![SnapshotEntity {
1870                        key: key.to_string(),
1871                        data: cached_entity,
1872                    }];
1873                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1874                    let batch_config = ctx.entity_cache.snapshot_config();
1875                    send_snapshot_batches(
1876                        ctx.client_id,
1877                        &snapshot_entities,
1878                        view_spec.mode,
1879                        view_id,
1880                        ctx.client_manager,
1881                        ctx.usage_emitter,
1882                        &batch_config,
1883                    )
1884                    .await?;
1885                    rx.borrow_and_update();
1886                } else if !rx.borrow().is_empty() {
1887                    let data = rx.borrow_and_update().clone();
1888                    let data_len = data.len();
1889                    if ctx
1890                        .client_manager
1891                        .send_to_client(ctx.client_id, data)
1892                        .is_ok()
1893                    {
1894                        emit_update_sent_for_client(
1895                            ctx.usage_emitter,
1896                            ctx.client_manager,
1897                            ctx.client_id,
1898                            view_id,
1899                            data_len,
1900                        );
1901                    }
1902                }
1903            } else {
1904                info!(
1905                    "Client {} subscribed to {} without snapshot",
1906                    ctx.client_id, view_id
1907                );
1908                rx.borrow_and_update();
1909            }
1910
1911            let client_id = ctx.client_id;
1912            let client_mgr = ctx.client_manager.clone();
1913            let usage_emitter = ctx.usage_emitter.clone();
1914            let view_id_clone = view_id.clone();
1915            let view_id_span = view_id.clone();
1916            let key_clone = key.to_string();
1917            tokio::spawn(
1918                async move {
1919                    loop {
1920                        tokio::select! {
1921                            _ = cancel_token.cancelled() => {
1922                                debug!("State subscription cancelled for client {}", client_id);
1923                                break;
1924                            }
1925                            result = rx.changed() => {
1926                                if result.is_err() {
1927                                    break;
1928                                }
1929                                let data = rx.borrow().clone();
1930                                let data_len = data.len();
1931                                if client_mgr.send_to_client(client_id, data).is_err() {
1932                                    break;
1933                                }
1934                                emit_update_sent_for_client(
1935                                    &usage_emitter,
1936                                    &client_mgr,
1937                                    client_id,
1938                                    &view_id_clone,
1939                                    data_len,
1940                                );
1941                            }
1942                        }
1943                    }
1944                }
1945                .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1946            );
1947        }
1948        Mode::List | Mode::Append => {
1949            let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1950
1951            // Check if we should send snapshot (defaults to true for backward compatibility)
1952            let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1953
1954            if should_send_snapshot {
1955                // Determine which entities to send based on cursor
1956                let mut snapshots = if let Some(ref cursor) = subscription.after {
1957                    ctx.entity_cache
1958                        .get_after(view_id, cursor, subscription.snapshot_limit)
1959                        .await
1960                } else {
1961                    ctx.entity_cache.get_all(view_id).await
1962                };
1963
1964                // Sort by _seq descending only when there is no cursor (to get most-recent N from full cache)
1965                if let Some(limit) = subscription.snapshot_limit {
1966                    if subscription.after.is_none() {
1967                        snapshots.sort_by(|a, b| {
1968                            let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1969                            let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1970                            cmp_seq(sb, sa) // descending: most-recent N
1971                        });
1972                        snapshots.truncate(limit);
1973                    }
1974                }
1975
1976                let snapshot_entities: Vec<SnapshotEntity> = snapshots
1977                    .into_iter()
1978                    .filter(|(key, _)| subscription.matches_key(key))
1979                    .map(|(key, mut data)| {
1980                        transform_large_u64_to_strings(&mut data);
1981                        SnapshotEntity { key, data }
1982                    })
1983                    .collect();
1984
1985                if !snapshot_entities.is_empty() {
1986                    enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1987                    let batch_config = ctx.entity_cache.snapshot_config();
1988                    send_snapshot_batches(
1989                        ctx.client_id,
1990                        &snapshot_entities,
1991                        view_spec.mode,
1992                        view_id,
1993                        ctx.client_manager,
1994                        ctx.usage_emitter,
1995                        &batch_config,
1996                    )
1997                    .await?;
1998                }
1999            } else {
2000                info!(
2001                    "Client {} subscribed to {} without snapshot",
2002                    ctx.client_id, view_id
2003                );
2004            }
2005
2006            let client_id = ctx.client_id;
2007            let client_mgr = ctx.client_manager.clone();
2008            let usage_emitter = ctx.usage_emitter.clone();
2009            let sub = subscription.clone();
2010            let view_id_clone = view_id.clone();
2011            let view_id_span = view_id.clone();
2012            let mode = view_spec.mode;
2013            tokio::spawn(
2014                async move {
2015                    loop {
2016                        tokio::select! {
2017                            _ = cancel_token.cancelled() => {
2018                                debug!("List subscription cancelled for client {}", client_id);
2019                                break;
2020                            }
2021                            result = rx.recv() => {
2022                                match result {
2023                                    Ok(envelope) => {
2024                                        if sub.matches(&envelope.entity, &envelope.key)
2025                                            && client_mgr
2026                                                .send_to_client(client_id, envelope.payload.clone())
2027                                                .is_err()
2028                                        {
2029                                            break;
2030                                        } else if sub.matches(&envelope.entity, &envelope.key) {
2031                                            emit_update_sent_for_client(
2032                                                &usage_emitter,
2033                                                &client_mgr,
2034                                                client_id,
2035                                                &view_id_clone,
2036                                                envelope.payload.len(),
2037                                            );
2038                                        }
2039                                    }
2040                                    Err(_) => break,
2041                                }
2042                            }
2043                        }
2044                    }
2045                }
2046                .instrument(
2047                    info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
2048                ),
2049            );
2050        }
2051    }
2052
2053    info!(
2054        "Client {} subscribed to {} (mode: {:?})",
2055        ctx.client_id, view_id, view_spec.mode
2056    );
2057
2058    Ok(())
2059}
2060
2061#[cfg(not(feature = "otel"))]
2062async fn attach_derived_view_subscription(
2063    ctx: &SubscriptionContext<'_>,
2064    subscription: Subscription,
2065    view_spec: ViewSpec,
2066    cancel_token: CancellationToken,
2067) -> Result<()> {
2068    let view_id = &subscription.view;
2069    let pipeline_limit = view_spec
2070        .pipeline
2071        .as_ref()
2072        .and_then(|p| p.limit)
2073        .unwrap_or(100);
2074    let take = subscription.take.unwrap_or(pipeline_limit);
2075    let skip = subscription.skip.unwrap_or(0);
2076    let is_single = take == 1;
2077
2078    let source_view_id = match &view_spec.source_view {
2079        Some(s) => s.clone(),
2080        None => {
2081            return Err(anyhow::anyhow!(
2082                "Derived view {} has no source_view",
2083                view_id
2084            ));
2085        }
2086    };
2087
2088    let sorted_caches = ctx.view_index.sorted_caches();
2089    let initial_window: Vec<(String, serde_json::Value)> = {
2090        let mut caches = sorted_caches.write().await;
2091        if let Some(cache) = caches.get_mut(view_id) {
2092            cache.get_window(skip, take)
2093        } else {
2094            warn!("No sorted cache for derived view {}", view_id);
2095            vec![]
2096        }
2097    };
2098
2099    let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
2100
2101    if !initial_window.is_empty() {
2102        let snapshot_entities: Vec<SnapshotEntity> = initial_window
2103            .into_iter()
2104            .map(|(key, mut data)| {
2105                transform_large_u64_to_strings(&mut data);
2106                SnapshotEntity { key, data }
2107            })
2108            .collect();
2109
2110        enforce_snapshot_limit(ctx, snapshot_entities.len())?;
2111        let batch_config = ctx.entity_cache.snapshot_config();
2112        send_snapshot_batches(
2113            ctx.client_id,
2114            &snapshot_entities,
2115            view_spec.mode,
2116            view_id,
2117            ctx.client_manager,
2118            ctx.usage_emitter,
2119            &batch_config,
2120        )
2121        .await?;
2122    }
2123
2124    let mut rx = ctx
2125        .bus_manager
2126        .get_or_create_list_bus(&source_view_id)
2127        .await;
2128
2129    let client_id = ctx.client_id;
2130    let client_mgr = ctx.client_manager.clone();
2131    let usage_emitter = ctx.usage_emitter.clone();
2132    let view_id_clone = view_id.clone();
2133    let view_id_span = view_id.clone();
2134    let sorted_caches_clone = sorted_caches;
2135    let frame_mode = view_spec.mode;
2136
2137    tokio::spawn(
2138        async move {
2139            let mut current_window_keys = initial_keys;
2140
2141            loop {
2142                tokio::select! {
2143                    _ = cancel_token.cancelled() => {
2144                        debug!("Derived view subscription cancelled for client {}", client_id);
2145                        break;
2146                    }
2147                    result = rx.recv() => {
2148                        match result {
2149                            Ok(_envelope) => {
2150                                let new_window: Vec<(String, serde_json::Value)> = {
2151                                    let mut caches = sorted_caches_clone.write().await;
2152                                    if let Some(cache) = caches.get_mut(&view_id_clone) {
2153                                        cache.get_window(skip, take)
2154                                    } else {
2155                                        continue;
2156                                    }
2157                                };
2158
2159                                let new_keys: HashSet<String> =
2160                                    new_window.iter().map(|(k, _)| k.clone()).collect();
2161
2162                                if is_single {
2163                                    if let Some((new_key, data)) = new_window.first() {
2164                                        for old_key in current_window_keys.difference(&new_keys) {
2165                                            let delete_frame = Frame {
2166                                            seq: None,
2167                                                mode: frame_mode,
2168                                                export: view_id_clone.clone(),
2169                                                op: "delete",
2170                                                key: old_key.clone(),
2171                                                data: serde_json::Value::Null,
2172                                                append: vec![],
2173                                            };
2174                                            if let Ok(json) = serde_json::to_vec(&delete_frame) {
2175                                                let payload = Arc::new(Bytes::from(json));
2176                                                let payload_len = payload.len();
2177                                                if client_mgr.send_to_client(client_id, payload).is_err() {
2178                                                    return;
2179                                                }
2180                                                emit_update_sent_for_client(
2181                                                    &usage_emitter,
2182                                                    &client_mgr,
2183                                                    client_id,
2184                                                    &view_id_clone,
2185                                                    payload_len,
2186                                                );
2187                                            }
2188                                        }
2189
2190                                        let mut transformed_data = data.clone();
2191                                        transform_large_u64_to_strings(&mut transformed_data);
2192                                        let frame = Frame {
2193                                            seq: None,
2194                                            mode: frame_mode,
2195                                            export: view_id_clone.clone(),
2196                                            op: "upsert",
2197                                            key: new_key.clone(),
2198                                            data: transformed_data,
2199                                            append: vec![],
2200                                        };
2201                                        if let Ok(json) = serde_json::to_vec(&frame) {
2202                                            let payload = Arc::new(Bytes::from(json));
2203                                            let payload_len = payload.len();
2204                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2205                                                return;
2206                                            }
2207                                            emit_update_sent_for_client(
2208                                                &usage_emitter,
2209                                                &client_mgr,
2210                                                client_id,
2211                                                &view_id_clone,
2212                                                payload_len,
2213                                            );
2214                                        }
2215                                    }
2216                                } else {
2217                                    for key in current_window_keys.difference(&new_keys) {
2218                                        let delete_frame = Frame {
2219                                            seq: None,
2220                                            mode: frame_mode,
2221                                            export: view_id_clone.clone(),
2222                                            op: "delete",
2223                                            key: key.clone(),
2224                                            data: serde_json::Value::Null,
2225                                            append: vec![],
2226                                        };
2227                                        if let Ok(json) = serde_json::to_vec(&delete_frame) {
2228                                            let payload = Arc::new(Bytes::from(json));
2229                                            let payload_len = payload.len();
2230                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2231                                                return;
2232                                            }
2233                                            emit_update_sent_for_client(
2234                                                &usage_emitter,
2235                                                &client_mgr,
2236                                                client_id,
2237                                                &view_id_clone,
2238                                                payload_len,
2239                                            );
2240                                        }
2241                                    }
2242
2243                                    for (key, data) in &new_window {
2244                                        let mut transformed_data = data.clone();
2245                                        transform_large_u64_to_strings(&mut transformed_data);
2246                                        let frame = Frame {
2247                                            seq: None,
2248                                            mode: frame_mode,
2249                                            export: view_id_clone.clone(),
2250                                            op: "upsert",
2251                                            key: key.clone(),
2252                                            data: transformed_data,
2253                                            append: vec![],
2254                                        };
2255                                        if let Ok(json) = serde_json::to_vec(&frame) {
2256                                            let payload = Arc::new(Bytes::from(json));
2257                                            let payload_len = payload.len();
2258                                            if client_mgr.send_to_client(client_id, payload).is_err() {
2259                                                return;
2260                                            }
2261                                            emit_update_sent_for_client(
2262                                                &usage_emitter,
2263                                                &client_mgr,
2264                                                client_id,
2265                                                &view_id_clone,
2266                                                payload_len,
2267                                            );
2268                                        }
2269                                    }
2270                                }
2271
2272                                current_window_keys = new_keys;
2273                            }
2274                            Err(_) => break,
2275                        }
2276                    }
2277                }
2278            }
2279        }
2280        .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
2281    );
2282
2283    info!(
2284        "Client {} subscribed to derived view {} (take={}, skip={})",
2285        ctx.client_id, view_id, take, skip
2286    );
2287
2288    Ok(())
2289}