1use chrono::{DateTime, Utc};
2use uuid::Uuid;
3
4use super::readset::ReadSet;
5use super::session::SessionId;
6
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub struct SubscriptionId(pub Uuid);
10
11impl SubscriptionId {
12 pub fn new() -> Self {
14 Self(Uuid::new_v4())
15 }
16
17 pub fn from_uuid(id: Uuid) -> Self {
19 Self(id)
20 }
21
22 pub fn as_uuid(&self) -> Uuid {
24 self.0
25 }
26}
27
28impl Default for SubscriptionId {
29 fn default() -> Self {
30 Self::new()
31 }
32}
33
34impl std::fmt::Display for SubscriptionId {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 write!(f, "{}", self.0)
37 }
38}
39
40#[derive(Debug, Clone)]
42pub struct SubscriptionState<T> {
43 pub loading: bool,
45 pub data: Option<T>,
47 pub error: Option<String>,
49 pub stale: bool,
51}
52
53impl<T> Default for SubscriptionState<T> {
54 fn default() -> Self {
55 Self {
56 loading: true,
57 data: None,
58 error: None,
59 stale: false,
60 }
61 }
62}
63
64impl<T> SubscriptionState<T> {
65 pub fn loading() -> Self {
67 Self::default()
68 }
69
70 pub fn with_data(data: T) -> Self {
72 Self {
73 loading: false,
74 data: Some(data),
75 error: None,
76 stale: false,
77 }
78 }
79
80 pub fn with_error(error: impl Into<String>) -> Self {
82 Self {
83 loading: false,
84 data: None,
85 error: Some(error.into()),
86 stale: false,
87 }
88 }
89
90 pub fn mark_stale(&mut self) {
92 self.stale = true;
93 }
94
95 pub fn clear_stale(&mut self) {
97 self.stale = false;
98 }
99}
100
101#[derive(Debug, Clone)]
103pub struct SubscriptionInfo {
104 pub id: SubscriptionId,
106 pub session_id: SessionId,
108 pub query_name: String,
110 pub args: serde_json::Value,
112 pub query_hash: String,
114 pub read_set: ReadSet,
116 pub last_result_hash: Option<String>,
118 pub created_at: DateTime<Utc>,
120 pub last_executed_at: Option<DateTime<Utc>>,
122 pub execution_count: u64,
124 pub memory_bytes: usize,
126}
127
128impl SubscriptionInfo {
129 pub fn new(
131 session_id: SessionId,
132 query_name: impl Into<String>,
133 args: serde_json::Value,
134 ) -> Self {
135 let query_name = query_name.into();
136 let query_hash = compute_query_hash(&query_name, &args);
137
138 Self {
139 id: SubscriptionId::new(),
140 session_id,
141 query_name,
142 args,
143 query_hash,
144 read_set: ReadSet::new(),
145 last_result_hash: None,
146 created_at: Utc::now(),
147 last_executed_at: None,
148 execution_count: 0,
149 memory_bytes: 0,
150 }
151 }
152
153 pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
155 self.read_set = read_set;
156 self.memory_bytes = self.read_set.memory_bytes() + self.query_name.len() + 128;
157 self.last_result_hash = Some(result_hash);
158 self.last_executed_at = Some(Utc::now());
159 self.execution_count += 1;
160 }
161
162 pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
164 change.invalidates(&self.read_set)
165 }
166}
167
168fn compute_query_hash(query_name: &str, args: &serde_json::Value) -> String {
170 use std::collections::hash_map::DefaultHasher;
171 use std::hash::{Hash, Hasher};
172
173 let mut hasher = DefaultHasher::new();
174 query_name.hash(&mut hasher);
175 args.to_string().hash(&mut hasher);
176 format!("{:016x}", hasher.finish())
177}
178
179#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
181pub struct Delta<T> {
182 pub added: Vec<T>,
184 pub removed: Vec<String>,
186 pub updated: Vec<T>,
188}
189
190impl<T> Default for Delta<T> {
191 fn default() -> Self {
192 Self {
193 added: Vec::new(),
194 removed: Vec::new(),
195 updated: Vec::new(),
196 }
197 }
198}
199
200impl<T> Delta<T> {
201 pub fn empty() -> Self {
203 Self::default()
204 }
205
206 pub fn is_empty(&self) -> bool {
208 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
209 }
210
211 pub fn change_count(&self) -> usize {
213 self.added.len() + self.removed.len() + self.updated.len()
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220
221 #[test]
222 fn test_subscription_id_generation() {
223 let id1 = SubscriptionId::new();
224 let id2 = SubscriptionId::new();
225 assert_ne!(id1, id2);
226 }
227
228 #[test]
229 fn test_subscription_state_default() {
230 let state: SubscriptionState<String> = SubscriptionState::default();
231 assert!(state.loading);
232 assert!(state.data.is_none());
233 assert!(state.error.is_none());
234 assert!(!state.stale);
235 }
236
237 #[test]
238 fn test_subscription_state_with_data() {
239 let state = SubscriptionState::with_data(vec![1, 2, 3]);
240 assert!(!state.loading);
241 assert_eq!(state.data, Some(vec![1, 2, 3]));
242 assert!(state.error.is_none());
243 }
244
245 #[test]
246 fn test_subscription_info_creation() {
247 let session_id = SessionId::new();
248 let info = SubscriptionInfo::new(
249 session_id,
250 "get_projects",
251 serde_json::json!({"userId": "abc"}),
252 );
253
254 assert_eq!(info.query_name, "get_projects");
255 assert_eq!(info.execution_count, 0);
256 assert!(!info.query_hash.is_empty());
257 }
258
259 #[test]
260 fn test_query_hash_consistency() {
261 let hash1 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
262 let hash2 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
263 let hash3 = compute_query_hash("get_projects", &serde_json::json!({"userId": "xyz"}));
264
265 assert_eq!(hash1, hash2);
266 assert_ne!(hash1, hash3);
267 }
268
269 #[test]
270 fn test_delta_empty() {
271 let delta: Delta<String> = Delta::empty();
272 assert!(delta.is_empty());
273 assert_eq!(delta.change_count(), 0);
274 }
275
276 #[test]
277 fn test_delta_with_changes() {
278 let delta = Delta {
279 added: vec!["a".to_string()],
280 removed: vec!["b".to_string()],
281 updated: vec!["c".to_string()],
282 };
283
284 assert!(!delta.is_empty());
285 assert_eq!(delta.change_count(), 3);
286 }
287}