leptos_query_rs/optimistic/
mod.rs

1use crate::types::{QueryKey, QueryKeyPattern};
2use crate::retry::QueryError;
3use serde::{Deserialize, Serialize};
4use serde::de::DeserializeOwned;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use parking_lot::RwLock;
9
10/// Optimistic update configuration
11#[derive(Clone, Debug, Serialize, Deserialize)]
12pub struct OptimisticConfig {
13    /// Whether optimistic updates are enabled
14    pub enabled: bool,
15    /// How long to keep optimistic data before reverting
16    pub rollback_timeout: Duration,
17    /// Whether to show loading states during optimistic updates
18    pub show_loading: bool,
19    /// Whether to merge optimistic data with existing cache
20    pub merge_strategy: MergeStrategy,
21}
22
23impl Default for OptimisticConfig {
24    fn default() -> Self {
25        Self {
26            enabled: true,
27            rollback_timeout: Duration::from_secs(30), // 30 seconds
28            show_loading: false,
29            merge_strategy: MergeStrategy::Replace,
30        }
31    }
32}
33
34/// Strategy for merging optimistic data with existing cache
35#[derive(Clone, Debug, Serialize, Deserialize)]
36pub enum MergeStrategy {
37    /// Replace existing data completely
38    Replace,
39    /// Merge fields, keeping existing values for missing fields
40    Merge,
41    /// Deep merge nested objects
42    DeepMerge,
43    /// Custom merge function (not serializable)
44    Custom,
45}
46
47/// Optimistic update operation
48#[derive(Clone, Debug)]
49pub struct OptimisticUpdate<T> {
50    /// Unique ID for this update
51    pub id: String,
52    /// The query key being updated
53    pub key: QueryKey,
54    /// The optimistic data
55    pub data: T,
56    /// When the update was applied
57    pub applied_at: Instant,
58    /// Whether this update has been confirmed
59    pub confirmed: bool,
60    /// Whether this update has been rolled back
61    pub rolled_back: bool,
62    /// Rollback data (original state)
63    pub rollback_data: Option<T>,
64}
65
66impl<T> OptimisticUpdate<T> {
67    /// Create a new optimistic update
68    pub fn new(key: QueryKey, data: T, rollback_data: Option<T>) -> Self {
69        Self {
70            id: uuid::Uuid::new_v4().to_string(),
71            key,
72            data,
73            applied_at: Instant::now(),
74            confirmed: false,
75            rolled_back: false,
76            rollback_data,
77        }
78    }
79
80    /// Check if this update has expired
81    pub fn is_expired(&self, timeout: Duration) -> bool {
82        self.applied_at.elapsed() > timeout
83    }
84
85    /// Mark this update as confirmed
86    pub fn confirm(&mut self) {
87        self.confirmed = true;
88    }
89
90    /// Mark this update as rolled back
91    pub fn rollback(&mut self) {
92        self.rolled_back = true;
93    }
94}
95
96/// Optimistic update manager
97pub struct OptimisticManager<T> {
98    /// Configuration for optimistic updates
99    config: OptimisticConfig,
100    /// Active optimistic updates
101    updates: Arc<RwLock<HashMap<String, OptimisticUpdate<T>>>>,
102    /// Update history for debugging
103    history: Arc<RwLock<Vec<OptimisticUpdate<T>>>>,
104}
105
106impl<T: Clone + Serialize + DeserializeOwned> OptimisticManager<T> {
107    /// Create a new optimistic update manager
108    pub fn new(config: OptimisticConfig) -> Self {
109        Self {
110            config,
111            updates: Arc::new(RwLock::new(HashMap::new())),
112            history: Arc::new(RwLock::new(Vec::new())),
113        }
114    }
115
116    /// Apply an optimistic update
117    pub fn apply_update(&self, key: &QueryKey, data: T, rollback_data: Option<T>) -> String {
118        let update = OptimisticUpdate::new(key.clone(), data, rollback_data);
119        let id = update.id.clone();
120        
121        let mut updates = self.updates.write();
122        updates.insert(id.clone(), update);
123        
124        // Add to history
125        let mut history = self.history.write();
126        history.push(updates.get(&id).unwrap().clone());
127        
128        // Keep only last 100 updates in history
129        if history.len() > 100 {
130            history.remove(0);
131        }
132        
133        id
134    }
135
136    /// Get optimistic data for a key
137    pub fn get_optimistic_data(&self, key: &QueryKey) -> Option<T> {
138        let updates = self.updates.read();
139        
140        // Find the most recent unconfirmed update for this key
141        updates
142            .values()
143            .filter(|update| update.key == *key && !update.confirmed && !update.rolled_back)
144            .max_by_key(|update| update.applied_at)
145            .map(|update| update.data.clone())
146    }
147
148    /// Confirm an optimistic update
149    pub fn confirm_update(&self, update_id: &str) -> Result<(), QueryError> {
150        let mut updates = self.updates.write();
151        
152        if let Some(update) = updates.get_mut(update_id) {
153            update.confirm();
154            Ok(())
155        } else {
156            Err(QueryError::GenericError("Update not found".to_string()))
157        }
158    }
159
160    /// Rollback an optimistic update
161    pub fn rollback_update(&self, update_id: &str) -> Result<Option<T>, QueryError> {
162        let mut updates = self.updates.write();
163        
164        if let Some(update) = updates.get_mut(update_id) {
165            update.rollback();
166            Ok(update.rollback_data.clone())
167        } else {
168            Err(QueryError::GenericError("Update not found".to_string()))
169        }
170    }
171
172    /// Rollback all updates for a specific key
173    pub fn rollback_key(&self, key: &QueryKey) -> Vec<T> {
174        let mut updates = self.updates.write();
175        let mut rollback_data = Vec::new();
176        
177        for update in updates.values_mut() {
178            if update.key == *key && !update.rolled_back {
179                update.rollback();
180                if let Some(data) = &update.rollback_data {
181                    rollback_data.push(data.clone());
182                }
183            }
184        }
185        
186        rollback_data
187    }
188
189    /// Rollback all updates matching a pattern
190    pub fn rollback_pattern(&self, pattern: &QueryKeyPattern) -> Vec<T> {
191        let mut updates = self.updates.write();
192        let mut rollback_data = Vec::new();
193        
194        for update in updates.values_mut() {
195            if update.key.matches_pattern(pattern) && !update.rolled_back {
196                update.rollback();
197                if let Some(data) = &update.rollback_data {
198                    rollback_data.push(data.clone());
199                }
200            }
201        }
202        
203        rollback_data
204    }
205
206    /// Clean up expired updates
207    pub fn cleanup_expired(&self) -> usize {
208        let mut updates = self.updates.write();
209        let initial_count = updates.len();
210        
211        updates.retain(|_, update| !update.is_expired(self.config.rollback_timeout));
212        
213        initial_count - updates.len()
214    }
215
216    /// Get all active updates
217    pub fn get_active_updates(&self) -> Vec<OptimisticUpdate<T>> {
218        let updates = self.updates.read();
219        updates
220            .values()
221            .filter(|update| !update.confirmed && !update.rolled_back)
222            .cloned()
223            .collect()
224    }
225
226    /// Get update statistics
227    pub fn get_stats(&self) -> OptimisticStats {
228        let updates = self.updates.read();
229        let history = self.history.read();
230        
231        let active = updates.values().filter(|u| !u.confirmed && !u.rolled_back).count();
232        let confirmed = updates.values().filter(|u| u.confirmed).count();
233        let rolled_back = updates.values().filter(|u| u.rolled_back).count();
234        let expired = updates.values().filter(|u| u.is_expired(self.config.rollback_timeout)).count();
235        
236        OptimisticStats {
237            active_updates: active,
238            confirmed_updates: confirmed,
239            rolled_back_updates: rolled_back,
240            expired_updates: expired,
241            total_history: history.len(),
242        }
243    }
244
245    /// Clear all updates
246    pub fn clear_all(&self) {
247        let mut updates = self.updates.write();
248        updates.clear();
249    }
250
251    /// Clear history
252    pub fn clear_history(&self) {
253        let mut history = self.history.write();
254        history.clear();
255    }
256}
257
258/// Statistics for optimistic updates
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct OptimisticStats {
261    /// Number of active (unconfirmed, unrolled) updates
262    pub active_updates: usize,
263    /// Number of confirmed updates
264    pub confirmed_updates: usize,
265    /// Number of rolled back updates
266    pub rolled_back_updates: usize,
267    /// Number of expired updates
268    pub expired_updates: usize,
269    /// Total number of updates in history
270    pub total_history: usize,
271}
272
273/// Optimistic mutation result
274pub struct OptimisticMutationResult<T> {
275    /// The optimistic update ID
276    pub update_id: String,
277    /// Whether the update was applied optimistically
278    pub applied: bool,
279    /// The optimistic data
280    pub optimistic_data: Option<T>,
281    /// Rollback function
282    pub rollback: Box<dyn Fn() -> Result<(), QueryError> + Send + Sync>,
283}
284
285impl<T> OptimisticMutationResult<T> {
286    /// Create a new optimistic mutation result
287    pub fn new(
288        update_id: String,
289        applied: bool,
290        optimistic_data: Option<T>,
291        rollback: Box<dyn Fn() -> Result<(), QueryError> + Send + Sync>,
292    ) -> Self {
293        Self {
294            update_id,
295            applied,
296            optimistic_data,
297            rollback,
298        }
299    }
300
301    /// Rollback this optimistic update
302    pub fn rollback(&self) -> Result<(), QueryError> {
303        (self.rollback)()
304    }
305}
306
307#[cfg(test)]
308mod tests {
309    use super::*;
310    use crate::types::QueryKey;
311    
312    #[test]
313    fn test_optimistic_update_creation() {
314        let config = OptimisticConfig::default();
315        let manager = OptimisticManager::<String>::new(config);
316        
317        let key = QueryKey::from("test");
318        let data = "optimistic data".to_string();
319        let rollback = "original data".to_string();
320        
321        let update_id = manager.apply_update(&key, data.clone(), Some(rollback.clone()));
322        
323        // Check that update was applied
324        let optimistic_data = manager.get_optimistic_data(&key);
325        assert_eq!(optimistic_data, Some(data));
326        
327        // Check that update can be confirmed
328        assert!(manager.confirm_update(&update_id).is_ok());
329        
330        // Check that update can be rolled back
331        let rollback_data = manager.rollback_update(&update_id);
332        assert!(rollback_data.is_ok());
333    }
334    
335    #[test]
336    fn test_optimistic_update_rollback() {
337        let config = OptimisticConfig::default();
338        let manager = OptimisticManager::<String>::new(config);
339        
340        let key = QueryKey::from("test");
341        let original_data = "original data".to_string();
342        let optimistic_data = "optimistic data".to_string();
343        
344        // Apply optimistic update
345        let update_id = manager.apply_update(&key, optimistic_data.clone(), Some(original_data.clone()));
346        
347        // Verify optimistic data is active
348        assert_eq!(manager.get_optimistic_data(&key), Some(optimistic_data));
349        
350        // Rollback the update
351        let rollback_result = manager.rollback_update(&update_id);
352        assert!(rollback_result.is_ok());
353        assert_eq!(rollback_result.unwrap(), Some(original_data));
354        
355        // Verify optimistic data is no longer active
356        assert_eq!(manager.get_optimistic_data(&key), None);
357    }
358    
359    #[test]
360    fn test_optimistic_update_expiration() {
361        let mut config = OptimisticConfig::default();
362        config.rollback_timeout = Duration::from_millis(10); // Very short timeout
363        
364        let manager = OptimisticManager::<String>::new(config);
365        
366        let key = QueryKey::from("test");
367        let data = "test data".to_string();
368        
369        manager.apply_update(&key, data, None);
370        
371        // Wait for expiration
372        std::thread::sleep(Duration::from_millis(20));
373        
374        // Clean up expired updates
375        let cleaned = manager.cleanup_expired();
376        assert_eq!(cleaned, 1);
377        
378        // Verify no optimistic data remains
379        assert_eq!(manager.get_optimistic_data(&key), None);
380    }
381}