Skip to main content

helios_subscriptions/channels/
websocket.rs

1//! WebSocket channel dispatcher.
2//!
3//! Delivers notification bundles by broadcasting to all connected WebSocket
4//! clients for a subscription. The actual WebSocket connections are managed
5//! by the [`WebSocketManager`] and the REST layer's upgrade handler.
6
7use std::sync::Arc;
8
9use async_trait::async_trait;
10use tracing::debug;
11
12use super::ws_manager::WebSocketManager;
13use super::{ChannelDispatcher, DispatchResult};
14use crate::error::SubscriptionError;
15use crate::manager::ActiveSubscription;
16
17/// WebSocket channel implementation.
18///
19/// Unlike rest-hook (which pushes to an external endpoint), this channel
20/// broadcasts to clients that have connected to the server's WebSocket
21/// endpoint and bound to a subscription via a binding token.
22pub struct WebSocketChannel {
23    manager: Arc<WebSocketManager>,
24}
25
26impl WebSocketChannel {
27    /// Creates a new WebSocket channel backed by the given manager.
28    pub fn new(manager: Arc<WebSocketManager>) -> Self {
29        Self { manager }
30    }
31
32    /// Returns a reference to the underlying WebSocket manager.
33    pub fn manager(&self) -> &Arc<WebSocketManager> {
34        &self.manager
35    }
36}
37
38#[async_trait]
39impl ChannelDispatcher for WebSocketChannel {
40    async fn dispatch(
41        &self,
42        subscription: &ActiveSubscription,
43        notification_bundle: &serde_json::Value,
44    ) -> Result<DispatchResult, SubscriptionError> {
45        let count = self.manager.send_to_subscription(
46            &subscription.tenant_id,
47            &subscription.id,
48            notification_bundle,
49        );
50
51        debug!(
52            subscription_id = %subscription.id,
53            clients = count,
54            "WebSocket notification dispatched"
55        );
56
57        // WebSocket dispatch is best-effort. Even with zero connected clients
58        // we return Success — treating it as an error would incorrectly trigger
59        // the error/off status transitions in the engine.
60        Ok(DispatchResult::Success)
61    }
62
63    async fn handshake(
64        &self,
65        subscription: &ActiveSubscription,
66        _handshake_bundle: &serde_json::Value,
67    ) -> Result<DispatchResult, SubscriptionError> {
68        // WebSocket handshake is a no-op at subscription activation time.
69        // The actual handshake notification is sent when a client connects
70        // via the WebSocket upgrade handler and binds with a token.
71        debug!(
72            subscription_id = %subscription.id,
73            "WebSocket handshake (no-op at activation)"
74        );
75        Ok(DispatchResult::Success)
76    }
77}
78
79#[cfg(test)]
80mod tests {
81    use super::*;
82    use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
83    use helios_fhir::FhirVersion;
84    use serde_json::json;
85
86    fn ws_subscription() -> ActiveSubscription {
87        ActiveSubscription {
88            id: "sub-ws-1".to_string(),
89            topic_url: "http://example.org/topic/test".to_string(),
90            status: SubscriptionStatusCode::Active,
91            channel: ChannelConfig {
92                channel_type: ChannelType::Websocket,
93                endpoint: None,
94                payload_mime_type: Some("application/fhir+json".to_string()),
95                payload_content: PayloadContent::FullResource,
96                headers: vec![],
97                heartbeat_period: None,
98                timeout: None,
99                max_count: None,
100            },
101            filters: vec![],
102            fhir_version: FhirVersion::default(),
103            events_since_start: 0,
104            consecutive_failures: 0,
105            tenant_id: "test".to_string(),
106        }
107    }
108
109    fn test_bundle() -> serde_json::Value {
110        json!({"resourceType": "Bundle", "type": "history", "entry": []})
111    }
112
113    #[tokio::test]
114    async fn test_dispatch_no_clients_returns_success() {
115        let mgr = Arc::new(WebSocketManager::new());
116        let channel = WebSocketChannel::new(mgr);
117
118        let result = channel
119            .dispatch(&ws_subscription(), &test_bundle())
120            .await
121            .unwrap();
122        assert!(matches!(result, DispatchResult::Success));
123    }
124
125    #[tokio::test]
126    async fn test_dispatch_delivers_to_clients() {
127        let mgr = Arc::new(WebSocketManager::new());
128        let (_client_id, mut rx) = mgr.register_client("test", "sub-ws-1");
129
130        let channel = WebSocketChannel::new(mgr);
131        let bundle = test_bundle();
132
133        let result = channel.dispatch(&ws_subscription(), &bundle).await.unwrap();
134        assert!(matches!(result, DispatchResult::Success));
135
136        let received = rx.recv().await.unwrap();
137        assert_eq!(received, bundle);
138    }
139
140    #[tokio::test]
141    async fn test_handshake_returns_success() {
142        let mgr = Arc::new(WebSocketManager::new());
143        let channel = WebSocketChannel::new(mgr);
144
145        let result = channel
146            .handshake(&ws_subscription(), &test_bundle())
147            .await
148            .unwrap();
149        assert!(matches!(result, DispatchResult::Success));
150    }
151}