1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14 pub fn new() -> Self {
16 Self(Uuid::new_v4())
17 }
18
19 pub fn from_uuid(id: Uuid) -> Self {
21 Self(id)
22 }
23
24 pub fn as_uuid(&self) -> Uuid {
26 self.0
27 }
28}
29
30impl Default for SubscriptionId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl std::fmt::Display for SubscriptionId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct SubscriptionKey(pub u64);
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51pub struct QueryGroupId(pub u32);
52
53impl std::fmt::Display for QueryGroupId {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 write!(f, "qg-{}", self.0)
56 }
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61pub struct SubscriberId(pub u32);
62
63impl std::fmt::Display for SubscriberId {
64 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65 write!(f, "sub-{}", self.0)
66 }
67}
68
69#[derive(Debug, Clone)]
75pub struct AuthScope {
76 pub principal_id: Option<String>,
77 pub tenant_id: Option<String>,
78 pub role_hash: u64,
80}
81
82impl PartialEq for AuthScope {
83 fn eq(&self, other: &Self) -> bool {
84 self.principal_id == other.principal_id
85 && self.tenant_id == other.tenant_id
86 && self.role_hash == other.role_hash
87 }
88}
89
90impl Eq for AuthScope {}
91
92impl std::hash::Hash for AuthScope {
93 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
94 self.principal_id.hash(state);
95 self.tenant_id.hash(state);
96 self.role_hash.hash(state);
97 }
98}
99
100impl AuthScope {
101 pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
102 use std::hash::{Hash, Hasher};
103
104 let mut roles: Vec<&str> = auth.roles().iter().map(|s| s.as_str()).collect();
105 roles.sort_unstable();
106 let mut hasher = FnvHasher::new();
107 roles.hash(&mut hasher);
108 let role_hash = hasher.finish();
109
110 Self {
111 principal_id: auth.principal_id(),
112 tenant_id: auth
113 .claim("tenant_id")
114 .and_then(|v| v.as_str())
115 .map(ToString::to_string),
116 role_hash,
117 }
118 }
119}
120
121pub struct QueryGroup {
124 pub id: QueryGroupId,
125 pub query_name: String,
126 pub args: Arc<serde_json::Value>,
127 pub auth_scope: AuthScope,
128 pub auth_context: crate::function::AuthContext,
131 pub table_deps: &'static [&'static str],
132 pub selected_cols: &'static [&'static str],
133 pub read_set: ReadSet,
134 pub last_result_hash: Option<String>,
136 pub last_result: Option<Arc<serde_json::Value>>,
138 pub subscribers: Vec<SubscriberId>,
139 pub created_at: DateTime<Utc>,
140 pub execution_count: u64,
141}
142
143struct FnvHasher(u64);
146
147impl FnvHasher {
148 fn new() -> Self {
149 Self(14695981039346656037)
150 }
151}
152
153impl std::hash::Hasher for FnvHasher {
154 fn write(&mut self, bytes: &[u8]) {
155 for &b in bytes {
156 self.0 ^= b as u64;
157 self.0 = self.0.wrapping_mul(1099511628211);
158 }
159 }
160
161 fn finish(&self) -> u64 {
162 self.0
163 }
164}
165
166impl QueryGroup {
167 pub fn compute_lookup_key(
172 query_name: &str,
173 args: &serde_json::Value,
174 auth_scope: &AuthScope,
175 ) -> SubscriptionKey {
176 use std::hash::{Hash, Hasher};
177
178 let mut hasher = FnvHasher::new();
179 query_name.hash(&mut hasher);
180 Self::hash_json_canonical(args, &mut hasher);
181 auth_scope.hash(&mut hasher);
182 SubscriptionKey(hasher.finish())
183 }
184
185 fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
186 use std::hash::Hash;
187 match value {
188 serde_json::Value::Object(map) => {
189 let mut keys: Vec<&String> = map.keys().collect();
190 keys.sort();
191 for key in keys {
192 key.hash(hasher);
193 Self::hash_json_canonical(&map[key], hasher);
194 }
195 }
196 other => other.to_string().hash(hasher),
197 }
198 }
199
200 pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
202 self.read_set = read_set;
203 self.last_result_hash = Some(result_hash);
204 self.execution_count += 1;
205 }
206
207 pub fn record_execution_with_data(
209 &mut self,
210 read_set: ReadSet,
211 result_hash: String,
212 data: Arc<serde_json::Value>,
213 ) {
214 self.read_set = read_set;
215 self.last_result_hash = Some(result_hash);
216 self.last_result = Some(data);
217 self.execution_count += 1;
218 }
219
220 pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
224 let table_matches = if self.read_set.tables.is_empty() {
225 self.table_deps.iter().any(|t| *t == change.table)
226 } else {
227 change.invalidates(&self.read_set)
228 };
229
230 if !table_matches {
231 return false;
232 }
233
234 if !change.invalidates_columns(self.selected_cols) {
235 return false;
236 }
237
238 true
239 }
240}
241
242pub struct Subscriber {
244 pub id: SubscriberId,
245 pub session_id: SessionId,
246 pub client_sub_id: String,
248 pub group_id: QueryGroupId,
250 pub subscription_id: SubscriptionId,
252}
253
254#[derive(Debug, Clone)]
256pub struct SubscriptionState<T> {
257 pub loading: bool,
259 pub data: Option<T>,
261 pub error: Option<String>,
263 pub stale: bool,
265}
266
267impl<T> Default for SubscriptionState<T> {
268 fn default() -> Self {
269 Self {
270 loading: true,
271 data: None,
272 error: None,
273 stale: false,
274 }
275 }
276}
277
278impl<T> SubscriptionState<T> {
279 pub fn loading() -> Self {
281 Self::default()
282 }
283
284 pub fn with_data(data: T) -> Self {
286 Self {
287 loading: false,
288 data: Some(data),
289 error: None,
290 stale: false,
291 }
292 }
293
294 pub fn with_error(error: impl Into<String>) -> Self {
296 Self {
297 loading: false,
298 data: None,
299 error: Some(error.into()),
300 stale: false,
301 }
302 }
303
304 pub fn mark_stale(&mut self) {
306 self.stale = true;
307 }
308
309 pub fn clear_stale(&mut self) {
311 self.stale = false;
312 }
313}
314
315#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
317pub struct Delta<T> {
318 pub added: Vec<T>,
320 pub removed: Vec<String>,
322 pub updated: Vec<T>,
324}
325
326impl<T> Default for Delta<T> {
327 fn default() -> Self {
328 Self {
329 added: Vec::new(),
330 removed: Vec::new(),
331 updated: Vec::new(),
332 }
333 }
334}
335
336impl<T> Delta<T> {
337 pub fn empty() -> Self {
339 Self::default()
340 }
341
342 pub fn is_empty(&self) -> bool {
344 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
345 }
346
347 pub fn change_count(&self) -> usize {
349 self.added.len() + self.removed.len() + self.updated.len()
350 }
351}
352
353#[cfg(test)]
354#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
355mod tests {
356 use super::*;
357
358 #[test]
359 fn test_subscription_id_generation() {
360 let id1 = SubscriptionId::new();
361 let id2 = SubscriptionId::new();
362 assert_ne!(id1, id2);
363 }
364
365 #[test]
366 fn test_subscription_state_default() {
367 let state: SubscriptionState<String> = SubscriptionState::default();
368 assert!(state.loading);
369 assert!(state.data.is_none());
370 assert!(state.error.is_none());
371 assert!(!state.stale);
372 }
373
374 #[test]
375 fn test_subscription_state_with_data() {
376 let state = SubscriptionState::with_data(vec![1, 2, 3]);
377 assert!(!state.loading);
378 assert_eq!(state.data, Some(vec![1, 2, 3]));
379 assert!(state.error.is_none());
380 }
381
382 #[test]
383 fn lookup_key_stable_across_calls() {
384 let scope = AuthScope {
385 principal_id: Some("user-1".to_string()),
386 tenant_id: None,
387 role_hash: 0,
388 };
389 let key1 = QueryGroup::compute_lookup_key(
390 "get_projects",
391 &serde_json::json!({"userId": "abc"}),
392 &scope,
393 );
394 let key2 = QueryGroup::compute_lookup_key(
395 "get_projects",
396 &serde_json::json!({"userId": "abc"}),
397 &scope,
398 );
399 assert_eq!(key1, key2);
400
401 let other_scope = AuthScope {
402 principal_id: Some("user-2".to_string()),
403 tenant_id: None,
404 role_hash: 0,
405 };
406 let key3 = QueryGroup::compute_lookup_key(
407 "get_projects",
408 &serde_json::json!({"userId": "abc"}),
409 &other_scope,
410 );
411 assert_ne!(key1, key3);
412 }
413
414 #[test]
415 fn lookup_key_is_deterministic() {
416 let scope = AuthScope {
417 principal_id: Some("u1".to_string()),
418 tenant_id: None,
419 role_hash: 0,
420 };
421 let key =
422 QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
423 let expected =
424 QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
425 assert_eq!(key, expected);
426 assert_ne!(key.0, 0);
428 }
429
430 #[test]
431 fn lookup_key_canonical_json_order_invariant() {
432 let scope = AuthScope {
433 principal_id: None,
434 tenant_id: None,
435 role_hash: 0,
436 };
437 let key_ab =
438 QueryGroup::compute_lookup_key("q", &serde_json::json!({"a": 1, "b": 2}), &scope);
439 let key_ba =
440 QueryGroup::compute_lookup_key("q", &serde_json::json!({"b": 2, "a": 1}), &scope);
441 assert_eq!(key_ab, key_ba);
442 }
443
444 #[test]
445 fn test_delta_empty() {
446 let delta: Delta<String> = Delta::empty();
447 assert!(delta.is_empty());
448 assert_eq!(delta.change_count(), 0);
449 }
450
451 #[test]
452 fn test_delta_with_changes() {
453 let delta = Delta {
454 added: vec!["a".to_string()],
455 removed: vec!["b".to_string()],
456 updated: vec!["c".to_string()],
457 };
458
459 assert!(!delta.is_empty());
460 assert_eq!(delta.change_count(), 3);
461 }
462}