Skip to main content

hyperstack_server/websocket/
subscription.rs

1use serde::{Deserialize, Serialize};
2
3use crate::websocket::auth::AuthDeny;
4
5/// Client message types for subscription management
6#[derive(Debug, Clone, Serialize, Deserialize)]
7#[serde(tag = "type", rename_all = "snake_case")]
8pub enum ClientMessage {
9    /// Subscribe to a view
10    Subscribe(Subscription),
11    /// Unsubscribe from a view
12    Unsubscribe(Unsubscription),
13    /// Keep-alive ping (no response needed)
14    Ping,
15    /// Refresh authentication token without reconnecting
16    RefreshAuth(RefreshAuthRequest),
17}
18
19/// Request to refresh authentication token
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct RefreshAuthRequest {
22    pub token: String,
23}
24
25/// Response to a refresh auth request
26#[derive(Debug, Clone, Serialize, Deserialize)]
27#[serde(rename_all = "camelCase")]
28pub struct RefreshAuthResponse {
29    pub success: bool,
30    #[serde(skip_serializing_if = "Option::is_none")]
31    pub error: Option<String>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub expires_at: Option<u64>,
34}
35
36/// Server-sent socket issue payload for auth and quota failures after connect.
37#[derive(Debug, Clone, Serialize, Deserialize)]
38pub struct SocketIssueMessage {
39    #[serde(rename = "type")]
40    pub kind: String,
41    pub error: String,
42    pub message: String,
43    pub code: String,
44    pub retryable: bool,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub retry_after: Option<u64>,
47    #[serde(skip_serializing_if = "Option::is_none")]
48    pub suggested_action: Option<String>,
49    #[serde(skip_serializing_if = "Option::is_none")]
50    pub docs_url: Option<String>,
51    pub fatal: bool,
52}
53
54impl SocketIssueMessage {
55    pub fn from_auth_deny(deny: &AuthDeny, fatal: bool) -> Self {
56        let response = deny.to_error_response();
57        Self {
58            kind: "error".to_string(),
59            error: response.error,
60            message: response.message,
61            code: response.code,
62            retryable: response.retryable,
63            retry_after: response.retry_after,
64            suggested_action: response.suggested_action,
65            docs_url: response.docs_url,
66            fatal,
67        }
68    }
69}
70
71/// Client subscription to a specific view
72#[derive(Debug, Clone, Serialize, Deserialize)]
73#[serde(rename_all = "camelCase")]
74pub struct Subscription {
75    pub view: String,
76    #[serde(skip_serializing_if = "Option::is_none")]
77    pub key: Option<String>,
78    #[serde(skip_serializing_if = "Option::is_none")]
79    pub partition: Option<String>,
80    /// Number of items to return (for windowed subscriptions)
81    #[serde(skip_serializing_if = "Option::is_none")]
82    pub take: Option<usize>,
83    /// Number of items to skip (for windowed subscriptions)
84    #[serde(skip_serializing_if = "Option::is_none")]
85    pub skip: Option<usize>,
86    /// Whether to include initial snapshot (defaults to true for backward compatibility)
87    #[serde(skip_serializing_if = "Option::is_none")]
88    pub with_snapshot: Option<bool>,
89    /// Cursor for resuming from a specific point (_seq value).
90    /// Note: Ignored for State mode subscriptions (single entity, no pagination).
91    /// Note: Not supported for derived views (windowed aggregations with sort). Derived views
92    /// always emit `seq: None` in live update frames, so cursor-based reconnection is unavailable.
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub after: Option<String>,
95    /// Maximum number of entities to include in snapshot (pagination hint).
96    /// Note: Ignored for State mode subscriptions (single entity).
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub snapshot_limit: Option<usize>,
99}
100
101/// Client unsubscription request
102#[derive(Debug, Clone, Serialize, Deserialize)]
103pub struct Unsubscription {
104    pub view: String,
105    #[serde(skip_serializing_if = "Option::is_none")]
106    pub key: Option<String>,
107}
108
109impl Unsubscription {
110    /// Generate the subscription key used for tracking
111    pub fn sub_key(&self) -> String {
112        match &self.key {
113            Some(k) => format!("{}:{}", self.view, k),
114            None => format!("{}:*", self.view),
115        }
116    }
117}
118
119impl Subscription {
120    pub fn matches_view(&self, view_id: &str) -> bool {
121        self.view == view_id
122    }
123
124    pub fn matches_key(&self, key: &str) -> bool {
125        self.key.as_ref().is_none_or(|k| k == key)
126    }
127
128    pub fn matches(&self, view_id: &str, key: &str) -> bool {
129        self.matches_view(view_id) && self.matches_key(key)
130    }
131
132    pub fn sub_key(&self) -> String {
133        match &self.key {
134            Some(k) => format!("{}:{}", self.view, k),
135            None => format!("{}:*", self.view),
136        }
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use crate::websocket::auth::{AuthDeny, AuthErrorCode};
144    use serde_json::json;
145
146    #[test]
147    fn test_subscription_parse() {
148        let json = json!({
149            "view": "SettlementGame/list",
150            "key": "835"
151        });
152
153        let sub: Subscription = serde_json::from_value(json).unwrap();
154        assert_eq!(sub.view, "SettlementGame/list");
155        assert_eq!(sub.key, Some("835".to_string()));
156    }
157
158    #[test]
159    fn test_subscription_no_key() {
160        let json = json!({
161            "view": "SettlementGame/list"
162        });
163
164        let sub: Subscription = serde_json::from_value(json).unwrap();
165        assert_eq!(sub.view, "SettlementGame/list");
166        assert!(sub.key.is_none());
167    }
168
169    #[test]
170    fn test_subscription_matches() {
171        let sub = Subscription {
172            view: "SettlementGame/list".to_string(),
173            key: Some("835".to_string()),
174            partition: None,
175            take: None,
176            skip: None,
177            with_snapshot: None,
178            after: None,
179            snapshot_limit: None,
180        };
181
182        assert!(sub.matches("SettlementGame/list", "835"));
183        assert!(!sub.matches("SettlementGame/list", "836"));
184        assert!(!sub.matches("SettlementGame/state", "835"));
185    }
186
187    #[test]
188    fn test_subscription_matches_all_keys() {
189        let sub = Subscription {
190            view: "SettlementGame/list".to_string(),
191            key: None,
192            partition: None,
193            take: None,
194            skip: None,
195            with_snapshot: None,
196            after: None,
197            snapshot_limit: None,
198        };
199
200        assert!(sub.matches("SettlementGame/list", "835"));
201        assert!(sub.matches("SettlementGame/list", "836"));
202        assert!(!sub.matches("SettlementGame/state", "835"));
203    }
204
205    #[test]
206    fn test_client_message_subscribe_parse() {
207        let json = json!({
208            "type": "subscribe",
209            "view": "SettlementGame/list",
210            "key": "835"
211        });
212
213        let msg: ClientMessage = serde_json::from_value(json).unwrap();
214        match msg {
215            ClientMessage::Subscribe(sub) => {
216                assert_eq!(sub.view, "SettlementGame/list");
217                assert_eq!(sub.key, Some("835".to_string()));
218            }
219            _ => panic!("Expected Subscribe"),
220        }
221    }
222
223    #[test]
224    fn test_client_message_unsubscribe_parse() {
225        let json = json!({
226            "type": "unsubscribe",
227            "view": "SettlementGame/list",
228            "key": "835"
229        });
230
231        let msg: ClientMessage = serde_json::from_value(json).unwrap();
232        match msg {
233            ClientMessage::Unsubscribe(unsub) => {
234                assert_eq!(unsub.view, "SettlementGame/list");
235                assert_eq!(unsub.key, Some("835".to_string()));
236            }
237            _ => panic!("Expected Unsubscribe"),
238        }
239    }
240
241    #[test]
242    fn test_client_message_ping_parse() {
243        let json = json!({ "type": "ping" });
244
245        let msg: ClientMessage = serde_json::from_value(json).unwrap();
246        assert!(matches!(msg, ClientMessage::Ping));
247    }
248
249    #[test]
250    fn test_client_message_refresh_auth_parse() {
251        let json = json!({
252            "type": "refresh_auth",
253            "token": "new_token_here"
254        });
255
256        let msg: ClientMessage = serde_json::from_value(json).unwrap();
257        match msg {
258            ClientMessage::RefreshAuth(req) => {
259                assert_eq!(req.token, "new_token_here");
260            }
261            _ => panic!("Expected RefreshAuth"),
262        }
263    }
264
265    #[test]
266    fn test_legacy_subscription_parse_as_subscribe() {
267        let json = json!({
268            "view": "SettlementGame/list",
269            "key": "835"
270        });
271
272        let sub: Subscription = serde_json::from_value(json).unwrap();
273        assert_eq!(sub.view, "SettlementGame/list");
274        assert_eq!(sub.key, Some("835".to_string()));
275    }
276
277    #[test]
278    fn test_sub_key_with_key() {
279        let sub = Subscription {
280            view: "SettlementGame/list".to_string(),
281            key: Some("835".to_string()),
282            partition: None,
283            take: None,
284            skip: None,
285            with_snapshot: None,
286            after: None,
287            snapshot_limit: None,
288        };
289        assert_eq!(sub.sub_key(), "SettlementGame/list:835");
290    }
291
292    #[test]
293    fn test_sub_key_without_key() {
294        let sub = Subscription {
295            view: "SettlementGame/list".to_string(),
296            key: None,
297            partition: None,
298            take: None,
299            skip: None,
300            with_snapshot: None,
301            after: None,
302            snapshot_limit: None,
303        };
304        assert_eq!(sub.sub_key(), "SettlementGame/list:*");
305    }
306
307    #[test]
308    fn test_unsubscription_sub_key() {
309        let unsub = Unsubscription {
310            view: "SettlementGame/list".to_string(),
311            key: Some("835".to_string()),
312        };
313        assert_eq!(unsub.sub_key(), "SettlementGame/list:835");
314
315        let unsub_all = Unsubscription {
316            view: "SettlementGame/list".to_string(),
317            key: None,
318        };
319        assert_eq!(unsub_all.sub_key(), "SettlementGame/list:*");
320    }
321
322    #[test]
323    fn test_subscription_with_take_skip() {
324        let json = json!({
325            "type": "subscribe",
326            "view": "OreRound/latest",
327            "take": 10,
328            "skip": 0
329        });
330
331        let msg: ClientMessage = serde_json::from_value(json).unwrap();
332        match msg {
333            ClientMessage::Subscribe(sub) => {
334                assert_eq!(sub.view, "OreRound/latest");
335                assert_eq!(sub.take, Some(10));
336                assert_eq!(sub.skip, Some(0));
337            }
338            _ => panic!("Expected Subscribe"),
339        }
340    }
341
342    #[test]
343    fn test_subscription_with_snapshot() {
344        let json = json!({
345            "type": "subscribe",
346            "view": "SettlementGame/list",
347            "withSnapshot": true
348        });
349
350        let msg: ClientMessage = serde_json::from_value(json).unwrap();
351        match msg {
352            ClientMessage::Subscribe(sub) => {
353                assert_eq!(sub.with_snapshot, Some(true));
354            }
355            _ => panic!("Expected Subscribe"),
356        }
357    }
358
359    #[test]
360    fn test_subscription_with_partition() {
361        let json = json!({
362            "type": "subscribe",
363            "view": "SettlementGame/list",
364            "partition": "mainnet"
365        });
366
367        let msg: ClientMessage = serde_json::from_value(json).unwrap();
368        match msg {
369            ClientMessage::Subscribe(sub) => {
370                assert_eq!(sub.partition, Some("mainnet".to_string()));
371            }
372            _ => panic!("Expected Subscribe"),
373        }
374    }
375
376    #[test]
377    fn test_subscription_with_after() {
378        let json = json!({
379            "type": "subscribe",
380            "view": "SettlementGame/list",
381            "after": "12345"
382        });
383
384        let msg: ClientMessage = serde_json::from_value(json).unwrap();
385        match msg {
386            ClientMessage::Subscribe(sub) => {
387                assert_eq!(sub.after, Some("12345".to_string()));
388            }
389            _ => panic!("Expected Subscribe"),
390        }
391    }
392
393    #[test]
394    fn test_subscription_with_snapshot_limit() {
395        let json = json!({
396            "type": "subscribe",
397            "view": "SettlementGame/list",
398            "snapshotLimit": 100
399        });
400
401        let msg: ClientMessage = serde_json::from_value(json).unwrap();
402        match msg {
403            ClientMessage::Subscribe(sub) => {
404                assert_eq!(sub.snapshot_limit, Some(100));
405            }
406            _ => panic!("Expected Subscribe"),
407        }
408    }
409
410    #[test]
411    fn test_socket_issue_message_from_auth_deny() {
412        let deny = AuthDeny::new(
413            AuthErrorCode::SubscriptionLimitExceeded,
414            "subscription limit exceeded",
415        )
416        .with_suggested_action("unsubscribe first");
417
418        let issue = SocketIssueMessage::from_auth_deny(&deny, false);
419        assert_eq!(issue.kind, "error");
420        assert_eq!(issue.code, "subscription-limit-exceeded");
421        assert_eq!(issue.suggested_action.as_deref(), Some("unsubscribe first"));
422        assert!(!issue.fatal);
423    }
424}