1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14 pub fn new() -> Self {
16 Self(Uuid::new_v4())
17 }
18
19 pub fn from_uuid(id: Uuid) -> Self {
21 Self(id)
22 }
23
24 pub fn as_uuid(&self) -> Uuid {
26 self.0
27 }
28}
29
30impl Default for SubscriptionId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl std::fmt::Display for SubscriptionId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct QueryGroupId(pub u32);
45
46impl std::fmt::Display for QueryGroupId {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(f, "qg-{}", self.0)
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub struct SubscriberId(pub u32);
55
56impl std::fmt::Display for SubscriberId {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 write!(f, "sub-{}", self.0)
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct AuthScope {
67 pub principal_id: Option<String>,
68 pub tenant_id: Option<String>,
69}
70
71impl AuthScope {
72 pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
73 Self {
74 principal_id: auth.principal_id(),
75 tenant_id: auth
76 .claim("tenant_id")
77 .and_then(|v| v.as_str())
78 .map(ToString::to_string),
79 }
80 }
81}
82
83pub struct QueryGroup {
87 pub id: QueryGroupId,
88 pub query_name: String,
89 pub args: Arc<serde_json::Value>,
90 pub auth_scope: AuthScope,
91 pub auth_context: crate::function::AuthContext,
92 pub table_deps: &'static [&'static str],
94 pub selected_cols: &'static [&'static str],
96 pub read_set: ReadSet,
97 pub last_result_hash: Option<String>,
99 pub subscribers: Vec<SubscriberId>,
101 pub created_at: DateTime<Utc>,
102 pub execution_count: u64,
103}
104
105impl QueryGroup {
106 pub fn compute_lookup_key(
108 query_name: &str,
109 args: &serde_json::Value,
110 auth_scope: &AuthScope,
111 ) -> u64 {
112 use std::collections::hash_map::DefaultHasher;
113 use std::hash::{Hash, Hasher};
114
115 let mut hasher = DefaultHasher::new();
116 query_name.hash(&mut hasher);
117 Self::hash_json_canonical(args, &mut hasher);
120 auth_scope.hash(&mut hasher);
121 hasher.finish()
122 }
123
124 fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
125 use std::hash::Hash;
126 match value {
127 serde_json::Value::Object(map) => {
128 let mut keys: Vec<&String> = map.keys().collect();
129 keys.sort();
130 for key in keys {
131 key.hash(hasher);
132 Self::hash_json_canonical(&map[key], hasher);
133 }
134 }
135 other => other.to_string().hash(hasher),
136 }
137 }
138
139 pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
141 self.read_set = read_set;
142 self.last_result_hash = Some(result_hash);
143 self.execution_count += 1;
144 }
145
146 pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
148 if !change.invalidates(&self.read_set) {
149 return false;
150 }
151
152 if !change.invalidates_columns(self.selected_cols) {
154 return false;
155 }
156
157 true
158 }
159}
160
161pub struct Subscriber {
163 pub id: SubscriberId,
164 pub session_id: SessionId,
165 pub client_sub_id: String,
167 pub group_id: QueryGroupId,
169 pub subscription_id: SubscriptionId,
171}
172
173#[derive(Debug, Clone)]
175pub struct SubscriptionState<T> {
176 pub loading: bool,
178 pub data: Option<T>,
180 pub error: Option<String>,
182 pub stale: bool,
184}
185
186impl<T> Default for SubscriptionState<T> {
187 fn default() -> Self {
188 Self {
189 loading: true,
190 data: None,
191 error: None,
192 stale: false,
193 }
194 }
195}
196
197impl<T> SubscriptionState<T> {
198 pub fn loading() -> Self {
200 Self::default()
201 }
202
203 pub fn with_data(data: T) -> Self {
205 Self {
206 loading: false,
207 data: Some(data),
208 error: None,
209 stale: false,
210 }
211 }
212
213 pub fn with_error(error: impl Into<String>) -> Self {
215 Self {
216 loading: false,
217 data: None,
218 error: Some(error.into()),
219 stale: false,
220 }
221 }
222
223 pub fn mark_stale(&mut self) {
225 self.stale = true;
226 }
227
228 pub fn clear_stale(&mut self) {
230 self.stale = false;
231 }
232}
233
234#[derive(Debug, Clone)]
236pub struct SubscriptionInfo {
237 pub id: SubscriptionId,
239 pub session_id: SessionId,
241 pub query_name: String,
243 pub args: serde_json::Value,
245 pub query_hash: String,
247 pub read_set: ReadSet,
249 pub last_result_hash: Option<String>,
251 pub created_at: DateTime<Utc>,
253 pub last_executed_at: Option<DateTime<Utc>>,
255 pub execution_count: u64,
257 pub memory_bytes: usize,
259}
260
261impl SubscriptionInfo {
262 pub fn new(
264 session_id: SessionId,
265 query_name: impl Into<String>,
266 args: serde_json::Value,
267 ) -> Self {
268 let query_name = query_name.into();
269 let query_hash = compute_query_hash(&query_name, &args);
270
271 Self {
272 id: SubscriptionId::new(),
273 session_id,
274 query_name,
275 args,
276 query_hash,
277 read_set: ReadSet::new(),
278 last_result_hash: None,
279 created_at: Utc::now(),
280 last_executed_at: None,
281 execution_count: 0,
282 memory_bytes: 0,
283 }
284 }
285
286 pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
288 self.read_set = read_set;
289 self.memory_bytes = self.read_set.memory_bytes() + self.query_name.len() + 128;
290 self.last_result_hash = Some(result_hash);
291 self.last_executed_at = Some(Utc::now());
292 self.execution_count += 1;
293 }
294
295 pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
297 change.invalidates(&self.read_set)
298 }
299}
300
301fn compute_query_hash(query_name: &str, args: &serde_json::Value) -> String {
303 use std::collections::hash_map::DefaultHasher;
304 use std::hash::{Hash, Hasher};
305
306 let mut hasher = DefaultHasher::new();
307 query_name.hash(&mut hasher);
308 args.to_string().hash(&mut hasher);
309 format!("{:016x}", hasher.finish())
310}
311
312#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
314pub struct Delta<T> {
315 pub added: Vec<T>,
317 pub removed: Vec<String>,
319 pub updated: Vec<T>,
321}
322
323impl<T> Default for Delta<T> {
324 fn default() -> Self {
325 Self {
326 added: Vec::new(),
327 removed: Vec::new(),
328 updated: Vec::new(),
329 }
330 }
331}
332
333impl<T> Delta<T> {
334 pub fn empty() -> Self {
336 Self::default()
337 }
338
339 pub fn is_empty(&self) -> bool {
341 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
342 }
343
344 pub fn change_count(&self) -> usize {
346 self.added.len() + self.removed.len() + self.updated.len()
347 }
348}
349
350#[cfg(test)]
351#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn test_subscription_id_generation() {
357 let id1 = SubscriptionId::new();
358 let id2 = SubscriptionId::new();
359 assert_ne!(id1, id2);
360 }
361
362 #[test]
363 fn test_subscription_state_default() {
364 let state: SubscriptionState<String> = SubscriptionState::default();
365 assert!(state.loading);
366 assert!(state.data.is_none());
367 assert!(state.error.is_none());
368 assert!(!state.stale);
369 }
370
371 #[test]
372 fn test_subscription_state_with_data() {
373 let state = SubscriptionState::with_data(vec![1, 2, 3]);
374 assert!(!state.loading);
375 assert_eq!(state.data, Some(vec![1, 2, 3]));
376 assert!(state.error.is_none());
377 }
378
379 #[test]
380 fn test_subscription_info_creation() {
381 let session_id = SessionId::new();
382 let info = SubscriptionInfo::new(
383 session_id,
384 "get_projects",
385 serde_json::json!({"userId": "abc"}),
386 );
387
388 assert_eq!(info.query_name, "get_projects");
389 assert_eq!(info.execution_count, 0);
390 assert!(!info.query_hash.is_empty());
391 }
392
393 #[test]
394 fn test_query_hash_consistency() {
395 let hash1 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
396 let hash2 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
397 let hash3 = compute_query_hash("get_projects", &serde_json::json!({"userId": "xyz"}));
398
399 assert_eq!(hash1, hash2);
400 assert_ne!(hash1, hash3);
401 }
402
403 #[test]
404 fn test_query_group_lookup_key() {
405 let scope = AuthScope {
406 principal_id: Some("user-1".to_string()),
407 tenant_id: None,
408 };
409 let key1 = QueryGroup::compute_lookup_key(
410 "get_projects",
411 &serde_json::json!({"userId": "abc"}),
412 &scope,
413 );
414 let key2 = QueryGroup::compute_lookup_key(
415 "get_projects",
416 &serde_json::json!({"userId": "abc"}),
417 &scope,
418 );
419 assert_eq!(key1, key2);
420
421 let other_scope = AuthScope {
422 principal_id: Some("user-2".to_string()),
423 tenant_id: None,
424 };
425 let key3 = QueryGroup::compute_lookup_key(
426 "get_projects",
427 &serde_json::json!({"userId": "abc"}),
428 &other_scope,
429 );
430 assert_ne!(key1, key3);
431 }
432
433 #[test]
434 fn test_delta_empty() {
435 let delta: Delta<String> = Delta::empty();
436 assert!(delta.is_empty());
437 assert_eq!(delta.change_count(), 0);
438 }
439
440 #[test]
441 fn test_delta_with_changes() {
442 let delta = Delta {
443 added: vec!["a".to_string()],
444 removed: vec!["b".to_string()],
445 updated: vec!["c".to_string()],
446 };
447
448 assert!(!delta.is_empty());
449 assert_eq!(delta.change_count(), 3);
450 }
451}