forge_core/realtime/
subscription.rs

1use chrono::{DateTime, Utc};
2use uuid::Uuid;
3
4use super::readset::ReadSet;
5use super::session::SessionId;
6
7/// Unique subscription identifier.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
9pub struct SubscriptionId(pub Uuid);
10
11impl SubscriptionId {
12    /// Generate a new random subscription ID.
13    pub fn new() -> Self {
14        Self(Uuid::new_v4())
15    }
16
17    /// Create from an existing UUID.
18    pub fn from_uuid(id: Uuid) -> Self {
19        Self(id)
20    }
21
22    /// Get the inner UUID.
23    pub fn as_uuid(&self) -> Uuid {
24        self.0
25    }
26}
27
28impl Default for SubscriptionId {
29    fn default() -> Self {
30        Self::new()
31    }
32}
33
34impl std::fmt::Display for SubscriptionId {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        write!(f, "{}", self.0)
37    }
38}
39
40/// Subscription state from the client's perspective.
41#[derive(Debug, Clone)]
42pub struct SubscriptionState<T> {
43    /// Whether the initial load is in progress.
44    pub loading: bool,
45    /// Current data.
46    pub data: Option<T>,
47    /// Error if any.
48    pub error: Option<String>,
49    /// Whether data may be stale (reconnecting).
50    pub stale: bool,
51}
52
53impl<T> Default for SubscriptionState<T> {
54    fn default() -> Self {
55        Self {
56            loading: true,
57            data: None,
58            error: None,
59            stale: false,
60        }
61    }
62}
63
64impl<T> SubscriptionState<T> {
65    /// Create a loading state.
66    pub fn loading() -> Self {
67        Self::default()
68    }
69
70    /// Create a state with data.
71    pub fn with_data(data: T) -> Self {
72        Self {
73            loading: false,
74            data: Some(data),
75            error: None,
76            stale: false,
77        }
78    }
79
80    /// Create an error state.
81    pub fn with_error(error: impl Into<String>) -> Self {
82        Self {
83            loading: false,
84            data: None,
85            error: Some(error.into()),
86            stale: false,
87        }
88    }
89
90    /// Mark as stale.
91    pub fn mark_stale(&mut self) {
92        self.stale = true;
93    }
94
95    /// Clear stale flag.
96    pub fn clear_stale(&mut self) {
97        self.stale = false;
98    }
99}
100
101/// Information about a server-side subscription.
102#[derive(Debug, Clone)]
103pub struct SubscriptionInfo {
104    /// Unique subscription ID.
105    pub id: SubscriptionId,
106    /// Session that owns this subscription.
107    pub session_id: SessionId,
108    /// Query function name.
109    pub query_name: String,
110    /// Query arguments (as JSON).
111    pub args: serde_json::Value,
112    /// Hash of query + args for deduplication.
113    pub query_hash: String,
114    /// Read set from last execution.
115    pub read_set: ReadSet,
116    /// Hash of last result for delta computation.
117    pub last_result_hash: Option<String>,
118    /// When the subscription was created.
119    pub created_at: DateTime<Utc>,
120    /// When the subscription was last executed.
121    pub last_executed_at: Option<DateTime<Utc>>,
122    /// Number of times the subscription has been re-executed.
123    pub execution_count: u64,
124    /// Estimated memory usage in bytes.
125    pub memory_bytes: usize,
126}
127
128impl SubscriptionInfo {
129    /// Create a new subscription info.
130    pub fn new(
131        session_id: SessionId,
132        query_name: impl Into<String>,
133        args: serde_json::Value,
134    ) -> Self {
135        let query_name = query_name.into();
136        let query_hash = compute_query_hash(&query_name, &args);
137
138        Self {
139            id: SubscriptionId::new(),
140            session_id,
141            query_name,
142            args,
143            query_hash,
144            read_set: ReadSet::new(),
145            last_result_hash: None,
146            created_at: Utc::now(),
147            last_executed_at: None,
148            execution_count: 0,
149            memory_bytes: 0,
150        }
151    }
152
153    /// Update after execution.
154    pub fn record_execution(&mut self, read_set: ReadSet, result_hash: String) {
155        self.read_set = read_set;
156        self.memory_bytes = self.read_set.memory_bytes() + self.query_name.len() + 128;
157        self.last_result_hash = Some(result_hash);
158        self.last_executed_at = Some(Utc::now());
159        self.execution_count += 1;
160    }
161
162    /// Check if a change should invalidate this subscription.
163    pub fn should_invalidate(&self, change: &super::readset::Change) -> bool {
164        change.invalidates(&self.read_set)
165    }
166}
167
168/// Compute a hash of query name + args for deduplication.
169fn compute_query_hash(query_name: &str, args: &serde_json::Value) -> String {
170    use std::collections::hash_map::DefaultHasher;
171    use std::hash::{Hash, Hasher};
172
173    let mut hasher = DefaultHasher::new();
174    query_name.hash(&mut hasher);
175    args.to_string().hash(&mut hasher);
176    format!("{:016x}", hasher.finish())
177}
178
179/// Delta format for subscription updates.
180#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
181pub struct Delta<T> {
182    /// New items added.
183    pub added: Vec<T>,
184    /// IDs of removed items.
185    pub removed: Vec<String>,
186    /// Updated items (partial).
187    pub updated: Vec<T>,
188}
189
190impl<T> Default for Delta<T> {
191    fn default() -> Self {
192        Self {
193            added: Vec::new(),
194            removed: Vec::new(),
195            updated: Vec::new(),
196        }
197    }
198}
199
200impl<T> Delta<T> {
201    /// Create an empty delta.
202    pub fn empty() -> Self {
203        Self::default()
204    }
205
206    /// Check if the delta is empty (no changes).
207    pub fn is_empty(&self) -> bool {
208        self.added.is_empty() && self.removed.is_empty() && self.updated.is_empty()
209    }
210
211    /// Total number of changes.
212    pub fn change_count(&self) -> usize {
213        self.added.len() + self.removed.len() + self.updated.len()
214    }
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220
221    #[test]
222    fn test_subscription_id_generation() {
223        let id1 = SubscriptionId::new();
224        let id2 = SubscriptionId::new();
225        assert_ne!(id1, id2);
226    }
227
228    #[test]
229    fn test_subscription_state_default() {
230        let state: SubscriptionState<String> = SubscriptionState::default();
231        assert!(state.loading);
232        assert!(state.data.is_none());
233        assert!(state.error.is_none());
234        assert!(!state.stale);
235    }
236
237    #[test]
238    fn test_subscription_state_with_data() {
239        let state = SubscriptionState::with_data(vec![1, 2, 3]);
240        assert!(!state.loading);
241        assert_eq!(state.data, Some(vec![1, 2, 3]));
242        assert!(state.error.is_none());
243    }
244
245    #[test]
246    fn test_subscription_info_creation() {
247        let session_id = SessionId::new();
248        let info = SubscriptionInfo::new(
249            session_id,
250            "get_projects",
251            serde_json::json!({"userId": "abc"}),
252        );
253
254        assert_eq!(info.query_name, "get_projects");
255        assert_eq!(info.execution_count, 0);
256        assert!(!info.query_hash.is_empty());
257    }
258
259    #[test]
260    fn test_query_hash_consistency() {
261        let hash1 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
262        let hash2 = compute_query_hash("get_projects", &serde_json::json!({"userId": "abc"}));
263        let hash3 = compute_query_hash("get_projects", &serde_json::json!({"userId": "xyz"}));
264
265        assert_eq!(hash1, hash2);
266        assert_ne!(hash1, hash3);
267    }
268
269    #[test]
270    fn test_delta_empty() {
271        let delta: Delta<String> = Delta::empty();
272        assert!(delta.is_empty());
273        assert_eq!(delta.change_count(), 0);
274    }
275
276    #[test]
277    fn test_delta_with_changes() {
278        let delta = Delta {
279            added: vec!["a".to_string()],
280            removed: vec!["b".to_string()],
281            updated: vec!["c".to_string()],
282        };
283
284        assert!(!delta.is_empty());
285        assert_eq!(delta.change_count(), 3);
286    }
287}