1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7 Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29 client_id: Uuid,
30 client_manager: &'a ClientManager,
31 bus_manager: &'a BusManager,
32 entity_cache: &'a EntityCache,
33 view_index: &'a ViewIndex,
34 #[cfg(feature = "otel")]
35 metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39 bind_addr: SocketAddr,
40 client_manager: ClientManager,
41 bus_manager: BusManager,
42 entity_cache: EntityCache,
43 view_index: Arc<ViewIndex>,
44 max_clients: usize,
45 #[cfg(feature = "otel")]
46 metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50 #[cfg(feature = "otel")]
51 pub fn new(
52 bind_addr: SocketAddr,
53 bus_manager: BusManager,
54 entity_cache: EntityCache,
55 view_index: Arc<ViewIndex>,
56 metrics: Option<Arc<Metrics>>,
57 ) -> Self {
58 Self {
59 bind_addr,
60 client_manager: ClientManager::new(),
61 bus_manager,
62 entity_cache,
63 view_index,
64 max_clients: 10000,
65 metrics,
66 }
67 }
68
69 #[cfg(not(feature = "otel"))]
70 pub fn new(
71 bind_addr: SocketAddr,
72 bus_manager: BusManager,
73 entity_cache: EntityCache,
74 view_index: Arc<ViewIndex>,
75 ) -> Self {
76 Self {
77 bind_addr,
78 client_manager: ClientManager::new(),
79 bus_manager,
80 entity_cache,
81 view_index,
82 max_clients: 10000,
83 }
84 }
85
86 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87 self.max_clients = max_clients;
88 self
89 }
90
91 pub async fn start(self) -> Result<()> {
92 info!(
93 "Starting WebSocket server on {} (max_clients: {})",
94 self.bind_addr, self.max_clients
95 );
96
97 let listener = TcpListener::bind(&self.bind_addr).await?;
98 info!("WebSocket server listening on {}", self.bind_addr);
99
100 self.client_manager.start_cleanup_task();
101
102 loop {
103 match listener.accept().await {
104 Ok((stream, addr)) => {
105 let client_count = self.client_manager.client_count();
106 if client_count >= self.max_clients {
107 warn!(
108 "Rejecting connection from {} - max clients ({}) reached",
109 addr, self.max_clients
110 );
111 drop(stream);
112 continue;
113 }
114
115 #[cfg(feature = "otel")]
116 if let Some(ref metrics) = self.metrics {
117 metrics.record_ws_connection();
118 }
119
120 info!(
121 "New WebSocket connection from {} ({}/{} clients)",
122 addr,
123 client_count + 1,
124 self.max_clients
125 );
126 let client_manager = self.client_manager.clone();
127 let bus_manager = self.bus_manager.clone();
128 let entity_cache = self.entity_cache.clone();
129 let view_index = self.view_index.clone();
130 #[cfg(feature = "otel")]
131 let metrics = self.metrics.clone();
132
133 tokio::spawn(
134 async move {
135 #[cfg(feature = "otel")]
136 let result = handle_connection(
137 stream,
138 client_manager,
139 bus_manager,
140 entity_cache,
141 view_index,
142 metrics,
143 )
144 .await;
145 #[cfg(not(feature = "otel"))]
146 let result = handle_connection(
147 stream,
148 client_manager,
149 bus_manager,
150 entity_cache,
151 view_index,
152 )
153 .await;
154
155 if let Err(e) = result {
156 error!("WebSocket connection error: {}", e);
157 }
158 }
159 .instrument(info_span!("ws.connection", %addr)),
160 );
161 }
162 Err(e) => {
163 error!("Failed to accept connection: {}", e);
164 }
165 }
166 }
167 }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172 stream: TcpStream,
173 client_manager: ClientManager,
174 bus_manager: BusManager,
175 entity_cache: EntityCache,
176 view_index: Arc<ViewIndex>,
177 metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179 let ws_stream = accept_async(stream).await?;
180 let client_id = Uuid::new_v4();
181 let connection_start = Instant::now();
182
183 info!("WebSocket connection established for client {}", client_id);
184
185 let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187 client_manager.add_client(client_id, ws_sender);
188
189 let ctx = SubscriptionContext {
190 client_id,
191 client_manager: &client_manager,
192 bus_manager: &bus_manager,
193 entity_cache: &entity_cache,
194 view_index: &view_index,
195 metrics: metrics.clone(),
196 };
197
198 let mut active_subscriptions: Vec<String> = Vec::new();
199
200 loop {
201 tokio::select! {
202 ws_msg = ws_receiver.next() => {
203 match ws_msg {
204 Some(Ok(msg)) => {
205 if msg.is_close() {
206 info!("Client {} requested close", client_id);
207 break;
208 }
209
210 client_manager.update_client_last_seen(client_id);
211
212 if msg.is_text() {
213 if let Some(ref m) = metrics {
214 m.record_ws_message_received();
215 }
216
217 if let Ok(text) = msg.to_text() {
218 debug!("Received text message from client {}: {}", client_id, text);
219
220 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221 match client_msg {
222 ClientMessage::Subscribe(subscription) => {
223 let view_id = subscription.view.clone();
224 let sub_key = subscription.sub_key();
225 client_manager.update_subscription(client_id, subscription.clone());
226
227 let cancel_token = CancellationToken::new();
228 let is_new = client_manager.add_client_subscription(
229 client_id,
230 sub_key.clone(),
231 cancel_token.clone(),
232 ).await;
233
234 if !is_new {
235 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236 continue;
237 }
238
239 if let Some(ref m) = metrics {
240 m.record_subscription_created(&view_id);
241 }
242 active_subscriptions.push(view_id);
243
244 attach_client_to_bus(&ctx, subscription, cancel_token).await;
245 }
246 ClientMessage::Unsubscribe(unsub) => {
247 let sub_key = unsub.sub_key();
248 let removed = client_manager
249 .remove_client_subscription(client_id, &sub_key)
250 .await;
251
252 if removed {
253 info!("Client {} unsubscribed from {}", client_id, sub_key);
254 if let Some(ref m) = metrics {
255 m.record_subscription_removed(&unsub.view);
256 }
257 }
258 }
259 ClientMessage::Ping => {
260 debug!("Received ping from client {}", client_id);
261 }
262 }
263 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264 let view_id = subscription.view.clone();
265 let sub_key = subscription.sub_key();
266 client_manager.update_subscription(client_id, subscription.clone());
267
268 let cancel_token = CancellationToken::new();
269 let is_new = client_manager.add_client_subscription(
270 client_id,
271 sub_key.clone(),
272 cancel_token.clone(),
273 ).await;
274
275 if !is_new {
276 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277 continue;
278 }
279
280 if let Some(ref m) = metrics {
281 m.record_subscription_created(&view_id);
282 }
283 active_subscriptions.push(view_id);
284
285 attach_client_to_bus(&ctx, subscription, cancel_token).await;
286 } else {
287 debug!("Received non-subscription message from client {}: {}", client_id, text);
288 }
289 }
290 }
291 }
292 Some(Err(e)) => {
293 warn!("WebSocket error for client {}: {}", client_id, e);
294 break;
295 }
296 None => {
297 debug!("WebSocket stream ended for client {}", client_id);
298 break;
299 }
300 }
301 }
302 }
303 }
304
305 client_manager
306 .cancel_all_client_subscriptions(client_id)
307 .await;
308 client_manager.remove_client(client_id);
309
310 if let Some(ref m) = metrics {
311 let duration_secs = connection_start.elapsed().as_secs_f64();
312 m.record_ws_disconnection(duration_secs);
313
314 for view_id in active_subscriptions {
315 m.record_subscription_removed(&view_id);
316 }
317 }
318
319 info!("Client {} disconnected", client_id);
320
321 Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326 stream: TcpStream,
327 client_manager: ClientManager,
328 bus_manager: BusManager,
329 entity_cache: EntityCache,
330 view_index: Arc<ViewIndex>,
331) -> Result<()> {
332 let ws_stream = accept_async(stream).await?;
333 let client_id = Uuid::new_v4();
334
335 info!("WebSocket connection established for client {}", client_id);
336
337 let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339 client_manager.add_client(client_id, ws_sender);
340
341 let ctx = SubscriptionContext {
342 client_id,
343 client_manager: &client_manager,
344 bus_manager: &bus_manager,
345 entity_cache: &entity_cache,
346 view_index: &view_index,
347 };
348
349 loop {
350 tokio::select! {
351 ws_msg = ws_receiver.next() => {
352 match ws_msg {
353 Some(Ok(msg)) => {
354 if msg.is_close() {
355 info!("Client {} requested close", client_id);
356 break;
357 }
358
359 client_manager.update_client_last_seen(client_id);
360
361 if msg.is_text() {
362 if let Ok(text) = msg.to_text() {
363 debug!("Received text message from client {}: {}", client_id, text);
364
365 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366 match client_msg {
367 ClientMessage::Subscribe(subscription) => {
368 let sub_key = subscription.sub_key();
369 client_manager.update_subscription(client_id, subscription.clone());
370
371 let cancel_token = CancellationToken::new();
372 let is_new = client_manager.add_client_subscription(
373 client_id,
374 sub_key.clone(),
375 cancel_token.clone(),
376 ).await;
377
378 if !is_new {
379 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380 continue;
381 }
382
383 attach_client_to_bus(&ctx, subscription, cancel_token).await;
384 }
385 ClientMessage::Unsubscribe(unsub) => {
386 let sub_key = unsub.sub_key();
387 let removed = client_manager
388 .remove_client_subscription(client_id, &sub_key)
389 .await;
390
391 if removed {
392 info!("Client {} unsubscribed from {}", client_id, sub_key);
393 }
394 }
395 ClientMessage::Ping => {
396 debug!("Received ping from client {}", client_id);
397 }
398 }
399 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400 let sub_key = subscription.sub_key();
401 client_manager.update_subscription(client_id, subscription.clone());
402
403 let cancel_token = CancellationToken::new();
404 let is_new = client_manager.add_client_subscription(
405 client_id,
406 sub_key.clone(),
407 cancel_token.clone(),
408 ).await;
409
410 if !is_new {
411 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412 continue;
413 }
414
415 attach_client_to_bus(&ctx, subscription, cancel_token).await;
416 } else {
417 debug!("Received non-subscription message from client {}: {}", client_id, text);
418 }
419 }
420 }
421 }
422 Some(Err(e)) => {
423 warn!("WebSocket error for client {}: {}", client_id, e);
424 break;
425 }
426 None => {
427 debug!("WebSocket stream ended for client {}", client_id);
428 break;
429 }
430 }
431 }
432 }
433 }
434
435 client_manager
436 .cancel_all_client_subscriptions(client_id)
437 .await;
438 client_manager.remove_client(client_id);
439 info!("Client {} disconnected", client_id);
440
441 Ok(())
442}
443
444async fn send_snapshot_batches(
445 client_id: Uuid,
446 entities: &[SnapshotEntity],
447 mode: Mode,
448 view_id: &str,
449 client_manager: &ClientManager,
450 batch_config: &SnapshotBatchConfig,
451 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453 let total = entities.len();
454 if total == 0 {
455 return Ok(());
456 }
457
458 let mut offset = 0;
459 let mut batch_num = 0;
460
461 while offset < total {
462 let batch_size = if batch_num == 0 {
463 batch_config.initial_batch_size
464 } else {
465 batch_config.subsequent_batch_size
466 };
467
468 let end = (offset + batch_size).min(total);
469 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470 let is_complete = end >= total;
471
472 let snapshot_frame = SnapshotFrame {
473 mode,
474 export: view_id.to_string(),
475 op: "snapshot",
476 data: batch_data,
477 complete: is_complete,
478 };
479
480 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481 let payload = maybe_compress(&json_payload);
482 if client_manager
483 .send_compressed_async(client_id, payload)
484 .await
485 .is_err()
486 {
487 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488 }
489 #[cfg(feature = "otel")]
490 if let Some(m) = metrics {
491 m.record_ws_message_sent();
492 }
493 }
494
495 offset = end;
496 batch_num += 1;
497 }
498
499 debug!(
500 "Sent {} snapshot batches ({} entities) for {} to client {}",
501 batch_num, total, view_id, client_id
502 );
503
504 Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509 return Some(SortConfig {
510 field: sort.field_path.clone(),
511 order: match sort.order {
512 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514 },
515 });
516 }
517
518 if view_spec.mode == Mode::List {
519 return Some(SortConfig {
520 field: vec!["_seq".to_string()],
521 order: SortOrder::Desc,
522 });
523 }
524
525 None
526}
527
528fn send_subscribed_frame(
529 client_id: Uuid,
530 view_id: &str,
531 view_spec: &ViewSpec,
532 client_manager: &ClientManager,
533) -> Result<()> {
534 let sort_config = extract_sort_config(view_spec);
535 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537 let json_payload = serde_json::to_vec(&subscribed_frame)?;
538 let payload = Arc::new(Bytes::from(json_payload));
539 client_manager
540 .send_to_client(client_id, payload)
541 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546 ctx: &SubscriptionContext<'_>,
547 subscription: Subscription,
548 cancel_token: CancellationToken,
549) {
550 let view_id = &subscription.view;
551
552 let view_spec = match ctx.view_index.get_view(view_id) {
553 Some(spec) => spec.clone(),
554 None => {
555 warn!("Unknown view ID: {}", view_id);
556 return;
557 }
558 };
559
560 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561 warn!("Failed to send subscribed frame: {}", e);
562 return;
563 }
564
565 let is_derived_with_sort = view_spec.is_derived()
566 && view_spec
567 .pipeline
568 .as_ref()
569 .map(|p| p.sort.is_some())
570 .unwrap_or(false);
571
572 if is_derived_with_sort {
573 attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574 return;
575 }
576
577 match view_spec.mode {
578 Mode::State => {
579 let key = subscription.key.as_deref().unwrap_or("");
580 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
581
582 if !rx.borrow().is_empty() {
583 let data = rx.borrow().clone();
584 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
585 if let Some(ref m) = ctx.metrics {
586 m.record_ws_message_sent();
587 }
588 }
589
590 let client_id = ctx.client_id;
591 let client_mgr = ctx.client_manager.clone();
592 let metrics_clone = ctx.metrics.clone();
593 let view_id_clone = view_id.clone();
594 let key_clone = key.to_string();
595 tokio::spawn(
596 async move {
597 loop {
598 tokio::select! {
599 _ = cancel_token.cancelled() => {
600 debug!("State subscription cancelled for client {}", client_id);
601 break;
602 }
603 result = rx.changed() => {
604 if result.is_err() {
605 break;
606 }
607 let data = rx.borrow().clone();
608 if client_mgr.send_to_client(client_id, data).is_err() {
609 break;
610 }
611 if let Some(ref m) = metrics_clone {
612 m.record_ws_message_sent();
613 }
614 }
615 }
616 }
617 }
618 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
619 );
620 }
621 Mode::List | Mode::Append => {
622 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
623
624 let snapshots = ctx.entity_cache.get_all(view_id).await;
625 let snapshot_entities: Vec<SnapshotEntity> = snapshots
626 .into_iter()
627 .filter(|(key, _)| subscription.matches_key(key))
628 .map(|(key, data)| SnapshotEntity { key, data })
629 .collect();
630
631 if !snapshot_entities.is_empty() {
632 let batch_config = ctx.entity_cache.snapshot_config();
633 if send_snapshot_batches(
634 ctx.client_id,
635 &snapshot_entities,
636 view_spec.mode,
637 view_id,
638 ctx.client_manager,
639 &batch_config,
640 #[cfg(feature = "otel")]
641 ctx.metrics.as_ref(),
642 )
643 .await
644 .is_err()
645 {
646 return;
647 }
648 }
649
650 let client_id = ctx.client_id;
651 let client_mgr = ctx.client_manager.clone();
652 let sub = subscription.clone();
653 let metrics_clone = ctx.metrics.clone();
654 let view_id_clone = view_id.clone();
655 let mode = view_spec.mode;
656 tokio::spawn(
657 async move {
658 loop {
659 tokio::select! {
660 _ = cancel_token.cancelled() => {
661 debug!("List subscription cancelled for client {}", client_id);
662 break;
663 }
664 result = rx.recv() => {
665 match result {
666 Ok(envelope) => {
667 if sub.matches(&envelope.entity, &envelope.key) {
668 if client_mgr
669 .send_to_client(client_id, envelope.payload.clone())
670 .is_err()
671 {
672 break;
673 }
674 if let Some(ref m) = metrics_clone {
675 m.record_ws_message_sent();
676 }
677 }
678 }
679 Err(_) => break,
680 }
681 }
682 }
683 }
684 }
685 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
686 );
687 }
688 }
689
690 info!(
691 "Client {} subscribed to {} (mode: {:?})",
692 ctx.client_id, view_id, view_spec.mode
693 );
694}
695
696#[cfg(feature = "otel")]
697async fn attach_derived_view_subscription_otel(
698 ctx: &SubscriptionContext<'_>,
699 subscription: Subscription,
700 view_spec: ViewSpec,
701 cancel_token: CancellationToken,
702) {
703 let view_id = &subscription.view;
704 let pipeline_limit = view_spec
705 .pipeline
706 .as_ref()
707 .and_then(|p| p.limit)
708 .unwrap_or(100);
709 let take = subscription.take.unwrap_or(pipeline_limit);
710 let skip = subscription.skip.unwrap_or(0);
711 let is_single = take == 1;
712
713 let source_view_id = match &view_spec.source_view {
714 Some(s) => s.clone(),
715 None => {
716 warn!("Derived view {} has no source_view", view_id);
717 return;
718 }
719 };
720
721 let sorted_caches = ctx.view_index.sorted_caches();
722 let initial_window: Vec<(String, serde_json::Value)> = {
723 let mut caches = sorted_caches.write().await;
724 if let Some(cache) = caches.get_mut(view_id) {
725 cache.get_window(skip, take)
726 } else {
727 warn!("No sorted cache for derived view {}", view_id);
728 vec![]
729 }
730 };
731
732 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
733
734 if !initial_window.is_empty() {
735 let snapshot_entities: Vec<SnapshotEntity> = initial_window
736 .into_iter()
737 .map(|(key, data)| SnapshotEntity { key, data })
738 .collect();
739
740 let batch_config = ctx.entity_cache.snapshot_config();
741 if send_snapshot_batches(
742 ctx.client_id,
743 &snapshot_entities,
744 view_spec.mode,
745 view_id,
746 ctx.client_manager,
747 &batch_config,
748 ctx.metrics.as_ref(),
749 )
750 .await
751 .is_err()
752 {
753 return;
754 }
755 }
756
757 let mut rx = ctx
758 .bus_manager
759 .get_or_create_list_bus(&source_view_id)
760 .await;
761
762 let client_id = ctx.client_id;
763 let client_mgr = ctx.client_manager.clone();
764 let view_id_clone = view_id.clone();
765 let view_id_span = view_id.clone();
766 let sorted_caches_clone = sorted_caches;
767 let metrics_clone = ctx.metrics.clone();
768 let frame_mode = view_spec.mode;
769
770 tokio::spawn(
771 async move {
772 let mut current_window_keys = initial_keys;
773
774 loop {
775 tokio::select! {
776 _ = cancel_token.cancelled() => {
777 debug!("Derived view subscription cancelled for client {}", client_id);
778 break;
779 }
780 result = rx.recv() => {
781 match result {
782 Ok(_envelope) => {
783 let new_window: Vec<(String, serde_json::Value)> = {
784 let mut caches = sorted_caches_clone.write().await;
785 if let Some(cache) = caches.get_mut(&view_id_clone) {
786 cache.get_window(skip, take)
787 } else {
788 continue;
789 }
790 };
791
792 let new_keys: HashSet<String> =
793 new_window.iter().map(|(k, _)| k.clone()).collect();
794
795 if is_single {
796 if let Some((new_key, data)) = new_window.first() {
797 for old_key in current_window_keys.difference(&new_keys) {
798 let delete_frame = Frame {
799 mode: frame_mode,
800 export: view_id_clone.clone(),
801 op: "delete",
802 key: old_key.clone(),
803 data: serde_json::Value::Null,
804 append: vec![],
805 };
806 if let Ok(json) = serde_json::to_vec(&delete_frame) {
807 let payload = Arc::new(Bytes::from(json));
808 if client_mgr.send_to_client(client_id, payload).is_err() {
809 return;
810 }
811 if let Some(ref m) = metrics_clone {
812 m.record_ws_message_sent();
813 }
814 }
815 }
816
817 let frame = Frame {
818 mode: frame_mode,
819 export: view_id_clone.clone(),
820 op: "upsert",
821 key: new_key.clone(),
822 data: data.clone(),
823 append: vec![],
824 };
825 if let Ok(json) = serde_json::to_vec(&frame) {
826 let payload = Arc::new(Bytes::from(json));
827 if client_mgr.send_to_client(client_id, payload).is_err() {
828 return;
829 }
830 if let Some(ref m) = metrics_clone {
831 m.record_ws_message_sent();
832 }
833 }
834 }
835 } else {
836 for key in current_window_keys.difference(&new_keys) {
837 let delete_frame = Frame {
838 mode: frame_mode,
839 export: view_id_clone.clone(),
840 op: "delete",
841 key: key.clone(),
842 data: serde_json::Value::Null,
843 append: vec![],
844 };
845 if let Ok(json) = serde_json::to_vec(&delete_frame) {
846 let payload = Arc::new(Bytes::from(json));
847 if client_mgr.send_to_client(client_id, payload).is_err() {
848 return;
849 }
850 if let Some(ref m) = metrics_clone {
851 m.record_ws_message_sent();
852 }
853 }
854 }
855
856 for (key, data) in &new_window {
857 let frame = Frame {
858 mode: frame_mode,
859 export: view_id_clone.clone(),
860 op: "upsert",
861 key: key.clone(),
862 data: data.clone(),
863 append: vec![],
864 };
865 if let Ok(json) = serde_json::to_vec(&frame) {
866 let payload = Arc::new(Bytes::from(json));
867 if client_mgr.send_to_client(client_id, payload).is_err() {
868 return;
869 }
870 if let Some(ref m) = metrics_clone {
871 m.record_ws_message_sent();
872 }
873 }
874 }
875 }
876
877 current_window_keys = new_keys;
878 }
879 Err(_) => break,
880 }
881 }
882 }
883 }
884 }
885 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
886 );
887
888 info!(
889 "Client {} subscribed to derived view {} (take={}, skip={})",
890 ctx.client_id, view_id, take, skip
891 );
892}
893
894#[cfg(not(feature = "otel"))]
895async fn attach_client_to_bus(
896 ctx: &SubscriptionContext<'_>,
897 subscription: Subscription,
898 cancel_token: CancellationToken,
899) {
900 let view_id = &subscription.view;
901
902 let view_spec = match ctx.view_index.get_view(view_id) {
903 Some(spec) => spec.clone(),
904 None => {
905 warn!("Unknown view ID: {}", view_id);
906 return;
907 }
908 };
909
910 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
911 warn!("Failed to send subscribed frame: {}", e);
912 return;
913 }
914
915 let is_derived_with_sort = view_spec.is_derived()
916 && view_spec
917 .pipeline
918 .as_ref()
919 .map(|p| p.sort.is_some())
920 .unwrap_or(false);
921
922 if is_derived_with_sort {
923 attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
924 return;
925 }
926
927 match view_spec.mode {
928 Mode::State => {
929 let key = subscription.key.as_deref().unwrap_or("");
930 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
931
932 if !rx.borrow().is_empty() {
933 let data = rx.borrow().clone();
934 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
935 }
936
937 let client_id = ctx.client_id;
938 let client_mgr = ctx.client_manager.clone();
939 let view_id_clone = view_id.clone();
940 let key_clone = key.to_string();
941 tokio::spawn(
942 async move {
943 loop {
944 tokio::select! {
945 _ = cancel_token.cancelled() => {
946 debug!("State subscription cancelled for client {}", client_id);
947 break;
948 }
949 result = rx.changed() => {
950 if result.is_err() {
951 break;
952 }
953 let data = rx.borrow().clone();
954 if client_mgr.send_to_client(client_id, data).is_err() {
955 break;
956 }
957 }
958 }
959 }
960 }
961 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
962 );
963 }
964 Mode::List | Mode::Append => {
965 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
966
967 let snapshots = ctx.entity_cache.get_all(view_id).await;
968 let snapshot_entities: Vec<SnapshotEntity> = snapshots
969 .into_iter()
970 .filter(|(key, _)| subscription.matches_key(key))
971 .map(|(key, data)| SnapshotEntity { key, data })
972 .collect();
973
974 if !snapshot_entities.is_empty() {
975 let batch_config = ctx.entity_cache.snapshot_config();
976 if send_snapshot_batches(
977 ctx.client_id,
978 &snapshot_entities,
979 view_spec.mode,
980 view_id,
981 ctx.client_manager,
982 &batch_config,
983 )
984 .await
985 .is_err()
986 {
987 return;
988 }
989 }
990
991 let client_id = ctx.client_id;
992 let client_mgr = ctx.client_manager.clone();
993 let sub = subscription.clone();
994 let view_id_clone = view_id.clone();
995 let mode = view_spec.mode;
996 tokio::spawn(
997 async move {
998 loop {
999 tokio::select! {
1000 _ = cancel_token.cancelled() => {
1001 debug!("List subscription cancelled for client {}", client_id);
1002 break;
1003 }
1004 result = rx.recv() => {
1005 match result {
1006 Ok(envelope) => {
1007 if sub.matches(&envelope.entity, &envelope.key)
1008 && client_mgr
1009 .send_to_client(client_id, envelope.payload.clone())
1010 .is_err()
1011 {
1012 break;
1013 }
1014 }
1015 Err(_) => break,
1016 }
1017 }
1018 }
1019 }
1020 }
1021 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1022 );
1023 }
1024 }
1025
1026 info!(
1027 "Client {} subscribed to {} (mode: {:?})",
1028 ctx.client_id, view_id, view_spec.mode
1029 );
1030}
1031
1032#[cfg(not(feature = "otel"))]
1033async fn attach_derived_view_subscription(
1034 ctx: &SubscriptionContext<'_>,
1035 subscription: Subscription,
1036 view_spec: ViewSpec,
1037 cancel_token: CancellationToken,
1038) {
1039 let view_id = &subscription.view;
1040 let pipeline_limit = view_spec
1041 .pipeline
1042 .as_ref()
1043 .and_then(|p| p.limit)
1044 .unwrap_or(100);
1045 let take = subscription.take.unwrap_or(pipeline_limit);
1046 let skip = subscription.skip.unwrap_or(0);
1047 let is_single = take == 1;
1048
1049 let source_view_id = match &view_spec.source_view {
1050 Some(s) => s.clone(),
1051 None => {
1052 warn!("Derived view {} has no source_view", view_id);
1053 return;
1054 }
1055 };
1056
1057 let sorted_caches = ctx.view_index.sorted_caches();
1058 let initial_window: Vec<(String, serde_json::Value)> = {
1059 let mut caches = sorted_caches.write().await;
1060 if let Some(cache) = caches.get_mut(view_id) {
1061 cache.get_window(skip, take)
1062 } else {
1063 warn!("No sorted cache for derived view {}", view_id);
1064 vec![]
1065 }
1066 };
1067
1068 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1069
1070 if !initial_window.is_empty() {
1071 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1072 .into_iter()
1073 .map(|(key, data)| SnapshotEntity { key, data })
1074 .collect();
1075
1076 let batch_config = ctx.entity_cache.snapshot_config();
1077 if send_snapshot_batches(
1078 ctx.client_id,
1079 &snapshot_entities,
1080 view_spec.mode,
1081 view_id,
1082 ctx.client_manager,
1083 &batch_config,
1084 )
1085 .await
1086 .is_err()
1087 {
1088 return;
1089 }
1090 }
1091
1092 let mut rx = ctx
1093 .bus_manager
1094 .get_or_create_list_bus(&source_view_id)
1095 .await;
1096
1097 let client_id = ctx.client_id;
1098 let client_mgr = ctx.client_manager.clone();
1099 let view_id_clone = view_id.clone();
1100 let view_id_span = view_id.clone();
1101 let sorted_caches_clone = sorted_caches;
1102 let frame_mode = view_spec.mode;
1103
1104 tokio::spawn(
1105 async move {
1106 let mut current_window_keys = initial_keys;
1107
1108 loop {
1109 tokio::select! {
1110 _ = cancel_token.cancelled() => {
1111 debug!("Derived view subscription cancelled for client {}", client_id);
1112 break;
1113 }
1114 result = rx.recv() => {
1115 match result {
1116 Ok(_envelope) => {
1117 let new_window: Vec<(String, serde_json::Value)> = {
1118 let mut caches = sorted_caches_clone.write().await;
1119 if let Some(cache) = caches.get_mut(&view_id_clone) {
1120 cache.get_window(skip, take)
1121 } else {
1122 continue;
1123 }
1124 };
1125
1126 let new_keys: HashSet<String> =
1127 new_window.iter().map(|(k, _)| k.clone()).collect();
1128
1129 if is_single {
1130 if let Some((new_key, data)) = new_window.first() {
1131 for old_key in current_window_keys.difference(&new_keys) {
1132 let delete_frame = Frame {
1133 mode: frame_mode,
1134 export: view_id_clone.clone(),
1135 op: "delete",
1136 key: old_key.clone(),
1137 data: serde_json::Value::Null,
1138 append: vec![],
1139 };
1140 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1141 let payload = Arc::new(Bytes::from(json));
1142 if client_mgr.send_to_client(client_id, payload).is_err() {
1143 return;
1144 }
1145 }
1146 }
1147
1148 let frame = Frame {
1149 mode: frame_mode,
1150 export: view_id_clone.clone(),
1151 op: "upsert",
1152 key: new_key.clone(),
1153 data: data.clone(),
1154 append: vec![],
1155 };
1156 if let Ok(json) = serde_json::to_vec(&frame) {
1157 let payload = Arc::new(Bytes::from(json));
1158 if client_mgr.send_to_client(client_id, payload).is_err() {
1159 return;
1160 }
1161 }
1162 }
1163 } else {
1164 for key in current_window_keys.difference(&new_keys) {
1165 let delete_frame = Frame {
1166 mode: frame_mode,
1167 export: view_id_clone.clone(),
1168 op: "delete",
1169 key: key.clone(),
1170 data: serde_json::Value::Null,
1171 append: vec![],
1172 };
1173 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1174 let payload = Arc::new(Bytes::from(json));
1175 if client_mgr.send_to_client(client_id, payload).is_err() {
1176 return;
1177 }
1178 }
1179 }
1180
1181 for (key, data) in &new_window {
1182 let frame = Frame {
1183 mode: frame_mode,
1184 export: view_id_clone.clone(),
1185 op: "upsert",
1186 key: key.clone(),
1187 data: data.clone(),
1188 append: vec![],
1189 };
1190 if let Ok(json) = serde_json::to_vec(&frame) {
1191 let payload = Arc::new(Bytes::from(json));
1192 if client_mgr.send_to_client(client_id, payload).is_err() {
1193 return;
1194 }
1195 }
1196 }
1197 }
1198
1199 current_window_keys = new_keys;
1200 }
1201 Err(_) => break,
1202 }
1203 }
1204 }
1205 }
1206 }
1207 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1208 );
1209
1210 info!(
1211 "Client {} subscribed to derived view {} (take={}, skip={})",
1212 ctx.client_id, view_id, take, skip
1213 );
1214}