1use crate::bus::BusManager;
2use crate::cache::{EntityCache, SnapshotBatchConfig};
3use crate::compression::maybe_compress;
4use crate::view::{ViewIndex, ViewSpec};
5use crate::websocket::client_manager::ClientManager;
6use crate::websocket::frame::{
7 Frame, Mode, SnapshotEntity, SnapshotFrame, SortConfig, SortOrder, SubscribedFrame,
8};
9use crate::websocket::subscription::{ClientMessage, Subscription};
10use anyhow::Result;
11use bytes::Bytes;
12use futures_util::StreamExt;
13use std::collections::HashSet;
14use std::net::SocketAddr;
15use std::sync::Arc;
16#[cfg(feature = "otel")]
17use std::time::Instant;
18
19use tokio::net::{TcpListener, TcpStream};
20use tokio_tungstenite::accept_async;
21use tokio_util::sync::CancellationToken;
22use tracing::{debug, error, info, info_span, warn, Instrument};
23use uuid::Uuid;
24
25#[cfg(feature = "otel")]
26use crate::metrics::Metrics;
27
28struct SubscriptionContext<'a> {
29 client_id: Uuid,
30 client_manager: &'a ClientManager,
31 bus_manager: &'a BusManager,
32 entity_cache: &'a EntityCache,
33 view_index: &'a ViewIndex,
34 #[cfg(feature = "otel")]
35 metrics: Option<Arc<Metrics>>,
36}
37
38pub struct WebSocketServer {
39 bind_addr: SocketAddr,
40 client_manager: ClientManager,
41 bus_manager: BusManager,
42 entity_cache: EntityCache,
43 view_index: Arc<ViewIndex>,
44 max_clients: usize,
45 #[cfg(feature = "otel")]
46 metrics: Option<Arc<Metrics>>,
47}
48
49impl WebSocketServer {
50 #[cfg(feature = "otel")]
51 pub fn new(
52 bind_addr: SocketAddr,
53 bus_manager: BusManager,
54 entity_cache: EntityCache,
55 view_index: Arc<ViewIndex>,
56 metrics: Option<Arc<Metrics>>,
57 ) -> Self {
58 Self {
59 bind_addr,
60 client_manager: ClientManager::new(),
61 bus_manager,
62 entity_cache,
63 view_index,
64 max_clients: 10000,
65 metrics,
66 }
67 }
68
69 #[cfg(not(feature = "otel"))]
70 pub fn new(
71 bind_addr: SocketAddr,
72 bus_manager: BusManager,
73 entity_cache: EntityCache,
74 view_index: Arc<ViewIndex>,
75 ) -> Self {
76 Self {
77 bind_addr,
78 client_manager: ClientManager::new(),
79 bus_manager,
80 entity_cache,
81 view_index,
82 max_clients: 10000,
83 }
84 }
85
86 pub fn with_max_clients(mut self, max_clients: usize) -> Self {
87 self.max_clients = max_clients;
88 self
89 }
90
91 pub async fn start(self) -> Result<()> {
92 info!(
93 "Starting WebSocket server on {} (max_clients: {})",
94 self.bind_addr, self.max_clients
95 );
96
97 let listener = TcpListener::bind(&self.bind_addr).await?;
98 info!("WebSocket server listening on {}", self.bind_addr);
99
100 self.client_manager.start_cleanup_task();
101
102 loop {
103 match listener.accept().await {
104 Ok((stream, addr)) => {
105 let client_count = self.client_manager.client_count();
106 if client_count >= self.max_clients {
107 warn!(
108 "Rejecting connection from {} - max clients ({}) reached",
109 addr, self.max_clients
110 );
111 drop(stream);
112 continue;
113 }
114
115 #[cfg(feature = "otel")]
116 if let Some(ref metrics) = self.metrics {
117 metrics.record_ws_connection();
118 }
119
120 info!(
121 "New WebSocket connection from {} ({}/{} clients)",
122 addr,
123 client_count + 1,
124 self.max_clients
125 );
126 let client_manager = self.client_manager.clone();
127 let bus_manager = self.bus_manager.clone();
128 let entity_cache = self.entity_cache.clone();
129 let view_index = self.view_index.clone();
130 #[cfg(feature = "otel")]
131 let metrics = self.metrics.clone();
132
133 tokio::spawn(
134 async move {
135 #[cfg(feature = "otel")]
136 let result = handle_connection(
137 stream,
138 client_manager,
139 bus_manager,
140 entity_cache,
141 view_index,
142 metrics,
143 )
144 .await;
145 #[cfg(not(feature = "otel"))]
146 let result = handle_connection(
147 stream,
148 client_manager,
149 bus_manager,
150 entity_cache,
151 view_index,
152 )
153 .await;
154
155 if let Err(e) = result {
156 error!("WebSocket connection error: {}", e);
157 }
158 }
159 .instrument(info_span!("ws.connection", %addr)),
160 );
161 }
162 Err(e) => {
163 error!("Failed to accept connection: {}", e);
164 }
165 }
166 }
167 }
168}
169
170#[cfg(feature = "otel")]
171async fn handle_connection(
172 stream: TcpStream,
173 client_manager: ClientManager,
174 bus_manager: BusManager,
175 entity_cache: EntityCache,
176 view_index: Arc<ViewIndex>,
177 metrics: Option<Arc<Metrics>>,
178) -> Result<()> {
179 let ws_stream = accept_async(stream).await?;
180 let client_id = Uuid::new_v4();
181 let connection_start = Instant::now();
182
183 info!("WebSocket connection established for client {}", client_id);
184
185 let (ws_sender, mut ws_receiver) = ws_stream.split();
186
187 client_manager.add_client(client_id, ws_sender);
188
189 let ctx = SubscriptionContext {
190 client_id,
191 client_manager: &client_manager,
192 bus_manager: &bus_manager,
193 entity_cache: &entity_cache,
194 view_index: &view_index,
195 metrics: metrics.clone(),
196 };
197
198 let mut active_subscriptions: Vec<String> = Vec::new();
199
200 loop {
201 tokio::select! {
202 ws_msg = ws_receiver.next() => {
203 match ws_msg {
204 Some(Ok(msg)) => {
205 if msg.is_close() {
206 info!("Client {} requested close", client_id);
207 break;
208 }
209
210 client_manager.update_client_last_seen(client_id);
211
212 if msg.is_text() {
213 if let Some(ref m) = metrics {
214 m.record_ws_message_received();
215 }
216
217 if let Ok(text) = msg.to_text() {
218 debug!("Received text message from client {}: {}", client_id, text);
219
220 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
221 match client_msg {
222 ClientMessage::Subscribe(subscription) => {
223 let view_id = subscription.view.clone();
224 let sub_key = subscription.sub_key();
225 client_manager.update_subscription(client_id, subscription.clone());
226
227 let cancel_token = CancellationToken::new();
228 let is_new = client_manager.add_client_subscription(
229 client_id,
230 sub_key.clone(),
231 cancel_token.clone(),
232 ).await;
233
234 if !is_new {
235 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
236 continue;
237 }
238
239 if let Some(ref m) = metrics {
240 m.record_subscription_created(&view_id);
241 }
242 active_subscriptions.push(view_id);
243
244 attach_client_to_bus(&ctx, subscription, cancel_token).await;
245 }
246 ClientMessage::Unsubscribe(unsub) => {
247 let sub_key = unsub.sub_key();
248 let removed = client_manager
249 .remove_client_subscription(client_id, &sub_key)
250 .await;
251
252 if removed {
253 info!("Client {} unsubscribed from {}", client_id, sub_key);
254 if let Some(ref m) = metrics {
255 m.record_subscription_removed(&unsub.view);
256 }
257 }
258 }
259 ClientMessage::Ping => {
260 debug!("Received ping from client {}", client_id);
261 }
262 }
263 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
264 let view_id = subscription.view.clone();
265 let sub_key = subscription.sub_key();
266 client_manager.update_subscription(client_id, subscription.clone());
267
268 let cancel_token = CancellationToken::new();
269 let is_new = client_manager.add_client_subscription(
270 client_id,
271 sub_key.clone(),
272 cancel_token.clone(),
273 ).await;
274
275 if !is_new {
276 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
277 continue;
278 }
279
280 if let Some(ref m) = metrics {
281 m.record_subscription_created(&view_id);
282 }
283 active_subscriptions.push(view_id);
284
285 attach_client_to_bus(&ctx, subscription, cancel_token).await;
286 } else {
287 debug!("Received non-subscription message from client {}: {}", client_id, text);
288 }
289 }
290 }
291 }
292 Some(Err(e)) => {
293 warn!("WebSocket error for client {}: {}", client_id, e);
294 break;
295 }
296 None => {
297 debug!("WebSocket stream ended for client {}", client_id);
298 break;
299 }
300 }
301 }
302 }
303 }
304
305 client_manager
306 .cancel_all_client_subscriptions(client_id)
307 .await;
308 client_manager.remove_client(client_id);
309
310 if let Some(ref m) = metrics {
311 let duration_secs = connection_start.elapsed().as_secs_f64();
312 m.record_ws_disconnection(duration_secs);
313
314 for view_id in active_subscriptions {
315 m.record_subscription_removed(&view_id);
316 }
317 }
318
319 info!("Client {} disconnected", client_id);
320
321 Ok(())
322}
323
324#[cfg(not(feature = "otel"))]
325async fn handle_connection(
326 stream: TcpStream,
327 client_manager: ClientManager,
328 bus_manager: BusManager,
329 entity_cache: EntityCache,
330 view_index: Arc<ViewIndex>,
331) -> Result<()> {
332 let ws_stream = accept_async(stream).await?;
333 let client_id = Uuid::new_v4();
334
335 info!("WebSocket connection established for client {}", client_id);
336
337 let (ws_sender, mut ws_receiver) = ws_stream.split();
338
339 client_manager.add_client(client_id, ws_sender);
340
341 let ctx = SubscriptionContext {
342 client_id,
343 client_manager: &client_manager,
344 bus_manager: &bus_manager,
345 entity_cache: &entity_cache,
346 view_index: &view_index,
347 };
348
349 loop {
350 tokio::select! {
351 ws_msg = ws_receiver.next() => {
352 match ws_msg {
353 Some(Ok(msg)) => {
354 if msg.is_close() {
355 info!("Client {} requested close", client_id);
356 break;
357 }
358
359 client_manager.update_client_last_seen(client_id);
360
361 if msg.is_text() {
362 if let Ok(text) = msg.to_text() {
363 debug!("Received text message from client {}: {}", client_id, text);
364
365 if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(text) {
366 match client_msg {
367 ClientMessage::Subscribe(subscription) => {
368 let sub_key = subscription.sub_key();
369 client_manager.update_subscription(client_id, subscription.clone());
370
371 let cancel_token = CancellationToken::new();
372 let is_new = client_manager.add_client_subscription(
373 client_id,
374 sub_key.clone(),
375 cancel_token.clone(),
376 ).await;
377
378 if !is_new {
379 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
380 continue;
381 }
382
383 attach_client_to_bus(&ctx, subscription, cancel_token).await;
384 }
385 ClientMessage::Unsubscribe(unsub) => {
386 let sub_key = unsub.sub_key();
387 let removed = client_manager
388 .remove_client_subscription(client_id, &sub_key)
389 .await;
390
391 if removed {
392 info!("Client {} unsubscribed from {}", client_id, sub_key);
393 }
394 }
395 ClientMessage::Ping => {
396 debug!("Received ping from client {}", client_id);
397 }
398 }
399 } else if let Ok(subscription) = serde_json::from_str::<Subscription>(text) {
400 let sub_key = subscription.sub_key();
401 client_manager.update_subscription(client_id, subscription.clone());
402
403 let cancel_token = CancellationToken::new();
404 let is_new = client_manager.add_client_subscription(
405 client_id,
406 sub_key.clone(),
407 cancel_token.clone(),
408 ).await;
409
410 if !is_new {
411 debug!("Client {} already subscribed to {}, ignoring duplicate", client_id, sub_key);
412 continue;
413 }
414
415 attach_client_to_bus(&ctx, subscription, cancel_token).await;
416 } else {
417 debug!("Received non-subscription message from client {}: {}", client_id, text);
418 }
419 }
420 }
421 }
422 Some(Err(e)) => {
423 warn!("WebSocket error for client {}: {}", client_id, e);
424 break;
425 }
426 None => {
427 debug!("WebSocket stream ended for client {}", client_id);
428 break;
429 }
430 }
431 }
432 }
433 }
434
435 client_manager
436 .cancel_all_client_subscriptions(client_id)
437 .await;
438 client_manager.remove_client(client_id);
439 info!("Client {} disconnected", client_id);
440
441 Ok(())
442}
443
444async fn send_snapshot_batches(
445 client_id: Uuid,
446 entities: &[SnapshotEntity],
447 mode: Mode,
448 view_id: &str,
449 client_manager: &ClientManager,
450 batch_config: &SnapshotBatchConfig,
451 #[cfg(feature = "otel")] metrics: Option<&Arc<Metrics>>,
452) -> Result<()> {
453 let total = entities.len();
454 if total == 0 {
455 return Ok(());
456 }
457
458 let mut offset = 0;
459 let mut batch_num = 0;
460
461 while offset < total {
462 let batch_size = if batch_num == 0 {
463 batch_config.initial_batch_size
464 } else {
465 batch_config.subsequent_batch_size
466 };
467
468 let end = (offset + batch_size).min(total);
469 let batch_data: Vec<SnapshotEntity> = entities[offset..end].to_vec();
470 let is_complete = end >= total;
471
472 let snapshot_frame = SnapshotFrame {
473 mode,
474 export: view_id.to_string(),
475 op: "snapshot",
476 data: batch_data,
477 complete: is_complete,
478 };
479
480 if let Ok(json_payload) = serde_json::to_vec(&snapshot_frame) {
481 let payload = maybe_compress(&json_payload);
482 if client_manager
483 .send_compressed_async(client_id, payload)
484 .await
485 .is_err()
486 {
487 return Err(anyhow::anyhow!("Failed to send snapshot batch"));
488 }
489 #[cfg(feature = "otel")]
490 if let Some(m) = metrics {
491 m.record_ws_message_sent();
492 }
493 }
494
495 offset = end;
496 batch_num += 1;
497 }
498
499 debug!(
500 "Sent {} snapshot batches ({} entities) for {} to client {}",
501 batch_num, total, view_id, client_id
502 );
503
504 Ok(())
505}
506
507fn extract_sort_config(view_spec: &ViewSpec) -> Option<SortConfig> {
508 if let Some(sort) = view_spec.pipeline.as_ref().and_then(|p| p.sort.as_ref()) {
509 return Some(SortConfig {
510 field: sort.field_path.clone(),
511 order: match sort.order {
512 crate::materialized_view::SortOrder::Asc => SortOrder::Asc,
513 crate::materialized_view::SortOrder::Desc => SortOrder::Desc,
514 },
515 });
516 }
517
518 if view_spec.mode == Mode::List {
519 return Some(SortConfig {
520 field: vec!["_seq".to_string()],
521 order: SortOrder::Desc,
522 });
523 }
524
525 None
526}
527
528fn send_subscribed_frame(
529 client_id: Uuid,
530 view_id: &str,
531 view_spec: &ViewSpec,
532 client_manager: &ClientManager,
533) -> Result<()> {
534 let sort_config = extract_sort_config(view_spec);
535 let subscribed_frame = SubscribedFrame::new(view_id.to_string(), view_spec.mode, sort_config);
536
537 let json_payload = serde_json::to_vec(&subscribed_frame)?;
538 let payload = Arc::new(Bytes::from(json_payload));
539 client_manager
540 .send_to_client(client_id, payload)
541 .map_err(|e| anyhow::anyhow!("Failed to send subscribed frame: {:?}", e))
542}
543
544#[cfg(feature = "otel")]
545async fn attach_client_to_bus(
546 ctx: &SubscriptionContext<'_>,
547 subscription: Subscription,
548 cancel_token: CancellationToken,
549) {
550 let view_id = &subscription.view;
551
552 let view_spec = match ctx.view_index.get_view(view_id) {
553 Some(spec) => spec.clone(),
554 None => {
555 warn!("Unknown view ID: {}", view_id);
556 return;
557 }
558 };
559
560 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
561 warn!("Failed to send subscribed frame: {}", e);
562 return;
563 }
564
565 let is_derived_with_sort = view_spec.is_derived()
566 && view_spec
567 .pipeline
568 .as_ref()
569 .map(|p| p.sort.is_some())
570 .unwrap_or(false);
571
572 if is_derived_with_sort {
573 attach_derived_view_subscription_otel(ctx, subscription, view_spec, cancel_token).await;
574 return;
575 }
576
577 match view_spec.mode {
578 Mode::State => {
579 let key = subscription.key.as_deref().unwrap_or("");
580
581 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
582
583 if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
584 let snapshot_entities = vec![SnapshotEntity {
585 key: key.to_string(),
586 data: cached_entity,
587 }];
588 let batch_config = ctx.entity_cache.snapshot_config();
589 let _ = send_snapshot_batches(
590 ctx.client_id,
591 &snapshot_entities,
592 view_spec.mode,
593 view_id,
594 ctx.client_manager,
595 &batch_config,
596 #[cfg(feature = "otel")]
597 ctx.metrics.as_ref(),
598 )
599 .await;
600 rx.borrow_and_update();
601 } else if !rx.borrow().is_empty() {
602 let data = rx.borrow_and_update().clone();
603 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
604 }
605
606 let client_id = ctx.client_id;
607 let client_mgr = ctx.client_manager.clone();
608 let metrics_clone = ctx.metrics.clone();
609 let view_id_clone = view_id.clone();
610 let key_clone = key.to_string();
611 tokio::spawn(
612 async move {
613 loop {
614 tokio::select! {
615 _ = cancel_token.cancelled() => {
616 debug!("State subscription cancelled for client {}", client_id);
617 break;
618 }
619 result = rx.changed() => {
620 if result.is_err() {
621 break;
622 }
623 let data = rx.borrow().clone();
624 if client_mgr.send_to_client(client_id, data).is_err() {
625 break;
626 }
627 if let Some(ref m) = metrics_clone {
628 m.record_ws_message_sent();
629 }
630 }
631 }
632 }
633 }
634 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
635 );
636 }
637 Mode::List | Mode::Append => {
638 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
639
640 let snapshots = ctx.entity_cache.get_all(view_id).await;
641 let snapshot_entities: Vec<SnapshotEntity> = snapshots
642 .into_iter()
643 .filter(|(key, _)| subscription.matches_key(key))
644 .map(|(key, data)| SnapshotEntity { key, data })
645 .collect();
646
647 if !snapshot_entities.is_empty() {
648 let batch_config = ctx.entity_cache.snapshot_config();
649 if send_snapshot_batches(
650 ctx.client_id,
651 &snapshot_entities,
652 view_spec.mode,
653 view_id,
654 ctx.client_manager,
655 &batch_config,
656 #[cfg(feature = "otel")]
657 ctx.metrics.as_ref(),
658 )
659 .await
660 .is_err()
661 {
662 return;
663 }
664 }
665
666 let client_id = ctx.client_id;
667 let client_mgr = ctx.client_manager.clone();
668 let sub = subscription.clone();
669 let metrics_clone = ctx.metrics.clone();
670 let view_id_clone = view_id.clone();
671 let mode = view_spec.mode;
672 tokio::spawn(
673 async move {
674 loop {
675 tokio::select! {
676 _ = cancel_token.cancelled() => {
677 debug!("List subscription cancelled for client {}", client_id);
678 break;
679 }
680 result = rx.recv() => {
681 match result {
682 Ok(envelope) => {
683 if sub.matches(&envelope.entity, &envelope.key) {
684 if client_mgr
685 .send_to_client(client_id, envelope.payload.clone())
686 .is_err()
687 {
688 break;
689 }
690 if let Some(ref m) = metrics_clone {
691 m.record_ws_message_sent();
692 }
693 }
694 }
695 Err(_) => break,
696 }
697 }
698 }
699 }
700 }
701 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
702 );
703 }
704 }
705
706 info!(
707 "Client {} subscribed to {} (mode: {:?})",
708 ctx.client_id, view_id, view_spec.mode
709 );
710}
711
712#[cfg(feature = "otel")]
713async fn attach_derived_view_subscription_otel(
714 ctx: &SubscriptionContext<'_>,
715 subscription: Subscription,
716 view_spec: ViewSpec,
717 cancel_token: CancellationToken,
718) {
719 let view_id = &subscription.view;
720 let pipeline_limit = view_spec
721 .pipeline
722 .as_ref()
723 .and_then(|p| p.limit)
724 .unwrap_or(100);
725 let take = subscription.take.unwrap_or(pipeline_limit);
726 let skip = subscription.skip.unwrap_or(0);
727 let is_single = take == 1;
728
729 let source_view_id = match &view_spec.source_view {
730 Some(s) => s.clone(),
731 None => {
732 warn!("Derived view {} has no source_view", view_id);
733 return;
734 }
735 };
736
737 let sorted_caches = ctx.view_index.sorted_caches();
738 let initial_window: Vec<(String, serde_json::Value)> = {
739 let mut caches = sorted_caches.write().await;
740 if let Some(cache) = caches.get_mut(view_id) {
741 cache.get_window(skip, take)
742 } else {
743 warn!("No sorted cache for derived view {}", view_id);
744 vec![]
745 }
746 };
747
748 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
749
750 if !initial_window.is_empty() {
751 let snapshot_entities: Vec<SnapshotEntity> = initial_window
752 .into_iter()
753 .map(|(key, data)| SnapshotEntity { key, data })
754 .collect();
755
756 let batch_config = ctx.entity_cache.snapshot_config();
757 if send_snapshot_batches(
758 ctx.client_id,
759 &snapshot_entities,
760 view_spec.mode,
761 view_id,
762 ctx.client_manager,
763 &batch_config,
764 ctx.metrics.as_ref(),
765 )
766 .await
767 .is_err()
768 {
769 return;
770 }
771 }
772
773 let mut rx = ctx
774 .bus_manager
775 .get_or_create_list_bus(&source_view_id)
776 .await;
777
778 let client_id = ctx.client_id;
779 let client_mgr = ctx.client_manager.clone();
780 let view_id_clone = view_id.clone();
781 let view_id_span = view_id.clone();
782 let sorted_caches_clone = sorted_caches;
783 let metrics_clone = ctx.metrics.clone();
784 let frame_mode = view_spec.mode;
785
786 tokio::spawn(
787 async move {
788 let mut current_window_keys = initial_keys;
789
790 loop {
791 tokio::select! {
792 _ = cancel_token.cancelled() => {
793 debug!("Derived view subscription cancelled for client {}", client_id);
794 break;
795 }
796 result = rx.recv() => {
797 match result {
798 Ok(_envelope) => {
799 let new_window: Vec<(String, serde_json::Value)> = {
800 let mut caches = sorted_caches_clone.write().await;
801 if let Some(cache) = caches.get_mut(&view_id_clone) {
802 cache.get_window(skip, take)
803 } else {
804 continue;
805 }
806 };
807
808 let new_keys: HashSet<String> =
809 new_window.iter().map(|(k, _)| k.clone()).collect();
810
811 if is_single {
812 if let Some((new_key, data)) = new_window.first() {
813 for old_key in current_window_keys.difference(&new_keys) {
814 let delete_frame = Frame {
815 mode: frame_mode,
816 export: view_id_clone.clone(),
817 op: "delete",
818 key: old_key.clone(),
819 data: serde_json::Value::Null,
820 append: vec![],
821 };
822 if let Ok(json) = serde_json::to_vec(&delete_frame) {
823 let payload = Arc::new(Bytes::from(json));
824 if client_mgr.send_to_client(client_id, payload).is_err() {
825 return;
826 }
827 if let Some(ref m) = metrics_clone {
828 m.record_ws_message_sent();
829 }
830 }
831 }
832
833 let frame = Frame {
834 mode: frame_mode,
835 export: view_id_clone.clone(),
836 op: "upsert",
837 key: new_key.clone(),
838 data: data.clone(),
839 append: vec![],
840 };
841 if let Ok(json) = serde_json::to_vec(&frame) {
842 let payload = Arc::new(Bytes::from(json));
843 if client_mgr.send_to_client(client_id, payload).is_err() {
844 return;
845 }
846 if let Some(ref m) = metrics_clone {
847 m.record_ws_message_sent();
848 }
849 }
850 }
851 } else {
852 for key in current_window_keys.difference(&new_keys) {
853 let delete_frame = Frame {
854 mode: frame_mode,
855 export: view_id_clone.clone(),
856 op: "delete",
857 key: key.clone(),
858 data: serde_json::Value::Null,
859 append: vec![],
860 };
861 if let Ok(json) = serde_json::to_vec(&delete_frame) {
862 let payload = Arc::new(Bytes::from(json));
863 if client_mgr.send_to_client(client_id, payload).is_err() {
864 return;
865 }
866 if let Some(ref m) = metrics_clone {
867 m.record_ws_message_sent();
868 }
869 }
870 }
871
872 for (key, data) in &new_window {
873 let frame = Frame {
874 mode: frame_mode,
875 export: view_id_clone.clone(),
876 op: "upsert",
877 key: key.clone(),
878 data: data.clone(),
879 append: vec![],
880 };
881 if let Ok(json) = serde_json::to_vec(&frame) {
882 let payload = Arc::new(Bytes::from(json));
883 if client_mgr.send_to_client(client_id, payload).is_err() {
884 return;
885 }
886 if let Some(ref m) = metrics_clone {
887 m.record_ws_message_sent();
888 }
889 }
890 }
891 }
892
893 current_window_keys = new_keys;
894 }
895 Err(_) => break,
896 }
897 }
898 }
899 }
900 }
901 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
902 );
903
904 info!(
905 "Client {} subscribed to derived view {} (take={}, skip={})",
906 ctx.client_id, view_id, take, skip
907 );
908}
909
910#[cfg(not(feature = "otel"))]
911async fn attach_client_to_bus(
912 ctx: &SubscriptionContext<'_>,
913 subscription: Subscription,
914 cancel_token: CancellationToken,
915) {
916 let view_id = &subscription.view;
917
918 let view_spec = match ctx.view_index.get_view(view_id) {
919 Some(spec) => spec.clone(),
920 None => {
921 warn!("Unknown view ID: {}", view_id);
922 return;
923 }
924 };
925
926 if let Err(e) = send_subscribed_frame(ctx.client_id, view_id, &view_spec, ctx.client_manager) {
927 warn!("Failed to send subscribed frame: {}", e);
928 return;
929 }
930
931 let is_derived_with_sort = view_spec.is_derived()
932 && view_spec
933 .pipeline
934 .as_ref()
935 .map(|p| p.sort.is_some())
936 .unwrap_or(false);
937
938 if is_derived_with_sort {
939 attach_derived_view_subscription(ctx, subscription, view_spec, cancel_token).await;
940 return;
941 }
942
943 match view_spec.mode {
944 Mode::State => {
945 let key = subscription.key.as_deref().unwrap_or("");
946
947 let mut rx = ctx.bus_manager.get_or_create_state_bus(view_id, key).await;
948
949 if let Some(cached_entity) = ctx.entity_cache.get(view_id, key).await {
950 let snapshot_entities = vec![SnapshotEntity {
951 key: key.to_string(),
952 data: cached_entity,
953 }];
954 let batch_config = ctx.entity_cache.snapshot_config();
955 let _ = send_snapshot_batches(
956 ctx.client_id,
957 &snapshot_entities,
958 view_spec.mode,
959 view_id,
960 ctx.client_manager,
961 &batch_config,
962 )
963 .await;
964 rx.borrow_and_update();
965 } else if !rx.borrow().is_empty() {
966 let data = rx.borrow_and_update().clone();
967 let _ = ctx.client_manager.send_to_client(ctx.client_id, data);
968 }
969
970 let client_id = ctx.client_id;
971 let client_mgr = ctx.client_manager.clone();
972 let view_id_clone = view_id.clone();
973 let key_clone = key.to_string();
974 tokio::spawn(
975 async move {
976 loop {
977 tokio::select! {
978 _ = cancel_token.cancelled() => {
979 debug!("State subscription cancelled for client {}", client_id);
980 break;
981 }
982 result = rx.changed() => {
983 if result.is_err() {
984 break;
985 }
986 let data = rx.borrow().clone();
987 if client_mgr.send_to_client(client_id, data).is_err() {
988 break;
989 }
990 }
991 }
992 }
993 }
994 .instrument(info_span!("ws.subscribe.state", %client_id, view = %view_id_clone, key = %key_clone)),
995 );
996 }
997 Mode::List | Mode::Append => {
998 let mut rx = ctx.bus_manager.get_or_create_list_bus(view_id).await;
999
1000 let snapshots = ctx.entity_cache.get_all(view_id).await;
1001 let snapshot_entities: Vec<SnapshotEntity> = snapshots
1002 .into_iter()
1003 .filter(|(key, _)| subscription.matches_key(key))
1004 .map(|(key, data)| SnapshotEntity { key, data })
1005 .collect();
1006
1007 if !snapshot_entities.is_empty() {
1008 let batch_config = ctx.entity_cache.snapshot_config();
1009 if send_snapshot_batches(
1010 ctx.client_id,
1011 &snapshot_entities,
1012 view_spec.mode,
1013 view_id,
1014 ctx.client_manager,
1015 &batch_config,
1016 )
1017 .await
1018 .is_err()
1019 {
1020 return;
1021 }
1022 }
1023
1024 let client_id = ctx.client_id;
1025 let client_mgr = ctx.client_manager.clone();
1026 let sub = subscription.clone();
1027 let view_id_clone = view_id.clone();
1028 let mode = view_spec.mode;
1029 tokio::spawn(
1030 async move {
1031 loop {
1032 tokio::select! {
1033 _ = cancel_token.cancelled() => {
1034 debug!("List subscription cancelled for client {}", client_id);
1035 break;
1036 }
1037 result = rx.recv() => {
1038 match result {
1039 Ok(envelope) => {
1040 if sub.matches(&envelope.entity, &envelope.key)
1041 && client_mgr
1042 .send_to_client(client_id, envelope.payload.clone())
1043 .is_err()
1044 {
1045 break;
1046 }
1047 }
1048 Err(_) => break,
1049 }
1050 }
1051 }
1052 }
1053 }
1054 .instrument(info_span!("ws.subscribe.list", %client_id, view = %view_id_clone, mode = ?mode)),
1055 );
1056 }
1057 }
1058
1059 info!(
1060 "Client {} subscribed to {} (mode: {:?})",
1061 ctx.client_id, view_id, view_spec.mode
1062 );
1063}
1064
1065#[cfg(not(feature = "otel"))]
1066async fn attach_derived_view_subscription(
1067 ctx: &SubscriptionContext<'_>,
1068 subscription: Subscription,
1069 view_spec: ViewSpec,
1070 cancel_token: CancellationToken,
1071) {
1072 let view_id = &subscription.view;
1073 let pipeline_limit = view_spec
1074 .pipeline
1075 .as_ref()
1076 .and_then(|p| p.limit)
1077 .unwrap_or(100);
1078 let take = subscription.take.unwrap_or(pipeline_limit);
1079 let skip = subscription.skip.unwrap_or(0);
1080 let is_single = take == 1;
1081
1082 let source_view_id = match &view_spec.source_view {
1083 Some(s) => s.clone(),
1084 None => {
1085 warn!("Derived view {} has no source_view", view_id);
1086 return;
1087 }
1088 };
1089
1090 let sorted_caches = ctx.view_index.sorted_caches();
1091 let initial_window: Vec<(String, serde_json::Value)> = {
1092 let mut caches = sorted_caches.write().await;
1093 if let Some(cache) = caches.get_mut(view_id) {
1094 cache.get_window(skip, take)
1095 } else {
1096 warn!("No sorted cache for derived view {}", view_id);
1097 vec![]
1098 }
1099 };
1100
1101 let initial_keys: HashSet<String> = initial_window.iter().map(|(k, _)| k.clone()).collect();
1102
1103 if !initial_window.is_empty() {
1104 let snapshot_entities: Vec<SnapshotEntity> = initial_window
1105 .into_iter()
1106 .map(|(key, data)| SnapshotEntity { key, data })
1107 .collect();
1108
1109 let batch_config = ctx.entity_cache.snapshot_config();
1110 if send_snapshot_batches(
1111 ctx.client_id,
1112 &snapshot_entities,
1113 view_spec.mode,
1114 view_id,
1115 ctx.client_manager,
1116 &batch_config,
1117 )
1118 .await
1119 .is_err()
1120 {
1121 return;
1122 }
1123 }
1124
1125 let mut rx = ctx
1126 .bus_manager
1127 .get_or_create_list_bus(&source_view_id)
1128 .await;
1129
1130 let client_id = ctx.client_id;
1131 let client_mgr = ctx.client_manager.clone();
1132 let view_id_clone = view_id.clone();
1133 let view_id_span = view_id.clone();
1134 let sorted_caches_clone = sorted_caches;
1135 let frame_mode = view_spec.mode;
1136
1137 tokio::spawn(
1138 async move {
1139 let mut current_window_keys = initial_keys;
1140
1141 loop {
1142 tokio::select! {
1143 _ = cancel_token.cancelled() => {
1144 debug!("Derived view subscription cancelled for client {}", client_id);
1145 break;
1146 }
1147 result = rx.recv() => {
1148 match result {
1149 Ok(_envelope) => {
1150 let new_window: Vec<(String, serde_json::Value)> = {
1151 let mut caches = sorted_caches_clone.write().await;
1152 if let Some(cache) = caches.get_mut(&view_id_clone) {
1153 cache.get_window(skip, take)
1154 } else {
1155 continue;
1156 }
1157 };
1158
1159 let new_keys: HashSet<String> =
1160 new_window.iter().map(|(k, _)| k.clone()).collect();
1161
1162 if is_single {
1163 if let Some((new_key, data)) = new_window.first() {
1164 for old_key in current_window_keys.difference(&new_keys) {
1165 let delete_frame = Frame {
1166 mode: frame_mode,
1167 export: view_id_clone.clone(),
1168 op: "delete",
1169 key: old_key.clone(),
1170 data: serde_json::Value::Null,
1171 append: vec![],
1172 };
1173 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1174 let payload = Arc::new(Bytes::from(json));
1175 if client_mgr.send_to_client(client_id, payload).is_err() {
1176 return;
1177 }
1178 }
1179 }
1180
1181 let frame = Frame {
1182 mode: frame_mode,
1183 export: view_id_clone.clone(),
1184 op: "upsert",
1185 key: new_key.clone(),
1186 data: data.clone(),
1187 append: vec![],
1188 };
1189 if let Ok(json) = serde_json::to_vec(&frame) {
1190 let payload = Arc::new(Bytes::from(json));
1191 if client_mgr.send_to_client(client_id, payload).is_err() {
1192 return;
1193 }
1194 }
1195 }
1196 } else {
1197 for key in current_window_keys.difference(&new_keys) {
1198 let delete_frame = Frame {
1199 mode: frame_mode,
1200 export: view_id_clone.clone(),
1201 op: "delete",
1202 key: key.clone(),
1203 data: serde_json::Value::Null,
1204 append: vec![],
1205 };
1206 if let Ok(json) = serde_json::to_vec(&delete_frame) {
1207 let payload = Arc::new(Bytes::from(json));
1208 if client_mgr.send_to_client(client_id, payload).is_err() {
1209 return;
1210 }
1211 }
1212 }
1213
1214 for (key, data) in &new_window {
1215 let frame = Frame {
1216 mode: frame_mode,
1217 export: view_id_clone.clone(),
1218 op: "upsert",
1219 key: key.clone(),
1220 data: data.clone(),
1221 append: vec![],
1222 };
1223 if let Ok(json) = serde_json::to_vec(&frame) {
1224 let payload = Arc::new(Bytes::from(json));
1225 if client_mgr.send_to_client(client_id, payload).is_err() {
1226 return;
1227 }
1228 }
1229 }
1230 }
1231
1232 current_window_keys = new_keys;
1233 }
1234 Err(_) => break,
1235 }
1236 }
1237 }
1238 }
1239 }
1240 .instrument(info_span!("ws.subscribe.derived", %client_id, view = %view_id_span)),
1241 );
1242
1243 info!(
1244 "Client {} subscribed to derived view {} (take={}, skip={})",
1245 ctx.client_id, view_id, take, skip
1246 );
1247}