hyperstack_sdk/
subscription.rs1use serde::{Deserialize, Serialize};
2use std::collections::HashMap;
3
4#[derive(Debug, Clone, Serialize, Deserialize)]
5#[serde(tag = "type", rename_all = "lowercase")]
6pub enum ClientMessage {
7 Subscribe(Subscription),
8 Unsubscribe(Unsubscription),
9 Ping,
10}
11
12#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
13pub struct Subscription {
14 pub view: String,
15 #[serde(skip_serializing_if = "Option::is_none")]
16 pub key: Option<String>,
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub partition: Option<String>,
19 #[serde(skip_serializing_if = "Option::is_none")]
20 pub filters: Option<HashMap<String, String>>,
21 #[serde(skip_serializing_if = "Option::is_none")]
22 pub take: Option<u32>,
23 #[serde(skip_serializing_if = "Option::is_none")]
24 pub skip: Option<u32>,
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
28pub struct Unsubscription {
29 pub view: String,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub key: Option<String>,
32}
33
34impl Unsubscription {
35 pub fn new(view: impl Into<String>) -> Self {
36 Self {
37 view: view.into(),
38 key: None,
39 }
40 }
41
42 pub fn with_key(mut self, key: impl Into<String>) -> Self {
43 self.key = Some(key.into());
44 self
45 }
46
47 pub fn sub_key(&self) -> String {
48 format!("{}:{}", self.view, self.key.as_deref().unwrap_or("*"),)
49 }
50}
51
52impl From<&Subscription> for Unsubscription {
53 fn from(sub: &Subscription) -> Self {
54 Self {
55 view: sub.view.clone(),
56 key: sub.key.clone(),
57 }
58 }
59}
60
61impl Subscription {
62 pub fn new(view: impl Into<String>) -> Self {
63 Self {
64 view: view.into(),
65 key: None,
66 partition: None,
67 filters: None,
68 take: None,
69 skip: None,
70 }
71 }
72
73 pub fn with_key(mut self, key: impl Into<String>) -> Self {
74 self.key = Some(key.into());
75 self
76 }
77
78 pub fn with_filters(mut self, filters: HashMap<String, String>) -> Self {
79 self.filters = Some(filters);
80 self
81 }
82
83 pub fn with_take(mut self, take: u32) -> Self {
84 self.take = Some(take);
85 self
86 }
87
88 pub fn with_skip(mut self, skip: u32) -> Self {
89 self.skip = Some(skip);
90 self
91 }
92
93 pub fn sub_key(&self) -> String {
94 let filters_str = self
95 .filters
96 .as_ref()
97 .map(|f| serde_json::to_string(f).unwrap_or_default())
98 .unwrap_or_default();
99 format!(
100 "{}:{}:{}:{}",
101 self.view,
102 self.key.as_deref().unwrap_or("*"),
103 self.partition.as_deref().unwrap_or(""),
104 filters_str
105 )
106 }
107}
108
109#[derive(Debug, Default)]
110pub struct SubscriptionRegistry {
111 subscriptions: HashMap<String, Subscription>,
112}
113
114impl SubscriptionRegistry {
115 pub fn new() -> Self {
116 Self::default()
117 }
118
119 pub fn add(&mut self, sub: Subscription) {
120 let key = sub.sub_key();
121 self.subscriptions.insert(key, sub);
122 }
123
124 pub fn remove(&mut self, sub: &Subscription) {
125 let key = sub.sub_key();
126 self.subscriptions.remove(&key);
127 }
128
129 pub fn contains(&self, sub: &Subscription) -> bool {
130 let key = sub.sub_key();
131 self.subscriptions.contains_key(&key)
132 }
133
134 pub fn all(&self) -> Vec<Subscription> {
135 self.subscriptions.values().cloned().collect()
136 }
137
138 #[allow(dead_code)]
139 pub fn clear(&mut self) {
140 self.subscriptions.clear();
141 }
142}