Skip to main content

forge_core/realtime/
subscription.rs

1use std::sync::Arc;
2
3use chrono::{DateTime, Utc};
4use uuid::Uuid;
5
6use super::readset::ReadSet;
7use super::session::SessionId;
8
9/// Unique subscription identifier.
10#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
11pub struct SubscriptionId(pub Uuid);
12
13impl SubscriptionId {
14    /// Generate a new random subscription ID.
15    pub fn new() -> Self {
16        Self(Uuid::new_v4())
17    }
18
19    /// Create from an existing UUID.
20    pub fn from_uuid(id: Uuid) -> Self {
21        Self(id)
22    }
23
24    /// Get the inner UUID.
25    pub fn as_uuid(&self) -> Uuid {
26        self.0
27    }
28}
29
30impl Default for SubscriptionId {
31    fn default() -> Self {
32        Self::new()
33    }
34}
35
36impl std::fmt::Display for SubscriptionId {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        write!(f, "{}", self.0)
39    }
40}
41
42/// Compact identifier for query groups. u32 for cache-friendly storage.
43#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
44pub struct QueryGroupId(pub u32);
45
46impl std::fmt::Display for QueryGroupId {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        write!(f, "qg-{}", self.0)
49    }
50}
51
52/// Compact identifier for subscribers within a slab.
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
54pub struct SubscriberId(pub u32);
55
56impl std::fmt::Display for SubscriberId {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        write!(f, "sub-{}", self.0)
59    }
60}
61
62/// Authentication scope for query group identity.
63/// Two subscriptions with the same query+args but different auth scopes
64/// must be in different groups (different users see different data).
65#[derive(Debug, Clone, PartialEq, Eq, Hash)]
66pub struct AuthScope {
67    pub principal_id: Option<String>,
68    pub tenant_id: Option<String>,
69}
70
71impl AuthScope {
72    pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
73        Self {
74            principal_id: auth.principal_id(),
75            tenant_id: auth
76                .claim("tenant_id")
77                .and_then(|v| v.as_str())
78                .map(ToString::to_string),
79        }
80    }
81}
82
83/// A query group is the primary unit of execution.
84/// Multiple subscribers watching the same query+args+auth_scope share a single group.
85/// On invalidation, the query executes once per group (not per subscriber).
86pub struct QueryGroup {
87    pub id: QueryGroupId,
88    pub query_name: String,
89    pub args: Arc<serde_json::Value>,
90    pub auth_scope: AuthScope,
91    pub auth_context: crate::function::AuthContext,
92    /// Compile-time table dependencies from FunctionInfo.
93    pub table_deps: &'static [&'static str],
94    /// Compile-time selected columns from FunctionInfo.
95    pub selected_cols: &'static [&'static str],
96    pub read_set: ReadSet,
97    /// Result hash for delta detection. Shared across all subscribers.
98    pub last_result_hash: Option<String>,
99    /// All subscribers in this group.
100    pub subscribers: Vec<SubscriberId>,
101    pub created_at: DateTime<Utc>,
102    pub execution_count: u64,
103}
104
105impl QueryGroup {
106    /// Compute the lookup key for dedup: hash of (query_name, args, auth_scope).
107    pub fn compute_lookup_key(
108        query_name: &str,
109        args: &serde_json::Value,
110        auth_scope: &AuthScope,
111    ) -> u64 {
112        use std::collections::hash_map::DefaultHasher;
113        use std::hash::{Hash, Hasher};
114
115        let mut hasher = DefaultHasher::new();
116        query_name.hash(&mut hasher);
117        // Canonical hashing: sort object keys so {"a":1,"b":2} and {"b":2,"a":1}
118        // produce the same hash.
119        Self::hash_json_canonical(args, &mut hasher);
120        auth_scope.hash(&mut hasher);
121        hasher.finish()
122    }
123
124    fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
125        use std::hash::Hash;
126        match value {
127            serde_json::Value::Object(map) => {
128                let mut keys: Vec<&String> = map.keys().collect();
129                keys.sort();
130                for key in keys {
131                    key.hash(hasher);
132                    Self::hash_json_canonical(&map[key], hasher);
133                }
134            }
135            other => other.to_string().hash(hasher),
136        }
137    }
138
139    /// Record a query execution result.
140    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
141        self.read_set = read_set;
142        self.last_result_hash = Some(result_hash);
143        self.execution_count += 1;
144    }
145
146    /// Check if a change should invalidate this group.
147    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
148        if !change.invalidates(&self.read_set) {
149            return false;
150        }
151
152        // Column-level filtering: skip if changed columns don't overlap with selected
153        if !change.invalidates_columns(self.selected_cols) {
154            return false;
155        }
156
157        true
158    }
159}
160
161/// Lightweight subscriber within a query group. ~48 bytes.
162pub struct Subscriber {
163    pub id: SubscriberId,
164    pub session_id: SessionId,
165    /// Client-facing subscription ID (what the client uses in subscribe/unsubscribe calls).
166    pub client_sub_id: String,
167    /// Back-reference to the query group this subscriber belongs to.
168    pub group_id: QueryGroupId,
169    /// Subscription ID used by the session server for tracking.
170    pub subscription_id: SubscriptionId,
171}
172
173/// Subscription state from the client's perspective.
174#[derive(Debug, Clone)]
175pub struct SubscriptionState<T> {
176    /// Whether the initial load is in progress.
177    pub loading: bool,
178    /// Current data.
179    pub data: Option<T>,
180    /// Error if any.
181    pub error: Option<String>,
182    /// Whether data may be stale (reconnecting).
183    pub stale: bool,
184}
185
186impl<T> Default for SubscriptionState<T> {
187    fn default() -> Self {
188        Self {
189            loading: true,
190            data: None,
191            error: None,
192            stale: false,
193        }
194    }
195}
196
197impl<T> SubscriptionState<T> {
198    /// Create a loading state.
199    pub fn loading() -> Self {
200        Self::default()
201    }
202
203    /// Create a state with data.
204    pub fn with_data(data: T) -> Self {
205        Self {
206            loading: false,
207            data: Some(data),
208            error: None,
209            stale: false,
210        }
211    }
212
213    /// Create an error state.
214    pub fn with_error(error: impl Into<String>) -> Self {
215        Self {
216            loading: false,
217            data: None,
218            error: Some(error.into()),
219            stale: false,
220        }
221    }
222
223    /// Mark as stale.
224    pub fn mark_stale(&mut self) {
225        self.stale = true;
226    }
227
228    /// Clear stale flag.
229    pub fn clear_stale(&mut self) {
230        self.stale = false;
231    }
232}
233
234/// Delta format for subscription updates.
235#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
236pub struct Delta<T> {
237    /// New items added.
238    pub added: Vec<T>,
239    /// IDs of removed items.
240    pub removed: Vec<String>,
241    /// Updated items (partial).
242    pub updated: Vec<T>,
243}
244
245impl<T> Default for Delta<T> {
246    fn default() -> Self {
247        Self {
248            added: Vec::new(),
249            removed: Vec::new(),
250            updated: Vec::new(),
251        }
252    }
253}
254
255impl<T> Delta<T> {
256    /// Create an empty delta.
257    pub fn empty() -> Self {
258        Self::default()
259    }
260
261    /// Check if the delta is empty (no changes).
262    pub fn is_empty(&self) -> bool {
263        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
264    }
265
266    /// Total number of changes.
267    pub fn change_count(&self) -> usize {
268        self.added.len() + self.removed.len() + self.updated.len()
269    }
270}
271
272#[cfg(test)]
273#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
274mod tests {
275    use super::*;
276
277    #[test]
278    fn test_subscription_id_generation() {
279        let id1 = SubscriptionId::new();
280        let id2 = SubscriptionId::new();
281        assert_ne!(id1, id2);
282    }
283
284    #[test]
285    fn test_subscription_state_default() {
286        let state: SubscriptionState<String> = SubscriptionState::default();
287        assert!(state.loading);
288        assert!(state.data.is_none());
289        assert!(state.error.is_none());
290        assert!(!state.stale);
291    }
292
293    #[test]
294    fn test_subscription_state_with_data() {
295        let state = SubscriptionState::with_data(vec![1, 2, 3]);
296        assert!(!state.loading);
297        assert_eq!(state.data, Some(vec![1, 2, 3]));
298        assert!(state.error.is_none());
299    }
300
301    #[test]
302    fn test_query_group_lookup_key() {
303        let scope = AuthScope {
304            principal_id: Some("user-1".to_string()),
305            tenant_id: None,
306        };
307        let key1 = QueryGroup::compute_lookup_key(
308            "get_projects",
309            &serde_json::json!({"userId": "abc"}),
310            &scope,
311        );
312        let key2 = QueryGroup::compute_lookup_key(
313            "get_projects",
314            &serde_json::json!({"userId": "abc"}),
315            &scope,
316        );
317        assert_eq!(key1, key2);
318
319        let other_scope = AuthScope {
320            principal_id: Some("user-2".to_string()),
321            tenant_id: None,
322        };
323        let key3 = QueryGroup::compute_lookup_key(
324            "get_projects",
325            &serde_json::json!({"userId": "abc"}),
326            &other_scope,
327        );
328        assert_ne!(key1, key3);
329    }
330
331    #[test]
332    fn test_delta_empty() {
333        let delta: Delta<String> = Delta::empty();
334        assert!(delta.is_empty());
335        assert_eq!(delta.change_count(), 0);
336    }
337
338    #[test]
339    fn test_delta_with_changes() {
340        let delta = Delta {
341            added: vec!["a".to_string()],
342            removed: vec!["b".to_string()],
343            updated: vec!["c".to_string()],
344        };
345
346        assert!(!delta.is_empty());
347        assert_eq!(delta.change_count(), 3);
348    }
349}