forge-core 0.10.0

Core types and traits for the Forge framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
use std::sync::Arc;

use chrono::{DateTime, Utc};
use uuid::Uuid;

use super::readset::ReadSet;
use super::session::SessionId;

/// Unique subscription identifier.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionId(pub Uuid);

impl SubscriptionId {
    /// Generate a new random subscription ID.
    pub fn new() -> Self {
        Self(Uuid::new_v4())
    }

    /// Create from an existing UUID.
    pub fn from_uuid(id: Uuid) -> Self {
        Self(id)
    }

    /// Get the inner UUID.
    pub fn as_uuid(&self) -> Uuid {
        self.0
    }
}

impl Default for SubscriptionId {
    fn default() -> Self {
        Self::new()
    }
}

impl std::fmt::Display for SubscriptionId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{}", self.0)
    }
}

/// Opaque dedup key for subscriptions sharing the same query+args+auth_scope.
///
/// Wraps a 64-bit FNV-1a hash (fixed seed, stable across processes) so the
/// algorithm can be swapped later without changing any call sites.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriptionKey(pub u64);

/// Compact identifier for query groups. u32 for cache-friendly storage.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct QueryGroupId(pub u32);

impl std::fmt::Display for QueryGroupId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "qg-{}", self.0)
    }
}

/// Compact identifier for subscribers within a slab.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SubscriberId(pub u32);

impl std::fmt::Display for SubscriberId {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "sub-{}", self.0)
    }
}

/// Authentication scope for query group identity.
/// Two subscriptions with the same query+args but different auth scopes
/// must be in different groups (different users see different data).
/// Includes a hash of the sorted roles so users with different role sets
/// don't share a group.
#[derive(Debug, Clone)]
pub struct AuthScope {
    pub principal_id: Option<String>,
    pub tenant_id: Option<String>,
    /// Hash of the sorted roles for this auth context.
    pub role_hash: u64,
}

impl PartialEq for AuthScope {
    fn eq(&self, other: &Self) -> bool {
        self.principal_id == other.principal_id
            && self.tenant_id == other.tenant_id
            && self.role_hash == other.role_hash
    }
}

impl Eq for AuthScope {}

impl std::hash::Hash for AuthScope {
    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
        self.principal_id.hash(state);
        self.tenant_id.hash(state);
        self.role_hash.hash(state);
    }
}

impl AuthScope {
    pub fn from_auth(auth: &crate::function::AuthContext) -> Self {
        use std::hash::{Hash, Hasher};

        let mut roles: Vec<&str> = auth.roles().iter().map(|s| s.as_str()).collect();
        roles.sort_unstable();
        let mut hasher = FnvHasher::new();
        roles.hash(&mut hasher);
        let role_hash = hasher.finish();

        Self {
            principal_id: auth.principal_id(),
            tenant_id: auth
                .claim("tenant_id")
                .and_then(|v| v.as_str())
                .map(ToString::to_string),
            role_hash,
        }
    }
}

/// Coalesces subscriptions sharing the same query+args+auth_scope.
/// On invalidation the query executes once per group, not per subscriber.
pub struct QueryGroup {
    pub id: QueryGroupId,
    pub query_name: String,
    pub args: Arc<serde_json::Value>,
    pub auth_scope: AuthScope,
    /// Cached at subscribe time; not refreshed mid-lifetime. The reactor skips
    /// re-execution for groups with expired tokens; session cleanup evicts them.
    pub auth_context: crate::function::AuthContext,
    pub table_deps: &'static [&'static str],
    pub selected_cols: &'static [&'static str],
    pub read_set: ReadSet,
    /// Shared across all subscribers for delta detection.
    pub last_result_hash: Option<String>,
    /// Sent to new subscribers joining an existing group.
    pub last_result: Option<Arc<serde_json::Value>>,
    pub subscribers: Vec<SubscriberId>,
    pub created_at: DateTime<Utc>,
    pub execution_count: u64,
}

/// `DefaultHasher` randomises its seed per process, so equal inputs hash to
/// different values after a restart. FNV-1a's fixed basis avoids that.
struct FnvHasher(u64);

impl FnvHasher {
    fn new() -> Self {
        Self(14695981039346656037)
    }
}

impl std::hash::Hasher for FnvHasher {
    fn write(&mut self, bytes: &[u8]) {
        for &b in bytes {
            self.0 ^= b as u64;
            self.0 = self.0.wrapping_mul(1099511628211);
        }
    }

    fn finish(&self) -> u64 {
        self.0
    }
}

impl QueryGroup {
    /// Compute the dedup key for (query_name, args, auth_scope).
    ///
    /// Uses FNV-1a so the result is stable across process restarts — unlike
    /// `DefaultHasher` which randomises its seed per process.
    pub fn compute_lookup_key(
        query_name: &str,
        args: &serde_json::Value,
        auth_scope: &AuthScope,
    ) -> SubscriptionKey {
        use std::hash::{Hash, Hasher};

        let mut hasher = FnvHasher::new();
        query_name.hash(&mut hasher);
        Self::hash_json_canonical(args, &mut hasher);
        auth_scope.hash(&mut hasher);
        SubscriptionKey(hasher.finish())
    }

    fn hash_json_canonical(value: &serde_json::Value, hasher: &mut impl std::hash::Hasher) {
        use std::hash::Hash;
        match value {
            serde_json::Value::Object(map) => {
                let mut keys: Vec<&String> = map.keys().collect();
                keys.sort();
                for key in keys {
                    key.hash(hasher);
                    Self::hash_json_canonical(&map[key], hasher);
                }
            }
            other => other.to_string().hash(hasher),
        }
    }

    /// Record a query execution result.
    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
        self.read_set = read_set;
        self.last_result_hash = Some(result_hash);
        self.execution_count += 1;
    }

    /// Record execution with the result data cached for new subscribers.
    pub fn record_execution_with_data(
        &mut self,
        read_set: ReadSet,
        result_hash: String,
        data: Arc<serde_json::Value>,
    ) {
        self.read_set = read_set;
        self.last_result_hash = Some(result_hash);
        self.last_result = Some(data);
        self.execution_count += 1;
    }

    /// Check if a change should invalidate this group.
    /// Uses the runtime read set when populated, otherwise falls back to the
    /// compile-time table dependencies from macro extraction.
    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
        let table_matches = if self.read_set.tables.is_empty() {
            self.table_deps.iter().any(|t| *t == change.table)
        } else {
            change.invalidates(&self.read_set)
        };

        if !table_matches {
            return false;
        }

        if !change.invalidates_columns(self.selected_cols) {
            return false;
        }

        true
    }
}

/// Lightweight subscriber within a query group. ~48 bytes.
pub struct Subscriber {
    pub id: SubscriberId,
    pub session_id: SessionId,
    /// Client-facing subscription ID (what the client uses in subscribe/unsubscribe calls).
    pub client_sub_id: String,
    /// Back-reference to the query group this subscriber belongs to.
    pub group_id: QueryGroupId,
    /// Subscription ID used by the session server for tracking.
    pub subscription_id: SubscriptionId,
}

/// Subscription state from the client's perspective.
#[derive(Debug, Clone)]
pub struct SubscriptionState<T> {
    /// Whether the initial load is in progress.
    pub loading: bool,
    /// Current data.
    pub data: Option<T>,
    /// Error if any.
    pub error: Option<String>,
    /// Whether data may be stale (reconnecting).
    pub stale: bool,
}

impl<T> Default for SubscriptionState<T> {
    fn default() -> Self {
        Self {
            loading: true,
            data: None,
            error: None,
            stale: false,
        }
    }
}

impl<T> SubscriptionState<T> {
    /// Create a loading state.
    pub fn loading() -> Self {
        Self::default()
    }

    /// Create a state with data.
    pub fn with_data(data: T) -> Self {
        Self {
            loading: false,
            data: Some(data),
            error: None,
            stale: false,
        }
    }

    /// Create an error state.
    pub fn with_error(error: impl Into<String>) -> Self {
        Self {
            loading: false,
            data: None,
            error: Some(error.into()),
            stale: false,
        }
    }

    /// Mark as stale.
    pub fn mark_stale(&mut self) {
        self.stale = true;
    }

    /// Clear stale flag.
    pub fn clear_stale(&mut self) {
        self.stale = false;
    }
}

/// Delta format for subscription updates.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Delta<T> {
    /// New items added.
    pub added: Vec<T>,
    /// IDs of removed items.
    pub removed: Vec<String>,
    /// Updated items (partial).
    pub updated: Vec<T>,
}

impl<T> Default for Delta<T> {
    fn default() -> Self {
        Self {
            added: Vec::new(),
            removed: Vec::new(),
            updated: Vec::new(),
        }
    }
}

impl<T> Delta<T> {
    /// Create an empty delta.
    pub fn empty() -> Self {
        Self::default()
    }

    /// Check if the delta is empty (no changes).
    pub fn is_empty(&self) -> bool {
        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
    }

    /// Total number of changes.
    pub fn change_count(&self) -> usize {
        self.added.len() + self.removed.len() + self.updated.len()
    }
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::indexing_slicing)]
mod tests {
    use super::*;

    #[test]
    fn test_subscription_id_generation() {
        let id1 = SubscriptionId::new();
        let id2 = SubscriptionId::new();
        assert_ne!(id1, id2);
    }

    #[test]
    fn test_subscription_state_default() {
        let state: SubscriptionState<String> = SubscriptionState::default();
        assert!(state.loading);
        assert!(state.data.is_none());
        assert!(state.error.is_none());
        assert!(!state.stale);
    }

    #[test]
    fn test_subscription_state_with_data() {
        let state = SubscriptionState::with_data(vec![1, 2, 3]);
        assert!(!state.loading);
        assert_eq!(state.data, Some(vec![1, 2, 3]));
        assert!(state.error.is_none());
    }

    #[test]
    fn lookup_key_stable_across_calls() {
        let scope = AuthScope {
            principal_id: Some("user-1".to_string()),
            tenant_id: None,
            role_hash: 0,
        };
        let key1 = QueryGroup::compute_lookup_key(
            "get_projects",
            &serde_json::json!({"userId": "abc"}),
            &scope,
        );
        let key2 = QueryGroup::compute_lookup_key(
            "get_projects",
            &serde_json::json!({"userId": "abc"}),
            &scope,
        );
        assert_eq!(key1, key2);

        let other_scope = AuthScope {
            principal_id: Some("user-2".to_string()),
            tenant_id: None,
            role_hash: 0,
        };
        let key3 = QueryGroup::compute_lookup_key(
            "get_projects",
            &serde_json::json!({"userId": "abc"}),
            &other_scope,
        );
        assert_ne!(key1, key3);
    }

    #[test]
    fn lookup_key_is_deterministic() {
        let scope = AuthScope {
            principal_id: Some("u1".to_string()),
            tenant_id: None,
            role_hash: 0,
        };
        let key =
            QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
        let expected =
            QueryGroup::compute_lookup_key("get_items", &serde_json::json!({"id": "42"}), &scope);
        assert_eq!(key, expected);
        // FNV-1a with a non-empty input can't produce 0 (sanity: hashing actually ran).
        assert_ne!(key.0, 0);
    }

    #[test]
    fn lookup_key_canonical_json_order_invariant() {
        let scope = AuthScope {
            principal_id: None,
            tenant_id: None,
            role_hash: 0,
        };
        let key_ab =
            QueryGroup::compute_lookup_key("q", &serde_json::json!({"a": 1, "b": 2}), &scope);
        let key_ba =
            QueryGroup::compute_lookup_key("q", &serde_json::json!({"b": 2, "a": 1}), &scope);
        assert_eq!(key_ab, key_ba);
    }

    #[test]
    fn test_delta_empty() {
        let delta: Delta<String> = Delta::empty();
        assert!(delta.is_empty());
        assert_eq!(delta.change_count(), 0);
    }

    #[test]
    fn test_delta_with_changes() {
        let delta = Delta {
            added: vec!["a".to_string()],
            removed: vec!["b".to_string()],
            updated: vec!["c".to_string()],
        };

        assert!(!delta.is_empty());
        assert_eq!(delta.change_count(), 3);
    }
}