1use crate::bus::BusManager;
2use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::auth::{
6 AuthContext, AuthDecision, AuthDeny, ConnectionAuthRequest, WebSocketAuthPlugin,
7};
8use crate::websocket::client_manager::{ClientManager, RateLimitConfig};
9use crate::websocket::frame::{
10 transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
11 SortOrder, SubscribedFrame,
12};
13use crate::websocket::subscription::{
14 ClientMessage, RefreshAuthRequest, RefreshAuthResponse, SocketIssueMessage, Subscription,
15};
16use crate::websocket::usage::{WebSocketUsageEmitter, WebSocketUsageEvent};
17use anyhow::Result;
18use bytes::Bytes;
19use futures_util::StreamExt;
20use std::collections::{HashMap, HashSet};
21use std::net::SocketAddr;
22use std::sync::Arc;
23#[cfg(feature = "otel")]
24use std::time::Instant;
25
26use tokio::net::{TcpListener, TcpStream};
27use tokio_tungstenite::{
28 accept_hdr_async,
29 tungstenite::{
30 handshake::server::{ErrorResponse as HandshakeErrorResponse, Request, Response},
31 http::{header::CONTENT_TYPE, StatusCode},
32 Error as WsError,
33 },
34};
35
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, info_span, warn, Instrument};
38use uuid::Uuid;
39
40#[cfg(feature = "otel")]
41use crate::metrics::Metrics;
42
43async fn handle_refresh_auth(
45 client_id: Uuid,
46 refresh_req: &RefreshAuthRequest,
47 client_manager: &ClientManager,
48 auth_plugin: &Arc<dyn WebSocketAuthPlugin>,
49) {
50 let refresh_result: Result<AuthContext, String> = {
53 if let Some(signed_plugin) = auth_plugin
55 .as_any()
56 .downcast_ref::<crate::websocket::auth::SignedSessionAuthPlugin>(
57 ) {
58 signed_plugin
59 .verify_refresh_token(&refresh_req.token)
60 .await
61 .map_err(|e| e.reason)
62 } else {
63 Err("In-band auth refresh not supported with current auth plugin".to_string())
64 }
65 };
66
67 match refresh_result {
68 Ok(new_context) => {
69 let expires_at = new_context.expires_at;
70 if client_manager.update_client_auth(client_id, new_context) {
71 info!(
72 "Client {} refreshed auth successfully, expires at {}",
73 client_id, expires_at
74 );
75
76 let response = RefreshAuthResponse {
78 success: true,
79 error: None,
80 expires_at: Some(expires_at),
81 };
82 if let Ok(json) = serde_json::to_string(&response) {
83 let _ = client_manager.send_text_to_client(client_id, json).await;
84 }
85 } else {
86 warn!("Client {} not found when refreshing auth", client_id);
87
88 let response = RefreshAuthResponse {
90 success: false,
91 error: Some("client-not-found".to_string()),
92 expires_at: None,
93 };
94 if let Ok(json) = serde_json::to_string(&response) {
95 let _ = client_manager.send_text_to_client(client_id, json).await;
96 }
97 }
98 }
99 Err(err) => {
100 warn!("Client {} auth refresh failed: {}", client_id, err);
101
102 let error_code = if err.contains("expired") {
104 "token-expired"
105 } else if err.contains("signature") {
106 "token-invalid-signature"
107 } else if err.contains("issuer") {
108 "token-invalid-issuer"
109 } else if err.contains("audience") {
110 "token-invalid-audience"
111 } else {
112 "token-invalid"
113 };
114
115 let response = RefreshAuthResponse {
116 success: false,
117 error: Some(error_code.to_string()),
118 expires_at: None,
119 };
120 if let Ok(json) = serde_json::to_string(&response) {
121 let _ = client_manager.send_text_to_client(client_id, json).await;
122 }
123 }
124 }
125}
126
127async fn send_socket_issue(
128 client_id: Uuid,
129 client_manager: &ClientManager,
130 deny: &AuthDeny,
131 fatal: bool,
132) {
133 let message = SocketIssueMessage::from_auth_deny(deny, fatal);
134 match serde_json::to_string(&message) {
135 Ok(json) => {
136 let _ = client_manager.send_text_to_client(client_id, json).await;
137 }
138 Err(error) => {
139 warn!(error = %error, client_id = %client_id, "failed to serialize socket issue message");
140 }
141 }
142}
143
144fn auth_deny_from_subscription_error(reason: &str) -> Option<AuthDeny> {
145 if reason.starts_with("Snapshot limit exceeded:") {
146 Some(AuthDeny::new(
147 crate::websocket::auth::AuthErrorCode::SnapshotLimitExceeded,
148 reason,
149 ))
150 } else {
151 None
152 }
153}
154
155fn key_class_label(key_class: arete_auth::KeyClass) -> &'static str {
156 match key_class {
157 arete_auth::KeyClass::Secret => "secret",
158 arete_auth::KeyClass::Publishable => "publishable",
159 }
160}
161
162fn emit_usage_event(
163 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
164 event: WebSocketUsageEvent,
165) {
166 if let Some(emitter) = usage_emitter.clone() {
167 tokio::spawn(async move {
168 emitter.emit(event).await;
169 });
170 }
171}
172
173fn usage_identity(
174 auth_context: Option<&AuthContext>,
175) -> (
176 Option<String>,
177 Option<String>,
178 Option<String>,
179 Option<String>,
180) {
181 match auth_context {
182 Some(ctx) => (
183 Some(ctx.metering_key.clone()),
184 Some(ctx.subject.clone()),
185 Some(key_class_label(ctx.key_class).to_string()),
186 ctx.deployment_id.clone(),
187 ),
188 None => (None, None, None, None),
189 }
190}
191
192fn emit_update_sent_for_client(
193 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
194 client_manager: &ClientManager,
195 client_id: Uuid,
196 view_id: &str,
197 bytes: usize,
198) {
199 let auth_context = client_manager.get_auth_context(client_id);
200 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
201 emit_usage_event(
202 usage_emitter,
203 WebSocketUsageEvent::UpdateSent {
204 client_id: client_id.to_string(),
205 deployment_id,
206 metering_key,
207 subject,
208 view_id: view_id.to_string(),
209 messages: 1,
210 bytes: bytes as u64,
211 },
212 );
213}
214
215struct SubscriptionContext<'a> {
216 client_id: Uuid,
217 client_manager: &'a ClientManager,
218 bus_manager: &'a BusManager,
219 entity_cache: &'a EntityCache,
220 view_index: &'a ViewIndex,
221 usage_emitter: &'a Option<Arc<dyn WebSocketUsageEmitter>>,
222 #[cfg(feature = "otel")]
223 metrics: Option<Arc<Metrics>>,
224}
225
226pub struct WebSocketServer {
227 bind_addr: SocketAddr,
228 client_manager: ClientManager,
229 bus_manager: BusManager,
230 entity_cache: EntityCache,
231 view_index: Arc<ViewIndex>,
232 max_clients: usize,
233 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
234 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
235 rate_limit_config: Option<RateLimitConfig>,
236 #[cfg(feature = "otel")]
237 metrics: Option<Arc<Metrics>>,
238}
239
240impl WebSocketServer {
241 #[cfg(feature = "otel")]
242 pub fn new(
243 bind_addr: SocketAddr,
244 bus_manager: BusManager,
245 entity_cache: EntityCache,
246 view_index: Arc<ViewIndex>,
247 metrics: Option<Arc<Metrics>>,
248 ) -> Self {
249 Self {
250 bind_addr,
251 client_manager: ClientManager::new(),
252 bus_manager,
253 entity_cache,
254 view_index,
255 max_clients: 10000,
256 auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
257 usage_emitter: None,
258 rate_limit_config: None,
259 metrics,
260 }
261 }
262
263 #[cfg(not(feature = "otel"))]
264 pub fn new(
265 bind_addr: SocketAddr,
266 bus_manager: BusManager,
267 entity_cache: EntityCache,
268 view_index: Arc<ViewIndex>,
269 ) -> Self {
270 Self {
271 bind_addr,
272 client_manager: ClientManager::new(),
273 bus_manager,
274 entity_cache,
275 view_index,
276 max_clients: 10000,
277 auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
278 usage_emitter: None,
279 rate_limit_config: None,
280 }
281 }
282
283 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
284 self.max_clients = max_clients;
285 self
286 }
287
288 pub fn with_auth_plugin(mut self, auth_plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
289 self.auth_plugin = auth_plugin;
290 self
291 }
292
293 pub fn with_usage_emitter(mut self, usage_emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
294 self.usage_emitter = Some(usage_emitter);
295 self
296 }
297
298 pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
304 self.rate_limit_config = Some(config);
305 self
306 }
307
308 pub async fn start(self) -> Result<()> {
309 info!(
310 "Starting WebSocket server on {} (max_clients: {})",
311 self.bind_addr, self.max_clients
312 );
313
314 let listener = TcpListener::bind(&self.bind_addr).await?;
315 info!("WebSocket server listening on {}", self.bind_addr);
316
317 let client_manager = if let Some(config) = self.rate_limit_config {
319 ClientManager::with_config(config)
320 } else {
321 self.client_manager
322 };
323
324 client_manager.start_cleanup_task();
325
326 loop {
327 match listener.accept().await {
328 Ok((stream, addr)) => {
329 let client_count = client_manager.client_count();
330 if client_count >= self.max_clients {
331 warn!(
332 "Rejecting connection from {} - max clients ({}) reached",
333 addr, self.max_clients
334 );
335 drop(stream);
336 continue;
337 }
338
339 info!(
340 "New WebSocket connection from {} ({}/{} clients)",
341 addr,
342 client_count + 1,
343 self.max_clients
344 );
345 let client_manager = client_manager.clone();
346 let bus_manager = self.bus_manager.clone();
347 let entity_cache = self.entity_cache.clone();
348 let view_index = self.view_index.clone();
349 #[cfg(feature = "otel")]
350 let metrics = self.metrics.clone();
351
352 let auth_plugin = self.auth_plugin.clone();
353 let usage_emitter = self.usage_emitter.clone();
354
355 tokio::spawn(
356 async move {
357 #[cfg(feature = "otel")]
358 let result = handle_connection(
359 stream,
360 client_manager,
361 bus_manager,
362 entity_cache,
363 view_index,
364 addr,
365 auth_plugin,
366 usage_emitter,
367 metrics,
368 )
369 .await;
370 #[cfg(not(feature = "otel"))]
371 let result = handle_connection(
372 stream,
373 client_manager,
374 bus_manager,
375 entity_cache,
376 view_index,
377 addr,
378 auth_plugin,
379 usage_emitter,
380 )
381 .await;
382
383 if let Err(e) = result {
384 error!("WebSocket connection error: {}", e);
385 }
386 }
387 .instrument(info_span!("ws.connection", %addr)),
388 );
389 }
390 Err(e) => {
391 error!("Failed to accept connection: {}", e);
392 }
393 }
394 }
395 }
396}
397
398#[derive(Debug, Clone)]
399struct HandshakeReject {
400 status: StatusCode,
401 body: crate::websocket::auth::ErrorResponse,
402 error_code: String,
403 retry_after_secs: Option<u64>,
404}
405
406impl HandshakeReject {
407 fn from_deny(deny: &AuthDeny) -> Self {
408 let retry_after_secs = match deny.retry_policy {
409 crate::websocket::auth::RetryPolicy::RetryAfter(duration) => Some(duration.as_secs()),
410 _ => None,
411 };
412
413 Self {
414 status: StatusCode::from_u16(deny.http_status).unwrap_or(StatusCode::UNAUTHORIZED),
415 body: deny.to_error_response(),
416 error_code: deny.code.to_string(),
417 retry_after_secs,
418 }
419 }
420}
421
422fn build_handshake_error_response(
423 response: &Response,
424 reject: &HandshakeReject,
425) -> HandshakeErrorResponse {
426 let mut builder = Response::builder()
427 .status(reject.status)
428 .version(response.version())
429 .header(CONTENT_TYPE, "application/json; charset=utf-8")
430 .header("X-Error-Code", &reject.error_code)
431 .header("Cache-Control", "no-store");
432
433 if let Some(retry_after_secs) = reject.retry_after_secs {
434 builder = builder.header("Retry-After", retry_after_secs.to_string());
435 }
436
437 let body = serde_json::to_string(&reject.body).unwrap_or_else(|_| {
438 format!(
439 r#"{{"error":"{}","message":"{}","code":"{}","retryable":false}}"#,
440 reject.body.error, reject.body.message, reject.body.code
441 )
442 });
443
444 builder
445 .body(Some(body))
446 .expect("handshake rejection response should build")
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::websocket::auth::{AuthDeny, AuthErrorCode};
453 use std::time::Duration;
454
455 #[test]
456 fn handshake_error_response_serializes_json_and_retry_after() {
457 let response = Response::builder()
458 .status(StatusCode::SWITCHING_PROTOCOLS)
459 .body(())
460 .unwrap();
461 let deny = AuthDeny::rate_limited(Duration::from_secs(7), "websocket handshakes");
462 let reject = HandshakeReject::from_deny(&deny);
463
464 let handshake_response = build_handshake_error_response(&response, &reject);
465 assert_eq!(handshake_response.status(), StatusCode::TOO_MANY_REQUESTS);
466 assert_eq!(
467 handshake_response.headers().get("X-Error-Code").unwrap(),
468 "rate-limit-exceeded"
469 );
470 assert_eq!(
471 handshake_response.headers().get("Retry-After").unwrap(),
472 "7"
473 );
474
475 let body = handshake_response.into_body().unwrap();
476 assert!(body.contains("rate-limit-exceeded"));
477 assert!(body.contains("retryable"));
478 }
479
480 #[test]
481 fn handshake_error_response_preserves_non_retryable_auth_denies() {
482 let response = Response::builder()
483 .status(StatusCode::SWITCHING_PROTOCOLS)
484 .body(())
485 .unwrap();
486 let deny = AuthDeny::new(AuthErrorCode::OriginMismatch, "origin mismatch");
487 let reject = HandshakeReject::from_deny(&deny);
488
489 let handshake_response = build_handshake_error_response(&response, &reject);
490 assert_eq!(handshake_response.status(), StatusCode::FORBIDDEN);
491 assert!(handshake_response.headers().get("Retry-After").is_none());
492 }
493}
494
495#[allow(clippy::result_large_err)]
496async fn accept_authorized_connection(
497 stream: TcpStream,
498 remote_addr: SocketAddr,
499 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
500 client_manager: ClientManager,
501) -> Result<Option<(tokio_tungstenite::WebSocketStream<TcpStream>, AuthContext)>> {
502 use std::sync::Mutex;
503
504 let auth_result_capture: Arc<Mutex<Option<Result<AuthContext, HandshakeReject>>>> =
505 Arc::new(Mutex::new(None));
506 let auth_result_ref = auth_result_capture.clone();
507 let auth_plugin_ref = auth_plugin.clone();
508 let client_manager_for_auth = client_manager.clone();
509
510 let handshake_result = accept_hdr_async(stream, move |request: &Request, response| {
511 let connection_request = ConnectionAuthRequest::from_http_request(remote_addr, request);
512
513 let auth_result = tokio::task::block_in_place(|| {
514 tokio::runtime::Handle::current().block_on(async {
515 match auth_plugin_ref.authorize(&connection_request).await {
516 AuthDecision::Allow(ctx) => {
517 match client_manager_for_auth
518 .check_connection_allowed(remote_addr, &Some(ctx.clone()))
519 .await
520 {
521 Ok(()) => Ok(ctx),
522 Err(deny) => Err(HandshakeReject::from_deny(&deny)),
523 }
524 }
525 AuthDecision::Deny(deny) => Err(HandshakeReject::from_deny(&deny)),
526 }
527 })
528 });
529
530 let mut capture_lock = auth_result_ref.lock().expect("capture lock poisoned");
531 *capture_lock = Some(auth_result.clone());
532
533 match auth_result {
534 Ok(_) => Ok(response),
535 Err(reject) => Err(build_handshake_error_response(&response, &reject)),
536 }
537 })
538 .await;
539
540 let auth_result = {
541 let mut guard = auth_result_capture.lock().expect("capture lock poisoned");
542 guard.take()
543 };
544
545 match handshake_result {
546 Ok(ws_stream) => match auth_result {
547 Some(Ok(ctx)) => {
548 info!("WebSocket connection authorized for {}", remote_addr);
549 Ok(Some((ws_stream, ctx)))
550 }
551 Some(Err(reject)) => Err(anyhow::anyhow!(
552 "handshake unexpectedly succeeded after rejection: {}",
553 reject.body.message
554 )),
555 None => Err(anyhow::anyhow!(
556 "no auth result captured for authorized connection {}",
557 remote_addr
558 )),
559 },
560 Err(WsError::Http(_)) => {
561 match auth_result {
562 Some(Err(reject)) => {
563 warn!(
564 "WebSocket connection rejected during handshake for {}: {}",
565 remote_addr, reject.body.message
566 );
567 }
568 Some(Ok(_)) => {
569 warn!(
570 "WebSocket handshake failed after auth success for {}",
571 remote_addr
572 );
573 }
574 None => {
575 warn!(
576 "WebSocket handshake rejected for {} without captured auth result",
577 remote_addr
578 );
579 }
580 }
581 Ok(None)
582 }
583 Err(err) => Err(err.into()),
584 }
585}
586
587#[cfg(feature = "otel")]
588async fn handle_connection(
589 stream: TcpStream,
590 client_manager: ClientManager,
591 bus_manager: BusManager,
592 entity_cache: EntityCache,
593 view_index: Arc<ViewIndex>,
594 remote_addr: std::net::SocketAddr,
595 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
596 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
597 metrics: Option<Arc<Metrics>>,
598) -> Result<()> {
599 let Some((ws_stream, auth_context)) = accept_authorized_connection(
600 stream,
601 remote_addr,
602 auth_plugin.clone(),
603 client_manager.clone(),
604 )
605 .await?
606 else {
607 return Ok(());
608 };
609
610 let client_id = Uuid::new_v4();
611 let connection_start = Instant::now();
612
613 let auth_context = Some(auth_context);
614 let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
615 usage_identity(auth_context.as_ref());
616
617 let metering_key = auth_context.as_ref().map(|ctx| ctx.metering_key.clone());
619
620 if let Some(ref m) = metrics {
621 if let Some(ref mk) = metering_key {
622 m.record_ws_connection_with_metering(mk);
623 } else {
624 m.record_ws_connection();
625 }
626 }
627
628 info!("WebSocket connection established for client {}", client_id);
629
630 emit_usage_event(
631 &usage_emitter,
632 WebSocketUsageEvent::ConnectionEstablished {
633 client_id: client_id.to_string(),
634 remote_addr: remote_addr.to_string(),
635 deployment_id: usage_deployment_id.clone(),
636 metering_key: usage_metering_key.clone(),
637 subject: usage_subject.clone(),
638 key_class: usage_key_class,
639 },
640 );
641
642 let (ws_sender, mut ws_receiver) = ws_stream.split();
643
644 client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
646
647 let ctx = SubscriptionContext {
648 client_id,
649 client_manager: &client_manager,
650 bus_manager: &bus_manager,
651 entity_cache: &entity_cache,
652 view_index: &view_index,
653 usage_emitter: &usage_emitter,
654 metrics: metrics.clone(),
655 };
656
657 let mut active_subscriptions: HashMap<String, String> = HashMap::new();
658
659 loop {
660 tokio::select! {
661 ws_msg = ws_receiver.next() => {
662 match ws_msg {
663 Some(Ok(msg)) => {
664 if msg.is_close() {
665 info!("Client {} requested close", client_id);
666 break;
667 }
668
669 client_manager.update_client_last_seen(client_id);
670
671 if msg.is_text() {
672 if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
673 warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
674 send_socket_issue(client_id, &client_manager, &deny, true).await;
675 break;
676 }
677
678 if let Some(ref m) = metrics {
679 if let Some(ref mk) = metering_key {
680 m.record_ws_message_received_with_metering(mk);
681 } else {
682 m.record_ws_message_received();
683 }
684 }
685
686 if let Ok(text) = msg.to_text() {
687 debug!("Received text message from client {}: {}", client_id, text);
688
689 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
690 match client_msg {
691 ClientMessage::Subscribe(subscription) => {
692 let view_id = subscription.view.clone();
693 let sub_key = subscription.sub_key();
694
695 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
697 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
698 send_socket_issue(client_id, &client_manager, &deny, false).await;
699 continue;
700 }
701
702 client_manager.update_subscription(client_id, subscription.clone());
703
704 let cancel_token = CancellationToken::new();
705 let is_new = client_manager.add_client_subscription(
706 client_id,
707 sub_key.clone(),
708 cancel_token.clone(),
709 ).await;
710
711 if !is_new {
712 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
713 continue;
714 }
715
716 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
717 warn!(
718 "Subscription rejected for client {} on {}: {}",
719 client_id, view_id, err
720 );
721 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
722 send_socket_issue(client_id, &client_manager, &deny, false).await;
723 }
724 let _ = client_manager
725 .remove_client_subscription(client_id, &sub_key)
726 .await;
727 continue;
728 }
729
730 if let Some(ref m) = metrics {
731 if let Some(ref mk) = metering_key {
732 m.record_subscription_created_with_metering(&view_id, mk);
733 } else {
734 m.record_subscription_created(&view_id);
735 }
736 }
737 active_subscriptions.insert(sub_key, view_id.clone());
738 emit_usage_event(
739 &usage_emitter,
740 WebSocketUsageEvent::SubscriptionCreated {
741 client_id: client_id.to_string(),
742 deployment_id: usage_deployment_id.clone(),
743 metering_key: usage_metering_key.clone(),
744 subject: usage_subject.clone(),
745 view_id,
746 },
747 );
748 }
749 ClientMessage::Unsubscribe(unsub) => {
750 let sub_key = unsub.sub_key();
751 let removed = client_manager
752 .remove_client_subscription(client_id, &sub_key)
753 .await;
754
755 if removed {
756 info!("Client {} unsubscribed from {}", client_id, sub_key);
757 active_subscriptions.remove(&sub_key);
758 if let Some(ref m) = metrics {
759 if let Some(ref mk) = metering_key {
760 m.record_subscription_removed_with_metering(&unsub.view, mk);
761 } else {
762 m.record_subscription_removed(&unsub.view);
763 }
764 }
765 emit_usage_event(
766 &usage_emitter,
767 WebSocketUsageEvent::SubscriptionRemoved {
768 client_id: client_id.to_string(),
769 deployment_id: usage_deployment_id.clone(),
770 metering_key: usage_metering_key.clone(),
771 subject: usage_subject.clone(),
772 view_id: unsub.view.clone(),
773 },
774 );
775 }
776 }
777 ClientMessage::Ping => {
778 debug!("Received ping from client {}", client_id);
779 }
780 ClientMessage::RefreshAuth(refresh_req) => {
781 debug!("Received refresh_auth from client {}", client_id);
782 handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
783 }
784 }
785 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
786 let view_id = subscription.view.clone();
787 let sub_key = subscription.sub_key();
788
789 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
790 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
791 send_socket_issue(client_id, &client_manager, &deny, false).await;
792 continue;
793 }
794
795 client_manager.update_subscription(client_id, subscription.clone());
796
797 let cancel_token = CancellationToken::new();
798 let is_new = client_manager.add_client_subscription(
799 client_id,
800 sub_key.clone(),
801 cancel_token.clone(),
802 ).await;
803
804 if !is_new {
805 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
806 continue;
807 }
808
809 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
810 warn!(
811 "Subscription rejected for client {} on {}: {}",
812 client_id, view_id, err
813 );
814 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
815 send_socket_issue(client_id, &client_manager, &deny, false).await;
816 }
817 let _ = client_manager
818 .remove_client_subscription(client_id, &sub_key)
819 .await;
820 continue;
821 }
822
823 if let Some(ref m) = metrics {
824 if let Some(ref mk) = metering_key {
825 m.record_subscription_created_with_metering(&view_id, mk);
826 } else {
827 m.record_subscription_created(&view_id);
828 }
829 }
830 active_subscriptions.insert(sub_key, view_id.clone());
831 emit_usage_event(
832 &usage_emitter,
833 WebSocketUsageEvent::SubscriptionCreated {
834 client_id: client_id.to_string(),
835 deployment_id: usage_deployment_id.clone(),
836 metering_key: usage_metering_key.clone(),
837 subject: usage_subject.clone(),
838 view_id,
839 },
840 );
841 } else {
842 debug!("Received non-subscription message from client {}: {}", client_id, text);
843 }
844 }
845 }
846 }
847 Some(Err(e)) => {
848 warn!("WebSocket error for client {}: {}", client_id, e);
849 break;
850 }
851 None => {
852 debug!("WebSocket stream ended for client {}", client_id);
853 break;
854 }
855 }
856 }
857 }
858 }
859
860 client_manager
861 .cancel_all_client_subscriptions(client_id)
862 .await;
863 client_manager.remove_client(client_id);
864 if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
865 rate_limiter.remove_client_buckets(client_id).await;
866 }
867
868 if let Some(ref m) = metrics {
869 let duration_secs = connection_start.elapsed().as_secs_f64();
870 if let Some(ref mk) = metering_key {
871 m.record_ws_disconnection_with_metering(duration_secs, mk);
872 for view_id in active_subscriptions.values() {
873 m.record_subscription_removed_with_metering(view_id, mk);
874 }
875 } else {
876 m.record_ws_disconnection(duration_secs);
877 for view_id in active_subscriptions.values() {
878 m.record_subscription_removed(view_id);
879 }
880 }
881 }
882
883 for view_id in active_subscriptions.values() {
884 emit_usage_event(
885 &usage_emitter,
886 WebSocketUsageEvent::SubscriptionRemoved {
887 client_id: client_id.to_string(),
888 deployment_id: usage_deployment_id.clone(),
889 metering_key: usage_metering_key.clone(),
890 subject: usage_subject.clone(),
891 view_id: view_id.clone(),
892 },
893 );
894 }
895
896 emit_usage_event(
897 &usage_emitter,
898 WebSocketUsageEvent::ConnectionClosed {
899 client_id: client_id.to_string(),
900 deployment_id: usage_deployment_id,
901 metering_key: usage_metering_key,
902 subject: usage_subject,
903 duration_secs: Some(connection_start.elapsed().as_secs_f64()),
904 subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
905 },
906 );
907
908 info!("Client {} disconnected", client_id);
909
910 Ok(())
911}
912
913#[cfg(not(feature = "otel"))]
914#[allow(clippy::too_many_arguments)]
915async fn handle_connection(
916 stream: TcpStream,
917 client_manager: ClientManager,
918 bus_manager: BusManager,
919 entity_cache: EntityCache,
920 view_index: Arc<ViewIndex>,
921 remote_addr: std::net::SocketAddr,
922 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
923 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
924) -> Result<()> {
925 let Some((ws_stream, auth_context)) = accept_authorized_connection(
926 stream,
927 remote_addr,
928 auth_plugin.clone(),
929 client_manager.clone(),
930 )
931 .await?
932 else {
933 return Ok(());
934 };
935
936 let client_id = Uuid::new_v4();
937 let auth_context_ref = Some(&auth_context);
938 let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
939 usage_identity(auth_context_ref);
940
941 let auth_context = Some(auth_context);
942
943 info!("WebSocket connection established for client {}", client_id);
944
945 emit_usage_event(
946 &usage_emitter,
947 WebSocketUsageEvent::ConnectionEstablished {
948 client_id: client_id.to_string(),
949 remote_addr: remote_addr.to_string(),
950 deployment_id: usage_deployment_id.clone(),
951 metering_key: usage_metering_key.clone(),
952 subject: usage_subject.clone(),
953 key_class: usage_key_class,
954 },
955 );
956
957 let (ws_sender, mut ws_receiver) = ws_stream.split();
958
959 client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
961
962 let ctx = SubscriptionContext {
963 client_id,
964 client_manager: &client_manager,
965 bus_manager: &bus_manager,
966 entity_cache: &entity_cache,
967 view_index: &view_index,
968 usage_emitter: &usage_emitter,
969 };
970
971 let mut active_subscriptions: HashMap<String, String> = HashMap::new();
972
973 loop {
974 tokio::select! {
975 ws_msg = ws_receiver.next() => {
976 match ws_msg {
977 Some(Ok(msg)) => {
978 if msg.is_close() {
979 info!("Client {} requested close", client_id);
980 break;
981 }
982
983 client_manager.update_client_last_seen(client_id);
984
985 if msg.is_text() {
986 if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
987 warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
988 send_socket_issue(client_id, &client_manager, &deny, true).await;
989 break;
990 }
991
992 if let Ok(text) = msg.to_text() {
993 debug!("Received text message from client {}: {}", client_id, text);
994
995 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
996 match client_msg {
997 ClientMessage::Subscribe(subscription) => {
998 let view_id = subscription.view.clone();
999 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1000 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1001 send_socket_issue(client_id, &client_manager, &deny, false).await;
1002 continue;
1003 }
1004
1005 let sub_key = subscription.sub_key();
1006 client_manager.update_subscription(client_id, subscription.clone());
1007
1008 let cancel_token = CancellationToken::new();
1009 let is_new = client_manager.add_client_subscription(
1010 client_id,
1011 sub_key.clone(),
1012 cancel_token.clone(),
1013 ).await;
1014
1015 if !is_new {
1016 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1017 continue;
1018 }
1019
1020 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1021 warn!(
1022 "Subscription rejected for client {} on {}: {}",
1023 client_id,
1024 sub_key,
1025 err
1026 );
1027 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1028 send_socket_issue(client_id, &client_manager, &deny, false).await;
1029 }
1030 let _ = client_manager
1031 .remove_client_subscription(client_id, &sub_key)
1032 .await;
1033 } else {
1034 active_subscriptions.insert(sub_key, view_id.clone());
1035 emit_usage_event(
1036 &usage_emitter,
1037 WebSocketUsageEvent::SubscriptionCreated {
1038 client_id: client_id.to_string(),
1039 deployment_id: usage_deployment_id.clone(),
1040 metering_key: usage_metering_key.clone(),
1041 subject: usage_subject.clone(),
1042 view_id,
1043 },
1044 );
1045 }
1046 }
1047 ClientMessage::Unsubscribe(unsub) => {
1048 let sub_key = unsub.sub_key();
1049 let removed = client_manager
1050 .remove_client_subscription(client_id, &sub_key)
1051 .await;
1052
1053 if removed {
1054 info!("Client {} unsubscribed from {}", client_id, sub_key);
1055 active_subscriptions.remove(&sub_key);
1056 emit_usage_event(
1057 &usage_emitter,
1058 WebSocketUsageEvent::SubscriptionRemoved {
1059 client_id: client_id.to_string(),
1060 deployment_id: usage_deployment_id.clone(),
1061 metering_key: usage_metering_key.clone(),
1062 subject: usage_subject.clone(),
1063 view_id: unsub.view.clone(),
1064 },
1065 );
1066 }
1067 }
1068 ClientMessage::Ping => {
1069 debug!("Received ping from client {}", client_id);
1070 }
1071 ClientMessage::RefreshAuth(refresh_req) => {
1072 debug!("Received refresh_auth from client {}", client_id);
1073 handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
1074 }
1075 }
1076 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
1077 let view_id = subscription.view.clone();
1078 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1079 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1080 send_socket_issue(client_id, &client_manager, &deny, false).await;
1081 continue;
1082 }
1083
1084 let sub_key = subscription.sub_key();
1085 client_manager.update_subscription(client_id, subscription.clone());
1086
1087 let cancel_token = CancellationToken::new();
1088 let is_new = client_manager.add_client_subscription(
1089 client_id,
1090 sub_key.clone(),
1091 cancel_token.clone(),
1092 ).await;
1093
1094 if !is_new {
1095 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1096 continue;
1097 }
1098
1099 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1100 warn!(
1101 "Subscription rejected for client {} on {}: {}",
1102 client_id,
1103 sub_key,
1104 err
1105 );
1106 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1107 send_socket_issue(client_id, &client_manager, &deny, false).await;
1108 }
1109 let _ = client_manager
1110 .remove_client_subscription(client_id, &sub_key)
1111 .await;
1112 } else {
1113 active_subscriptions.insert(sub_key, view_id.clone());
1114 emit_usage_event(
1115 &usage_emitter,
1116 WebSocketUsageEvent::SubscriptionCreated {
1117 client_id: client_id.to_string(),
1118 deployment_id: usage_deployment_id.clone(),
1119 metering_key: usage_metering_key.clone(),
1120 subject: usage_subject.clone(),
1121 view_id,
1122 },
1123 );
1124 }
1125 } else {
1126 debug!("Received non-subscription message from client {}: {}", client_id, text);
1127 }
1128 }
1129 }
1130 }
1131 Some(Err(e)) => {
1132 warn!("WebSocket error for client {}: {}", client_id, e);
1133 break;
1134 }
1135 None => {
1136 debug!("WebSocket stream ended for client {}", client_id);
1137 break;
1138 }
1139 }
1140 }
1141 }
1142 }
1143
1144 client_manager
1145 .cancel_all_client_subscriptions(client_id)
1146 .await;
1147 client_manager.remove_client(client_id);
1148 if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
1149 rate_limiter.remove_client_buckets(client_id).await;
1150 }
1151
1152 for view_id in active_subscriptions.values() {
1153 emit_usage_event(
1154 &usage_emitter,
1155 WebSocketUsageEvent::SubscriptionRemoved {
1156 client_id: client_id.to_string(),
1157 deployment_id: usage_deployment_id.clone(),
1158 metering_key: usage_metering_key.clone(),
1159 subject: usage_subject.clone(),
1160 view_id: view_id.clone(),
1161 },
1162 );
1163 }
1164
1165 emit_usage_event(
1166 &usage_emitter,
1167 WebSocketUsageEvent::ConnectionClosed {
1168 client_id: client_id.to_string(),
1169 deployment_id: usage_deployment_id,
1170 metering_key: usage_metering_key,
1171 subject: usage_subject,
1172 duration_secs: None,
1173 subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
1174 },
1175 );
1176
1177 info!("Client {} disconnected", client_id);
1178
1179 Ok(())
1180}
1181
1182async fn send_snapshot_batches(
1183 client_id: Uuid,
1184 entities: &[SnapshotEntity],
1185 mode: Mode,
1186 view_id: &str,
1187 client_manager: &ClientManager,
1188 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1189 batch_config: &SnapshotBatchConfig,
1190 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
1191) -> Result<()> {
1192 let total = entities.len();
1193 if total == 0 {
1194 return Ok(());
1195 }
1196
1197 let mut offset = 0;
1198 let mut batch_num = 0;
1199
1200 while offset < total {
1201 let batch_size = if batch_num == 0 {
1202 batch_config.initial_batch_size
1203 } else {
1204 batch_config.subsequent_batch_size
1205 };
1206
1207 let end = (offset + batch_size).min(total);
1208 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
1209 let rows_in_batch = batch_data.len() as u32;
1210 let is_complete = end >= total;
1211
1212 let snapshot_frame = SnapshotFrame {
1213 mode,
1214 export: view_id.to_string(),
1215 op: "snapshot",
1216 data: batch_data,
1217 complete: is_complete,
1218 };
1219
1220 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
1221 let payload = maybe_compress(&json_payload);
1222 let payload_bytes = payload.as_bytes().len() as u64;
1223 if client_manager
1224 .send_compressed_async(client_id, payload)
1225 .await
1226 .is_err()
1227 {
1228 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
1229 }
1230 #[cfg(feature = "otel")]
1231 if let Some(m) = metrics {
1232 m.record_ws_message_sent();
1233 }
1234
1235 let auth_context = client_manager.get_auth_context(client_id);
1236 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1237 emit_usage_event(
1238 usage_emitter,
1239 WebSocketUsageEvent::SnapshotSent {
1240 client_id: client_id.to_string(),
1241 deployment_id,
1242 metering_key,
1243 subject,
1244 view_id: view_id.to_string(),
1245 rows: rows_in_batch,
1246 messages: 1,
1247 bytes: payload_bytes,
1248 },
1249 );
1250 }
1251
1252 offset = end;
1253 batch_num += 1;
1254 }
1255
1256 debug!(
1257 "Sent {} snapshot batches ({} entities) for {} to client {}",
1258 batch_num, total, view_id, client_id
1259 );
1260
1261 Ok(())
1262}
1263
1264fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
1265 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
1266 return Some(SortConfig {
1267 field: sort.field_path.clone(),
1268 order: match sort.order {
1269 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
1270 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
1271 },
1272 });
1273 }
1274
1275 if view_spec.mode == Mode::List {
1276 return Some(SortConfig {
1277 field: vec!["_seq".to_string()],
1278 order: SortOrder::Desc,
1279 });
1280 }
1281
1282 None
1283}
1284
1285fn send_subscribed_frame(
1286 client_id: Uuid,
1287 view_id: &str,
1288 view_spec: &ViewSpec,
1289 client_manager: &ClientManager,
1290 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1291) -> Result<()> {
1292 let sort_config = extract_sort_config(view_spec);
1293 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
1294
1295 let json_payload = serde_json::to_vec(&subscribed_frame)?;
1296 let payload_bytes = json_payload.len() as u64;
1297 let payload = Arc::new(Bytes::from(json_payload));
1298 client_manager
1299 .send_to_client(client_id, payload)
1300 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))?;
1301
1302 let auth_context = client_manager.get_auth_context(client_id);
1303 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1304 emit_usage_event(
1305 usage_emitter,
1306 WebSocketUsageEvent::UpdateSent {
1307 client_id: client_id.to_string(),
1308 deployment_id,
1309 metering_key,
1310 subject,
1311 view_id: view_id.to_string(),
1312 messages: 1,
1313 bytes: payload_bytes,
1314 },
1315 );
1316
1317 Ok(())
1318}
1319
1320fn enforce_snapshot_limit(ctx: &SubscriptionContext<'_>, rows: usize) -> Result<()> {
1321 let requested_rows = u32::try_from(rows).unwrap_or(u32::MAX);
1322 ctx.client_manager
1323 .check_snapshot_allowed(ctx.client_id, requested_rows)
1324 .map_err(|deny| anyhow::anyhow!(deny.reason))
1325}
1326
1327#[cfg(feature = "otel")]
1328async fn attach_client_to_bus(
1329 ctx: &SubscriptionContext<'_>,
1330 subscription: Subscription,
1331 cancel_token: CancellationToken,
1332) -> Result<()> {
1333 let view_id = &subscription.view;
1334
1335 let view_spec = match ctx.view_index.get_view(view_id) {
1336 Some(spec) => spec.clone(),
1337 None => {
1338 return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1339 }
1340 };
1341
1342 send_subscribed_frame(
1343 ctx.client_id,
1344 view_id,
1345 &view_spec,
1346 ctx.client_manager,
1347 ctx.usage_emitter,
1348 )?;
1349
1350 let is_derived_with_sort = view_spec.is_derived()
1351 && view_spec
1352 .pipeline
1353 .as_ref()
1354 .map(|p| p.sort.is_some())
1355 .unwrap_or(false);
1356
1357 if is_derived_with_sort {
1358 return attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token)
1359 .await;
1360 }
1361
1362 match view_spec.mode {
1363 Mode::State => {
1364 let key = subscription.key.as_deref().unwrap_or("");
1365
1366 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1367
1368 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1370
1371 if should_send_snapshot {
1372 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1373 transform_large_u64_to_strings(&mut cached_entity);
1374 let snapshot_entities = vec![SnapshotEntity {
1375 key: key.to_string(),
1376 data: cached_entity,
1377 }];
1378 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1379 let batch_config = ctx.entity_cache.snapshot_config();
1380 send_snapshot_batches(
1381 ctx.client_id,
1382 &snapshot_entities,
1383 view_spec.mode,
1384 view_id,
1385 ctx.client_manager,
1386 ctx.usage_emitter,
1387 &batch_config,
1388 #[cfg(feature = "otel")]
1389 ctx.metrics.as_ref(),
1390 )
1391 .await?;
1392 rx.borrow_and_update();
1393 } else if !rx.borrow().is_empty() {
1394 let data = rx.borrow_and_update().clone();
1395 let data_len = data.len();
1396 if ctx
1397 .client_manager
1398 .send_to_client(ctx.client_id, data)
1399 .is_ok()
1400 {
1401 emit_update_sent_for_client(
1402 ctx.usage_emitter,
1403 ctx.client_manager,
1404 ctx.client_id,
1405 view_id,
1406 data_len,
1407 );
1408 }
1409 }
1410 } else {
1411 info!(
1412 "Client {} subscribed to {} without snapshot",
1413 ctx.client_id, view_id
1414 );
1415 rx.borrow_and_update();
1416 }
1417
1418 let client_id = ctx.client_id;
1419 let client_mgr = ctx.client_manager.clone();
1420 let usage_emitter = ctx.usage_emitter.clone();
1421 let metrics_clone = ctx.metrics.clone();
1422 let view_id_clone = view_id.clone();
1423 let view_id_span = view_id.clone();
1424 let key_clone = key.to_string();
1425 tokio::spawn(
1426 async move {
1427 loop {
1428 tokio::select! {
1429 _ = cancel_token.cancelled() => {
1430 debug!("State subscription cancelled for client {}", client_id);
1431 break;
1432 }
1433 result = rx.changed() => {
1434 if result.is_err() {
1435 break;
1436 }
1437 let data = rx.borrow().clone();
1438 let data_len = data.len();
1439 if client_mgr.send_to_client(client_id, data).is_err() {
1440 break;
1441 }
1442 if let Some(ref m) = metrics_clone {
1443 m.record_ws_message_sent();
1444 }
1445 emit_update_sent_for_client(
1446 &usage_emitter,
1447 &client_mgr,
1448 client_id,
1449 &view_id_clone,
1450 data_len,
1451 );
1452 }
1453 }
1454 }
1455 }
1456 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1457 );
1458 }
1459 Mode::List | Mode::Append => {
1460 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1461
1462 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1464
1465 if should_send_snapshot {
1466 let mut snapshots = if let Some(ref cursor) = subscription.after {
1468 ctx.entity_cache
1469 .get_after(view_id, cursor, subscription.snapshot_limit)
1470 .await
1471 } else {
1472 ctx.entity_cache.get_all(view_id).await
1473 };
1474
1475 if let Some(limit) = subscription.snapshot_limit {
1477 if subscription.after.is_none() {
1478 snapshots.sort_by(|a, b| {
1479 let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1480 let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1481 cmp_seq(sb, sa) });
1483 snapshots.truncate(limit);
1484 }
1485 }
1486
1487 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1488 .into_iter()
1489 .filter(|(key, _)| subscription.matches_key(key))
1490 .map(|(key, mut data)| {
1491 transform_large_u64_to_strings(&mut data);
1492 SnapshotEntity { key, data }
1493 })
1494 .collect();
1495
1496 if !snapshot_entities.is_empty() {
1497 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1498 let batch_config = ctx.entity_cache.snapshot_config();
1499 send_snapshot_batches(
1500 ctx.client_id,
1501 &snapshot_entities,
1502 view_spec.mode,
1503 view_id,
1504 ctx.client_manager,
1505 ctx.usage_emitter,
1506 &batch_config,
1507 #[cfg(feature = "otel")]
1508 ctx.metrics.as_ref(),
1509 )
1510 .await?;
1511 }
1512 } else {
1513 info!(
1514 "Client {} subscribed to {} without snapshot",
1515 ctx.client_id, view_id
1516 );
1517 }
1518
1519 let client_id = ctx.client_id;
1520 let client_mgr = ctx.client_manager.clone();
1521 let usage_emitter = ctx.usage_emitter.clone();
1522 let sub = subscription.clone();
1523 let metrics_clone = ctx.metrics.clone();
1524 let view_id_clone = view_id.clone();
1525 let view_id_span = view_id.clone();
1526 let mode = view_spec.mode;
1527 tokio::spawn(
1528 async move {
1529 loop {
1530 tokio::select! {
1531 _ = cancel_token.cancelled() => {
1532 debug!("List subscription cancelled for client {}", client_id);
1533 break;
1534 }
1535 result = rx.recv() => {
1536 match result {
1537 Ok(envelope) => {
1538 if sub.matches(&envelope.entity, &envelope.key) {
1539 if client_mgr
1540 .send_to_client(client_id, envelope.payload.clone())
1541 .is_err()
1542 {
1543 break;
1544 }
1545 if let Some(ref m) = metrics_clone {
1546 m.record_ws_message_sent();
1547 }
1548 emit_update_sent_for_client(
1549 &usage_emitter,
1550 &client_mgr,
1551 client_id,
1552 &view_id_clone,
1553 envelope.payload.len(),
1554 );
1555 }
1556 }
1557 Err(_) => break,
1558 }
1559 }
1560 }
1561 }
1562 }
1563 .instrument(
1564 info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
1565 ),
1566 );
1567 }
1568 }
1569
1570 info!(
1571 "Client {} subscribed to {} (mode: {:?})",
1572 ctx.client_id, view_id, view_spec.mode
1573 );
1574
1575 Ok(())
1576}
1577
1578#[cfg(feature = "otel")]
1579async fn attach_derived_view_subscription_otel(
1580 ctx: &SubscriptionContext<'_>,
1581 subscription: Subscription,
1582 view_spec: ViewSpec,
1583 cancel_token: CancellationToken,
1584) -> Result<()> {
1585 let view_id = &subscription.view;
1586 let pipeline_limit = view_spec
1587 .pipeline
1588 .as_ref()
1589 .and_then(|p| p.limit)
1590 .unwrap_or(100);
1591 let take = subscription.take.unwrap_or(pipeline_limit);
1592 let skip = subscription.skip.unwrap_or(0);
1593 let is_single = take == 1;
1594
1595 let source_view_id = match &view_spec.source_view {
1596 Some(s) => s.clone(),
1597 None => {
1598 return Err(anyhow::anyhow!(
1599 "Derived view {} has no source_view",
1600 view_id
1601 ));
1602 }
1603 };
1604
1605 let sorted_caches = ctx.view_index.sorted_caches();
1606 let initial_window: Vec<(String, serde_json::Value)> = {
1607 let mut caches = sorted_caches.write().await;
1608 if let Some(cache) = caches.get_mut(view_id) {
1609 cache.get_window(skip, take)
1610 } else {
1611 warn!("No sorted cache for derived view {}", view_id);
1612 vec![]
1613 }
1614 };
1615
1616 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1617
1618 if !initial_window.is_empty() {
1619 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1620 .into_iter()
1621 .map(|(key, mut data)| {
1622 transform_large_u64_to_strings(&mut data);
1623 SnapshotEntity { key, data }
1624 })
1625 .collect();
1626
1627 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1628 let batch_config = ctx.entity_cache.snapshot_config();
1629 send_snapshot_batches(
1630 ctx.client_id,
1631 &snapshot_entities,
1632 view_spec.mode,
1633 view_id,
1634 ctx.client_manager,
1635 ctx.usage_emitter,
1636 &batch_config,
1637 ctx.metrics.as_ref(),
1638 )
1639 .await?;
1640 }
1641
1642 let mut rx = ctx
1643 .bus_manager
1644 .get_or_create_list_bus(&source_view_id)
1645 .await;
1646
1647 let client_id = ctx.client_id;
1648 let client_mgr = ctx.client_manager.clone();
1649 let usage_emitter = ctx.usage_emitter.clone();
1650 let view_id_clone = view_id.clone();
1651 let view_id_span = view_id.clone();
1652 let sorted_caches_clone = sorted_caches;
1653 let metrics_clone = ctx.metrics.clone();
1654 let frame_mode = view_spec.mode;
1655
1656 tokio::spawn(
1657 async move {
1658 let mut current_window_keys = initial_keys;
1659
1660 loop {
1661 tokio::select! {
1662 _ = cancel_token.cancelled() => {
1663 debug!("Derived view subscription cancelled for client {}", client_id);
1664 break;
1665 }
1666 result = rx.recv() => {
1667 match result {
1668 Ok(_envelope) => {
1669 let new_window: Vec<(String, serde_json::Value)> = {
1670 let mut caches = sorted_caches_clone.write().await;
1671 if let Some(cache) = caches.get_mut(&view_id_clone) {
1672 cache.get_window(skip, take)
1673 } else {
1674 continue;
1675 }
1676 };
1677
1678 let new_keys: HashSet<String> =
1679 new_window.iter().map(|(k, _)| k.clone()).collect();
1680
1681 if is_single {
1682 if let Some((new_key, data)) = new_window.first() {
1683 for old_key in current_window_keys.difference(&new_keys) {
1684 let delete_frame = Frame {
1685 seq: None,
1686 mode: frame_mode,
1687 export: view_id_clone.clone(),
1688 op: "delete",
1689 key: old_key.clone(),
1690 data: serde_json::Value::Null,
1691 append: vec![],
1692 };
1693 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1694 let payload = Arc::new(Bytes::from(json));
1695 let payload_len = payload.len();
1696 if client_mgr.send_to_client(client_id, payload).is_err() {
1697 return;
1698 }
1699 if let Some(ref m) = metrics_clone {
1700 m.record_ws_message_sent();
1701 }
1702 emit_update_sent_for_client(
1703 &usage_emitter,
1704 &client_mgr,
1705 client_id,
1706 &view_id_clone,
1707 payload_len,
1708 );
1709 }
1710 }
1711
1712 let mut transformed_data = data.clone();
1713 transform_large_u64_to_strings(&mut transformed_data);
1714 let frame = Frame {
1715 seq: None,
1716 mode: frame_mode,
1717 export: view_id_clone.clone(),
1718 op: "upsert",
1719 key: new_key.clone(),
1720 data: transformed_data,
1721 append: vec![],
1722 };
1723
1724 if let Ok(json) = serde_json::to_vec(&frame) {
1725 let payload = Arc::new(Bytes::from(json));
1726 let payload_len = payload.len();
1727 if client_mgr.send_to_client(client_id, payload).is_err() {
1728 return;
1729 }
1730 if let Some(ref m) = metrics_clone {
1731 m.record_ws_message_sent();
1732 }
1733 emit_update_sent_for_client(
1734 &usage_emitter,
1735 &client_mgr,
1736 client_id,
1737 &view_id_clone,
1738 payload_len,
1739 );
1740 }
1741 }
1742 } else {
1743 for key in current_window_keys.difference(&new_keys) {
1744 let delete_frame = Frame {
1745 seq: None,
1746 mode: frame_mode,
1747 export: view_id_clone.clone(),
1748 op: "delete",
1749 key: key.clone(),
1750 data: serde_json::Value::Null,
1751 append: vec![],
1752 };
1753 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1754 let payload = Arc::new(Bytes::from(json));
1755 let payload_len = payload.len();
1756 if client_mgr.send_to_client(client_id, payload).is_err() {
1757 return;
1758 }
1759 if let Some(ref m) = metrics_clone {
1760 m.record_ws_message_sent();
1761 }
1762 emit_update_sent_for_client(
1763 &usage_emitter,
1764 &client_mgr,
1765 client_id,
1766 &view_id_clone,
1767 payload_len,
1768 );
1769 }
1770 }
1771
1772 for (key, data) in &new_window {
1773 let mut transformed_data = data.clone();
1774 transform_large_u64_to_strings(&mut transformed_data);
1775 let frame = Frame {
1776 seq: None,
1777 mode: frame_mode,
1778 export: view_id_clone.clone(),
1779 op: "upsert",
1780 key: key.clone(),
1781 data: transformed_data,
1782 append: vec![],
1783 };
1784 if let Ok(json) = serde_json::to_vec(&frame) {
1785 let payload = Arc::new(Bytes::from(json));
1786 let payload_len = payload.len();
1787 if client_mgr.send_to_client(client_id, payload).is_err() {
1788 return;
1789 }
1790 if let Some(ref m) = metrics_clone {
1791 m.record_ws_message_sent();
1792 }
1793 emit_update_sent_for_client(
1794 &usage_emitter,
1795 &client_mgr,
1796 client_id,
1797 &view_id_clone,
1798 payload_len,
1799 );
1800 }
1801 }
1802 }
1803
1804 current_window_keys = new_keys;
1805 }
1806 Err(_) => break,
1807 }
1808 }
1809 }
1810 }
1811 }
1812 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1813 );
1814
1815 info!(
1816 "Client {} subscribed to derived view {} (take={}, skip={})",
1817 ctx.client_id, view_id, take, skip
1818 );
1819
1820 Ok(())
1821}
1822
1823#[cfg(not(feature = "otel"))]
1824async fn attach_client_to_bus(
1825 ctx: &SubscriptionContext<'_>,
1826 subscription: Subscription,
1827 cancel_token: CancellationToken,
1828) -> Result<()> {
1829 let view_id = &subscription.view;
1830
1831 let view_spec = match ctx.view_index.get_view(view_id) {
1832 Some(spec) => spec.clone(),
1833 None => {
1834 return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1835 }
1836 };
1837
1838 send_subscribed_frame(
1839 ctx.client_id,
1840 view_id,
1841 &view_spec,
1842 ctx.client_manager,
1843 ctx.usage_emitter,
1844 )?;
1845
1846 let is_derived_with_sort = view_spec.is_derived()
1847 && view_spec
1848 .pipeline
1849 .as_ref()
1850 .map(|p| p.sort.is_some())
1851 .unwrap_or(false);
1852
1853 if is_derived_with_sort {
1854 return attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
1855 }
1856
1857 match view_spec.mode {
1858 Mode::State => {
1859 let key = subscription.key.as_deref().unwrap_or("");
1860
1861 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1862
1863 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1865
1866 if should_send_snapshot {
1867 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1868 transform_large_u64_to_strings(&mut cached_entity);
1869 let snapshot_entities = vec![SnapshotEntity {
1870 key: key.to_string(),
1871 data: cached_entity,
1872 }];
1873 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1874 let batch_config = ctx.entity_cache.snapshot_config();
1875 send_snapshot_batches(
1876 ctx.client_id,
1877 &snapshot_entities,
1878 view_spec.mode,
1879 view_id,
1880 ctx.client_manager,
1881 ctx.usage_emitter,
1882 &batch_config,
1883 )
1884 .await?;
1885 rx.borrow_and_update();
1886 } else if !rx.borrow().is_empty() {
1887 let data = rx.borrow_and_update().clone();
1888 let data_len = data.len();
1889 if ctx
1890 .client_manager
1891 .send_to_client(ctx.client_id, data)
1892 .is_ok()
1893 {
1894 emit_update_sent_for_client(
1895 ctx.usage_emitter,
1896 ctx.client_manager,
1897 ctx.client_id,
1898 view_id,
1899 data_len,
1900 );
1901 }
1902 }
1903 } else {
1904 info!(
1905 "Client {} subscribed to {} without snapshot",
1906 ctx.client_id, view_id
1907 );
1908 rx.borrow_and_update();
1909 }
1910
1911 let client_id = ctx.client_id;
1912 let client_mgr = ctx.client_manager.clone();
1913 let usage_emitter = ctx.usage_emitter.clone();
1914 let view_id_clone = view_id.clone();
1915 let view_id_span = view_id.clone();
1916 let key_clone = key.to_string();
1917 tokio::spawn(
1918 async move {
1919 loop {
1920 tokio::select! {
1921 _ = cancel_token.cancelled() => {
1922 debug!("State subscription cancelled for client {}", client_id);
1923 break;
1924 }
1925 result = rx.changed() => {
1926 if result.is_err() {
1927 break;
1928 }
1929 let data = rx.borrow().clone();
1930 let data_len = data.len();
1931 if client_mgr.send_to_client(client_id, data).is_err() {
1932 break;
1933 }
1934 emit_update_sent_for_client(
1935 &usage_emitter,
1936 &client_mgr,
1937 client_id,
1938 &view_id_clone,
1939 data_len,
1940 );
1941 }
1942 }
1943 }
1944 }
1945 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1946 );
1947 }
1948 Mode::List | Mode::Append => {
1949 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1950
1951 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1953
1954 if should_send_snapshot {
1955 let mut snapshots = if let Some(ref cursor) = subscription.after {
1957 ctx.entity_cache
1958 .get_after(view_id, cursor, subscription.snapshot_limit)
1959 .await
1960 } else {
1961 ctx.entity_cache.get_all(view_id).await
1962 };
1963
1964 if let Some(limit) = subscription.snapshot_limit {
1966 if subscription.after.is_none() {
1967 snapshots.sort_by(|a, b| {
1968 let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1969 let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1970 cmp_seq(sb, sa) });
1972 snapshots.truncate(limit);
1973 }
1974 }
1975
1976 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1977 .into_iter()
1978 .filter(|(key, _)| subscription.matches_key(key))
1979 .map(|(key, mut data)| {
1980 transform_large_u64_to_strings(&mut data);
1981 SnapshotEntity { key, data }
1982 })
1983 .collect();
1984
1985 if !snapshot_entities.is_empty() {
1986 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1987 let batch_config = ctx.entity_cache.snapshot_config();
1988 send_snapshot_batches(
1989 ctx.client_id,
1990 &snapshot_entities,
1991 view_spec.mode,
1992 view_id,
1993 ctx.client_manager,
1994 ctx.usage_emitter,
1995 &batch_config,
1996 )
1997 .await?;
1998 }
1999 } else {
2000 info!(
2001 "Client {} subscribed to {} without snapshot",
2002 ctx.client_id, view_id
2003 );
2004 }
2005
2006 let client_id = ctx.client_id;
2007 let client_mgr = ctx.client_manager.clone();
2008 let usage_emitter = ctx.usage_emitter.clone();
2009 let sub = subscription.clone();
2010 let view_id_clone = view_id.clone();
2011 let view_id_span = view_id.clone();
2012 let mode = view_spec.mode;
2013 tokio::spawn(
2014 async move {
2015 loop {
2016 tokio::select! {
2017 _ = cancel_token.cancelled() => {
2018 debug!("List subscription cancelled for client {}", client_id);
2019 break;
2020 }
2021 result = rx.recv() => {
2022 match result {
2023 Ok(envelope) => {
2024 if sub.matches(&envelope.entity, &envelope.key)
2025 && client_mgr
2026 .send_to_client(client_id, envelope.payload.clone())
2027 .is_err()
2028 {
2029 break;
2030 } else if sub.matches(&envelope.entity, &envelope.key) {
2031 emit_update_sent_for_client(
2032 &usage_emitter,
2033 &client_mgr,
2034 client_id,
2035 &view_id_clone,
2036 envelope.payload.len(),
2037 );
2038 }
2039 }
2040 Err(_) => break,
2041 }
2042 }
2043 }
2044 }
2045 }
2046 .instrument(
2047 info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
2048 ),
2049 );
2050 }
2051 }
2052
2053 info!(
2054 "Client {} subscribed to {} (mode: {:?})",
2055 ctx.client_id, view_id, view_spec.mode
2056 );
2057
2058 Ok(())
2059}
2060
2061#[cfg(not(feature = "otel"))]
2062async fn attach_derived_view_subscription(
2063 ctx: &SubscriptionContext<'_>,
2064 subscription: Subscription,
2065 view_spec: ViewSpec,
2066 cancel_token: CancellationToken,
2067) -> Result<()> {
2068 let view_id = &subscription.view;
2069 let pipeline_limit = view_spec
2070 .pipeline
2071 .as_ref()
2072 .and_then(|p| p.limit)
2073 .unwrap_or(100);
2074 let take = subscription.take.unwrap_or(pipeline_limit);
2075 let skip = subscription.skip.unwrap_or(0);
2076 let is_single = take == 1;
2077
2078 let source_view_id = match &view_spec.source_view {
2079 Some(s) => s.clone(),
2080 None => {
2081 return Err(anyhow::anyhow!(
2082 "Derived view {} has no source_view",
2083 view_id
2084 ));
2085 }
2086 };
2087
2088 let sorted_caches = ctx.view_index.sorted_caches();
2089 let initial_window: Vec<(String, serde_json::Value)> = {
2090 let mut caches = sorted_caches.write().await;
2091 if let Some(cache) = caches.get_mut(view_id) {
2092 cache.get_window(skip, take)
2093 } else {
2094 warn!("No sorted cache for derived view {}", view_id);
2095 vec![]
2096 }
2097 };
2098
2099 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
2100
2101 if !initial_window.is_empty() {
2102 let snapshot_entities: Vec<SnapshotEntity> = initial_window
2103 .into_iter()
2104 .map(|(key, mut data)| {
2105 transform_large_u64_to_strings(&mut data);
2106 SnapshotEntity { key, data }
2107 })
2108 .collect();
2109
2110 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
2111 let batch_config = ctx.entity_cache.snapshot_config();
2112 send_snapshot_batches(
2113 ctx.client_id,
2114 &snapshot_entities,
2115 view_spec.mode,
2116 view_id,
2117 ctx.client_manager,
2118 ctx.usage_emitter,
2119 &batch_config,
2120 )
2121 .await?;
2122 }
2123
2124 let mut rx = ctx
2125 .bus_manager
2126 .get_or_create_list_bus(&source_view_id)
2127 .await;
2128
2129 let client_id = ctx.client_id;
2130 let client_mgr = ctx.client_manager.clone();
2131 let usage_emitter = ctx.usage_emitter.clone();
2132 let view_id_clone = view_id.clone();
2133 let view_id_span = view_id.clone();
2134 let sorted_caches_clone = sorted_caches;
2135 let frame_mode = view_spec.mode;
2136
2137 tokio::spawn(
2138 async move {
2139 let mut current_window_keys = initial_keys;
2140
2141 loop {
2142 tokio::select! {
2143 _ = cancel_token.cancelled() => {
2144 debug!("Derived view subscription cancelled for client {}", client_id);
2145 break;
2146 }
2147 result = rx.recv() => {
2148 match result {
2149 Ok(_envelope) => {
2150 let new_window: Vec<(String, serde_json::Value)> = {
2151 let mut caches = sorted_caches_clone.write().await;
2152 if let Some(cache) = caches.get_mut(&view_id_clone) {
2153 cache.get_window(skip, take)
2154 } else {
2155 continue;
2156 }
2157 };
2158
2159 let new_keys: HashSet<String> =
2160 new_window.iter().map(|(k, _)| k.clone()).collect();
2161
2162 if is_single {
2163 if let Some((new_key, data)) = new_window.first() {
2164 for old_key in current_window_keys.difference(&new_keys) {
2165 let delete_frame = Frame {
2166 seq: None,
2167 mode: frame_mode,
2168 export: view_id_clone.clone(),
2169 op: "delete",
2170 key: old_key.clone(),
2171 data: serde_json::Value::Null,
2172 append: vec![],
2173 };
2174 if let Ok(json) = serde_json::to_vec(&delete_frame) {
2175 let payload = Arc::new(Bytes::from(json));
2176 let payload_len = payload.len();
2177 if client_mgr.send_to_client(client_id, payload).is_err() {
2178 return;
2179 }
2180 emit_update_sent_for_client(
2181 &usage_emitter,
2182 &client_mgr,
2183 client_id,
2184 &view_id_clone,
2185 payload_len,
2186 );
2187 }
2188 }
2189
2190 let mut transformed_data = data.clone();
2191 transform_large_u64_to_strings(&mut transformed_data);
2192 let frame = Frame {
2193 seq: None,
2194 mode: frame_mode,
2195 export: view_id_clone.clone(),
2196 op: "upsert",
2197 key: new_key.clone(),
2198 data: transformed_data,
2199 append: vec![],
2200 };
2201 if let Ok(json) = serde_json::to_vec(&frame) {
2202 let payload = Arc::new(Bytes::from(json));
2203 let payload_len = payload.len();
2204 if client_mgr.send_to_client(client_id, payload).is_err() {
2205 return;
2206 }
2207 emit_update_sent_for_client(
2208 &usage_emitter,
2209 &client_mgr,
2210 client_id,
2211 &view_id_clone,
2212 payload_len,
2213 );
2214 }
2215 }
2216 } else {
2217 for key in current_window_keys.difference(&new_keys) {
2218 let delete_frame = Frame {
2219 seq: None,
2220 mode: frame_mode,
2221 export: view_id_clone.clone(),
2222 op: "delete",
2223 key: key.clone(),
2224 data: serde_json::Value::Null,
2225 append: vec![],
2226 };
2227 if let Ok(json) = serde_json::to_vec(&delete_frame) {
2228 let payload = Arc::new(Bytes::from(json));
2229 let payload_len = payload.len();
2230 if client_mgr.send_to_client(client_id, payload).is_err() {
2231 return;
2232 }
2233 emit_update_sent_for_client(
2234 &usage_emitter,
2235 &client_mgr,
2236 client_id,
2237 &view_id_clone,
2238 payload_len,
2239 );
2240 }
2241 }
2242
2243 for (key, data) in &new_window {
2244 let mut transformed_data = data.clone();
2245 transform_large_u64_to_strings(&mut transformed_data);
2246 let frame = Frame {
2247 seq: None,
2248 mode: frame_mode,
2249 export: view_id_clone.clone(),
2250 op: "upsert",
2251 key: key.clone(),
2252 data: transformed_data,
2253 append: vec![],
2254 };
2255 if let Ok(json) = serde_json::to_vec(&frame) {
2256 let payload = Arc::new(Bytes::from(json));
2257 let payload_len = payload.len();
2258 if client_mgr.send_to_client(client_id, payload).is_err() {
2259 return;
2260 }
2261 emit_update_sent_for_client(
2262 &usage_emitter,
2263 &client_mgr,
2264 client_id,
2265 &view_id_clone,
2266 payload_len,
2267 );
2268 }
2269 }
2270 }
2271
2272 current_window_keys = new_keys;
2273 }
2274 Err(_) => break,
2275 }
2276 }
2277 }
2278 }
2279 }
2280 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
2281 );
2282
2283 info!(
2284 "Client {} subscribed to derived view {} (take={}, skip={})",
2285 ctx.client_id, view_id, take, skip
2286 );
2287
2288 Ok(())
2289}