Skip to main content

forge_core/realtime/
subscription.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9/// Unique subscription identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14    /// Generate a new random subscription ID.
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Create from an existing UUID.
20    pub fn from_uuid(id: Uuid) -> Self {
21        Self(id)
22    }
23
24    /// Get the inner UUID.
25    pub fn as_uuid(&self) -> Uuid {
26        self.0
27    }
28}
29
30impl Default for SubscriptionId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for SubscriptionId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Compact identifier for query groups. u32 for cache-friendly storage.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct QueryGroupId(pub u32);
45
46impl std::fmt::Display for QueryGroupId {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "qg-{}", self.0)
49    }
50}
51
52/// Compact identifier for subscribers within a slab.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub struct SubscriberId(pub u32);
55
56impl std::fmt::Display for SubscriberId {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        write!(f, "sub-{}", self.0)
59    }
60}
61
62/// Authentication scope for query group identity.
63/// Two subscriptions with the same query+args but different auth scopes
64/// must be in different groups (different users see different data).
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct AuthScope {
67    pub principal_id: Option<String>,
68    pub tenant_id: Option<String>,
69}
70
71impl AuthScope {
72    pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
73        Self {
74            principal_id: auth.principal_id(),
75            tenant_id: auth
76                .claim("tenant_id")
77                .and_then(|v| v.as_str())
78                .map(ToString::to_string),
79        }
80    }
81}
82
83/// A query group is the primary unit of execution.
84/// Multiple subscribers watching the same query+args+auth_scope share a single group.
85/// On invalidation, the query executes once per group (not per subscriber).
86pub struct QueryGroup {
87    pub id: QueryGroupId,
88    pub query_name: String,
89    pub args: Arc<serde_json::Value>,
90    pub auth_scope: AuthScope,
91    pub auth_context: crate::function::AuthContext,
92    /// Compile-time table dependencies from FunctionInfo.
93    pub table_deps: &'static [&'static str],
94    /// Compile-time selected columns from FunctionInfo.
95    pub selected_cols: &'static [&'static str],
96    pub read_set: ReadSet,
97    /// Result hash for delta detection. Shared across all subscribers.
98    pub last_result_hash: Option<String>,
99    /// All subscribers in this group.
100    pub subscribers: Vec<SubscriberId>,
101    pub created_at: DateTime<Utc>,
102    pub execution_count: u64,
103}
104
105impl QueryGroup {
106    /// Compute the lookup key for dedup: hash of (query_name, args, auth_scope).
107    pub fn compute_lookup_key(
108        query_name: &str,
109        args: &serde_json::Value,
110        auth_scope: &AuthScope,
111    ) -> u64 {
112        use std::collections::hash_map::DefaultHasher;
113        use std::hash::{Hash, Hasher};
114
115        let mut hasher = DefaultHasher::new();
116        query_name.hash(&mut hasher);
117        // Canonical hashing: sort object keys so {"a":1,"b":2} and {"b":2,"a":1}
118        // produce the same hash.
119        Self::hash_json_canonical(args, &mut hasher);
120        auth_scope.hash(&mut hasher);
121        hasher.finish()
122    }
123
124    fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
125        use std::hash::Hash;
126        match value {
127            serde_json::Value::Object(map) => {
128                let mut keys: Vec<&String> = map.keys().collect();
129                keys.sort();
130                for key in keys {
131                    key.hash(hasher);
132                    Self::hash_json_canonical(&map[key], hasher);
133                }
134            }
135            other => other.to_string().hash(hasher),
136        }
137    }
138
139    /// Record a query execution result.
140    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
141        self.read_set = read_set;
142        self.last_result_hash = Some(result_hash);
143        self.execution_count += 1;
144    }
145
146    /// Check if a change should invalidate this group.
147    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
148        if !change.invalidates(&self.read_set) {
149            return false;
150        }
151
152        // Column-level filtering: skip if changed columns don't overlap with selected
153        if !change.invalidates_columns(self.selected_cols) {
154            return false;
155        }
156
157        true
158    }
159}
160
161/// Lightweight subscriber within a query group. ~48 bytes.
162pub struct Subscriber {
163    pub id: SubscriberId,
164    pub session_id: SessionId,
165    /// Client-facing subscription ID (what the client uses in subscribe/unsubscribe calls).
166    pub client_sub_id: String,
167    /// Back-reference to the query group this subscriber belongs to.
168    pub group_id: QueryGroupId,
169    /// Legacy subscription ID for compatibility with the session server.
170    pub subscription_id: SubscriptionId,
171}
172
173/// Subscription state from the client's perspective.
174#[derive(Debug, Clone)]
175pub struct SubscriptionState<T> {
176    /// Whether the initial load is in progress.
177    pub loading: bool,
178    /// Current data.
179    pub data: Option<T>,
180    /// Error if any.
181    pub error: Option<String>,
182    /// Whether data may be stale (reconnecting).
183    pub stale: bool,
184}
185
186impl<T> Default for SubscriptionState<T> {
187    fn default() -> Self {
188        Self {
189            loading: true,
190            data: None,
191            error: None,
192            stale: false,
193        }
194    }
195}
196
197impl<T> SubscriptionState<T> {
198    /// Create a loading state.
199    pub fn loading() -> Self {
200        Self::default()
201    }
202
203    /// Create a state with data.
204    pub fn with_data(data: T) -> Self {
205        Self {
206            loading: false,
207            data: Some(data),
208            error: None,
209            stale: false,
210        }
211    }
212
213    /// Create an error state.
214    pub fn with_error(error: impl Into<String>) -> Self {
215        Self {
216            loading: false,
217            data: None,
218            error: Some(error.into()),
219            stale: false,
220        }
221    }
222
223    /// Mark as stale.
224    pub fn mark_stale(&mut self) {
225        self.stale = true;
226    }
227
228    /// Clear stale flag.
229    pub fn clear_stale(&mut self) {
230        self.stale = false;
231    }
232}
233
234/// Information about a server-side subscription (kept for backward compat).
235#[derive(Debug, Clone)]
236pub struct SubscriptionInfo {
237    /// Unique subscription ID.
238    pub id: SubscriptionId,
239    /// Session that owns this subscription.
240    pub session_id: SessionId,
241    /// Query function name.
242    pub query_name: String,
243    /// Query arguments (as JSON).
244    pub args: serde_json::Value,
245    /// Hash of query + args for deduplication.
246    pub query_hash: String,
247    /// Read set from last execution.
248    pub read_set: ReadSet,
249    /// Hash of last result for delta computation.
250    pub last_result_hash: Option<String>,
251    /// When the subscription was created.
252    pub created_at: DateTime<Utc>,
253    /// When the subscription was last executed.
254    pub last_executed_at: Option<DateTime<Utc>>,
255    /// Number of times the subscription has been re-executed.
256    pub execution_count: u64,
257    /// Estimated memory usage in bytes.
258    pub memory_bytes: usize,
259}
260
261impl SubscriptionInfo {
262    /// Create a new subscription info.
263    pub fn new(
264        session_id: SessionId,
265        query_name: impl Into<String>,
266        args: serde_json::Value,
267    ) -> Self {
268        let query_name = query_name.into();
269        let query_hash = compute_query_hash(&query_name, &args);
270
271        Self {
272            id: SubscriptionId::new(),
273            session_id,
274            query_name,
275            args,
276            query_hash,
277            read_set: ReadSet::new(),
278            last_result_hash: None,
279            created_at: Utc::now(),
280            last_executed_at: None,
281            execution_count: 0,
282            memory_bytes: 0,
283        }
284    }
285
286    /// Update after execution.
287    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
288        self.read_set = read_set;
289        self.memory_bytes = self.read_set.memory_bytes() + self.query_name.len() + 128;
290        self.last_result_hash = Some(result_hash);
291        self.last_executed_at = Some(Utc::now());
292        self.execution_count += 1;
293    }
294
295    /// Check if a change should invalidate this subscription.
296    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
297        change.invalidates(&self.read_set)
298    }
299}
300
301/// Compute a hash of query name + args for deduplication.
302fn compute_query_hash(query_name: &str, args: &serde_json::Value) -> String {
303    use std::collections::hash_map::DefaultHasher;
304    use std::hash::{Hash, Hasher};
305
306    let mut hasher = DefaultHasher::new();
307    query_name.hash(&mut hasher);
308    args.to_string().hash(&mut hasher);
309    format!("{:016x}", hasher.finish())
310}
311
312/// Delta format for subscription updates.
313#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
314pub struct Delta<T> {
315    /// New items added.
316    pub added: Vec<T>,
317    /// IDs of removed items.
318    pub removed: Vec<String>,
319    /// Updated items (partial).
320    pub updated: Vec<T>,
321}
322
323impl<T> Default for Delta<T> {
324    fn default() -> Self {
325        Self {
326            added: Vec::new(),
327            removed: Vec::new(),
328            updated: Vec::new(),
329        }
330    }
331}
332
333impl<T> Delta<T> {
334    /// Create an empty delta.
335    pub fn empty() -> Self {
336        Self::default()
337    }
338
339    /// Check if the delta is empty (no changes).
340    pub fn is_empty(&self) -> bool {
341        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
342    }
343
344    /// Total number of changes.
345    pub fn change_count(&self) -> usize {
346        self.added.len() + self.removed.len() + self.updated.len()
347    }
348}
349
350#[cfg(test)]
351#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn test_subscription_id_generation() {
357        let id1 = SubscriptionId::new();
358        let id2 = SubscriptionId::new();
359        assert_ne!(id1, id2);
360    }
361
362    #[test]
363    fn test_subscription_state_default() {
364        let state: SubscriptionState<String> = SubscriptionState::default();
365        assert!(state.loading);
366        assert!(state.data.is_none());
367        assert!(state.error.is_none());
368        assert!(!state.stale);
369    }
370
371    #[test]
372    fn test_subscription_state_with_data() {
373        let state = SubscriptionState::with_data(vec![1, 2, 3]);
374        assert!(!state.loading);
375        assert_eq!(state.data, Some(vec![1, 2, 3]));
376        assert!(state.error.is_none());
377    }
378
379    #[test]
380    fn test_subscription_info_creation() {
381        let session_id = SessionId::new();
382        let info = SubscriptionInfo::new(
383            session_id,
384            "get_projects",
385            serde_json::json!({"userId": "abc"}),
386        );
387
388        assert_eq!(info.query_name, "get_projects");
389        assert_eq!(info.execution_count, 0);
390        assert!(!info.query_hash.is_empty());
391    }
392
393    #[test]
394    fn test_query_hash_consistency() {
395        let hash1 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
396        let hash2 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
397        let hash3 = compute_query_hash("get_projects", &serde_json::json!({"userId": "xyz"}));
398
399        assert_eq!(hash1, hash2);
400        assert_ne!(hash1, hash3);
401    }
402
403    #[test]
404    fn test_query_group_lookup_key() {
405        let scope = AuthScope {
406            principal_id: Some("user-1".to_string()),
407            tenant_id: None,
408        };
409        let key1 = QueryGroup::compute_lookup_key(
410            "get_projects",
411            &serde_json::json!({"userId": "abc"}),
412            &scope,
413        );
414        let key2 = QueryGroup::compute_lookup_key(
415            "get_projects",
416            &serde_json::json!({"userId": "abc"}),
417            &scope,
418        );
419        assert_eq!(key1, key2);
420
421        let other_scope = AuthScope {
422            principal_id: Some("user-2".to_string()),
423            tenant_id: None,
424        };
425        let key3 = QueryGroup::compute_lookup_key(
426            "get_projects",
427            &serde_json::json!({"userId": "abc"}),
428            &other_scope,
429        );
430        assert_ne!(key1, key3);
431    }
432
433    #[test]
434    fn test_delta_empty() {
435        let delta: Delta<String> = Delta::empty();
436        assert!(delta.is_empty());
437        assert_eq!(delta.change_count(), 0);
438    }
439
440    #[test]
441    fn test_delta_with_changes() {
442        let delta = Delta {
443            added: vec!["a".to_string()],
444            removed: vec!["b".to_string()],
445            updated: vec!["c".to_string()],
446        };
447
448        assert!(!delta.is_empty());
449        assert_eq!(delta.change_count(), 3);
450    }
451}