1use crate::bus::BusManager;
2use crate::cache::{cmp_seq, EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::auth::{
6 AuthContext, AuthDecision, AuthDeny, ConnectionAuthRequest, WebSocketAuthPlugin,
7};
8use crate::websocket::client_manager::{ClientManager, RateLimitConfig};
9use crate::websocket::frame::{
10 transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
11 SortOrder, SubscribedFrame,
12};
13use crate::websocket::subscription::{
14 ClientMessage, RefreshAuthRequest, RefreshAuthResponse, SocketIssueMessage, Subscription,
15};
16use crate::websocket::usage::{WebSocketUsageEmitter, WebSocketUsageEvent};
17use anyhow::Result;
18use bytes::Bytes;
19use futures_util::StreamExt;
20use std::collections::{HashMap, HashSet};
21use std::net::SocketAddr;
22use std::sync::Arc;
23#[cfg(feature = "otel")]
24use std::time::Instant;
25
26use tokio::net::{TcpListener, TcpStream};
27use tokio_tungstenite::{
28 accept_hdr_async,
29 tungstenite::{
30 handshake::server::{ErrorResponse as HandshakeErrorResponse, Request, Response},
31 http::{header::CONTENT_TYPE, StatusCode},
32 Error as WsError,
33 },
34};
35
36use tokio_util::sync::CancellationToken;
37use tracing::{debug, error, info, info_span, warn, Instrument};
38use uuid::Uuid;
39
40#[cfg(feature = "otel")]
41use crate::metrics::Metrics;
42
43async fn handle_refresh_auth(
45 client_id: Uuid,
46 refresh_req: &RefreshAuthRequest,
47 client_manager: &ClientManager,
48 auth_plugin: &Arc<dyn WebSocketAuthPlugin>,
49) {
50 let refresh_result: Result<AuthContext, String> = {
53 if let Some(signed_plugin) = auth_plugin
55 .as_any()
56 .downcast_ref::<crate::websocket::auth::SignedSessionAuthPlugin>(
57 ) {
58 signed_plugin
59 .verify_refresh_token(&refresh_req.token)
60 .await
61 .map_err(|e| e.reason)
62 } else {
63 Err("In-band auth refresh not supported with current auth plugin".to_string())
64 }
65 };
66
67 match refresh_result {
68 Ok(new_context) => {
69 let expires_at = new_context.expires_at;
70 if client_manager.update_client_auth(client_id, new_context) {
71 info!(
72 "Client {} refreshed auth successfully, expires at {}",
73 client_id, expires_at
74 );
75
76 let response = RefreshAuthResponse {
78 success: true,
79 error: None,
80 expires_at: Some(expires_at),
81 };
82 if let Ok(json) = serde_json::to_string(&response) {
83 let _ = client_manager.send_text_to_client(client_id, json).await;
84 }
85 } else {
86 warn!("Client {} not found when refreshing auth", client_id);
87
88 let response = RefreshAuthResponse {
90 success: false,
91 error: Some("client-not-found".to_string()),
92 expires_at: None,
93 };
94 if let Ok(json) = serde_json::to_string(&response) {
95 let _ = client_manager.send_text_to_client(client_id, json).await;
96 }
97 }
98 }
99 Err(err) => {
100 warn!("Client {} auth refresh failed: {}", client_id, err);
101
102 let error_code = if err.contains("expired") {
104 "token-expired"
105 } else if err.contains("signature") {
106 "token-invalid-signature"
107 } else if err.contains("issuer") {
108 "token-invalid-issuer"
109 } else if err.contains("audience") {
110 "token-invalid-audience"
111 } else {
112 "token-invalid"
113 };
114
115 let response = RefreshAuthResponse {
116 success: false,
117 error: Some(error_code.to_string()),
118 expires_at: None,
119 };
120 if let Ok(json) = serde_json::to_string(&response) {
121 let _ = client_manager.send_text_to_client(client_id, json).await;
122 }
123 }
124 }
125}
126
127async fn send_socket_issue(
128 client_id: Uuid,
129 client_manager: &ClientManager,
130 deny: &AuthDeny,
131 fatal: bool,
132) {
133 let message = SocketIssueMessage::from_auth_deny(deny, fatal);
134 match serde_json::to_string(&message) {
135 Ok(json) => {
136 let _ = client_manager.send_text_to_client(client_id, json).await;
137 }
138 Err(error) => {
139 warn!(error = %error, client_id = %client_id, "failed to serialize socket issue message");
140 }
141 }
142}
143
144fn auth_deny_from_subscription_error(reason: &str) -> Option<AuthDeny> {
145 if reason.starts_with("Snapshot limit exceeded:") {
146 Some(AuthDeny::new(
147 crate::websocket::auth::AuthErrorCode::SnapshotLimitExceeded,
148 reason,
149 ))
150 } else {
151 None
152 }
153}
154
155fn key_class_label(key_class: hyperstack_auth::KeyClass) -> &'static str {
156 match key_class {
157 hyperstack_auth::KeyClass::Secret => "secret",
158 hyperstack_auth::KeyClass::Publishable => "publishable",
159 }
160}
161
162fn emit_usage_event(
163 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
164 event: WebSocketUsageEvent,
165) {
166 if let Some(emitter) = usage_emitter.clone() {
167 tokio::spawn(async move {
168 emitter.emit(event).await;
169 });
170 }
171}
172
173fn usage_identity(
174 auth_context: Option<&AuthContext>,
175) -> (
176 Option<String>,
177 Option<String>,
178 Option<String>,
179 Option<String>,
180) {
181 match auth_context {
182 Some(ctx) => (
183 Some(ctx.metering_key.clone()),
184 Some(ctx.subject.clone()),
185 Some(key_class_label(ctx.key_class).to_string()),
186 ctx.deployment_id.clone(),
187 ),
188 None => (None, None, None, None),
189 }
190}
191
192fn emit_update_sent_for_client(
193 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
194 client_manager: &ClientManager,
195 client_id: Uuid,
196 view_id: &str,
197 bytes: usize,
198) {
199 let auth_context = client_manager.get_auth_context(client_id);
200 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
201 emit_usage_event(
202 usage_emitter,
203 WebSocketUsageEvent::UpdateSent {
204 client_id: client_id.to_string(),
205 deployment_id,
206 metering_key,
207 subject,
208 view_id: view_id.to_string(),
209 messages: 1,
210 bytes: bytes as u64,
211 },
212 );
213}
214
215struct SubscriptionContext<'a> {
216 client_id: Uuid,
217 client_manager: &'a ClientManager,
218 bus_manager: &'a BusManager,
219 entity_cache: &'a EntityCache,
220 view_index: &'a ViewIndex,
221 usage_emitter: &'a Option<Arc<dyn WebSocketUsageEmitter>>,
222 #[cfg(feature = "otel")]
223 metrics: Option<Arc<Metrics>>,
224}
225
226pub struct WebSocketServer {
227 bind_addr: SocketAddr,
228 client_manager: ClientManager,
229 bus_manager: BusManager,
230 entity_cache: EntityCache,
231 view_index: Arc<ViewIndex>,
232 max_clients: usize,
233 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
234 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
235 rate_limit_config: Option<RateLimitConfig>,
236 #[cfg(feature = "otel")]
237 metrics: Option<Arc<Metrics>>,
238}
239
240impl WebSocketServer {
241 #[cfg(feature = "otel")]
242 pub fn new(
243 bind_addr: SocketAddr,
244 bus_manager: BusManager,
245 entity_cache: EntityCache,
246 view_index: Arc<ViewIndex>,
247 metrics: Option<Arc<Metrics>>,
248 ) -> Self {
249 Self {
250 bind_addr,
251 client_manager: ClientManager::new(),
252 bus_manager,
253 entity_cache,
254 view_index,
255 max_clients: 10000,
256 auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
257 usage_emitter: None,
258 rate_limit_config: None,
259 metrics,
260 }
261 }
262
263 #[cfg(not(feature = "otel"))]
264 pub fn new(
265 bind_addr: SocketAddr,
266 bus_manager: BusManager,
267 entity_cache: EntityCache,
268 view_index: Arc<ViewIndex>,
269 ) -> Self {
270 Self {
271 bind_addr,
272 client_manager: ClientManager::new(),
273 bus_manager,
274 entity_cache,
275 view_index,
276 max_clients: 10000,
277 auth_plugin: Arc::new(crate::websocket::auth::AllowAllAuthPlugin),
278 usage_emitter: None,
279 rate_limit_config: None,
280 }
281 }
282
283 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
284 self.max_clients = max_clients;
285 self
286 }
287
288 pub fn with_auth_plugin(mut self, auth_plugin: Arc<dyn WebSocketAuthPlugin>) -> Self {
289 self.auth_plugin = auth_plugin;
290 self
291 }
292
293 pub fn with_usage_emitter(mut self, usage_emitter: Arc<dyn WebSocketUsageEmitter>) -> Self {
294 self.usage_emitter = Some(usage_emitter);
295 self
296 }
297
298 pub fn with_rate_limit_config(mut self, config: RateLimitConfig) -> Self {
304 self.rate_limit_config = Some(config);
305 self
306 }
307
308 pub async fn start(self) -> Result<()> {
309 info!(
310 "Starting WebSocket server on {} (max_clients: {})",
311 self.bind_addr, self.max_clients
312 );
313
314 let listener = TcpListener::bind(&self.bind_addr).await?;
315 info!("WebSocket server listening on {}", self.bind_addr);
316
317 let client_manager = if let Some(config) = self.rate_limit_config {
319 ClientManager::with_config(config)
320 } else {
321 self.client_manager
322 };
323
324 client_manager.start_cleanup_task();
325
326 loop {
327 match listener.accept().await {
328 Ok((stream, addr)) => {
329 let client_count = client_manager.client_count();
330 if client_count >= self.max_clients {
331 warn!(
332 "Rejecting connection from {} - max clients ({}) reached",
333 addr, self.max_clients
334 );
335 drop(stream);
336 continue;
337 }
338
339 info!(
340 "New WebSocket connection from {} ({}/{} clients)",
341 addr,
342 client_count + 1,
343 self.max_clients
344 );
345 let client_manager = client_manager.clone();
346 let bus_manager = self.bus_manager.clone();
347 let entity_cache = self.entity_cache.clone();
348 let view_index = self.view_index.clone();
349 #[cfg(feature = "otel")]
350 let metrics = self.metrics.clone();
351
352 let auth_plugin = self.auth_plugin.clone();
353 let usage_emitter = self.usage_emitter.clone();
354
355 tokio::spawn(
356 async move {
357 #[cfg(feature = "otel")]
358 let result = handle_connection(
359 stream,
360 client_manager,
361 bus_manager,
362 entity_cache,
363 view_index,
364 addr,
365 auth_plugin,
366 usage_emitter,
367 metrics,
368 )
369 .await;
370 #[cfg(not(feature = "otel"))]
371 let result = handle_connection(
372 stream,
373 client_manager,
374 bus_manager,
375 entity_cache,
376 view_index,
377 addr,
378 auth_plugin,
379 usage_emitter,
380 )
381 .await;
382
383 if let Err(e) = result {
384 error!("WebSocket connection error: {}", e);
385 }
386 }
387 .instrument(info_span!("ws.connection", %addr)),
388 );
389 }
390 Err(e) => {
391 error!("Failed to accept connection: {}", e);
392 }
393 }
394 }
395 }
396}
397
398#[derive(Debug, Clone)]
399struct HandshakeReject {
400 status: StatusCode,
401 body: crate::websocket::auth::ErrorResponse,
402 error_code: String,
403 retry_after_secs: Option<u64>,
404}
405
406impl HandshakeReject {
407 fn from_deny(deny: &AuthDeny) -> Self {
408 let retry_after_secs = match deny.retry_policy {
409 crate::websocket::auth::RetryPolicy::RetryAfter(duration) => Some(duration.as_secs()),
410 _ => None,
411 };
412
413 Self {
414 status: StatusCode::from_u16(deny.http_status).unwrap_or(StatusCode::UNAUTHORIZED),
415 body: deny.to_error_response(),
416 error_code: deny.code.to_string(),
417 retry_after_secs,
418 }
419 }
420}
421
422fn build_handshake_error_response(
423 response: &Response,
424 reject: &HandshakeReject,
425) -> HandshakeErrorResponse {
426 let mut builder = Response::builder()
427 .status(reject.status)
428 .version(response.version())
429 .header(CONTENT_TYPE, "application/json; charset=utf-8")
430 .header("X-Error-Code", &reject.error_code)
431 .header("Cache-Control", "no-store");
432
433 if let Some(retry_after_secs) = reject.retry_after_secs {
434 builder = builder.header("Retry-After", retry_after_secs.to_string());
435 }
436
437 let body = serde_json::to_string(&reject.body).unwrap_or_else(|_| {
438 format!(
439 r#"{{"error":"{}","message":"{}","code":"{}","retryable":false}}"#,
440 reject.body.error, reject.body.message, reject.body.code
441 )
442 });
443
444 builder
445 .body(Some(body))
446 .expect("handshake rejection response should build")
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use crate::websocket::auth::{AuthDeny, AuthErrorCode};
453 use std::time::Duration;
454
455 #[test]
456 fn handshake_error_response_serializes_json_and_retry_after() {
457 let response = Response::builder()
458 .status(StatusCode::SWITCHING_PROTOCOLS)
459 .body(())
460 .unwrap();
461 let deny = AuthDeny::rate_limited(Duration::from_secs(7), "websocket handshakes");
462 let reject = HandshakeReject::from_deny(&deny);
463
464 let handshake_response = build_handshake_error_response(&response, &reject);
465 assert_eq!(handshake_response.status(), StatusCode::TOO_MANY_REQUESTS);
466 assert_eq!(
467 handshake_response.headers().get("X-Error-Code").unwrap(),
468 "rate-limit-exceeded"
469 );
470 assert_eq!(
471 handshake_response.headers().get("Retry-After").unwrap(),
472 "7"
473 );
474
475 let body = handshake_response.into_body().unwrap();
476 assert!(body.contains("rate-limit-exceeded"));
477 assert!(body.contains("retryable"));
478 }
479
480 #[test]
481 fn handshake_error_response_preserves_non_retryable_auth_denies() {
482 let response = Response::builder()
483 .status(StatusCode::SWITCHING_PROTOCOLS)
484 .body(())
485 .unwrap();
486 let deny = AuthDeny::new(AuthErrorCode::OriginMismatch, "origin mismatch");
487 let reject = HandshakeReject::from_deny(&deny);
488
489 let handshake_response = build_handshake_error_response(&response, &reject);
490 assert_eq!(handshake_response.status(), StatusCode::FORBIDDEN);
491 assert!(handshake_response.headers().get("Retry-After").is_none());
492 }
493}
494
495#[allow(clippy::result_large_err)]
496async fn accept_authorized_connection(
497 stream: TcpStream,
498 remote_addr: SocketAddr,
499 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
500 client_manager: ClientManager,
501) -> Result<Option<(tokio_tungstenite::WebSocketStream<TcpStream>, AuthContext)>> {
502 use std::sync::Mutex;
503
504 let auth_result_capture: Arc<Mutex<Option<Result<AuthContext, HandshakeReject>>>> =
505 Arc::new(Mutex::new(None));
506 let auth_result_ref = auth_result_capture.clone();
507 let auth_plugin_ref = auth_plugin.clone();
508 let client_manager_for_auth = client_manager.clone();
509
510 let handshake_result = accept_hdr_async(stream, move |request: &Request, response| {
511 let connection_request = ConnectionAuthRequest::from_http_request(remote_addr, request);
512
513 let auth_result = tokio::task::block_in_place(|| {
514 tokio::runtime::Handle::current().block_on(async {
515 match auth_plugin_ref.authorize(&connection_request).await {
516 AuthDecision::Allow(ctx) => {
517 match client_manager_for_auth
518 .check_connection_allowed(remote_addr, &Some(ctx.clone()))
519 .await
520 {
521 Ok(()) => Ok(ctx),
522 Err(deny) => Err(HandshakeReject::from_deny(&deny)),
523 }
524 }
525 AuthDecision::Deny(deny) => Err(HandshakeReject::from_deny(&deny)),
526 }
527 })
528 });
529
530 let mut capture_lock = auth_result_ref.lock().expect("capture lock poisoned");
531 *capture_lock = Some(auth_result.clone());
532
533 match auth_result {
534 Ok(_) => Ok(response),
535 Err(reject) => Err(build_handshake_error_response(&response, &reject)),
536 }
537 })
538 .await;
539
540 let auth_result = {
541 let mut guard = auth_result_capture.lock().expect("capture lock poisoned");
542 guard.take()
543 };
544
545 match handshake_result {
546 Ok(ws_stream) => match auth_result {
547 Some(Ok(ctx)) => {
548 info!("WebSocket connection authorized for {}", remote_addr);
549 Ok(Some((ws_stream, ctx)))
550 }
551 Some(Err(reject)) => Err(anyhow::anyhow!(
552 "handshake unexpectedly succeeded after rejection: {}",
553 reject.body.message
554 )),
555 None => Err(anyhow::anyhow!(
556 "no auth result captured for authorized connection {}",
557 remote_addr
558 )),
559 },
560 Err(WsError::Http(_)) => {
561 match auth_result {
562 Some(Err(reject)) => {
563 warn!(
564 "WebSocket connection rejected during handshake for {}: {}",
565 remote_addr, reject.body.message
566 );
567 }
568 Some(Ok(_)) => {
569 warn!(
570 "WebSocket handshake failed after auth success for {}",
571 remote_addr
572 );
573 }
574 None => {
575 warn!(
576 "WebSocket handshake rejected for {} without captured auth result",
577 remote_addr
578 );
579 }
580 }
581 Ok(None)
582 }
583 Err(err) => Err(err.into()),
584 }
585}
586
587#[cfg(feature = "otel")]
588async fn handle_connection(
589 stream: TcpStream,
590 client_manager: ClientManager,
591 bus_manager: BusManager,
592 entity_cache: EntityCache,
593 view_index: Arc<ViewIndex>,
594 remote_addr: std::net::SocketAddr,
595 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
596 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
597 metrics: Option<Arc<Metrics>>,
598) -> Result<()> {
599 let Some((ws_stream, auth_context)) = accept_authorized_connection(
600 stream,
601 remote_addr,
602 auth_plugin.clone(),
603 client_manager.clone(),
604 )
605 .await?
606 else {
607 return Ok(());
608 };
609
610 let client_id = Uuid::new_v4();
611 let connection_start = Instant::now();
612
613 let auth_context = Some(auth_context);
614 let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
615 usage_identity(auth_context.as_ref());
616
617 let metering_key = auth_context.as_ref().map(|ctx| ctx.metering_key.clone());
619
620 if let Some(ref m) = metrics {
621 if let Some(ref mk) = metering_key {
622 m.record_ws_connection_with_metering(mk);
623 } else {
624 m.record_ws_connection();
625 }
626 }
627
628 info!("WebSocket connection established for client {}", client_id);
629
630 emit_usage_event(
631 &usage_emitter,
632 WebSocketUsageEvent::ConnectionEstablished {
633 client_id: client_id.to_string(),
634 remote_addr: remote_addr.to_string(),
635 deployment_id: usage_deployment_id.clone(),
636 metering_key: usage_metering_key.clone(),
637 subject: usage_subject.clone(),
638 key_class: usage_key_class,
639 },
640 );
641
642 let (ws_sender, mut ws_receiver) = ws_stream.split();
643
644 client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
646
647 let ctx = SubscriptionContext {
648 client_id,
649 client_manager: &client_manager,
650 bus_manager: &bus_manager,
651 entity_cache: &entity_cache,
652 view_index: &view_index,
653 usage_emitter: &usage_emitter,
654 metrics: metrics.clone(),
655 };
656
657 let mut active_subscriptions: HashMap<String, String> = HashMap::new();
658
659 loop {
660 tokio::select! {
661 ws_msg = ws_receiver.next() => {
662 match ws_msg {
663 Some(Ok(msg)) => {
664 if msg.is_close() {
665 info!("Client {} requested close", client_id);
666 break;
667 }
668
669 client_manager.update_client_last_seen(client_id);
670
671 if msg.is_text() {
672 if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
673 warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
674 send_socket_issue(client_id, &client_manager, &deny, true).await;
675 break;
676 }
677
678 if let Some(ref m) = metrics {
679 if let Some(ref mk) = metering_key {
680 m.record_ws_message_received_with_metering(mk);
681 } else {
682 m.record_ws_message_received();
683 }
684 }
685
686 if let Ok(text) = msg.to_text() {
687 debug!("Received text message from client {}: {}", client_id, text);
688
689 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
690 match client_msg {
691 ClientMessage::Subscribe(subscription) => {
692 let view_id = subscription.view.clone();
693 let sub_key = subscription.sub_key();
694
695 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
697 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
698 send_socket_issue(client_id, &client_manager, &deny, false).await;
699 continue;
700 }
701
702 client_manager.update_subscription(client_id, subscription.clone());
703
704 let cancel_token = CancellationToken::new();
705 let is_new = client_manager.add_client_subscription(
706 client_id,
707 sub_key.clone(),
708 cancel_token.clone(),
709 ).await;
710
711 if !is_new {
712 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
713 continue;
714 }
715
716 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
717 warn!(
718 "Subscription rejected for client {} on {}: {}",
719 client_id, view_id, err
720 );
721 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
722 send_socket_issue(client_id, &client_manager, &deny, false).await;
723 }
724 let _ = client_manager
725 .remove_client_subscription(client_id, &sub_key)
726 .await;
727 continue;
728 }
729
730 if let Some(ref m) = metrics {
731 if let Some(ref mk) = metering_key {
732 m.record_subscription_created_with_metering(&view_id, mk);
733 } else {
734 m.record_subscription_created(&view_id);
735 }
736 }
737 active_subscriptions.insert(sub_key, view_id.clone());
738 emit_usage_event(
739 &usage_emitter,
740 WebSocketUsageEvent::SubscriptionCreated {
741 client_id: client_id.to_string(),
742 deployment_id: usage_deployment_id.clone(),
743 metering_key: usage_metering_key.clone(),
744 subject: usage_subject.clone(),
745 view_id,
746 },
747 );
748 }
749 ClientMessage::Unsubscribe(unsub) => {
750 let sub_key = unsub.sub_key();
751 let removed = client_manager
752 .remove_client_subscription(client_id, &sub_key)
753 .await;
754
755 if removed {
756 info!("Client {} unsubscribed from {}", client_id, sub_key);
757 active_subscriptions.remove(&sub_key);
758 if let Some(ref m) = metrics {
759 if let Some(ref mk) = metering_key {
760 m.record_subscription_removed_with_metering(&unsub.view, mk);
761 } else {
762 m.record_subscription_removed(&unsub.view);
763 }
764 }
765 emit_usage_event(
766 &usage_emitter,
767 WebSocketUsageEvent::SubscriptionRemoved {
768 client_id: client_id.to_string(),
769 deployment_id: usage_deployment_id.clone(),
770 metering_key: usage_metering_key.clone(),
771 subject: usage_subject.clone(),
772 view_id: unsub.view.clone(),
773 },
774 );
775 }
776 }
777 ClientMessage::Ping => {
778 debug!("Received ping from client {}", client_id);
779 }
780 ClientMessage::RefreshAuth(refresh_req) => {
781 debug!("Received refresh_auth from client {}", client_id);
782 handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
783 }
784 }
785 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
786 let view_id = subscription.view.clone();
787 let sub_key = subscription.sub_key();
788
789 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
790 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
791 send_socket_issue(client_id, &client_manager, &deny, false).await;
792 continue;
793 }
794
795 client_manager.update_subscription(client_id, subscription.clone());
796
797 let cancel_token = CancellationToken::new();
798 let is_new = client_manager.add_client_subscription(
799 client_id,
800 sub_key.clone(),
801 cancel_token.clone(),
802 ).await;
803
804 if !is_new {
805 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
806 continue;
807 }
808
809 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
810 warn!(
811 "Subscription rejected for client {} on {}: {}",
812 client_id, view_id, err
813 );
814 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
815 send_socket_issue(client_id, &client_manager, &deny, false).await;
816 }
817 let _ = client_manager
818 .remove_client_subscription(client_id, &sub_key)
819 .await;
820 continue;
821 }
822
823 if let Some(ref m) = metrics {
824 if let Some(ref mk) = metering_key {
825 m.record_subscription_created_with_metering(&view_id, mk);
826 } else {
827 m.record_subscription_created(&view_id);
828 }
829 }
830 active_subscriptions.insert(sub_key, view_id.clone());
831 emit_usage_event(
832 &usage_emitter,
833 WebSocketUsageEvent::SubscriptionCreated {
834 client_id: client_id.to_string(),
835 deployment_id: usage_deployment_id.clone(),
836 metering_key: usage_metering_key.clone(),
837 subject: usage_subject.clone(),
838 view_id,
839 },
840 );
841 } else {
842 debug!("Received non-subscription message from client {}: {}", client_id, text);
843 }
844 }
845 }
846 }
847 Some(Err(e)) => {
848 warn!("WebSocket error for client {}: {}", client_id, e);
849 break;
850 }
851 None => {
852 debug!("WebSocket stream ended for client {}", client_id);
853 break;
854 }
855 }
856 }
857 }
858 }
859
860 client_manager
861 .cancel_all_client_subscriptions(client_id)
862 .await;
863 client_manager.remove_client(client_id);
864 if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
865 rate_limiter.remove_client_buckets(client_id).await;
866 }
867
868 if let Some(ref m) = metrics {
869 let duration_secs = connection_start.elapsed().as_secs_f64();
870 if let Some(ref mk) = metering_key {
871 m.record_ws_disconnection_with_metering(duration_secs, mk);
872 for view_id in active_subscriptions.values() {
873 m.record_subscription_removed_with_metering(view_id, mk);
874 }
875 } else {
876 m.record_ws_disconnection(duration_secs);
877 for view_id in active_subscriptions.values() {
878 m.record_subscription_removed(view_id);
879 }
880 }
881 }
882
883 for view_id in active_subscriptions.values() {
884 emit_usage_event(
885 &usage_emitter,
886 WebSocketUsageEvent::SubscriptionRemoved {
887 client_id: client_id.to_string(),
888 deployment_id: usage_deployment_id.clone(),
889 metering_key: usage_metering_key.clone(),
890 subject: usage_subject.clone(),
891 view_id: view_id.clone(),
892 },
893 );
894 }
895
896 emit_usage_event(
897 &usage_emitter,
898 WebSocketUsageEvent::ConnectionClosed {
899 client_id: client_id.to_string(),
900 deployment_id: usage_deployment_id,
901 metering_key: usage_metering_key,
902 subject: usage_subject,
903 duration_secs: Some(connection_start.elapsed().as_secs_f64()),
904 subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
905 },
906 );
907
908 info!("Client {} disconnected", client_id);
909
910 Ok(())
911}
912
913#[cfg(not(feature = "otel"))]
914#[allow(clippy::too_many_arguments)]
915async fn handle_connection(
916 stream: TcpStream,
917 client_manager: ClientManager,
918 bus_manager: BusManager,
919 entity_cache: EntityCache,
920 view_index: Arc<ViewIndex>,
921 remote_addr: std::net::SocketAddr,
922 auth_plugin: Arc<dyn WebSocketAuthPlugin>,
923 usage_emitter: Option<Arc<dyn WebSocketUsageEmitter>>,
924) -> Result<()> {
925 let Some((ws_stream, auth_context)) = accept_authorized_connection(
926 stream,
927 remote_addr,
928 auth_plugin.clone(),
929 client_manager.clone(),
930 )
931 .await?
932 else {
933 return Ok(());
934 };
935
936 let client_id = Uuid::new_v4();
937 let auth_context_ref = Some(&auth_context);
938 let (usage_metering_key, usage_subject, usage_key_class, usage_deployment_id) =
939 usage_identity(auth_context_ref);
940
941 let auth_context = Some(auth_context);
942
943 info!("WebSocket connection established for client {}", client_id);
944
945 emit_usage_event(
946 &usage_emitter,
947 WebSocketUsageEvent::ConnectionEstablished {
948 client_id: client_id.to_string(),
949 remote_addr: remote_addr.to_string(),
950 deployment_id: usage_deployment_id.clone(),
951 metering_key: usage_metering_key.clone(),
952 subject: usage_subject.clone(),
953 key_class: usage_key_class,
954 },
955 );
956
957 let (ws_sender, mut ws_receiver) = ws_stream.split();
958
959 client_manager.add_client(client_id, ws_sender, auth_context, remote_addr);
961
962 let ctx = SubscriptionContext {
963 client_id,
964 client_manager: &client_manager,
965 bus_manager: &bus_manager,
966 entity_cache: &entity_cache,
967 view_index: &view_index,
968 usage_emitter: &usage_emitter,
969 };
970
971 let mut active_subscriptions: HashMap<String, String> = HashMap::new();
972
973 loop {
974 tokio::select! {
975 ws_msg = ws_receiver.next() => {
976 match ws_msg {
977 Some(Ok(msg)) => {
978 if msg.is_close() {
979 info!("Client {} requested close", client_id);
980 break;
981 }
982
983 client_manager.update_client_last_seen(client_id);
984
985 if msg.is_text() {
986 if let Err(deny) = client_manager.check_inbound_message_allowed(client_id) {
987 warn!("Inbound message rejected for client {}: {}", client_id, deny.reason);
988 send_socket_issue(client_id, &client_manager, &deny, true).await;
989 break;
990 }
991
992 if let Ok(text) = msg.to_text() {
993 debug!("Received text message from client {}: {}", client_id, text);
994
995 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
996 match client_msg {
997 ClientMessage::Subscribe(subscription) => {
998 let view_id = subscription.view.clone();
999 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1000 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1001 send_socket_issue(client_id, &client_manager, &deny, false).await;
1002 continue;
1003 }
1004
1005 let sub_key = subscription.sub_key();
1006 client_manager.update_subscription(client_id, subscription.clone());
1007
1008 let cancel_token = CancellationToken::new();
1009 let is_new = client_manager.add_client_subscription(
1010 client_id,
1011 sub_key.clone(),
1012 cancel_token.clone(),
1013 ).await;
1014
1015 if !is_new {
1016 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1017 continue;
1018 }
1019
1020 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1021 warn!(
1022 "Subscription rejected for client {} on {}: {}",
1023 client_id,
1024 sub_key,
1025 err
1026 );
1027 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1028 send_socket_issue(client_id, &client_manager, &deny, false).await;
1029 }
1030 let _ = client_manager
1031 .remove_client_subscription(client_id, &sub_key)
1032 .await;
1033 } else {
1034 active_subscriptions.insert(sub_key, view_id.clone());
1035 emit_usage_event(
1036 &usage_emitter,
1037 WebSocketUsageEvent::SubscriptionCreated {
1038 client_id: client_id.to_string(),
1039 deployment_id: usage_deployment_id.clone(),
1040 metering_key: usage_metering_key.clone(),
1041 subject: usage_subject.clone(),
1042 view_id,
1043 },
1044 );
1045 }
1046 }
1047 ClientMessage::Unsubscribe(unsub) => {
1048 let sub_key = unsub.sub_key();
1049 let removed = client_manager
1050 .remove_client_subscription(client_id, &sub_key)
1051 .await;
1052
1053 if removed {
1054 info!("Client {} unsubscribed from {}", client_id, sub_key);
1055 active_subscriptions.remove(&sub_key);
1056 emit_usage_event(
1057 &usage_emitter,
1058 WebSocketUsageEvent::SubscriptionRemoved {
1059 client_id: client_id.to_string(),
1060 deployment_id: usage_deployment_id.clone(),
1061 metering_key: usage_metering_key.clone(),
1062 subject: usage_subject.clone(),
1063 view_id: unsub.view.clone(),
1064 },
1065 );
1066 }
1067 }
1068 ClientMessage::Ping => {
1069 debug!("Received ping from client {}", client_id);
1070 }
1071 ClientMessage::RefreshAuth(refresh_req) => {
1072 debug!("Received refresh_auth from client {}", client_id);
1073 handle_refresh_auth(client_id, &refresh_req, &client_manager, &auth_plugin).await;
1074 }
1075 }
1076 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
1077 let view_id = subscription.view.clone();
1078 if let Err(deny) = client_manager.check_subscription_allowed(client_id).await {
1079 warn!("Subscription rejected for client {}: {}", client_id, deny.reason);
1080 send_socket_issue(client_id, &client_manager, &deny, false).await;
1081 continue;
1082 }
1083
1084 let sub_key = subscription.sub_key();
1085 client_manager.update_subscription(client_id, subscription.clone());
1086
1087 let cancel_token = CancellationToken::new();
1088 let is_new = client_manager.add_client_subscription(
1089 client_id,
1090 sub_key.clone(),
1091 cancel_token.clone(),
1092 ).await;
1093
1094 if !is_new {
1095 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
1096 continue;
1097 }
1098
1099 if let Err(err) = attach_client_to_bus(&ctx, subscription, cancel_token).await {
1100 warn!(
1101 "Subscription rejected for client {} on {}: {}",
1102 client_id,
1103 sub_key,
1104 err
1105 );
1106 if let Some(deny) = auth_deny_from_subscription_error(&err.to_string()) {
1107 send_socket_issue(client_id, &client_manager, &deny, false).await;
1108 }
1109 let _ = client_manager
1110 .remove_client_subscription(client_id, &sub_key)
1111 .await;
1112 } else {
1113 active_subscriptions.insert(sub_key, view_id.clone());
1114 emit_usage_event(
1115 &usage_emitter,
1116 WebSocketUsageEvent::SubscriptionCreated {
1117 client_id: client_id.to_string(),
1118 deployment_id: usage_deployment_id.clone(),
1119 metering_key: usage_metering_key.clone(),
1120 subject: usage_subject.clone(),
1121 view_id,
1122 },
1123 );
1124 }
1125 } else {
1126 debug!("Received non-subscription message from client {}: {}", client_id, text);
1127 }
1128 }
1129 }
1130 }
1131 Some(Err(e)) => {
1132 warn!("WebSocket error for client {}: {}", client_id, e);
1133 break;
1134 }
1135 None => {
1136 debug!("WebSocket stream ended for client {}", client_id);
1137 break;
1138 }
1139 }
1140 }
1141 }
1142 }
1143
1144 client_manager
1145 .cancel_all_client_subscriptions(client_id)
1146 .await;
1147 client_manager.remove_client(client_id);
1148 if let Some(rate_limiter) = client_manager.rate_limiter().cloned() {
1149 rate_limiter.remove_client_buckets(client_id).await;
1150 }
1151
1152 for view_id in active_subscriptions.values() {
1153 emit_usage_event(
1154 &usage_emitter,
1155 WebSocketUsageEvent::SubscriptionRemoved {
1156 client_id: client_id.to_string(),
1157 deployment_id: usage_deployment_id.clone(),
1158 metering_key: usage_metering_key.clone(),
1159 subject: usage_subject.clone(),
1160 view_id: view_id.clone(),
1161 },
1162 );
1163 }
1164
1165 emit_usage_event(
1166 &usage_emitter,
1167 WebSocketUsageEvent::ConnectionClosed {
1168 client_id: client_id.to_string(),
1169 deployment_id: usage_deployment_id,
1170 metering_key: usage_metering_key,
1171 subject: usage_subject,
1172 duration_secs: None,
1173 subscription_count: u32::try_from(active_subscriptions.len()).unwrap_or(u32::MAX),
1174 },
1175 );
1176
1177 info!("Client {} disconnected", client_id);
1178
1179 Ok(())
1180}
1181
1182async fn send_snapshot_batches(
1183 client_id: Uuid,
1184 entities: &[SnapshotEntity],
1185 mode: Mode,
1186 view_id: &str,
1187 client_manager: &ClientManager,
1188 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1189 batch_config: &SnapshotBatchConfig,
1190 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
1191) -> Result<()> {
1192 let total = entities.len();
1193 if total == 0 {
1194 return Ok(());
1195 }
1196
1197 let mut offset = 0;
1198 let mut batch_num = 0;
1199
1200 while offset < total {
1201 let batch_size = if batch_num == 0 {
1202 batch_config.initial_batch_size
1203 } else {
1204 batch_config.subsequent_batch_size
1205 };
1206
1207 let end = (offset + batch_size).min(total);
1208 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
1209 let rows_in_batch = batch_data.len() as u32;
1210 let is_complete = end >= total;
1211
1212 let snapshot_frame = SnapshotFrame {
1213 mode,
1214 export: view_id.to_string(),
1215 op: "snapshot",
1216 data: batch_data,
1217 complete: is_complete,
1218 };
1219
1220 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
1221 let payload = maybe_compress(&json_payload);
1222 let payload_bytes = payload.as_bytes().len() as u64;
1223 if client_manager
1224 .send_compressed_async(client_id, payload)
1225 .await
1226 .is_err()
1227 {
1228 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
1229 }
1230 #[cfg(feature = "otel")]
1231 if let Some(m) = metrics {
1232 m.record_ws_message_sent();
1233 }
1234
1235 let auth_context = client_manager.get_auth_context(client_id);
1236 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1237 emit_usage_event(
1238 usage_emitter,
1239 WebSocketUsageEvent::SnapshotSent {
1240 client_id: client_id.to_string(),
1241 deployment_id,
1242 metering_key,
1243 subject,
1244 view_id: view_id.to_string(),
1245 rows: rows_in_batch,
1246 messages: 1,
1247 bytes: payload_bytes,
1248 },
1249 );
1250 }
1251
1252 offset = end;
1253 batch_num += 1;
1254 }
1255
1256 debug!(
1257 "Sent {} snapshot batches ({} entities) for {} to client {}",
1258 batch_num, total, view_id, client_id
1259 );
1260
1261 Ok(())
1262}
1263
1264fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
1265 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
1266 return Some(SortConfig {
1267 field: sort.field_path.clone(),
1268 order: match sort.order {
1269 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
1270 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
1271 },
1272 });
1273 }
1274
1275 if view_spec.mode == Mode::List {
1276 return Some(SortConfig {
1277 field: vec!["_seq".to_string()],
1278 order: SortOrder::Desc,
1279 });
1280 }
1281
1282 None
1283}
1284
1285fn send_subscribed_frame(
1286 client_id: Uuid,
1287 view_id: &str,
1288 view_spec: &ViewSpec,
1289 client_manager: &ClientManager,
1290 usage_emitter: &Option<Arc<dyn WebSocketUsageEmitter>>,
1291) -> Result<()> {
1292 let sort_config = extract_sort_config(view_spec);
1293 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
1294
1295 let json_payload = serde_json::to_vec(&subscribed_frame)?;
1296 let payload_bytes = json_payload.len() as u64;
1297 let payload = Arc::new(Bytes::from(json_payload));
1298 client_manager
1299 .send_to_client(client_id, payload)
1300 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))?;
1301
1302 let auth_context = client_manager.get_auth_context(client_id);
1303 let (metering_key, subject, _, deployment_id) = usage_identity(auth_context.as_ref());
1304 emit_usage_event(
1305 usage_emitter,
1306 WebSocketUsageEvent::UpdateSent {
1307 client_id: client_id.to_string(),
1308 deployment_id,
1309 metering_key,
1310 subject,
1311 view_id: view_id.to_string(),
1312 messages: 1,
1313 bytes: payload_bytes,
1314 },
1315 );
1316
1317 Ok(())
1318}
1319
1320fn enforce_snapshot_limit(ctx: &SubscriptionContext<'_>, rows: usize) -> Result<()> {
1321 let requested_rows = u32::try_from(rows).unwrap_or(u32::MAX);
1322 ctx.client_manager
1323 .check_snapshot_allowed(ctx.client_id, requested_rows)
1324 .map_err(|deny| anyhow::anyhow!(deny.reason))
1325}
1326
1327#[cfg(feature = "otel")]
1328async fn attach_client_to_bus(
1329 ctx: &SubscriptionContext<'_>,
1330 subscription: Subscription,
1331 cancel_token: CancellationToken,
1332) -> Result<()> {
1333 let view_id = &subscription.view;
1334
1335 let view_spec = match ctx.view_index.get_view(view_id) {
1336 Some(spec) => spec.clone(),
1337 None => {
1338 return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1339 }
1340 };
1341
1342 send_subscribed_frame(
1343 ctx.client_id,
1344 view_id,
1345 &view_spec,
1346 ctx.client_manager,
1347 ctx.usage_emitter,
1348 )?;
1349
1350 let is_derived_with_sort = view_spec.is_derived()
1351 && view_spec
1352 .pipeline
1353 .as_ref()
1354 .map(|p| p.sort.is_some())
1355 .unwrap_or(false);
1356
1357 if is_derived_with_sort {
1358 return attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token)
1359 .await;
1360 }
1361
1362 match view_spec.mode {
1363 Mode::State => {
1364 let key = subscription.key.as_deref().unwrap_or("");
1365
1366 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1367
1368 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1370
1371 if should_send_snapshot {
1372 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1373 transform_large_u64_to_strings(&mut cached_entity);
1374 let snapshot_entities = vec![SnapshotEntity {
1375 key: key.to_string(),
1376 data: cached_entity,
1377 }];
1378 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1379 let batch_config = ctx.entity_cache.snapshot_config();
1380 send_snapshot_batches(
1381 ctx.client_id,
1382 &snapshot_entities,
1383 view_spec.mode,
1384 view_id,
1385 ctx.client_manager,
1386 ctx.usage_emitter,
1387 &batch_config,
1388 #[cfg(feature = "otel")]
1389 ctx.metrics.as_ref(),
1390 )
1391 .await?;
1392 rx.borrow_and_update();
1393 } else if !rx.borrow().is_empty() {
1394 let data = rx.borrow_and_update().clone();
1395 let data_len = data.len();
1396 if ctx
1397 .client_manager
1398 .send_to_client(ctx.client_id, data)
1399 .is_ok()
1400 {
1401 emit_update_sent_for_client(
1402 ctx.usage_emitter,
1403 ctx.client_manager,
1404 ctx.client_id,
1405 view_id,
1406 data_len,
1407 );
1408 }
1409 }
1410 } else {
1411 info!(
1412 "Client {} subscribed to {} without snapshot",
1413 ctx.client_id, view_id
1414 );
1415 rx.borrow_and_update();
1416 }
1417
1418 let client_id = ctx.client_id;
1419 let client_mgr = ctx.client_manager.clone();
1420 let usage_emitter = ctx.usage_emitter.clone();
1421 let metrics_clone = ctx.metrics.clone();
1422 let view_id_clone = view_id.clone();
1423 let view_id_span = view_id.clone();
1424 let key_clone = key.to_string();
1425 tokio::spawn(
1426 async move {
1427 loop {
1428 tokio::select! {
1429 _ = cancel_token.cancelled() => {
1430 debug!("State subscription cancelled for client {}", client_id);
1431 break;
1432 }
1433 result = rx.changed() => {
1434 if result.is_err() {
1435 break;
1436 }
1437 let data = rx.borrow().clone();
1438 let data_len = data.len();
1439 if client_mgr.send_to_client(client_id, data).is_err() {
1440 break;
1441 }
1442 if let Some(ref m) = metrics_clone {
1443 m.record_ws_message_sent();
1444 }
1445 emit_update_sent_for_client(
1446 &usage_emitter,
1447 &client_mgr,
1448 client_id,
1449 &view_id_clone,
1450 data_len,
1451 );
1452 }
1453 }
1454 }
1455 }
1456 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1457 );
1458 }
1459 Mode::List | Mode::Append => {
1460 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1461
1462 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1464
1465 if should_send_snapshot {
1466 let mut snapshots = if let Some(ref cursor) = subscription.after {
1468 ctx.entity_cache
1469 .get_after(view_id, cursor, subscription.snapshot_limit)
1470 .await
1471 } else {
1472 ctx.entity_cache.get_all(view_id).await
1473 };
1474
1475 if let Some(limit) = subscription.snapshot_limit {
1477 if subscription.after.is_none() {
1478 snapshots.sort_by(|a, b| {
1479 let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1480 let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1481 cmp_seq(sb, sa) });
1483 snapshots.truncate(limit);
1484 }
1485 }
1486
1487 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1488 .into_iter()
1489 .filter(|(key, _)| subscription.matches_key(key))
1490 .map(|(key, mut data)| {
1491 transform_large_u64_to_strings(&mut data);
1492 SnapshotEntity { key, data }
1493 })
1494 .collect();
1495
1496 if !snapshot_entities.is_empty() {
1497 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1498 let batch_config = ctx.entity_cache.snapshot_config();
1499 send_snapshot_batches(
1500 ctx.client_id,
1501 &snapshot_entities,
1502 view_spec.mode,
1503 view_id,
1504 ctx.client_manager,
1505 ctx.usage_emitter,
1506 &batch_config,
1507 #[cfg(feature = "otel")]
1508 ctx.metrics.as_ref(),
1509 )
1510 .await?;
1511 }
1512 } else {
1513 info!(
1514 "Client {} subscribed to {} without snapshot",
1515 ctx.client_id, view_id
1516 );
1517 }
1518
1519 let client_id = ctx.client_id;
1520 let client_mgr = ctx.client_manager.clone();
1521 let usage_emitter = ctx.usage_emitter.clone();
1522 let sub = subscription.clone();
1523 let metrics_clone = ctx.metrics.clone();
1524 let view_id_clone = view_id.clone();
1525 let view_id_span = view_id.clone();
1526 let mode = view_spec.mode;
1527 tokio::spawn(
1528 async move {
1529 loop {
1530 tokio::select! {
1531 _ = cancel_token.cancelled() => {
1532 debug!("List subscription cancelled for client {}", client_id);
1533 break;
1534 }
1535 result = rx.recv() => {
1536 match result {
1537 Ok(envelope) => {
1538 if sub.matches(&envelope.entity, &envelope.key) {
1539 if client_mgr
1540 .send_to_client(client_id, envelope.payload.clone())
1541 .is_err()
1542 {
1543 break;
1544 }
1545 if let Some(ref m) = metrics_clone {
1546 m.record_ws_message_sent();
1547 }
1548 emit_update_sent_for_client(
1549 &usage_emitter,
1550 &client_mgr,
1551 client_id,
1552 &view_id_clone,
1553 envelope.payload.len(),
1554 );
1555 }
1556 }
1557 Err(_) => break,
1558 }
1559 }
1560 }
1561 }
1562 }
1563 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode)),
1564 );
1565 }
1566 }
1567
1568 info!(
1569 "Client {} subscribed to {} (mode: {:?})",
1570 ctx.client_id, view_id, view_spec.mode
1571 );
1572
1573 Ok(())
1574}
1575
1576#[cfg(feature = "otel")]
1577async fn attach_derived_view_subscription_otel(
1578 ctx: &SubscriptionContext<'_>,
1579 subscription: Subscription,
1580 view_spec: ViewSpec,
1581 cancel_token: CancellationToken,
1582) -> Result<()> {
1583 let view_id = &subscription.view;
1584 let pipeline_limit = view_spec
1585 .pipeline
1586 .as_ref()
1587 .and_then(|p| p.limit)
1588 .unwrap_or(100);
1589 let take = subscription.take.unwrap_or(pipeline_limit);
1590 let skip = subscription.skip.unwrap_or(0);
1591 let is_single = take == 1;
1592
1593 let source_view_id = match &view_spec.source_view {
1594 Some(s) => s.clone(),
1595 None => {
1596 return Err(anyhow::anyhow!(
1597 "Derived view {} has no source_view",
1598 view_id
1599 ));
1600 }
1601 };
1602
1603 let sorted_caches = ctx.view_index.sorted_caches();
1604 let initial_window: Vec<(String, serde_json::Value)> = {
1605 let mut caches = sorted_caches.write().await;
1606 if let Some(cache) = caches.get_mut(view_id) {
1607 cache.get_window(skip, take)
1608 } else {
1609 warn!("No sorted cache for derived view {}", view_id);
1610 vec![]
1611 }
1612 };
1613
1614 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1615
1616 if !initial_window.is_empty() {
1617 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1618 .into_iter()
1619 .map(|(key, mut data)| {
1620 transform_large_u64_to_strings(&mut data);
1621 SnapshotEntity { key, data }
1622 })
1623 .collect();
1624
1625 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1626 let batch_config = ctx.entity_cache.snapshot_config();
1627 send_snapshot_batches(
1628 ctx.client_id,
1629 &snapshot_entities,
1630 view_spec.mode,
1631 view_id,
1632 ctx.client_manager,
1633 ctx.usage_emitter,
1634 &batch_config,
1635 ctx.metrics.as_ref(),
1636 )
1637 .await?;
1638 }
1639
1640 let mut rx = ctx
1641 .bus_manager
1642 .get_or_create_list_bus(&source_view_id)
1643 .await;
1644
1645 let client_id = ctx.client_id;
1646 let client_mgr = ctx.client_manager.clone();
1647 let usage_emitter = ctx.usage_emitter.clone();
1648 let view_id_clone = view_id.clone();
1649 let view_id_span = view_id.clone();
1650 let sorted_caches_clone = sorted_caches;
1651 let metrics_clone = ctx.metrics.clone();
1652 let frame_mode = view_spec.mode;
1653
1654 tokio::spawn(
1655 async move {
1656 let mut current_window_keys = initial_keys;
1657
1658 loop {
1659 tokio::select! {
1660 _ = cancel_token.cancelled() => {
1661 debug!("Derived view subscription cancelled for client {}", client_id);
1662 break;
1663 }
1664 result = rx.recv() => {
1665 match result {
1666 Ok(_envelope) => {
1667 let new_window: Vec<(String, serde_json::Value)> = {
1668 let mut caches = sorted_caches_clone.write().await;
1669 if let Some(cache) = caches.get_mut(&view_id_clone) {
1670 cache.get_window(skip, take)
1671 } else {
1672 continue;
1673 }
1674 };
1675
1676 let new_keys: HashSet<String> =
1677 new_window.iter().map(|(k, _)| k.clone()).collect();
1678
1679 if is_single {
1680 if let Some((new_key, data)) = new_window.first() {
1681 for old_key in current_window_keys.difference(&new_keys) {
1682 let delete_frame = Frame {
1683 seq: None,
1684 mode: frame_mode,
1685 export: view_id_clone.clone(),
1686 op: "delete",
1687 key: old_key.clone(),
1688 data: serde_json::Value::Null,
1689 append: vec![],
1690 };
1691 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1692 let payload = Arc::new(Bytes::from(json));
1693 let payload_len = payload.len();
1694 if client_mgr.send_to_client(client_id, payload).is_err() {
1695 return;
1696 }
1697 if let Some(ref m) = metrics_clone {
1698 m.record_ws_message_sent();
1699 }
1700 emit_update_sent_for_client(
1701 &usage_emitter,
1702 &client_mgr,
1703 client_id,
1704 &view_id_clone,
1705 payload_len,
1706 );
1707 }
1708 }
1709
1710 let mut transformed_data = data.clone();
1711 transform_large_u64_to_strings(&mut transformed_data);
1712 let frame = Frame {
1713 seq: None,
1714 mode: frame_mode,
1715 export: view_id_clone.clone(),
1716 op: "upsert",
1717 key: new_key.clone(),
1718 data: transformed_data,
1719 append: vec![],
1720 };
1721
1722 if let Ok(json) = serde_json::to_vec(&frame) {
1723 let payload = Arc::new(Bytes::from(json));
1724 let payload_len = payload.len();
1725 if client_mgr.send_to_client(client_id, payload).is_err() {
1726 return;
1727 }
1728 if let Some(ref m) = metrics_clone {
1729 m.record_ws_message_sent();
1730 }
1731 emit_update_sent_for_client(
1732 &usage_emitter,
1733 &client_mgr,
1734 client_id,
1735 &view_id_clone,
1736 payload_len,
1737 );
1738 }
1739 }
1740 } else {
1741 for key in current_window_keys.difference(&new_keys) {
1742 let delete_frame = Frame {
1743 seq: None,
1744 mode: frame_mode,
1745 export: view_id_clone.clone(),
1746 op: "delete",
1747 key: key.clone(),
1748 data: serde_json::Value::Null,
1749 append: vec![],
1750 };
1751 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1752 let payload = Arc::new(Bytes::from(json));
1753 let payload_len = payload.len();
1754 if client_mgr.send_to_client(client_id, payload).is_err() {
1755 return;
1756 }
1757 if let Some(ref m) = metrics_clone {
1758 m.record_ws_message_sent();
1759 }
1760 emit_update_sent_for_client(
1761 &usage_emitter,
1762 &client_mgr,
1763 client_id,
1764 &view_id_clone,
1765 payload_len,
1766 );
1767 }
1768 }
1769
1770 for (key, data) in &new_window {
1771 let mut transformed_data = data.clone();
1772 transform_large_u64_to_strings(&mut transformed_data);
1773 let frame = Frame {
1774 seq: None,
1775 mode: frame_mode,
1776 export: view_id_clone.clone(),
1777 op: "upsert",
1778 key: key.clone(),
1779 data: transformed_data,
1780 append: vec![],
1781 };
1782 if let Ok(json) = serde_json::to_vec(&frame) {
1783 let payload = Arc::new(Bytes::from(json));
1784 let payload_len = payload.len();
1785 if client_mgr.send_to_client(client_id, payload).is_err() {
1786 return;
1787 }
1788 if let Some(ref m) = metrics_clone {
1789 m.record_ws_message_sent();
1790 }
1791 emit_update_sent_for_client(
1792 &usage_emitter,
1793 &client_mgr,
1794 client_id,
1795 &view_id_clone,
1796 payload_len,
1797 );
1798 }
1799 }
1800 }
1801
1802 current_window_keys = new_keys;
1803 }
1804 Err(_) => break,
1805 }
1806 }
1807 }
1808 }
1809 }
1810 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1811 );
1812
1813 info!(
1814 "Client {} subscribed to derived view {} (take={}, skip={})",
1815 ctx.client_id, view_id, take, skip
1816 );
1817
1818 Ok(())
1819}
1820
1821#[cfg(not(feature = "otel"))]
1822async fn attach_client_to_bus(
1823 ctx: &SubscriptionContext<'_>,
1824 subscription: Subscription,
1825 cancel_token: CancellationToken,
1826) -> Result<()> {
1827 let view_id = &subscription.view;
1828
1829 let view_spec = match ctx.view_index.get_view(view_id) {
1830 Some(spec) => spec.clone(),
1831 None => {
1832 return Err(anyhow::anyhow!("Unknown view ID: {}", view_id));
1833 }
1834 };
1835
1836 send_subscribed_frame(
1837 ctx.client_id,
1838 view_id,
1839 &view_spec,
1840 ctx.client_manager,
1841 ctx.usage_emitter,
1842 )?;
1843
1844 let is_derived_with_sort = view_spec.is_derived()
1845 && view_spec
1846 .pipeline
1847 .as_ref()
1848 .map(|p| p.sort.is_some())
1849 .unwrap_or(false);
1850
1851 if is_derived_with_sort {
1852 return attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
1853 }
1854
1855 match view_spec.mode {
1856 Mode::State => {
1857 let key = subscription.key.as_deref().unwrap_or("");
1858
1859 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
1860
1861 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1863
1864 if should_send_snapshot {
1865 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
1866 transform_large_u64_to_strings(&mut cached_entity);
1867 let snapshot_entities = vec![SnapshotEntity {
1868 key: key.to_string(),
1869 data: cached_entity,
1870 }];
1871 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1872 let batch_config = ctx.entity_cache.snapshot_config();
1873 send_snapshot_batches(
1874 ctx.client_id,
1875 &snapshot_entities,
1876 view_spec.mode,
1877 view_id,
1878 ctx.client_manager,
1879 ctx.usage_emitter,
1880 &batch_config,
1881 )
1882 .await?;
1883 rx.borrow_and_update();
1884 } else if !rx.borrow().is_empty() {
1885 let data = rx.borrow_and_update().clone();
1886 let data_len = data.len();
1887 if ctx
1888 .client_manager
1889 .send_to_client(ctx.client_id, data)
1890 .is_ok()
1891 {
1892 emit_update_sent_for_client(
1893 ctx.usage_emitter,
1894 ctx.client_manager,
1895 ctx.client_id,
1896 view_id,
1897 data_len,
1898 );
1899 }
1900 }
1901 } else {
1902 info!(
1903 "Client {} subscribed to {} without snapshot",
1904 ctx.client_id, view_id
1905 );
1906 rx.borrow_and_update();
1907 }
1908
1909 let client_id = ctx.client_id;
1910 let client_mgr = ctx.client_manager.clone();
1911 let usage_emitter = ctx.usage_emitter.clone();
1912 let view_id_clone = view_id.clone();
1913 let view_id_span = view_id.clone();
1914 let key_clone = key.to_string();
1915 tokio::spawn(
1916 async move {
1917 loop {
1918 tokio::select! {
1919 _ = cancel_token.cancelled() => {
1920 debug!("State subscription cancelled for client {}", client_id);
1921 break;
1922 }
1923 result = rx.changed() => {
1924 if result.is_err() {
1925 break;
1926 }
1927 let data = rx.borrow().clone();
1928 let data_len = data.len();
1929 if client_mgr.send_to_client(client_id, data).is_err() {
1930 break;
1931 }
1932 emit_update_sent_for_client(
1933 &usage_emitter,
1934 &client_mgr,
1935 client_id,
1936 &view_id_clone,
1937 data_len,
1938 );
1939 }
1940 }
1941 }
1942 }
1943 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_span, key = %key_clone)),
1944 );
1945 }
1946 Mode::List | Mode::Append => {
1947 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1948
1949 let should_send_snapshot = subscription.with_snapshot.unwrap_or(true);
1951
1952 if should_send_snapshot {
1953 let mut snapshots = if let Some(ref cursor) = subscription.after {
1955 ctx.entity_cache
1956 .get_after(view_id, cursor, subscription.snapshot_limit)
1957 .await
1958 } else {
1959 ctx.entity_cache.get_all(view_id).await
1960 };
1961
1962 if let Some(limit) = subscription.snapshot_limit {
1964 if subscription.after.is_none() {
1965 snapshots.sort_by(|a, b| {
1966 let sa = a.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1967 let sb = b.1.get("_seq").and_then(|s| s.as_str()).unwrap_or("");
1968 cmp_seq(sb, sa) });
1970 snapshots.truncate(limit);
1971 }
1972 }
1973
1974 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1975 .into_iter()
1976 .filter(|(key, _)| subscription.matches_key(key))
1977 .map(|(key, mut data)| {
1978 transform_large_u64_to_strings(&mut data);
1979 SnapshotEntity { key, data }
1980 })
1981 .collect();
1982
1983 if !snapshot_entities.is_empty() {
1984 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
1985 let batch_config = ctx.entity_cache.snapshot_config();
1986 send_snapshot_batches(
1987 ctx.client_id,
1988 &snapshot_entities,
1989 view_spec.mode,
1990 view_id,
1991 ctx.client_manager,
1992 ctx.usage_emitter,
1993 &batch_config,
1994 )
1995 .await?;
1996 }
1997 } else {
1998 info!(
1999 "Client {} subscribed to {} without snapshot",
2000 ctx.client_id, view_id
2001 );
2002 }
2003
2004 let client_id = ctx.client_id;
2005 let client_mgr = ctx.client_manager.clone();
2006 let usage_emitter = ctx.usage_emitter.clone();
2007 let sub = subscription.clone();
2008 let view_id_clone = view_id.clone();
2009 let view_id_span = view_id.clone();
2010 let mode = view_spec.mode;
2011 tokio::spawn(
2012 async move {
2013 loop {
2014 tokio::select! {
2015 _ = cancel_token.cancelled() => {
2016 debug!("List subscription cancelled for client {}", client_id);
2017 break;
2018 }
2019 result = rx.recv() => {
2020 match result {
2021 Ok(envelope) => {
2022 if sub.matches(&envelope.entity, &envelope.key)
2023 && client_mgr
2024 .send_to_client(client_id, envelope.payload.clone())
2025 .is_err()
2026 {
2027 break;
2028 } else if sub.matches(&envelope.entity, &envelope.key) {
2029 emit_update_sent_for_client(
2030 &usage_emitter,
2031 &client_mgr,
2032 client_id,
2033 &view_id_clone,
2034 envelope.payload.len(),
2035 );
2036 }
2037 }
2038 Err(_) => break,
2039 }
2040 }
2041 }
2042 }
2043 }
2044 .instrument(
2045 info_span!("ws.subscribe.list", %client_id, view = %view_id_span, mode = ?mode),
2046 ),
2047 );
2048 }
2049 }
2050
2051 info!(
2052 "Client {} subscribed to {} (mode: {:?})",
2053 ctx.client_id, view_id, view_spec.mode
2054 );
2055
2056 Ok(())
2057}
2058
2059#[cfg(not(feature = "otel"))]
2060async fn attach_derived_view_subscription(
2061 ctx: &SubscriptionContext<'_>,
2062 subscription: Subscription,
2063 view_spec: ViewSpec,
2064 cancel_token: CancellationToken,
2065) -> Result<()> {
2066 let view_id = &subscription.view;
2067 let pipeline_limit = view_spec
2068 .pipeline
2069 .as_ref()
2070 .and_then(|p| p.limit)
2071 .unwrap_or(100);
2072 let take = subscription.take.unwrap_or(pipeline_limit);
2073 let skip = subscription.skip.unwrap_or(0);
2074 let is_single = take == 1;
2075
2076 let source_view_id = match &view_spec.source_view {
2077 Some(s) => s.clone(),
2078 None => {
2079 return Err(anyhow::anyhow!(
2080 "Derived view {} has no source_view",
2081 view_id
2082 ));
2083 }
2084 };
2085
2086 let sorted_caches = ctx.view_index.sorted_caches();
2087 let initial_window: Vec<(String, serde_json::Value)> = {
2088 let mut caches = sorted_caches.write().await;
2089 if let Some(cache) = caches.get_mut(view_id) {
2090 cache.get_window(skip, take)
2091 } else {
2092 warn!("No sorted cache for derived view {}", view_id);
2093 vec![]
2094 }
2095 };
2096
2097 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
2098
2099 if !initial_window.is_empty() {
2100 let snapshot_entities: Vec<SnapshotEntity> = initial_window
2101 .into_iter()
2102 .map(|(key, mut data)| {
2103 transform_large_u64_to_strings(&mut data);
2104 SnapshotEntity { key, data }
2105 })
2106 .collect();
2107
2108 enforce_snapshot_limit(ctx, snapshot_entities.len())?;
2109 let batch_config = ctx.entity_cache.snapshot_config();
2110 send_snapshot_batches(
2111 ctx.client_id,
2112 &snapshot_entities,
2113 view_spec.mode,
2114 view_id,
2115 ctx.client_manager,
2116 ctx.usage_emitter,
2117 &batch_config,
2118 )
2119 .await?;
2120 }
2121
2122 let mut rx = ctx
2123 .bus_manager
2124 .get_or_create_list_bus(&source_view_id)
2125 .await;
2126
2127 let client_id = ctx.client_id;
2128 let client_mgr = ctx.client_manager.clone();
2129 let usage_emitter = ctx.usage_emitter.clone();
2130 let view_id_clone = view_id.clone();
2131 let view_id_span = view_id.clone();
2132 let sorted_caches_clone = sorted_caches;
2133 let frame_mode = view_spec.mode;
2134
2135 tokio::spawn(
2136 async move {
2137 let mut current_window_keys = initial_keys;
2138
2139 loop {
2140 tokio::select! {
2141 _ = cancel_token.cancelled() => {
2142 debug!("Derived view subscription cancelled for client {}", client_id);
2143 break;
2144 }
2145 result = rx.recv() => {
2146 match result {
2147 Ok(_envelope) => {
2148 let new_window: Vec<(String, serde_json::Value)> = {
2149 let mut caches = sorted_caches_clone.write().await;
2150 if let Some(cache) = caches.get_mut(&view_id_clone) {
2151 cache.get_window(skip, take)
2152 } else {
2153 continue;
2154 }
2155 };
2156
2157 let new_keys: HashSet<String> =
2158 new_window.iter().map(|(k, _)| k.clone()).collect();
2159
2160 if is_single {
2161 if let Some((new_key, data)) = new_window.first() {
2162 for old_key in current_window_keys.difference(&new_keys) {
2163 let delete_frame = Frame {
2164 seq: None,
2165 mode: frame_mode,
2166 export: view_id_clone.clone(),
2167 op: "delete",
2168 key: old_key.clone(),
2169 data: serde_json::Value::Null,
2170 append: vec![],
2171 };
2172 if let Ok(json) = serde_json::to_vec(&delete_frame) {
2173 let payload = Arc::new(Bytes::from(json));
2174 let payload_len = payload.len();
2175 if client_mgr.send_to_client(client_id, payload).is_err() {
2176 return;
2177 }
2178 emit_update_sent_for_client(
2179 &usage_emitter,
2180 &client_mgr,
2181 client_id,
2182 &view_id_clone,
2183 payload_len,
2184 );
2185 }
2186 }
2187
2188 let mut transformed_data = data.clone();
2189 transform_large_u64_to_strings(&mut transformed_data);
2190 let frame = Frame {
2191 seq: None,
2192 mode: frame_mode,
2193 export: view_id_clone.clone(),
2194 op: "upsert",
2195 key: new_key.clone(),
2196 data: transformed_data,
2197 append: vec![],
2198 };
2199 if let Ok(json) = serde_json::to_vec(&frame) {
2200 let payload = Arc::new(Bytes::from(json));
2201 let payload_len = payload.len();
2202 if client_mgr.send_to_client(client_id, payload).is_err() {
2203 return;
2204 }
2205 emit_update_sent_for_client(
2206 &usage_emitter,
2207 &client_mgr,
2208 client_id,
2209 &view_id_clone,
2210 payload_len,
2211 );
2212 }
2213 }
2214 } else {
2215 for key in current_window_keys.difference(&new_keys) {
2216 let delete_frame = Frame {
2217 seq: None,
2218 mode: frame_mode,
2219 export: view_id_clone.clone(),
2220 op: "delete",
2221 key: key.clone(),
2222 data: serde_json::Value::Null,
2223 append: vec![],
2224 };
2225 if let Ok(json) = serde_json::to_vec(&delete_frame) {
2226 let payload = Arc::new(Bytes::from(json));
2227 let payload_len = payload.len();
2228 if client_mgr.send_to_client(client_id, payload).is_err() {
2229 return;
2230 }
2231 emit_update_sent_for_client(
2232 &usage_emitter,
2233 &client_mgr,
2234 client_id,
2235 &view_id_clone,
2236 payload_len,
2237 );
2238 }
2239 }
2240
2241 for (key, data) in &new_window {
2242 let mut transformed_data = data.clone();
2243 transform_large_u64_to_strings(&mut transformed_data);
2244 let frame = Frame {
2245 seq: None,
2246 mode: frame_mode,
2247 export: view_id_clone.clone(),
2248 op: "upsert",
2249 key: key.clone(),
2250 data: transformed_data,
2251 append: vec![],
2252 };
2253 if let Ok(json) = serde_json::to_vec(&frame) {
2254 let payload = Arc::new(Bytes::from(json));
2255 let payload_len = payload.len();
2256 if client_mgr.send_to_client(client_id, payload).is_err() {
2257 return;
2258 }
2259 emit_update_sent_for_client(
2260 &usage_emitter,
2261 &client_mgr,
2262 client_id,
2263 &view_id_clone,
2264 payload_len,
2265 );
2266 }
2267 }
2268 }
2269
2270 current_window_keys = new_keys;
2271 }
2272 Err(_) => break,
2273 }
2274 }
2275 }
2276 }
2277 }
2278 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
2279 );
2280
2281 info!(
2282 "Client {} subscribed to derived view {} (take={}, skip={})",
2283 ctx.client_id, view_id, take, skip
2284 );
2285
2286 Ok(())
2287}