helios_subscriptions/channels/
websocket.rs1use std::sync::Arc;
8
9use async_trait::async_trait;
10use tracing::debug;
11
12use super::ws_manager::WebSocketManager;
13use super::{ChannelDispatcher, DispatchResult};
14use crate::error::SubscriptionError;
15use crate::manager::ActiveSubscription;
16
17pub struct WebSocketChannel {
23 manager: Arc<WebSocketManager>,
24}
25
26impl WebSocketChannel {
27 pub fn new(manager: Arc<WebSocketManager>) -> Self {
29 Self { manager }
30 }
31
32 pub fn manager(&self) -> &Arc<WebSocketManager> {
34 &self.manager
35 }
36}
37
38#[async_trait]
39impl ChannelDispatcher for WebSocketChannel {
40 async fn dispatch(
41 &self,
42 subscription: &ActiveSubscription,
43 notification_bundle: &serde_json::Value,
44 ) -> Result<DispatchResult, SubscriptionError> {
45 let count = self.manager.send_to_subscription(
46 &subscription.tenant_id,
47 &subscription.id,
48 notification_bundle,
49 );
50
51 debug!(
52 subscription_id = %subscription.id,
53 clients = count,
54 "WebSocket notification dispatched"
55 );
56
57 Ok(DispatchResult::Success)
61 }
62
63 async fn handshake(
64 &self,
65 subscription: &ActiveSubscription,
66 _handshake_bundle: &serde_json::Value,
67 ) -> Result<DispatchResult, SubscriptionError> {
68 debug!(
72 subscription_id = %subscription.id,
73 "WebSocket handshake (no-op at activation)"
74 );
75 Ok(DispatchResult::Success)
76 }
77}
78
79#[cfg(test)]
80mod tests {
81 use super::*;
82 use crate::manager::{ChannelConfig, ChannelType, PayloadContent, SubscriptionStatusCode};
83 use helios_fhir::FhirVersion;
84 use serde_json::json;
85
86 fn ws_subscription() -> ActiveSubscription {
87 ActiveSubscription {
88 id: "sub-ws-1".to_string(),
89 topic_url: "http://example.org/topic/test".to_string(),
90 status: SubscriptionStatusCode::Active,
91 channel: ChannelConfig {
92 channel_type: ChannelType::Websocket,
93 endpoint: None,
94 payload_mime_type: Some("application/fhir+json".to_string()),
95 payload_content: PayloadContent::FullResource,
96 headers: vec![],
97 heartbeat_period: None,
98 timeout: None,
99 max_count: None,
100 },
101 filters: vec![],
102 fhir_version: FhirVersion::default(),
103 events_since_start: 0,
104 consecutive_failures: 0,
105 tenant_id: "test".to_string(),
106 }
107 }
108
109 fn test_bundle() -> serde_json::Value {
110 json!({"resourceType": "Bundle", "type": "history", "entry": []})
111 }
112
113 #[tokio::test]
114 async fn test_dispatch_no_clients_returns_success() {
115 let mgr = Arc::new(WebSocketManager::new());
116 let channel = WebSocketChannel::new(mgr);
117
118 let result = channel
119 .dispatch(&ws_subscription(), &test_bundle())
120 .await
121 .unwrap();
122 assert!(matches!(result, DispatchResult::Success));
123 }
124
125 #[tokio::test]
126 async fn test_dispatch_delivers_to_clients() {
127 let mgr = Arc::new(WebSocketManager::new());
128 let (_client_id, mut rx) = mgr.register_client("test", "sub-ws-1");
129
130 let channel = WebSocketChannel::new(mgr);
131 let bundle = test_bundle();
132
133 let result = channel.dispatch(&ws_subscription(), &bundle).await.unwrap();
134 assert!(matches!(result, DispatchResult::Success));
135
136 let received = rx.recv().await.unwrap();
137 assert_eq!(received, bundle);
138 }
139
140 #[tokio::test]
141 async fn test_handshake_returns_success() {
142 let mgr = Arc::new(WebSocketManager::new());
143 let channel = WebSocketChannel::new(mgr);
144
145 let result = channel
146 .handshake(&ws_subscription(), &test_bundle())
147 .await
148 .unwrap();
149 assert!(matches!(result, DispatchResult::Success));
150 }
151}