hyperstack_sdk/
subscription.rs1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5#[serde(tag = "type")]
6pub enum ClientMessage {
7 #[serde(rename = "subscribe")]
8 Subscribe(Subscription),
9 #[serde(rename = "unsubscribe")]
10 Unsubscribe(Unsubscription),
11 #[serde(rename = "ping")]
12 Ping,
13 #[serde(rename = "refresh_auth")]
14 RefreshAuth { token: String },
15}
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
18#[serde(rename_all = "camelCase")]
19pub struct Subscription {
20 pub view: String,
21 #[serde(skip_serializing_if = "Option::is_none")]
22 pub key: Option<String>,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub partition: Option<String>,
25 #[serde(skip_serializing_if = "Option::is_none")]
26 pub filters: Option<HashMap<String, String>>,
27 #[serde(skip_serializing_if = "Option::is_none")]
28 pub take: Option<u32>,
29 #[serde(skip_serializing_if = "Option::is_none")]
30 pub skip: Option<u32>,
31 #[serde(skip_serializing_if = "Option::is_none")]
33 pub with_snapshot: Option<bool>,
34 #[serde(skip_serializing_if = "Option::is_none")]
36 pub after: Option<String>,
37 #[serde(skip_serializing_if = "Option::is_none")]
39 pub snapshot_limit: Option<usize>,
40}
41
42#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
43pub struct Unsubscription {
44 pub view: String,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 pub key: Option<String>,
47}
48
49impl Unsubscription {
50 pub fn new(view: impl Into<String>) -> Self {
51 Self {
52 view: view.into(),
53 key: None,
54 }
55 }
56
57 pub fn with_key(mut self, key: impl Into<String>) -> Self {
58 self.key = Some(key.into());
59 self
60 }
61
62 pub fn sub_key(&self) -> String {
63 format!("{}:{}", self.view, self.key.as_deref().unwrap_or("*"),)
64 }
65}
66
67impl From<&Subscription> for Unsubscription {
68 fn from(sub: &Subscription) -> Self {
69 Self {
70 view: sub.view.clone(),
71 key: sub.key.clone(),
72 }
73 }
74}
75
76impl Subscription {
77 pub fn new(view: impl Into<String>) -> Self {
78 Self {
79 view: view.into(),
80 key: None,
81 partition: None,
82 filters: None,
83 take: None,
84 skip: None,
85 with_snapshot: None,
86 after: None,
87 snapshot_limit: None,
88 }
89 }
90
91 pub fn with_key(mut self, key: impl Into<String>) -> Self {
92 self.key = Some(key.into());
93 self
94 }
95
96 pub fn with_filters(mut self, filters: HashMap<String, String>) -> Self {
97 self.filters = Some(filters);
98 self
99 }
100
101 pub fn with_take(mut self, take: u32) -> Self {
102 self.take = Some(take);
103 self
104 }
105
106 pub fn with_skip(mut self, skip: u32) -> Self {
107 self.skip = Some(skip);
108 self
109 }
110
111 pub fn with_snapshot(mut self, with_snapshot: bool) -> Self {
113 self.with_snapshot = Some(with_snapshot);
114 self
115 }
116
117 pub fn after(mut self, cursor: impl Into<String>) -> Self {
119 self.after = Some(cursor.into());
120 self
121 }
122
123 pub fn with_snapshot_limit(mut self, limit: usize) -> Self {
125 self.snapshot_limit = Some(limit);
126 self
127 }
128
129 pub fn sub_key(&self) -> String {
130 let filters_str = self
131 .filters
132 .as_ref()
133 .map(|f| serde_json::to_string(f).unwrap_or_default())
134 .unwrap_or_default();
135 format!(
136 "{}:{}:{}:{}",
137 self.view,
138 self.key.as_deref().unwrap_or("*"),
139 self.partition.as_deref().unwrap_or(""),
140 filters_str
141 )
142 }
143}
144
145#[derive(Debug, Default)]
146pub struct SubscriptionRegistry {
147 subscriptions: HashMap<String, Subscription>,
148}
149
150impl SubscriptionRegistry {
151 pub fn new() -> Self {
152 Self::default()
153 }
154
155 pub fn add(&mut self, sub: Subscription) {
156 let key = sub.sub_key();
157 self.subscriptions.insert(key, sub);
158 }
159
160 pub fn remove(&mut self, sub: &Subscription) {
161 let key = sub.sub_key();
162 self.subscriptions.remove(&key);
163 }
164
165 pub fn contains(&self, sub: &Subscription) -> bool {
166 let key = sub.sub_key();
167 self.subscriptions.contains_key(&key)
168 }
169
170 pub fn all(&self) -> Vec<Subscription> {
171 self.subscriptions.values().cloned().collect()
172 }
173
174 #[allow(dead_code)]
175 pub fn clear(&mut self) {
176 self.subscriptions.clear();
177 }
178}