Skip to main content

hyperstack_sdk/
subscription.rs

1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5#[serde(tag = "type")]
6pub enum ClientMessage {
7    #[serde(rename = "subscribe")]
8    Subscribe(Subscription),
9    #[serde(rename = "unsubscribe")]
10    Unsubscribe(Unsubscription),
11    #[serde(rename = "ping")]
12    Ping,
13    #[serde(rename = "refresh_auth")]
14    RefreshAuth { token: String },
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "camelCase")]
19pub struct Subscription {
20    pub view: String,
21    #[serde(skip_serializing_if = "Option::is_none")]
22    pub key: Option<String>,
23    #[serde(skip_serializing_if = "Option::is_none")]
24    pub partition: Option<String>,
25    #[serde(skip_serializing_if = "Option::is_none")]
26    pub filters: Option<HashMap<String, String>>,
27    #[serde(skip_serializing_if = "Option::is_none")]
28    pub take: Option<u32>,
29    #[serde(skip_serializing_if = "Option::is_none")]
30    pub skip: Option<u32>,
31    /// Whether to include initial snapshot (defaults to true for backward compatibility)
32    #[serde(skip_serializing_if = "Option::is_none")]
33    pub with_snapshot: Option<bool>,
34    /// Cursor for resuming from a specific point (_seq value)
35    #[serde(skip_serializing_if = "Option::is_none")]
36    pub after: Option<String>,
37    /// Maximum number of entities to include in snapshot (pagination hint)
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub snapshot_limit: Option<usize>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct Unsubscription {
44    pub view: String,
45    #[serde(skip_serializing_if = "Option::is_none")]
46    pub key: Option<String>,
47}
48
49impl Unsubscription {
50    pub fn new(view: impl Into<String>) -> Self {
51        Self {
52            view: view.into(),
53            key: None,
54        }
55    }
56
57    pub fn with_key(mut self, key: impl Into<String>) -> Self {
58        self.key = Some(key.into());
59        self
60    }
61
62    pub fn sub_key(&self) -> String {
63        format!("{}:{}", self.view, self.key.as_deref().unwrap_or("*"),)
64    }
65}
66
67impl From<&Subscription> for Unsubscription {
68    fn from(sub: &Subscription) -> Self {
69        Self {
70            view: sub.view.clone(),
71            key: sub.key.clone(),
72        }
73    }
74}
75
76impl Subscription {
77    pub fn new(view: impl Into<String>) -> Self {
78        Self {
79            view: view.into(),
80            key: None,
81            partition: None,
82            filters: None,
83            take: None,
84            skip: None,
85            with_snapshot: None,
86            after: None,
87            snapshot_limit: None,
88        }
89    }
90
91    pub fn with_key(mut self, key: impl Into<String>) -> Self {
92        self.key = Some(key.into());
93        self
94    }
95
96    pub fn with_filters(mut self, filters: HashMap<String, String>) -> Self {
97        self.filters = Some(filters);
98        self
99    }
100
101    pub fn with_take(mut self, take: u32) -> Self {
102        self.take = Some(take);
103        self
104    }
105
106    pub fn with_skip(mut self, skip: u32) -> Self {
107        self.skip = Some(skip);
108        self
109    }
110
111    /// Set whether to include the initial snapshot (defaults to true)
112    pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
113        self.with_snapshot = Some(with_snapshot);
114        self
115    }
116
117    /// Set the cursor to resume from (for reconnecting and getting only newer data)
118    pub fn after(mut self, cursor: impl Into<String>) -> Self {
119        self.after = Some(cursor.into());
120        self
121    }
122
123    /// Set the maximum number of entities to include in the snapshot
124    pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
125        self.snapshot_limit = Some(limit);
126        self
127    }
128
129    pub fn sub_key(&self) -> String {
130        let filters_str = self
131            .filters
132            .as_ref()
133            .map(|f| serde_json::to_string(f).unwrap_or_default())
134            .unwrap_or_default();
135        format!(
136            "{}:{}:{}:{}",
137            self.view,
138            self.key.as_deref().unwrap_or("*"),
139            self.partition.as_deref().unwrap_or(""),
140            filters_str
141        )
142    }
143}
144
145#[derive(Debug, Default)]
146pub struct SubscriptionRegistry {
147    subscriptions: HashMap<String, Subscription>,
148}
149
150impl SubscriptionRegistry {
151    pub fn new() -> Self {
152        Self::default()
153    }
154
155    pub fn add(&mut self, sub: Subscription) {
156        let key = sub.sub_key();
157        self.subscriptions.insert(key, sub);
158    }
159
160    pub fn remove(&mut self, sub: &Subscription) {
161        let key = sub.sub_key();
162        self.subscriptions.remove(&key);
163    }
164
165    pub fn contains(&self, sub: &Subscription) -> bool {
166        let key = sub.sub_key();
167        self.subscriptions.contains_key(&key)
168    }
169
170    pub fn all(&self) -> Vec<Subscription> {
171        self.subscriptions.values().cloned().collect()
172    }
173
174    #[allow(dead_code)]
175    pub fn clear(&mut self) {
176        self.subscriptions.clear();
177    }
178}