1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7 Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29 client_id: Uuid,
30 client_manager: &'a ClientManager,
31 bus_manager: &'a BusManager,
32 entity_cache: &'a EntityCache,
33 view_index: &'a ViewIndex,
34 #[cfg(feature = "otel")]
35 metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39 bind_addr: SocketAddr,
40 client_manager: ClientManager,
41 bus_manager: BusManager,
42 entity_cache: EntityCache,
43 view_index: Arc<ViewIndex>,
44 max_clients: usize,
45 #[cfg(feature = "otel")]
46 metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50 #[cfg(feature = "otel")]
51 pub fn new(
52 bind_addr: SocketAddr,
53 bus_manager: BusManager,
54 entity_cache: EntityCache,
55 view_index: Arc<ViewIndex>,
56 metrics: Option<Arc<Metrics>>,
57 ) -> Self {
58 Self {
59 bind_addr,
60 client_manager: ClientManager::new(),
61 bus_manager,
62 entity_cache,
63 view_index,
64 max_clients: 10000,
65 metrics,
66 }
67 }
68
69 #[cfg(not(feature = "otel"))]
70 pub fn new(
71 bind_addr: SocketAddr,
72 bus_manager: BusManager,
73 entity_cache: EntityCache,
74 view_index: Arc<ViewIndex>,
75 ) -> Self {
76 Self {
77 bind_addr,
78 client_manager: ClientManager::new(),
79 bus_manager,
80 entity_cache,
81 view_index,
82 max_clients: 10000,
83 }
84 }
85
86 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87 self.max_clients = max_clients;
88 self
89 }
90
91 pub async fn start(self) -> Result<()> {
92 info!(
93 "Starting WebSocket server on {} (max_clients: {})",
94 self.bind_addr, self.max_clients
95 );
96
97 let listener = TcpListener::bind(&self.bind_addr).await?;
98 info!("WebSocket server listening on {}", self.bind_addr);
99
100 self.client_manager.start_cleanup_task();
101
102 loop {
103 match listener.accept().await {
104 Ok((stream, addr)) => {
105 let client_count = self.client_manager.client_count();
106 if client_count >= self.max_clients {
107 warn!(
108 "Rejecting connection from {} - max clients ({}) reached",
109 addr, self.max_clients
110 );
111 drop(stream);
112 continue;
113 }
114
115 #[cfg(feature = "otel")]
116 if let Some(ref metrics) = self.metrics {
117 metrics.record_ws_connection();
118 }
119
120 info!(
121 "New WebSocket connection from {} ({}/{} clients)",
122 addr,
123 client_count + 1,
124 self.max_clients
125 );
126 let client_manager = self.client_manager.clone();
127 let bus_manager = self.bus_manager.clone();
128 let entity_cache = self.entity_cache.clone();
129 let view_index = self.view_index.clone();
130 #[cfg(feature = "otel")]
131 let metrics = self.metrics.clone();
132
133 tokio::spawn(
134 async move {
135 #[cfg(feature = "otel")]
136 let result = handle_connection(
137 stream,
138 client_manager,
139 bus_manager,
140 entity_cache,
141 view_index,
142 metrics,
143 )
144 .await;
145 #[cfg(not(feature = "otel"))]
146 let result = handle_connection(
147 stream,
148 client_manager,
149 bus_manager,
150 entity_cache,
151 view_index,
152 )
153 .await;
154
155 if let Err(e) = result {
156 error!("WebSocket connection error: {}", e);
157 }
158 }
159 .instrument(info_span!("ws.connection", %addr)),
160 );
161 }
162 Err(e) => {
163 error!("Failed to accept connection: {}", e);
164 }
165 }
166 }
167 }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172 stream: TcpStream,
173 client_manager: ClientManager,
174 bus_manager: BusManager,
175 entity_cache: EntityCache,
176 view_index: Arc<ViewIndex>,
177 metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179 let ws_stream = accept_async(stream).await?;
180 let client_id = Uuid::new_v4();
181 let connection_start = Instant::now();
182
183 info!("WebSocket connection established for client {}", client_id);
184
185 let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187 client_manager.add_client(client_id, ws_sender);
188
189 let ctx = SubscriptionContext {
190 client_id,
191 client_manager: &client_manager,
192 bus_manager: &bus_manager,
193 entity_cache: &entity_cache,
194 view_index: &view_index,
195 metrics: metrics.clone(),
196 };
197
198 let mut active_subscriptions: Vec<String> = Vec::new();
199
200 loop {
201 tokio::select! {
202 ws_msg = ws_receiver.next() => {
203 match ws_msg {
204 Some(Ok(msg)) => {
205 if msg.is_close() {
206 info!("Client {} requested close", client_id);
207 break;
208 }
209
210 client_manager.update_client_last_seen(client_id);
211
212 if msg.is_text() {
213 if let Some(ref m) = metrics {
214 m.record_ws_message_received();
215 }
216
217 if let Ok(text) = msg.to_text() {
218 debug!("Received text message from client {}: {}", client_id, text);
219
220 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221 match client_msg {
222 ClientMessage::Subscribe(subscription) => {
223 let view_id = subscription.view.clone();
224 let sub_key = subscription.sub_key();
225 client_manager.update_subscription(client_id, subscription.clone());
226
227 let cancel_token = CancellationToken::new();
228 let is_new = client_manager.add_client_subscription(
229 client_id,
230 sub_key.clone(),
231 cancel_token.clone(),
232 ).await;
233
234 if !is_new {
235 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236 continue;
237 }
238
239 if let Some(ref m) = metrics {
240 m.record_subscription_created(&view_id);
241 }
242 active_subscriptions.push(view_id);
243
244 attach_client_to_bus(&ctx, subscription, cancel_token).await;
245 }
246 ClientMessage::Unsubscribe(unsub) => {
247 let sub_key = unsub.sub_key();
248 let removed = client_manager
249 .remove_client_subscription(client_id, &sub_key)
250 .await;
251
252 if removed {
253 info!("Client {} unsubscribed from {}", client_id, sub_key);
254 if let Some(ref m) = metrics {
255 m.record_subscription_removed(&unsub.view);
256 }
257 }
258 }
259 ClientMessage::Ping => {
260 debug!("Received ping from client {}", client_id);
261 }
262 }
263 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264 let view_id = subscription.view.clone();
265 let sub_key = subscription.sub_key();
266 client_manager.update_subscription(client_id, subscription.clone());
267
268 let cancel_token = CancellationToken::new();
269 let is_new = client_manager.add_client_subscription(
270 client_id,
271 sub_key.clone(),
272 cancel_token.clone(),
273 ).await;
274
275 if !is_new {
276 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277 continue;
278 }
279
280 if let Some(ref m) = metrics {
281 m.record_subscription_created(&view_id);
282 }
283 active_subscriptions.push(view_id);
284
285 attach_client_to_bus(&ctx, subscription, cancel_token).await;
286 } else {
287 debug!("Received non-subscription message from client {}: {}", client_id, text);
288 }
289 }
290 }
291 }
292 Some(Err(e)) => {
293 warn!("WebSocket error for client {}: {}", client_id, e);
294 break;
295 }
296 None => {
297 debug!("WebSocket stream ended for client {}", client_id);
298 break;
299 }
300 }
301 }
302 }
303 }
304
305 client_manager
306 .cancel_all_client_subscriptions(client_id)
307 .await;
308 client_manager.remove_client(client_id);
309
310 if let Some(ref m) = metrics {
311 let duration_secs = connection_start.elapsed().as_secs_f64();
312 m.record_ws_disconnection(duration_secs);
313
314 for view_id in active_subscriptions {
315 m.record_subscription_removed(&view_id);
316 }
317 }
318
319 info!("Client {} disconnected", client_id);
320
321 Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326 stream: TcpStream,
327 client_manager: ClientManager,
328 bus_manager: BusManager,
329 entity_cache: EntityCache,
330 view_index: Arc<ViewIndex>,
331) -> Result<()> {
332 let ws_stream = accept_async(stream).await?;
333 let client_id = Uuid::new_v4();
334
335 info!("WebSocket connection established for client {}", client_id);
336
337 let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339 client_manager.add_client(client_id, ws_sender);
340
341 let ctx = SubscriptionContext {
342 client_id,
343 client_manager: &client_manager,
344 bus_manager: &bus_manager,
345 entity_cache: &entity_cache,
346 view_index: &view_index,
347 };
348
349 loop {
350 tokio::select! {
351 ws_msg = ws_receiver.next() => {
352 match ws_msg {
353 Some(Ok(msg)) => {
354 if msg.is_close() {
355 info!("Client {} requested close", client_id);
356 break;
357 }
358
359 client_manager.update_client_last_seen(client_id);
360
361 if msg.is_text() {
362 if let Ok(text) = msg.to_text() {
363 debug!("Received text message from client {}: {}", client_id, text);
364
365 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366 match client_msg {
367 ClientMessage::Subscribe(subscription) => {
368 let sub_key = subscription.sub_key();
369 client_manager.update_subscription(client_id, subscription.clone());
370
371 let cancel_token = CancellationToken::new();
372 let is_new = client_manager.add_client_subscription(
373 client_id,
374 sub_key.clone(),
375 cancel_token.clone(),
376 ).await;
377
378 if !is_new {
379 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380 continue;
381 }
382
383 attach_client_to_bus(&ctx, subscription, cancel_token).await;
384 }
385 ClientMessage::Unsubscribe(unsub) => {
386 let sub_key = unsub.sub_key();
387 let removed = client_manager
388 .remove_client_subscription(client_id, &sub_key)
389 .await;
390
391 if removed {
392 info!("Client {} unsubscribed from {}", client_id, sub_key);
393 }
394 }
395 ClientMessage::Ping => {
396 debug!("Received ping from client {}", client_id);
397 }
398 }
399 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400 let sub_key = subscription.sub_key();
401 client_manager.update_subscription(client_id, subscription.clone());
402
403 let cancel_token = CancellationToken::new();
404 let is_new = client_manager.add_client_subscription(
405 client_id,
406 sub_key.clone(),
407 cancel_token.clone(),
408 ).await;
409
410 if !is_new {
411 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412 continue;
413 }
414
415 attach_client_to_bus(&ctx, subscription, cancel_token).await;
416 } else {
417 debug!("Received non-subscription message from client {}: {}", client_id, text);
418 }
419 }
420 }
421 }
422 Some(Err(e)) => {
423 warn!("WebSocket error for client {}: {}", client_id, e);
424 break;
425 }
426 None => {
427 debug!("WebSocket stream ended for client {}", client_id);
428 break;
429 }
430 }
431 }
432 }
433 }
434
435 client_manager
436 .cancel_all_client_subscriptions(client_id)
437 .await;
438 client_manager.remove_client(client_id);
439 info!("Client {} disconnected", client_id);
440
441 Ok(())
442}
443
444async fn send_snapshot_batches(
445 client_id: Uuid,
446 entities: &[SnapshotEntity],
447 mode: Mode,
448 view_id: &str,
449 client_manager: &ClientManager,
450 batch_config: &SnapshotBatchConfig,
451 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453 let total = entities.len();
454 if total == 0 {
455 return Ok(());
456 }
457
458 let mut offset = 0;
459 let mut batch_num = 0;
460
461 while offset < total {
462 let batch_size = if batch_num == 0 {
463 batch_config.initial_batch_size
464 } else {
465 batch_config.subsequent_batch_size
466 };
467
468 let end = (offset + batch_size).min(total);
469 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470 let is_complete = end >= total;
471
472 let snapshot_frame = SnapshotFrame {
473 mode,
474 export: view_id.to_string(),
475 op: "snapshot",
476 data: batch_data,
477 complete: is_complete,
478 };
479
480 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481 let payload = maybe_compress(&json_payload);
482 if client_manager
483 .send_compressed_async(client_id, payload)
484 .await
485 .is_err()
486 {
487 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488 }
489 #[cfg(feature = "otel")]
490 if let Some(m) = metrics {
491 m.record_ws_message_sent();
492 }
493 }
494
495 offset = end;
496 batch_num += 1;
497 }
498
499 debug!(
500 "Sent {} snapshot batches ({} entities) for {} to client {}",
501 batch_num, total, view_id, client_id
502 );
503
504 Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509 return Some(SortConfig {
510 field: sort.field_path.clone(),
511 order: match sort.order {
512 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514 },
515 });
516 }
517
518 if view_spec.mode == Mode::List {
519 return Some(SortConfig {
520 field: vec!["_seq".to_string()],
521 order: SortOrder::Desc,
522 });
523 }
524
525 None
526}
527
528fn send_subscribed_frame(
529 client_id: Uuid,
530 view_id: &str,
531 view_spec: &ViewSpec,
532 client_manager: &ClientManager,
533) -> Result<()> {
534 let sort_config = extract_sort_config(view_spec);
535 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537 let json_payload = serde_json::to_vec(&subscribed_frame)?;
538 let payload = Arc::new(Bytes::from(json_payload));
539 client_manager
540 .send_to_client(client_id, payload)
541 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546 ctx: &SubscriptionContext<'_>,
547 subscription: Subscription,
548 cancel_token: CancellationToken,
549) {
550 let view_id = &subscription.view;
551
552 let view_spec = match ctx.view_index.get_view(view_id) {
553 Some(spec) => spec.clone(),
554 None => {
555 warn!("Unknown view ID: {}", view_id);
556 return;
557 }
558 };
559
560 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561 warn!("Failed to send subscribed frame: {}", e);
562 return;
563 }
564
565 let is_derived_with_sort = view_spec.is_derived()
566 && view_spec
567 .pipeline
568 .as_ref()
569 .map(|p| p.sort.is_some())
570 .unwrap_or(false);
571
572 if is_derived_with_sort {
573 attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574 return;
575 }
576
577 match view_spec.mode {
578 Mode::State => {
579 let key = subscription.key.as_deref().unwrap_or("");
580
581 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
582
583 if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
584 let snapshot_entities = vec![SnapshotEntity {
585 key: key.to_string(),
586 data: cached_entity,
587 }];
588 let batch_config = ctx.entity_cache.snapshot_config();
589 let _ = send_snapshot_batches(
590 ctx.client_id,
591 &snapshot_entities,
592 view_spec.mode,
593 view_id,
594 ctx.client_manager,
595 &batch_config,
596 #[cfg(feature = "otel")]
597 ctx.metrics.as_ref(),
598 )
599 .await;
600 rx.borrow_and_update();
601 } else if !rx.borrow().is_empty() {
602 let data = rx.borrow_and_update().clone();
603 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
604 }
605
606 let client_id = ctx.client_id;
607 let client_mgr = ctx.client_manager.clone();
608 let metrics_clone = ctx.metrics.clone();
609 let view_id_clone = view_id.clone();
610 let key_clone = key.to_string();
611 tokio::spawn(
612 async move {
613 loop {
614 tokio::select! {
615 _ = cancel_token.cancelled() => {
616 debug!("State subscription cancelled for client {}", client_id);
617 break;
618 }
619 result = rx.changed() => {
620 if result.is_err() {
621 break;
622 }
623 let data = rx.borrow().clone();
624 if client_mgr.send_to_client(client_id, data).is_err() {
625 break;
626 }
627 if let Some(ref m) = metrics_clone {
628 m.record_ws_message_sent();
629 }
630 }
631 }
632 }
633 }
634 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
635 );
636 }
637 Mode::List | Mode::Append => {
638 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
639
640 let snapshots = ctx.entity_cache.get_all(view_id).await;
641 let snapshot_entities: Vec<SnapshotEntity> = snapshots
642 .into_iter()
643 .filter(|(key, _)| subscription.matches_key(key))
644 .map(|(key, data)| SnapshotEntity { key, data })
645 .collect();
646
647 if !snapshot_entities.is_empty() {
648 let batch_config = ctx.entity_cache.snapshot_config();
649 if send_snapshot_batches(
650 ctx.client_id,
651 &snapshot_entities,
652 view_spec.mode,
653 view_id,
654 ctx.client_manager,
655 &batch_config,
656 #[cfg(feature = "otel")]
657 ctx.metrics.as_ref(),
658 )
659 .await
660 .is_err()
661 {
662 return;
663 }
664 }
665
666 let client_id = ctx.client_id;
667 let client_mgr = ctx.client_manager.clone();
668 let sub = subscription.clone();
669 let metrics_clone = ctx.metrics.clone();
670 let view_id_clone = view_id.clone();
671 let mode = view_spec.mode;
672 tokio::spawn(
673 async move {
674 loop {
675 tokio::select! {
676 _ = cancel_token.cancelled() => {
677 debug!("List subscription cancelled for client {}", client_id);
678 break;
679 }
680 result = rx.recv() => {
681 match result {
682 Ok(envelope) => {
683 if sub.matches(&envelope.entity, &envelope.key) {
684 if client_mgr
685 .send_to_client(client_id, envelope.payload.clone())
686 .is_err()
687 {
688 break;
689 }
690 if let Some(ref m) = metrics_clone {
691 m.record_ws_message_sent();
692 }
693 }
694 }
695 Err(_) => break,
696 }
697 }
698 }
699 }
700 }
701 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
702 );
703 }
704 }
705
706 info!(
707 "Client {} subscribed to {} (mode: {:?})",
708 ctx.client_id, view_id, view_spec.mode
709 );
710}
711
712#[cfg(feature = "otel")]
713async fn attach_derived_view_subscription_otel(
714 ctx: &SubscriptionContext<'_>,
715 subscription: Subscription,
716 view_spec: ViewSpec,
717 cancel_token: CancellationToken,
718) {
719 let view_id = &subscription.view;
720 let pipeline_limit = view_spec
721 .pipeline
722 .as_ref()
723 .and_then(|p| p.limit)
724 .unwrap_or(100);
725 let take = subscription.take.unwrap_or(pipeline_limit);
726 let skip = subscription.skip.unwrap_or(0);
727 let is_single = take == 1;
728
729 let source_view_id = match &view_spec.source_view {
730 Some(s) => s.clone(),
731 None => {
732 warn!("Derived view {} has no source_view", view_id);
733 return;
734 }
735 };
736
737 let sorted_caches = ctx.view_index.sorted_caches();
738 let initial_window: Vec<(String, serde_json::Value)> = {
739 let mut caches = sorted_caches.write().await;
740 if let Some(cache) = caches.get_mut(view_id) {
741 cache.get_window(skip, take)
742 } else {
743 warn!("No sorted cache for derived view {}", view_id);
744 vec![]
745 }
746 };
747
748 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
749
750 if !initial_window.is_empty() {
751 let snapshot_entities: Vec<SnapshotEntity> = initial_window
752 .into_iter()
753 .map(|(key, data)| SnapshotEntity { key, data })
754 .collect();
755
756 let batch_config = ctx.entity_cache.snapshot_config();
757 if send_snapshot_batches(
758 ctx.client_id,
759 &snapshot_entities,
760 view_spec.mode,
761 view_id,
762 ctx.client_manager,
763 &batch_config,
764 ctx.metrics.as_ref(),
765 )
766 .await
767 .is_err()
768 {
769 return;
770 }
771 }
772
773 let mut rx = ctx
774 .bus_manager
775 .get_or_create_list_bus(&source_view_id)
776 .await;
777
778 let client_id = ctx.client_id;
779 let client_mgr = ctx.client_manager.clone();
780 let view_id_clone = view_id.clone();
781 let view_id_span = view_id.clone();
782 let sorted_caches_clone = sorted_caches;
783 let metrics_clone = ctx.metrics.clone();
784 let frame_mode = view_spec.mode;
785
786 tokio::spawn(
787 async move {
788 let mut current_window_keys = initial_keys;
789
790 loop {
791 tokio::select! {
792 _ = cancel_token.cancelled() => {
793 debug!("Derived view subscription cancelled for client {}", client_id);
794 break;
795 }
796 result = rx.recv() => {
797 match result {
798 Ok(_envelope) => {
799 let new_window: Vec<(String, serde_json::Value)> = {
800 let mut caches = sorted_caches_clone.write().await;
801 if let Some(cache) = caches.get_mut(&view_id_clone) {
802 cache.get_window(skip, take)
803 } else {
804 continue;
805 }
806 };
807
808 let new_keys: HashSet<String> =
809 new_window.iter().map(|(k, _)| k.clone()).collect();
810
811 if is_single {
812 if let Some((new_key, data)) = new_window.first() {
813 for old_key in current_window_keys.difference(&new_keys) {
814 let delete_frame = Frame {
815 mode: frame_mode,
816 export: view_id_clone.clone(),
817 op: "delete",
818 key: old_key.clone(),
819 data: serde_json::Value::Null,
820 append: vec![],
821 };
822 if let Ok(json) = serde_json::to_vec(&delete_frame) {
823 let payload = Arc::new(Bytes::from(json));
824 if client_mgr.send_to_client(client_id, payload).is_err() {
825 return;
826 }
827 if let Some(ref m) = metrics_clone {
828 m.record_ws_message_sent();
829 }
830 }
831 }
832
833 let frame = Frame {
834 mode: frame_mode,
835 export: view_id_clone.clone(),
836 op: "upsert",
837 key: new_key.clone(),
838 data: data.clone(),
839 append: vec![],
840 };
841
842 if let Ok(json) = serde_json::to_vec(&frame) {
843 let payload = Arc::new(Bytes::from(json));
844 if client_mgr.send_to_client(client_id, payload).is_err() {
845 return;
846 }
847 if let Some(ref m) = metrics_clone {
848 m.record_ws_message_sent();
849 }
850 }
851 }
852 } else {
853 for key in current_window_keys.difference(&new_keys) {
854 let delete_frame = Frame {
855 mode: frame_mode,
856 export: view_id_clone.clone(),
857 op: "delete",
858 key: key.clone(),
859 data: serde_json::Value::Null,
860 append: vec![],
861 };
862 if let Ok(json) = serde_json::to_vec(&delete_frame) {
863 let payload = Arc::new(Bytes::from(json));
864 if client_mgr.send_to_client(client_id, payload).is_err() {
865 return;
866 }
867 if let Some(ref m) = metrics_clone {
868 m.record_ws_message_sent();
869 }
870 }
871 }
872
873 for (key, data) in &new_window {
874 let frame = Frame {
875 mode: frame_mode,
876 export: view_id_clone.clone(),
877 op: "upsert",
878 key: key.clone(),
879 data: data.clone(),
880 append: vec![],
881 };
882 if let Ok(json) = serde_json::to_vec(&frame) {
883 let payload = Arc::new(Bytes::from(json));
884 if client_mgr.send_to_client(client_id, payload).is_err() {
885 return;
886 }
887 if let Some(ref m) = metrics_clone {
888 m.record_ws_message_sent();
889 }
890 }
891 }
892 }
893
894 current_window_keys = new_keys;
895 }
896 Err(_) => break,
897 }
898 }
899 }
900 }
901 }
902 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
903 );
904
905 info!(
906 "Client {} subscribed to derived view {} (take={}, skip={})",
907 ctx.client_id, view_id, take, skip
908 );
909}
910
911#[cfg(not(feature = "otel"))]
912async fn attach_client_to_bus(
913 ctx: &SubscriptionContext<'_>,
914 subscription: Subscription,
915 cancel_token: CancellationToken,
916) {
917 let view_id = &subscription.view;
918
919 let view_spec = match ctx.view_index.get_view(view_id) {
920 Some(spec) => spec.clone(),
921 None => {
922 warn!("Unknown view ID: {}", view_id);
923 return;
924 }
925 };
926
927 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
928 warn!("Failed to send subscribed frame: {}", e);
929 return;
930 }
931
932 let is_derived_with_sort = view_spec.is_derived()
933 && view_spec
934 .pipeline
935 .as_ref()
936 .map(|p| p.sort.is_some())
937 .unwrap_or(false);
938
939 if is_derived_with_sort {
940 attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
941 return;
942 }
943
944 match view_spec.mode {
945 Mode::State => {
946 let key = subscription.key.as_deref().unwrap_or("");
947
948 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
949
950 if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
951 let snapshot_entities = vec![SnapshotEntity {
952 key: key.to_string(),
953 data: cached_entity,
954 }];
955 let batch_config = ctx.entity_cache.snapshot_config();
956 let _ = send_snapshot_batches(
957 ctx.client_id,
958 &snapshot_entities,
959 view_spec.mode,
960 view_id,
961 ctx.client_manager,
962 &batch_config,
963 )
964 .await;
965 rx.borrow_and_update();
966 } else if !rx.borrow().is_empty() {
967 let data = rx.borrow_and_update().clone();
968 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
969 }
970
971 let client_id = ctx.client_id;
972 let client_mgr = ctx.client_manager.clone();
973 let view_id_clone = view_id.clone();
974 let key_clone = key.to_string();
975 tokio::spawn(
976 async move {
977 loop {
978 tokio::select! {
979 _ = cancel_token.cancelled() => {
980 debug!("State subscription cancelled for client {}", client_id);
981 break;
982 }
983 result = rx.changed() => {
984 if result.is_err() {
985 break;
986 }
987 let data = rx.borrow().clone();
988 if client_mgr.send_to_client(client_id, data).is_err() {
989 break;
990 }
991 }
992 }
993 }
994 }
995 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
996 );
997 }
998 Mode::List | Mode::Append => {
999 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
1000
1001 let snapshots = ctx.entity_cache.get_all(view_id).await;
1002 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1003 .into_iter()
1004 .filter(|(key, _)| subscription.matches_key(key))
1005 .map(|(key, data)| SnapshotEntity { key, data })
1006 .collect();
1007
1008 if !snapshot_entities.is_empty() {
1009 let batch_config = ctx.entity_cache.snapshot_config();
1010 if send_snapshot_batches(
1011 ctx.client_id,
1012 &snapshot_entities,
1013 view_spec.mode,
1014 view_id,
1015 ctx.client_manager,
1016 &batch_config,
1017 )
1018 .await
1019 .is_err()
1020 {
1021 return;
1022 }
1023 }
1024
1025 let client_id = ctx.client_id;
1026 let client_mgr = ctx.client_manager.clone();
1027 let sub = subscription.clone();
1028 let view_id_clone = view_id.clone();
1029 let mode = view_spec.mode;
1030 tokio::spawn(
1031 async move {
1032 loop {
1033 tokio::select! {
1034 _ = cancel_token.cancelled() => {
1035 debug!("List subscription cancelled for client {}", client_id);
1036 break;
1037 }
1038 result = rx.recv() => {
1039 match result {
1040 Ok(envelope) => {
1041 if sub.matches(&envelope.entity, &envelope.key)
1042 && client_mgr
1043 .send_to_client(client_id, envelope.payload.clone())
1044 .is_err()
1045 {
1046 break;
1047 }
1048 }
1049 Err(_) => break,
1050 }
1051 }
1052 }
1053 }
1054 }
1055 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1056 );
1057 }
1058 }
1059
1060 info!(
1061 "Client {} subscribed to {} (mode: {:?})",
1062 ctx.client_id, view_id, view_spec.mode
1063 );
1064}
1065
1066#[cfg(not(feature = "otel"))]
1067async fn attach_derived_view_subscription(
1068 ctx: &SubscriptionContext<'_>,
1069 subscription: Subscription,
1070 view_spec: ViewSpec,
1071 cancel_token: CancellationToken,
1072) {
1073 let view_id = &subscription.view;
1074 let pipeline_limit = view_spec
1075 .pipeline
1076 .as_ref()
1077 .and_then(|p| p.limit)
1078 .unwrap_or(100);
1079 let take = subscription.take.unwrap_or(pipeline_limit);
1080 let skip = subscription.skip.unwrap_or(0);
1081 let is_single = take == 1;
1082
1083 let source_view_id = match &view_spec.source_view {
1084 Some(s) => s.clone(),
1085 None => {
1086 warn!("Derived view {} has no source_view", view_id);
1087 return;
1088 }
1089 };
1090
1091 let sorted_caches = ctx.view_index.sorted_caches();
1092 let initial_window: Vec<(String, serde_json::Value)> = {
1093 let mut caches = sorted_caches.write().await;
1094 if let Some(cache) = caches.get_mut(view_id) {
1095 cache.get_window(skip, take)
1096 } else {
1097 warn!("No sorted cache for derived view {}", view_id);
1098 vec![]
1099 }
1100 };
1101
1102 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1103
1104 if !initial_window.is_empty() {
1105 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1106 .into_iter()
1107 .map(|(key, data)| SnapshotEntity { key, data })
1108 .collect();
1109
1110 let batch_config = ctx.entity_cache.snapshot_config();
1111 if send_snapshot_batches(
1112 ctx.client_id,
1113 &snapshot_entities,
1114 view_spec.mode,
1115 view_id,
1116 ctx.client_manager,
1117 &batch_config,
1118 )
1119 .await
1120 .is_err()
1121 {
1122 return;
1123 }
1124 }
1125
1126 let mut rx = ctx
1127 .bus_manager
1128 .get_or_create_list_bus(&source_view_id)
1129 .await;
1130
1131 let client_id = ctx.client_id;
1132 let client_mgr = ctx.client_manager.clone();
1133 let view_id_clone = view_id.clone();
1134 let view_id_span = view_id.clone();
1135 let sorted_caches_clone = sorted_caches;
1136 let frame_mode = view_spec.mode;
1137
1138 tokio::spawn(
1139 async move {
1140 let mut current_window_keys = initial_keys;
1141
1142 loop {
1143 tokio::select! {
1144 _ = cancel_token.cancelled() => {
1145 debug!("Derived view subscription cancelled for client {}", client_id);
1146 break;
1147 }
1148 result = rx.recv() => {
1149 match result {
1150 Ok(_envelope) => {
1151 let new_window: Vec<(String, serde_json::Value)> = {
1152 let mut caches = sorted_caches_clone.write().await;
1153 if let Some(cache) = caches.get_mut(&view_id_clone) {
1154 cache.get_window(skip, take)
1155 } else {
1156 continue;
1157 }
1158 };
1159
1160 let new_keys: HashSet<String> =
1161 new_window.iter().map(|(k, _)| k.clone()).collect();
1162
1163 if is_single {
1164 if let Some((new_key, data)) = new_window.first() {
1165 for old_key in current_window_keys.difference(&new_keys) {
1166 let delete_frame = Frame {
1167 mode: frame_mode,
1168 export: view_id_clone.clone(),
1169 op: "delete",
1170 key: old_key.clone(),
1171 data: serde_json::Value::Null,
1172 append: vec![],
1173 };
1174 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1175 let payload = Arc::new(Bytes::from(json));
1176 if client_mgr.send_to_client(client_id, payload).is_err() {
1177 return;
1178 }
1179 }
1180 }
1181
1182 let frame = Frame {
1183 mode: frame_mode,
1184 export: view_id_clone.clone(),
1185 op: "upsert",
1186 key: new_key.clone(),
1187 data: data.clone(),
1188 append: vec![],
1189 };
1190 if let Ok(json) = serde_json::to_vec(&frame) {
1191 let payload = Arc::new(Bytes::from(json));
1192 if client_mgr.send_to_client(client_id, payload).is_err() {
1193 return;
1194 }
1195 }
1196 }
1197 } else {
1198 for key in current_window_keys.difference(&new_keys) {
1199 let delete_frame = Frame {
1200 mode: frame_mode,
1201 export: view_id_clone.clone(),
1202 op: "delete",
1203 key: key.clone(),
1204 data: serde_json::Value::Null,
1205 append: vec![],
1206 };
1207 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1208 let payload = Arc::new(Bytes::from(json));
1209 if client_mgr.send_to_client(client_id, payload).is_err() {
1210 return;
1211 }
1212 }
1213 }
1214
1215 for (key, data) in &new_window {
1216 let frame = Frame {
1217 mode: frame_mode,
1218 export: view_id_clone.clone(),
1219 op: "upsert",
1220 key: key.clone(),
1221 data: data.clone(),
1222 append: vec![],
1223 };
1224 if let Ok(json) = serde_json::to_vec(&frame) {
1225 let payload = Arc::new(Bytes::from(json));
1226 if client_mgr.send_to_client(client_id, payload).is_err() {
1227 return;
1228 }
1229 }
1230 }
1231 }
1232
1233 current_window_keys = new_keys;
1234 }
1235 Err(_) => break,
1236 }
1237 }
1238 }
1239 }
1240 }
1241 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1242 );
1243
1244 info!(
1245 "Client {} subscribed to derived view {} (take={}, skip={})",
1246 ctx.client_id, view_id, take, skip
1247 );
1248}