1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7 transform_large_u64_to_strings, Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig,
8 SortOrder, SubscribedFrame,
9};
10use crate::websocket::subscription::{ClientMessage, Subscription};
11use anyhow::Result;
12use bytes::Bytes;
13use futures_util::StreamExt;
14use std::collections::HashSet;
15use std::net::SocketAddr;
16use std::sync::Arc;
17#[cfg(feature = "otel")]
18use std::time::Instant;
19
20use tokio::net::{TcpListener, TcpStream};
21use tokio_tungstenite::accept_async;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, error, info, info_span, warn, Instrument};
24use uuid::Uuid;
25
26#[cfg(feature = "otel")]
27use crate::metrics::Metrics;
28
29struct SubscriptionContext<'a> {
30 client_id: Uuid,
31 client_manager: &'a ClientManager,
32 bus_manager: &'a BusManager,
33 entity_cache: &'a EntityCache,
34 view_index: &'a ViewIndex,
35 #[cfg(feature = "otel")]
36 metrics: Option<Arc<Metrics>>,
37}
38
39pub struct WebSocketServer {
40 bind_addr: SocketAddr,
41 client_manager: ClientManager,
42 bus_manager: BusManager,
43 entity_cache: EntityCache,
44 view_index: Arc<ViewIndex>,
45 max_clients: usize,
46 #[cfg(feature = "otel")]
47 metrics: Option<Arc<Metrics>>,
48}
49
50impl WebSocketServer {
51 #[cfg(feature = "otel")]
52 pub fn new(
53 bind_addr: SocketAddr,
54 bus_manager: BusManager,
55 entity_cache: EntityCache,
56 view_index: Arc<ViewIndex>,
57 metrics: Option<Arc<Metrics>>,
58 ) -> Self {
59 Self {
60 bind_addr,
61 client_manager: ClientManager::new(),
62 bus_manager,
63 entity_cache,
64 view_index,
65 max_clients: 10000,
66 metrics,
67 }
68 }
69
70 #[cfg(not(feature = "otel"))]
71 pub fn new(
72 bind_addr: SocketAddr,
73 bus_manager: BusManager,
74 entity_cache: EntityCache,
75 view_index: Arc<ViewIndex>,
76 ) -> Self {
77 Self {
78 bind_addr,
79 client_manager: ClientManager::new(),
80 bus_manager,
81 entity_cache,
82 view_index,
83 max_clients: 10000,
84 }
85 }
86
87 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
88 self.max_clients = max_clients;
89 self
90 }
91
92 pub async fn start(self) -> Result<()> {
93 info!(
94 "Starting WebSocket server on {} (max_clients: {})",
95 self.bind_addr, self.max_clients
96 );
97
98 let listener = TcpListener::bind(&self.bind_addr).await?;
99 info!("WebSocket server listening on {}", self.bind_addr);
100
101 self.client_manager.start_cleanup_task();
102
103 loop {
104 match listener.accept().await {
105 Ok((stream, addr)) => {
106 let client_count = self.client_manager.client_count();
107 if client_count >= self.max_clients {
108 warn!(
109 "Rejecting connection from {} - max clients ({}) reached",
110 addr, self.max_clients
111 );
112 drop(stream);
113 continue;
114 }
115
116 #[cfg(feature = "otel")]
117 if let Some(ref metrics) = self.metrics {
118 metrics.record_ws_connection();
119 }
120
121 info!(
122 "New WebSocket connection from {} ({}/{} clients)",
123 addr,
124 client_count + 1,
125 self.max_clients
126 );
127 let client_manager = self.client_manager.clone();
128 let bus_manager = self.bus_manager.clone();
129 let entity_cache = self.entity_cache.clone();
130 let view_index = self.view_index.clone();
131 #[cfg(feature = "otel")]
132 let metrics = self.metrics.clone();
133
134 tokio::spawn(
135 async move {
136 #[cfg(feature = "otel")]
137 let result = handle_connection(
138 stream,
139 client_manager,
140 bus_manager,
141 entity_cache,
142 view_index,
143 metrics,
144 )
145 .await;
146 #[cfg(not(feature = "otel"))]
147 let result = handle_connection(
148 stream,
149 client_manager,
150 bus_manager,
151 entity_cache,
152 view_index,
153 )
154 .await;
155
156 if let Err(e) = result {
157 error!("WebSocket connection error: {}", e);
158 }
159 }
160 .instrument(info_span!("ws.connection", %addr)),
161 );
162 }
163 Err(e) => {
164 error!("Failed to accept connection: {}", e);
165 }
166 }
167 }
168 }
169}
170
171#[cfg(feature = "otel")]
172async fn handle_connection(
173 stream: TcpStream,
174 client_manager: ClientManager,
175 bus_manager: BusManager,
176 entity_cache: EntityCache,
177 view_index: Arc<ViewIndex>,
178 metrics: Option<Arc<Metrics>>,
179) -> Result<()> {
180 let ws_stream = accept_async(stream).await?;
181 let client_id = Uuid::new_v4();
182 let connection_start = Instant::now();
183
184 info!("WebSocket connection established for client {}", client_id);
185
186 let (ws_sender, mut ws_receiver) = ws_stream.split();
187
188 client_manager.add_client(client_id, ws_sender);
189
190 let ctx = SubscriptionContext {
191 client_id,
192 client_manager: &client_manager,
193 bus_manager: &bus_manager,
194 entity_cache: &entity_cache,
195 view_index: &view_index,
196 metrics: metrics.clone(),
197 };
198
199 let mut active_subscriptions: Vec<String> = Vec::new();
200
201 loop {
202 tokio::select! {
203 ws_msg = ws_receiver.next() => {
204 match ws_msg {
205 Some(Ok(msg)) => {
206 if msg.is_close() {
207 info!("Client {} requested close", client_id);
208 break;
209 }
210
211 client_manager.update_client_last_seen(client_id);
212
213 if msg.is_text() {
214 if let Some(ref m) = metrics {
215 m.record_ws_message_received();
216 }
217
218 if let Ok(text) = msg.to_text() {
219 debug!("Received text message from client {}: {}", client_id, text);
220
221 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
222 match client_msg {
223 ClientMessage::Subscribe(subscription) => {
224 let view_id = subscription.view.clone();
225 let sub_key = subscription.sub_key();
226 client_manager.update_subscription(client_id, subscription.clone());
227
228 let cancel_token = CancellationToken::new();
229 let is_new = client_manager.add_client_subscription(
230 client_id,
231 sub_key.clone(),
232 cancel_token.clone(),
233 ).await;
234
235 if !is_new {
236 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
237 continue;
238 }
239
240 if let Some(ref m) = metrics {
241 m.record_subscription_created(&view_id);
242 }
243 active_subscriptions.push(view_id);
244
245 attach_client_to_bus(&ctx, subscription, cancel_token).await;
246 }
247 ClientMessage::Unsubscribe(unsub) => {
248 let sub_key = unsub.sub_key();
249 let removed = client_manager
250 .remove_client_subscription(client_id, &sub_key)
251 .await;
252
253 if removed {
254 info!("Client {} unsubscribed from {}", client_id, sub_key);
255 if let Some(ref m) = metrics {
256 m.record_subscription_removed(&unsub.view);
257 }
258 }
259 }
260 ClientMessage::Ping => {
261 debug!("Received ping from client {}", client_id);
262 }
263 }
264 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
265 let view_id = subscription.view.clone();
266 let sub_key = subscription.sub_key();
267 client_manager.update_subscription(client_id, subscription.clone());
268
269 let cancel_token = CancellationToken::new();
270 let is_new = client_manager.add_client_subscription(
271 client_id,
272 sub_key.clone(),
273 cancel_token.clone(),
274 ).await;
275
276 if !is_new {
277 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
278 continue;
279 }
280
281 if let Some(ref m) = metrics {
282 m.record_subscription_created(&view_id);
283 }
284 active_subscriptions.push(view_id);
285
286 attach_client_to_bus(&ctx, subscription, cancel_token).await;
287 } else {
288 debug!("Received non-subscription message from client {}: {}", client_id, text);
289 }
290 }
291 }
292 }
293 Some(Err(e)) => {
294 warn!("WebSocket error for client {}: {}", client_id, e);
295 break;
296 }
297 None => {
298 debug!("WebSocket stream ended for client {}", client_id);
299 break;
300 }
301 }
302 }
303 }
304 }
305
306 client_manager
307 .cancel_all_client_subscriptions(client_id)
308 .await;
309 client_manager.remove_client(client_id);
310
311 if let Some(ref m) = metrics {
312 let duration_secs = connection_start.elapsed().as_secs_f64();
313 m.record_ws_disconnection(duration_secs);
314
315 for view_id in active_subscriptions {
316 m.record_subscription_removed(&view_id);
317 }
318 }
319
320 info!("Client {} disconnected", client_id);
321
322 Ok(())
323}
324
325#[cfg(not(feature = "otel"))]
326async fn handle_connection(
327 stream: TcpStream,
328 client_manager: ClientManager,
329 bus_manager: BusManager,
330 entity_cache: EntityCache,
331 view_index: Arc<ViewIndex>,
332) -> Result<()> {
333 let ws_stream = accept_async(stream).await?;
334 let client_id = Uuid::new_v4();
335
336 info!("WebSocket connection established for client {}", client_id);
337
338 let (ws_sender, mut ws_receiver) = ws_stream.split();
339
340 client_manager.add_client(client_id, ws_sender);
341
342 let ctx = SubscriptionContext {
343 client_id,
344 client_manager: &client_manager,
345 bus_manager: &bus_manager,
346 entity_cache: &entity_cache,
347 view_index: &view_index,
348 };
349
350 loop {
351 tokio::select! {
352 ws_msg = ws_receiver.next() => {
353 match ws_msg {
354 Some(Ok(msg)) => {
355 if msg.is_close() {
356 info!("Client {} requested close", client_id);
357 break;
358 }
359
360 client_manager.update_client_last_seen(client_id);
361
362 if msg.is_text() {
363 if let Ok(text) = msg.to_text() {
364 debug!("Received text message from client {}: {}", client_id, text);
365
366 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
367 match client_msg {
368 ClientMessage::Subscribe(subscription) => {
369 let sub_key = subscription.sub_key();
370 client_manager.update_subscription(client_id, subscription.clone());
371
372 let cancel_token = CancellationToken::new();
373 let is_new = client_manager.add_client_subscription(
374 client_id,
375 sub_key.clone(),
376 cancel_token.clone(),
377 ).await;
378
379 if !is_new {
380 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
381 continue;
382 }
383
384 attach_client_to_bus(&ctx, subscription, cancel_token).await;
385 }
386 ClientMessage::Unsubscribe(unsub) => {
387 let sub_key = unsub.sub_key();
388 let removed = client_manager
389 .remove_client_subscription(client_id, &sub_key)
390 .await;
391
392 if removed {
393 info!("Client {} unsubscribed from {}", client_id, sub_key);
394 }
395 }
396 ClientMessage::Ping => {
397 debug!("Received ping from client {}", client_id);
398 }
399 }
400 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
401 let sub_key = subscription.sub_key();
402 client_manager.update_subscription(client_id, subscription.clone());
403
404 let cancel_token = CancellationToken::new();
405 let is_new = client_manager.add_client_subscription(
406 client_id,
407 sub_key.clone(),
408 cancel_token.clone(),
409 ).await;
410
411 if !is_new {
412 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
413 continue;
414 }
415
416 attach_client_to_bus(&ctx, subscription, cancel_token).await;
417 } else {
418 debug!("Received non-subscription message from client {}: {}", client_id, text);
419 }
420 }
421 }
422 }
423 Some(Err(e)) => {
424 warn!("WebSocket error for client {}: {}", client_id, e);
425 break;
426 }
427 None => {
428 debug!("WebSocket stream ended for client {}", client_id);
429 break;
430 }
431 }
432 }
433 }
434 }
435
436 client_manager
437 .cancel_all_client_subscriptions(client_id)
438 .await;
439 client_manager.remove_client(client_id);
440 info!("Client {} disconnected", client_id);
441
442 Ok(())
443}
444
445async fn send_snapshot_batches(
446 client_id: Uuid,
447 entities: &[SnapshotEntity],
448 mode: Mode,
449 view_id: &str,
450 client_manager: &ClientManager,
451 batch_config: &SnapshotBatchConfig,
452 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
453) -> Result<()> {
454 let total = entities.len();
455 if total == 0 {
456 return Ok(());
457 }
458
459 let mut offset = 0;
460 let mut batch_num = 0;
461
462 while offset < total {
463 let batch_size = if batch_num == 0 {
464 batch_config.initial_batch_size
465 } else {
466 batch_config.subsequent_batch_size
467 };
468
469 let end = (offset + batch_size).min(total);
470 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
471 let is_complete = end >= total;
472
473 let snapshot_frame = SnapshotFrame {
474 mode,
475 export: view_id.to_string(),
476 op: "snapshot",
477 data: batch_data,
478 complete: is_complete,
479 };
480
481 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
482 let payload = maybe_compress(&json_payload);
483 if client_manager
484 .send_compressed_async(client_id, payload)
485 .await
486 .is_err()
487 {
488 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
489 }
490 #[cfg(feature = "otel")]
491 if let Some(m) = metrics {
492 m.record_ws_message_sent();
493 }
494 }
495
496 offset = end;
497 batch_num += 1;
498 }
499
500 debug!(
501 "Sent {} snapshot batches ({} entities) for {} to client {}",
502 batch_num, total, view_id, client_id
503 );
504
505 Ok(())
506}
507
508fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
509 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
510 return Some(SortConfig {
511 field: sort.field_path.clone(),
512 order: match sort.order {
513 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
514 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
515 },
516 });
517 }
518
519 if view_spec.mode == Mode::List {
520 return Some(SortConfig {
521 field: vec!["_seq".to_string()],
522 order: SortOrder::Desc,
523 });
524 }
525
526 None
527}
528
529fn send_subscribed_frame(
530 client_id: Uuid,
531 view_id: &str,
532 view_spec: &ViewSpec,
533 client_manager: &ClientManager,
534) -> Result<()> {
535 let sort_config = extract_sort_config(view_spec);
536 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
537
538 let json_payload = serde_json::to_vec(&subscribed_frame)?;
539 let payload = Arc::new(Bytes::from(json_payload));
540 client_manager
541 .send_to_client(client_id, payload)
542 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
543}
544
545#[cfg(feature = "otel")]
546async fn attach_client_to_bus(
547 ctx: &SubscriptionContext<'_>,
548 subscription: Subscription,
549 cancel_token: CancellationToken,
550) {
551 let view_id = &subscription.view;
552
553 let view_spec = match ctx.view_index.get_view(view_id) {
554 Some(spec) => spec.clone(),
555 None => {
556 warn!("Unknown view ID: {}", view_id);
557 return;
558 }
559 };
560
561 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
562 warn!("Failed to send subscribed frame: {}", e);
563 return;
564 }
565
566 let is_derived_with_sort = view_spec.is_derived()
567 && view_spec
568 .pipeline
569 .as_ref()
570 .map(|p| p.sort.is_some())
571 .unwrap_or(false);
572
573 if is_derived_with_sort {
574 attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
575 return;
576 }
577
578 match view_spec.mode {
579 Mode::State => {
580 let key = subscription.key.as_deref().unwrap_or("");
581
582 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
583
584 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
585 transform_large_u64_to_strings(&mut cached_entity);
586 let snapshot_entities = vec![SnapshotEntity {
587 key: key.to_string(),
588 data: cached_entity,
589 }];
590 let batch_config = ctx.entity_cache.snapshot_config();
591 let _ = send_snapshot_batches(
592 ctx.client_id,
593 &snapshot_entities,
594 view_spec.mode,
595 view_id,
596 ctx.client_manager,
597 &batch_config,
598 #[cfg(feature = "otel")]
599 ctx.metrics.as_ref(),
600 )
601 .await;
602 rx.borrow_and_update();
603 } else if !rx.borrow().is_empty() {
604 let data = rx.borrow_and_update().clone();
605 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
606 }
607
608 let client_id = ctx.client_id;
609 let client_mgr = ctx.client_manager.clone();
610 let metrics_clone = ctx.metrics.clone();
611 let view_id_clone = view_id.clone();
612 let key_clone = key.to_string();
613 tokio::spawn(
614 async move {
615 loop {
616 tokio::select! {
617 _ = cancel_token.cancelled() => {
618 debug!("State subscription cancelled for client {}", client_id);
619 break;
620 }
621 result = rx.changed() => {
622 if result.is_err() {
623 break;
624 }
625 let data = rx.borrow().clone();
626 if client_mgr.send_to_client(client_id, data).is_err() {
627 break;
628 }
629 if let Some(ref m) = metrics_clone {
630 m.record_ws_message_sent();
631 }
632 }
633 }
634 }
635 }
636 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
637 );
638 }
639 Mode::List | Mode::Append => {
640 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
641
642 let snapshots = ctx.entity_cache.get_all(view_id).await;
643 let snapshot_entities: Vec<SnapshotEntity> = snapshots
644 .into_iter()
645 .filter(|(key, _)| subscription.matches_key(key))
646 .map(|(key, mut data)| {
647 transform_large_u64_to_strings(&mut data);
648 SnapshotEntity { key, data }
649 })
650 .collect();
651
652 if !snapshot_entities.is_empty() {
653 let batch_config = ctx.entity_cache.snapshot_config();
654 if send_snapshot_batches(
655 ctx.client_id,
656 &snapshot_entities,
657 view_spec.mode,
658 view_id,
659 ctx.client_manager,
660 &batch_config,
661 #[cfg(feature = "otel")]
662 ctx.metrics.as_ref(),
663 )
664 .await
665 .is_err()
666 {
667 return;
668 }
669 }
670
671 let client_id = ctx.client_id;
672 let client_mgr = ctx.client_manager.clone();
673 let sub = subscription.clone();
674 let metrics_clone = ctx.metrics.clone();
675 let view_id_clone = view_id.clone();
676 let mode = view_spec.mode;
677 tokio::spawn(
678 async move {
679 loop {
680 tokio::select! {
681 _ = cancel_token.cancelled() => {
682 debug!("List subscription cancelled for client {}", client_id);
683 break;
684 }
685 result = rx.recv() => {
686 match result {
687 Ok(envelope) => {
688 if sub.matches(&envelope.entity, &envelope.key) {
689 if client_mgr
690 .send_to_client(client_id, envelope.payload.clone())
691 .is_err()
692 {
693 break;
694 }
695 if let Some(ref m) = metrics_clone {
696 m.record_ws_message_sent();
697 }
698 }
699 }
700 Err(_) => break,
701 }
702 }
703 }
704 }
705 }
706 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
707 );
708 }
709 }
710
711 info!(
712 "Client {} subscribed to {} (mode: {:?})",
713 ctx.client_id, view_id, view_spec.mode
714 );
715}
716
717#[cfg(feature = "otel")]
718async fn attach_derived_view_subscription_otel(
719 ctx: &SubscriptionContext<'_>,
720 subscription: Subscription,
721 view_spec: ViewSpec,
722 cancel_token: CancellationToken,
723) {
724 let view_id = &subscription.view;
725 let pipeline_limit = view_spec
726 .pipeline
727 .as_ref()
728 .and_then(|p| p.limit)
729 .unwrap_or(100);
730 let take = subscription.take.unwrap_or(pipeline_limit);
731 let skip = subscription.skip.unwrap_or(0);
732 let is_single = take == 1;
733
734 let source_view_id = match &view_spec.source_view {
735 Some(s) => s.clone(),
736 None => {
737 warn!("Derived view {} has no source_view", view_id);
738 return;
739 }
740 };
741
742 let sorted_caches = ctx.view_index.sorted_caches();
743 let initial_window: Vec<(String, serde_json::Value)> = {
744 let mut caches = sorted_caches.write().await;
745 if let Some(cache) = caches.get_mut(view_id) {
746 cache.get_window(skip, take)
747 } else {
748 warn!("No sorted cache for derived view {}", view_id);
749 vec![]
750 }
751 };
752
753 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
754
755 if !initial_window.is_empty() {
756 let snapshot_entities: Vec<SnapshotEntity> = initial_window
757 .into_iter()
758 .map(|(key, mut data)| {
759 transform_large_u64_to_strings(&mut data);
760 SnapshotEntity { key, data }
761 })
762 .collect();
763
764 let batch_config = ctx.entity_cache.snapshot_config();
765 if send_snapshot_batches(
766 ctx.client_id,
767 &snapshot_entities,
768 view_spec.mode,
769 view_id,
770 ctx.client_manager,
771 &batch_config,
772 ctx.metrics.as_ref(),
773 )
774 .await
775 .is_err()
776 {
777 return;
778 }
779 }
780
781 let mut rx = ctx
782 .bus_manager
783 .get_or_create_list_bus(&source_view_id)
784 .await;
785
786 let client_id = ctx.client_id;
787 let client_mgr = ctx.client_manager.clone();
788 let view_id_clone = view_id.clone();
789 let view_id_span = view_id.clone();
790 let sorted_caches_clone = sorted_caches;
791 let metrics_clone = ctx.metrics.clone();
792 let frame_mode = view_spec.mode;
793
794 tokio::spawn(
795 async move {
796 let mut current_window_keys = initial_keys;
797
798 loop {
799 tokio::select! {
800 _ = cancel_token.cancelled() => {
801 debug!("Derived view subscription cancelled for client {}", client_id);
802 break;
803 }
804 result = rx.recv() => {
805 match result {
806 Ok(_envelope) => {
807 let new_window: Vec<(String, serde_json::Value)> = {
808 let mut caches = sorted_caches_clone.write().await;
809 if let Some(cache) = caches.get_mut(&view_id_clone) {
810 cache.get_window(skip, take)
811 } else {
812 continue;
813 }
814 };
815
816 let new_keys: HashSet<String> =
817 new_window.iter().map(|(k, _)| k.clone()).collect();
818
819 if is_single {
820 if let Some((new_key, data)) = new_window.first() {
821 for old_key in current_window_keys.difference(&new_keys) {
822 let delete_frame = Frame {
823 mode: frame_mode,
824 export: view_id_clone.clone(),
825 op: "delete",
826 key: old_key.clone(),
827 data: serde_json::Value::Null,
828 append: vec![],
829 };
830 if let Ok(json) = serde_json::to_vec(&delete_frame) {
831 let payload = Arc::new(Bytes::from(json));
832 if client_mgr.send_to_client(client_id, payload).is_err() {
833 return;
834 }
835 if let Some(ref m) = metrics_clone {
836 m.record_ws_message_sent();
837 }
838 }
839 }
840
841 let mut transformed_data = data.clone();
842 transform_large_u64_to_strings(&mut transformed_data);
843 let frame = Frame {
844 mode: frame_mode,
845 export: view_id_clone.clone(),
846 op: "upsert",
847 key: new_key.clone(),
848 data: transformed_data,
849 append: vec![],
850 };
851
852 if let Ok(json) = serde_json::to_vec(&frame) {
853 let payload = Arc::new(Bytes::from(json));
854 if client_mgr.send_to_client(client_id, payload).is_err() {
855 return;
856 }
857 if let Some(ref m) = metrics_clone {
858 m.record_ws_message_sent();
859 }
860 }
861 }
862 } else {
863 for key in current_window_keys.difference(&new_keys) {
864 let delete_frame = Frame {
865 mode: frame_mode,
866 export: view_id_clone.clone(),
867 op: "delete",
868 key: key.clone(),
869 data: serde_json::Value::Null,
870 append: vec![],
871 };
872 if let Ok(json) = serde_json::to_vec(&delete_frame) {
873 let payload = Arc::new(Bytes::from(json));
874 if client_mgr.send_to_client(client_id, payload).is_err() {
875 return;
876 }
877 if let Some(ref m) = metrics_clone {
878 m.record_ws_message_sent();
879 }
880 }
881 }
882
883 for (key, data) in &new_window {
884 let mut transformed_data = data.clone();
885 transform_large_u64_to_strings(&mut transformed_data);
886 let frame = Frame {
887 mode: frame_mode,
888 export: view_id_clone.clone(),
889 op: "upsert",
890 key: key.clone(),
891 data: transformed_data,
892 append: vec![],
893 };
894 if let Ok(json) = serde_json::to_vec(&frame) {
895 let payload = Arc::new(Bytes::from(json));
896 if client_mgr.send_to_client(client_id, payload).is_err() {
897 return;
898 }
899 if let Some(ref m) = metrics_clone {
900 m.record_ws_message_sent();
901 }
902 }
903 }
904 }
905
906 current_window_keys = new_keys;
907 }
908 Err(_) => break,
909 }
910 }
911 }
912 }
913 }
914 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
915 );
916
917 info!(
918 "Client {} subscribed to derived view {} (take={}, skip={})",
919 ctx.client_id, view_id, take, skip
920 );
921}
922
923#[cfg(not(feature = "otel"))]
924async fn attach_client_to_bus(
925 ctx: &SubscriptionContext<'_>,
926 subscription: Subscription,
927 cancel_token: CancellationToken,
928) {
929 let view_id = &subscription.view;
930
931 let view_spec = match ctx.view_index.get_view(view_id) {
932 Some(spec) => spec.clone(),
933 None => {
934 warn!("Unknown view ID: {}", view_id);
935 return;
936 }
937 };
938
939 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
940 warn!("Failed to send subscribed frame: {}", e);
941 return;
942 }
943
944 let is_derived_with_sort = view_spec.is_derived()
945 && view_spec
946 .pipeline
947 .as_ref()
948 .map(|p| p.sort.is_some())
949 .unwrap_or(false);
950
951 if is_derived_with_sort {
952 attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
953 return;
954 }
955
956 match view_spec.mode {
957 Mode::State => {
958 let key = subscription.key.as_deref().unwrap_or("");
959
960 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
961
962 if let Some(mut cached_entity) = ctx.entity_cache.get(view_id, key).await {
963 transform_large_u64_to_strings(&mut cached_entity);
964 let snapshot_entities = vec![SnapshotEntity {
965 key: key.to_string(),
966 data: cached_entity,
967 }];
968 let batch_config = ctx.entity_cache.snapshot_config();
969 let _ = send_snapshot_batches(
970 ctx.client_id,
971 &snapshot_entities,
972 view_spec.mode,
973 view_id,
974 ctx.client_manager,
975 &batch_config,
976 )
977 .await;
978 rx.borrow_and_update();
979 } else if !rx.borrow().is_empty() {
980 let data = rx.borrow_and_update().clone();
981 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
982 }
983
984 let client_id = ctx.client_id;
985 let client_mgr = ctx.client_manager.clone();
986 let view_id_clone = view_id.clone();
987 let key_clone = key.to_string();
988 tokio::spawn(
989 async move {
990 loop {
991 tokio::select! {
992 _ = cancel_token.cancelled() => {
993 debug!("State subscription cancelled for client {}", client_id);
994 break;
995 }
996 result = rx.changed() => {
997 if result.is_err() {
998 break;
999 }
1000 let data = rx.borrow().clone();
1001 if client_mgr.send_to_client(client_id, data).is_err() {
1002 break;
1003 }
1004 }
1005 }
1006 }
1007 }
1008 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
1009 );
1010 }
1011 Mode::List | Mode::Append => {
1012 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1013
1014 let snapshots = ctx.entity_cache.get_all(view_id).await;
1015 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1016 .into_iter()
1017 .filter(|(key, _)| subscription.matches_key(key))
1018 .map(|(key, mut data)| {
1019 transform_large_u64_to_strings(&mut data);
1020 SnapshotEntity { key, data }
1021 })
1022 .collect();
1023
1024 if !snapshot_entities.is_empty() {
1025 let batch_config = ctx.entity_cache.snapshot_config();
1026 if send_snapshot_batches(
1027 ctx.client_id,
1028 &snapshot_entities,
1029 view_spec.mode,
1030 view_id,
1031 ctx.client_manager,
1032 &batch_config,
1033 )
1034 .await
1035 .is_err()
1036 {
1037 return;
1038 }
1039 }
1040
1041 let client_id = ctx.client_id;
1042 let client_mgr = ctx.client_manager.clone();
1043 let sub = subscription.clone();
1044 let view_id_clone = view_id.clone();
1045 let mode = view_spec.mode;
1046 tokio::spawn(
1047 async move {
1048 loop {
1049 tokio::select! {
1050 _ = cancel_token.cancelled() => {
1051 debug!("List subscription cancelled for client {}", client_id);
1052 break;
1053 }
1054 result = rx.recv() => {
1055 match result {
1056 Ok(envelope) => {
1057 if sub.matches(&envelope.entity, &envelope.key)
1058 && client_mgr
1059 .send_to_client(client_id, envelope.payload.clone())
1060 .is_err()
1061 {
1062 break;
1063 }
1064 }
1065 Err(_) => break,
1066 }
1067 }
1068 }
1069 }
1070 }
1071 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1072 );
1073 }
1074 }
1075
1076 info!(
1077 "Client {} subscribed to {} (mode: {:?})",
1078 ctx.client_id, view_id, view_spec.mode
1079 );
1080}
1081
1082#[cfg(not(feature = "otel"))]
1083async fn attach_derived_view_subscription(
1084 ctx: &SubscriptionContext<'_>,
1085 subscription: Subscription,
1086 view_spec: ViewSpec,
1087 cancel_token: CancellationToken,
1088) {
1089 let view_id = &subscription.view;
1090 let pipeline_limit = view_spec
1091 .pipeline
1092 .as_ref()
1093 .and_then(|p| p.limit)
1094 .unwrap_or(100);
1095 let take = subscription.take.unwrap_or(pipeline_limit);
1096 let skip = subscription.skip.unwrap_or(0);
1097 let is_single = take == 1;
1098
1099 let source_view_id = match &view_spec.source_view {
1100 Some(s) => s.clone(),
1101 None => {
1102 warn!("Derived view {} has no source_view", view_id);
1103 return;
1104 }
1105 };
1106
1107 let sorted_caches = ctx.view_index.sorted_caches();
1108 let initial_window: Vec<(String, serde_json::Value)> = {
1109 let mut caches = sorted_caches.write().await;
1110 if let Some(cache) = caches.get_mut(view_id) {
1111 cache.get_window(skip, take)
1112 } else {
1113 warn!("No sorted cache for derived view {}", view_id);
1114 vec![]
1115 }
1116 };
1117
1118 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1119
1120 if !initial_window.is_empty() {
1121 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1122 .into_iter()
1123 .map(|(key, mut data)| {
1124 transform_large_u64_to_strings(&mut data);
1125 SnapshotEntity { key, data }
1126 })
1127 .collect();
1128
1129 let batch_config = ctx.entity_cache.snapshot_config();
1130 if send_snapshot_batches(
1131 ctx.client_id,
1132 &snapshot_entities,
1133 view_spec.mode,
1134 view_id,
1135 ctx.client_manager,
1136 &batch_config,
1137 )
1138 .await
1139 .is_err()
1140 {
1141 return;
1142 }
1143 }
1144
1145 let mut rx = ctx
1146 .bus_manager
1147 .get_or_create_list_bus(&source_view_id)
1148 .await;
1149
1150 let client_id = ctx.client_id;
1151 let client_mgr = ctx.client_manager.clone();
1152 let view_id_clone = view_id.clone();
1153 let view_id_span = view_id.clone();
1154 let sorted_caches_clone = sorted_caches;
1155 let frame_mode = view_spec.mode;
1156
1157 tokio::spawn(
1158 async move {
1159 let mut current_window_keys = initial_keys;
1160
1161 loop {
1162 tokio::select! {
1163 _ = cancel_token.cancelled() => {
1164 debug!("Derived view subscription cancelled for client {}", client_id);
1165 break;
1166 }
1167 result = rx.recv() => {
1168 match result {
1169 Ok(_envelope) => {
1170 let new_window: Vec<(String, serde_json::Value)> = {
1171 let mut caches = sorted_caches_clone.write().await;
1172 if let Some(cache) = caches.get_mut(&view_id_clone) {
1173 cache.get_window(skip, take)
1174 } else {
1175 continue;
1176 }
1177 };
1178
1179 let new_keys: HashSet<String> =
1180 new_window.iter().map(|(k, _)| k.clone()).collect();
1181
1182 if is_single {
1183 if let Some((new_key, data)) = new_window.first() {
1184 for old_key in current_window_keys.difference(&new_keys) {
1185 let delete_frame = Frame {
1186 mode: frame_mode,
1187 export: view_id_clone.clone(),
1188 op: "delete",
1189 key: old_key.clone(),
1190 data: serde_json::Value::Null,
1191 append: vec![],
1192 };
1193 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1194 let payload = Arc::new(Bytes::from(json));
1195 if client_mgr.send_to_client(client_id, payload).is_err() {
1196 return;
1197 }
1198 }
1199 }
1200
1201 let mut transformed_data = data.clone();
1202 transform_large_u64_to_strings(&mut transformed_data);
1203 let frame = Frame {
1204 mode: frame_mode,
1205 export: view_id_clone.clone(),
1206 op: "upsert",
1207 key: new_key.clone(),
1208 data: transformed_data,
1209 append: vec![],
1210 };
1211 if let Ok(json) = serde_json::to_vec(&frame) {
1212 let payload = Arc::new(Bytes::from(json));
1213 if client_mgr.send_to_client(client_id, payload).is_err() {
1214 return;
1215 }
1216 }
1217 }
1218 } else {
1219 for key in current_window_keys.difference(&new_keys) {
1220 let delete_frame = Frame {
1221 mode: frame_mode,
1222 export: view_id_clone.clone(),
1223 op: "delete",
1224 key: key.clone(),
1225 data: serde_json::Value::Null,
1226 append: vec![],
1227 };
1228 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1229 let payload = Arc::new(Bytes::from(json));
1230 if client_mgr.send_to_client(client_id, payload).is_err() {
1231 return;
1232 }
1233 }
1234 }
1235
1236 for (key, data) in &new_window {
1237 let mut transformed_data = data.clone();
1238 transform_large_u64_to_strings(&mut transformed_data);
1239 let frame = Frame {
1240 mode: frame_mode,
1241 export: view_id_clone.clone(),
1242 op: "upsert",
1243 key: key.clone(),
1244 data: transformed_data,
1245 append: vec![],
1246 };
1247 if let Ok(json) = serde_json::to_vec(&frame) {
1248 let payload = Arc::new(Bytes::from(json));
1249 if client_mgr.send_to_client(client_id, payload).is_err() {
1250 return;
1251 }
1252 }
1253 }
1254 }
1255
1256 current_window_keys = new_keys;
1257 }
1258 Err(_) => break,
1259 }
1260 }
1261 }
1262 }
1263 }
1264 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1265 );
1266
1267 info!(
1268 "Client {} subscribed to derived view {} (take={}, skip={})",
1269 ctx.client_id, view_id, take, skip
1270 );
1271}