1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14 pub fn new() -> Self {
16 Self(Uuid::new_v4())
17 }
18
19 pub fn from_uuid(id: Uuid) -> Self {
21 Self(id)
22 }
23
24 pub fn as_uuid(&self) -> Uuid {
26 self.0
27 }
28}
29
30impl Default for SubscriptionId {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl std::fmt::Display for SubscriptionId {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 write!(f, "{}", self.0)
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct QueryGroupId(pub u32);
45
46impl std::fmt::Display for QueryGroupId {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 write!(f, "qg-{}", self.0)
49 }
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub struct SubscriberId(pub u32);
55
56impl std::fmt::Display for SubscriberId {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 write!(f, "sub-{}", self.0)
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct AuthScope {
67 pub principal_id: Option<String>,
68 pub tenant_id: Option<String>,
69}
70
71impl AuthScope {
72 pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
73 Self {
74 principal_id: auth.principal_id(),
75 tenant_id: auth
76 .claim("tenant_id")
77 .and_then(|v| v.as_str())
78 .map(ToString::to_string),
79 }
80 }
81}
82
83pub struct QueryGroup {
87 pub id: QueryGroupId,
88 pub query_name: String,
89 pub args: Arc<serde_json::Value>,
90 pub auth_scope: AuthScope,
91 pub auth_context: crate::function::AuthContext,
92 pub table_deps: &'static [&'static str],
94 pub selected_cols: &'static [&'static str],
96 pub read_set: ReadSet,
97 pub last_result_hash: Option<String>,
99 pub subscribers: Vec<SubscriberId>,
101 pub created_at: DateTime<Utc>,
102 pub execution_count: u64,
103}
104
105impl QueryGroup {
106 pub fn compute_lookup_key(
108 query_name: &str,
109 args: &serde_json::Value,
110 auth_scope: &AuthScope,
111 ) -> u64 {
112 use std::collections::hash_map::DefaultHasher;
113 use std::hash::{Hash, Hasher};
114
115 let mut hasher = DefaultHasher::new();
116 query_name.hash(&mut hasher);
117 Self::hash_json_canonical(args, &mut hasher);
120 auth_scope.hash(&mut hasher);
121 hasher.finish()
122 }
123
124 fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
125 use std::hash::Hash;
126 match value {
127 serde_json::Value::Object(map) => {
128 let mut keys: Vec<&String> = map.keys().collect();
129 keys.sort();
130 for key in keys {
131 key.hash(hasher);
132 Self::hash_json_canonical(&map[key], hasher);
133 }
134 }
135 other => other.to_string().hash(hasher),
136 }
137 }
138
139 pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
141 self.read_set = read_set;
142 self.last_result_hash = Some(result_hash);
143 self.execution_count += 1;
144 }
145
146 pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
148 if !change.invalidates(&self.read_set) {
149 return false;
150 }
151
152 if !change.invalidates_columns(self.selected_cols) {
154 return false;
155 }
156
157 true
158 }
159}
160
161pub struct Subscriber {
163 pub id: SubscriberId,
164 pub session_id: SessionId,
165 pub client_sub_id: String,
167 pub group_id: QueryGroupId,
169 pub subscription_id: SubscriptionId,
171}
172
173#[derive(Debug, Clone)]
175pub struct SubscriptionState<T> {
176 pub loading: bool,
178 pub data: Option<T>,
180 pub error: Option<String>,
182 pub stale: bool,
184}
185
186impl<T> Default for SubscriptionState<T> {
187 fn default() -> Self {
188 Self {
189 loading: true,
190 data: None,
191 error: None,
192 stale: false,
193 }
194 }
195}
196
197impl<T> SubscriptionState<T> {
198 pub fn loading() -> Self {
200 Self::default()
201 }
202
203 pub fn with_data(data: T) -> Self {
205 Self {
206 loading: false,
207 data: Some(data),
208 error: None,
209 stale: false,
210 }
211 }
212
213 pub fn with_error(error: impl Into<String>) -> Self {
215 Self {
216 loading: false,
217 data: None,
218 error: Some(error.into()),
219 stale: false,
220 }
221 }
222
223 pub fn mark_stale(&mut self) {
225 self.stale = true;
226 }
227
228 pub fn clear_stale(&mut self) {
230 self.stale = false;
231 }
232}
233
234#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
236pub struct Delta<T> {
237 pub added: Vec<T>,
239 pub removed: Vec<String>,
241 pub updated: Vec<T>,
243}
244
245impl<T> Default for Delta<T> {
246 fn default() -> Self {
247 Self {
248 added: Vec::new(),
249 removed: Vec::new(),
250 updated: Vec::new(),
251 }
252 }
253}
254
255impl<T> Delta<T> {
256 pub fn empty() -> Self {
258 Self::default()
259 }
260
261 pub fn is_empty(&self) -> bool {
263 self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
264 }
265
266 pub fn change_count(&self) -> usize {
268 self.added.len() + self.removed.len() + self.updated.len()
269 }
270}
271
272#[cfg(test)]
273#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
274mod tests {
275 use super::*;
276
277 #[test]
278 fn test_subscription_id_generation() {
279 let id1 = SubscriptionId::new();
280 let id2 = SubscriptionId::new();
281 assert_ne!(id1, id2);
282 }
283
284 #[test]
285 fn test_subscription_state_default() {
286 let state: SubscriptionState<String> = SubscriptionState::default();
287 assert!(state.loading);
288 assert!(state.data.is_none());
289 assert!(state.error.is_none());
290 assert!(!state.stale);
291 }
292
293 #[test]
294 fn test_subscription_state_with_data() {
295 let state = SubscriptionState::with_data(vec![1, 2, 3]);
296 assert!(!state.loading);
297 assert_eq!(state.data, Some(vec![1, 2, 3]));
298 assert!(state.error.is_none());
299 }
300
301 #[test]
302 fn test_query_group_lookup_key() {
303 let scope = AuthScope {
304 principal_id: Some("user-1".to_string()),
305 tenant_id: None,
306 };
307 let key1 = QueryGroup::compute_lookup_key(
308 "get_projects",
309 &serde_json::json!({"userId": "abc"}),
310 &scope,
311 );
312 let key2 = QueryGroup::compute_lookup_key(
313 "get_projects",
314 &serde_json::json!({"userId": "abc"}),
315 &scope,
316 );
317 assert_eq!(key1, key2);
318
319 let other_scope = AuthScope {
320 principal_id: Some("user-2".to_string()),
321 tenant_id: None,
322 };
323 let key3 = QueryGroup::compute_lookup_key(
324 "get_projects",
325 &serde_json::json!({"userId": "abc"}),
326 &other_scope,
327 );
328 assert_ne!(key1, key3);
329 }
330
331 #[test]
332 fn test_delta_empty() {
333 let delta: Delta<String> = Delta::empty();
334 assert!(delta.is_empty());
335 assert_eq!(delta.change_count(), 0);
336 }
337
338 #[test]
339 fn test_delta_with_changes() {
340 let delta = Delta {
341 added: vec!["a".to_string()],
342 removed: vec!["b".to_string()],
343 updated: vec!["c".to_string()],
344 };
345
346 assert!(!delta.is_empty());
347 assert_eq!(delta.change_count(), 3);
348 }
349}