Skip to main content

forge_core/realtime/
subscription.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9/// Unique subscription identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14    /// Generate a new random subscription ID.
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Create from an existing UUID.
20    pub fn from_uuid(id: Uuid) -> Self {
21        Self(id)
22    }
23
24    /// Get the inner UUID.
25    pub fn as_uuid(&self) -> Uuid {
26        self.0
27    }
28}
29
30impl Default for SubscriptionId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for SubscriptionId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Opaque dedup key for subscriptions sharing the same query+args+auth_scope.
43///
44/// Wraps a 64-bit FNV-1a hash (fixed seed, stable across processes) so the
45/// algorithm can be swapped later without changing any call sites.
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
47pub struct SubscriptionKey(pub u64);
48
49/// Compact identifier for query groups. u32 for cache-friendly storage.
50#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
51pub struct QueryGroupId(pub u32);
52
53impl std::fmt::Display for QueryGroupId {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "qg-{}", self.0)
56    }
57}
58
59/// Compact identifier for subscribers within a slab.
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
61pub struct SubscriberId(pub u32);
62
63impl std::fmt::Display for SubscriberId {
64    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
65        write!(f, "sub-{}", self.0)
66    }
67}
68
69/// Authentication scope for query group identity.
70/// Two subscriptions with the same query+args but different auth scopes
71/// must be in different groups (different users see different data).
72/// Includes a hash of the sorted roles so users with different role sets
73/// don't share a group.
74#[derive(Debug, Clone)]
75pub struct AuthScope {
76    pub principal_id: Option<String>,
77    pub tenant_id: Option<String>,
78    /// Hash of the sorted roles for this auth context.
79    pub role_hash: u64,
80}
81
82impl PartialEq for AuthScope {
83    fn eq(&self, other: &Self) -> bool {
84        self.principal_id == other.principal_id
85            && self.tenant_id == other.tenant_id
86            && self.role_hash == other.role_hash
87    }
88}
89
90impl Eq for AuthScope {}
91
92impl std::hash::Hash for AuthScope {
93    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
94        self.principal_id.hash(state);
95        self.tenant_id.hash(state);
96        self.role_hash.hash(state);
97    }
98}
99
100impl AuthScope {
101    pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
102        use std::hash::{Hash, Hasher};
103
104        let mut roles: Vec<&str> = auth.roles().iter().map(|s| s.as_str()).collect();
105        roles.sort_unstable();
106        let mut hasher = FnvHasher::new();
107        roles.hash(&mut hasher);
108        let role_hash = hasher.finish();
109
110        Self {
111            principal_id: auth.principal_id(),
112            tenant_id: auth
113                .claim("tenant_id")
114                .and_then(|v| v.as_str())
115                .map(ToString::to_string),
116            role_hash,
117        }
118    }
119}
120
121/// Coalesces subscriptions sharing the same query+args+auth_scope.
122/// On invalidation the query executes once per group, not per subscriber.
123pub struct QueryGroup {
124    pub id: QueryGroupId,
125    pub query_name: String,
126    pub args: Arc<serde_json::Value>,
127    pub auth_scope: AuthScope,
128    /// Cached at subscribe time; not refreshed mid-lifetime. The reactor skips
129    /// re-execution for groups with expired tokens; session cleanup evicts them.
130    pub auth_context: crate::function::AuthContext,
131    pub table_deps: &'static [&'static str],
132    pub selected_cols: &'static [&'static str],
133    pub read_set: ReadSet,
134    /// Shared across all subscribers for delta detection.
135    pub last_result_hash: Option<String>,
136    /// Sent to new subscribers joining an existing group.
137    pub last_result: Option<Arc<serde_json::Value>>,
138    pub subscribers: Vec<SubscriberId>,
139    pub created_at: DateTime<Utc>,
140    pub execution_count: u64,
141}
142
143/// `DefaultHasher` randomises its seed per process, so equal inputs hash to
144/// different values after a restart. FNV-1a's fixed basis avoids that.
145struct FnvHasher(u64);
146
147impl FnvHasher {
148    fn new() -> Self {
149        Self(14695981039346656037)
150    }
151}
152
153impl std::hash::Hasher for FnvHasher {
154    fn write(&mut self, bytes: &[u8]) {
155        for &b in bytes {
156            self.0 ^= b as u64;
157            self.0 = self.0.wrapping_mul(1099511628211);
158        }
159    }
160
161    fn finish(&self) -> u64 {
162        self.0
163    }
164}
165
166impl QueryGroup {
167    /// Compute the dedup key for (query_name, args, auth_scope).
168    ///
169    /// Uses FNV-1a so the result is stable across process restarts — unlike
170    /// `DefaultHasher` which randomises its seed per process.
171    pub fn compute_lookup_key(
172        query_name: &str,
173        args: &serde_json::Value,
174        auth_scope: &AuthScope,
175    ) -> SubscriptionKey {
176        use std::hash::{Hash, Hasher};
177
178        let mut hasher = FnvHasher::new();
179        query_name.hash(&mut hasher);
180        Self::hash_json_canonical(args, &mut hasher);
181        auth_scope.hash(&mut hasher);
182        SubscriptionKey(hasher.finish())
183    }
184
185    fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
186        use std::hash::Hash;
187        match value {
188            serde_json::Value::Object(map) => {
189                let mut keys: Vec<&String> = map.keys().collect();
190                keys.sort();
191                for key in keys {
192                    key.hash(hasher);
193                    Self::hash_json_canonical(&map[key], hasher);
194                }
195            }
196            other => other.to_string().hash(hasher),
197        }
198    }
199
200    /// Record a query execution result.
201    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
202        self.read_set = read_set;
203        self.last_result_hash = Some(result_hash);
204        self.execution_count += 1;
205    }
206
207    /// Record execution with the result data cached for new subscribers.
208    pub fn record_execution_with_data(
209        &mut self,
210        read_set: ReadSet,
211        result_hash: String,
212        data: Arc<serde_json::Value>,
213    ) {
214        self.read_set = read_set;
215        self.last_result_hash = Some(result_hash);
216        self.last_result = Some(data);
217        self.execution_count += 1;
218    }
219
220    /// Check if a change should invalidate this group.
221    /// Uses the runtime read set when populated, otherwise falls back to the
222    /// compile-time table dependencies from macro extraction.
223    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
224        let table_matches = if self.read_set.tables.is_empty() {
225            self.table_deps.iter().any(|t| *t == change.table)
226        } else {
227            change.invalidates(&self.read_set)
228        };
229
230        if !table_matches {
231            return false;
232        }
233
234        if !change.invalidates_columns(self.selected_cols) {
235            return false;
236        }
237
238        true
239    }
240}
241
242/// Lightweight subscriber within a query group. ~48 bytes.
243pub struct Subscriber {
244    pub id: SubscriberId,
245    pub session_id: SessionId,
246    /// Client-facing subscription ID (what the client uses in subscribe/unsubscribe calls).
247    pub client_sub_id: String,
248    /// Back-reference to the query group this subscriber belongs to.
249    pub group_id: QueryGroupId,
250    /// Subscription ID used by the session server for tracking.
251    pub subscription_id: SubscriptionId,
252}
253
254/// Subscription state from the client's perspective.
255#[derive(Debug, Clone)]
256pub struct SubscriptionState<T> {
257    /// Whether the initial load is in progress.
258    pub loading: bool,
259    /// Current data.
260    pub data: Option<T>,
261    /// Error if any.
262    pub error: Option<String>,
263    /// Whether data may be stale (reconnecting).
264    pub stale: bool,
265}
266
267impl<T> Default for SubscriptionState<T> {
268    fn default() -> Self {
269        Self {
270            loading: true,
271            data: None,
272            error: None,
273            stale: false,
274        }
275    }
276}
277
278impl<T> SubscriptionState<T> {
279    /// Create a loading state.
280    pub fn loading() -> Self {
281        Self::default()
282    }
283
284    /// Create a state with data.
285    pub fn with_data(data: T) -> Self {
286        Self {
287            loading: false,
288            data: Some(data),
289            error: None,
290            stale: false,
291        }
292    }
293
294    /// Create an error state.
295    pub fn with_error(error: impl Into<String>) -> Self {
296        Self {
297            loading: false,
298            data: None,
299            error: Some(error.into()),
300            stale: false,
301        }
302    }
303
304    /// Mark as stale.
305    pub fn mark_stale(&mut self) {
306        self.stale = true;
307    }
308
309    /// Clear stale flag.
310    pub fn clear_stale(&mut self) {
311        self.stale = false;
312    }
313}
314
315/// Delta format for subscription updates.
316#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
317pub struct Delta<T> {
318    /// New items added.
319    pub added: Vec<T>,
320    /// IDs of removed items.
321    pub removed: Vec<String>,
322    /// Updated items (partial).
323    pub updated: Vec<T>,
324}
325
326impl<T> Default for Delta<T> {
327    fn default() -> Self {
328        Self {
329            added: Vec::new(),
330            removed: Vec::new(),
331            updated: Vec::new(),
332        }
333    }
334}
335
336impl<T> Delta<T> {
337    /// Create an empty delta.
338    pub fn empty() -> Self {
339        Self::default()
340    }
341
342    /// Check if the delta is empty (no changes).
343    pub fn is_empty(&self) -> bool {
344        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
345    }
346
347    /// Total number of changes.
348    pub fn change_count(&self) -> usize {
349        self.added.len() + self.removed.len() + self.updated.len()
350    }
351}
352
353#[cfg(test)]
354#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
355mod tests {
356    use super::*;
357
358    #[test]
359    fn test_subscription_id_generation() {
360        let id1 = SubscriptionId::new();
361        let id2 = SubscriptionId::new();
362        assert_ne!(id1, id2);
363    }
364
365    #[test]
366    fn test_subscription_state_default() {
367        let state: SubscriptionState<String> = SubscriptionState::default();
368        assert!(state.loading);
369        assert!(state.data.is_none());
370        assert!(state.error.is_none());
371        assert!(!state.stale);
372    }
373
374    #[test]
375    fn test_subscription_state_with_data() {
376        let state = SubscriptionState::with_data(vec![1, 2, 3]);
377        assert!(!state.loading);
378        assert_eq!(state.data, Some(vec![1, 2, 3]));
379        assert!(state.error.is_none());
380    }
381
382    #[test]
383    fn lookup_key_stable_across_calls() {
384        let scope = AuthScope {
385            principal_id: Some("user-1".to_string()),
386            tenant_id: None,
387            role_hash: 0,
388        };
389        let key1 = QueryGroup::compute_lookup_key(
390            "get_projects",
391            &serde_json::json!({"userId": "abc"}),
392            &scope,
393        );
394        let key2 = QueryGroup::compute_lookup_key(
395            "get_projects",
396            &serde_json::json!({"userId": "abc"}),
397            &scope,
398        );
399        assert_eq!(key1, key2);
400
401        let other_scope = AuthScope {
402            principal_id: Some("user-2".to_string()),
403            tenant_id: None,
404            role_hash: 0,
405        };
406        let key3 = QueryGroup::compute_lookup_key(
407            "get_projects",
408            &serde_json::json!({"userId": "abc"}),
409            &other_scope,
410        );
411        assert_ne!(key1, key3);
412    }
413
414    #[test]
415    fn lookup_key_is_deterministic() {
416        let scope = AuthScope {
417            principal_id: Some("u1".to_string()),
418            tenant_id: None,
419            role_hash: 0,
420        };
421        let key =
422            QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
423        let expected =
424            QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
425        assert_eq!(key, expected);
426        // FNV-1a with a non-empty input can't produce 0 (sanity: hashing actually ran).
427        assert_ne!(key.0, 0);
428    }
429
430    #[test]
431    fn lookup_key_canonical_json_order_invariant() {
432        let scope = AuthScope {
433            principal_id: None,
434            tenant_id: None,
435            role_hash: 0,
436        };
437        let key_ab =
438            QueryGroup::compute_lookup_key("q", &serde_json::json!({"a": 1, "b": 2}), &scope);
439        let key_ba =
440            QueryGroup::compute_lookup_key("q", &serde_json::json!({"b": 2, "a": 1}), &scope);
441        assert_eq!(key_ab, key_ba);
442    }
443
444    #[test]
445    fn test_delta_empty() {
446        let delta: Delta<String> = Delta::empty();
447        assert!(delta.is_empty());
448        assert_eq!(delta.change_count(), 0);
449    }
450
451    #[test]
452    fn test_delta_with_changes() {
453        let delta = Delta {
454            added: vec!["a".to_string()],
455            removed: vec!["b".to_string()],
456            updated: vec!["c".to_string()],
457        };
458
459        assert!(!delta.is_empty());
460        assert_eq!(delta.change_count(), 3);
461    }
462}