leptos_query_rs/optimistic/
mod.rs1use crate::types::{QueryKey, QueryKeyPattern};
2use crate::retry::QueryError;
3use serde::{Deserialize, Serialize};
4use serde::de::DeserializeOwned;
5use std::collections::HashMap;
6use std::sync::Arc;
7use std::time::{Duration, Instant};
8use parking_lot::RwLock;
9
10#[derive(Clone, Debug, Serialize, Deserialize)]
12pub struct OptimisticConfig {
13 pub enabled: bool,
15 pub rollback_timeout: Duration,
17 pub show_loading: bool,
19 pub merge_strategy: MergeStrategy,
21}
22
23impl Default for OptimisticConfig {
24 fn default() -> Self {
25 Self {
26 enabled: true,
27 rollback_timeout: Duration::from_secs(30), show_loading: false,
29 merge_strategy: MergeStrategy::Replace,
30 }
31 }
32}
33
34#[derive(Clone, Debug, Serialize, Deserialize)]
36pub enum MergeStrategy {
37 Replace,
39 Merge,
41 DeepMerge,
43 Custom,
45}
46
47#[derive(Clone, Debug)]
49pub struct OptimisticUpdate<T> {
50 pub id: String,
52 pub key: QueryKey,
54 pub data: T,
56 pub applied_at: Instant,
58 pub confirmed: bool,
60 pub rolled_back: bool,
62 pub rollback_data: Option<T>,
64}
65
66impl<T> OptimisticUpdate<T> {
67 pub fn new(key: QueryKey, data: T, rollback_data: Option<T>) -> Self {
69 Self {
70 id: uuid::Uuid::new_v4().to_string(),
71 key,
72 data,
73 applied_at: Instant::now(),
74 confirmed: false,
75 rolled_back: false,
76 rollback_data,
77 }
78 }
79
80 pub fn is_expired(&self, timeout: Duration) -> bool {
82 self.applied_at.elapsed() > timeout
83 }
84
85 pub fn confirm(&mut self) {
87 self.confirmed = true;
88 }
89
90 pub fn rollback(&mut self) {
92 self.rolled_back = true;
93 }
94}
95
96pub struct OptimisticManager<T> {
98 config: OptimisticConfig,
100 updates: Arc<RwLock<HashMap<String, OptimisticUpdate<T>>>>,
102 history: Arc<RwLock<Vec<OptimisticUpdate<T>>>>,
104}
105
106impl<T: Clone + Serialize + DeserializeOwned> OptimisticManager<T> {
107 pub fn new(config: OptimisticConfig) -> Self {
109 Self {
110 config,
111 updates: Arc::new(RwLock::new(HashMap::new())),
112 history: Arc::new(RwLock::new(Vec::new())),
113 }
114 }
115
116 pub fn apply_update(&self, key: &QueryKey, data: T, rollback_data: Option<T>) -> String {
118 let update = OptimisticUpdate::new(key.clone(), data, rollback_data);
119 let id = update.id.clone();
120
121 let mut updates = self.updates.write();
122 updates.insert(id.clone(), update);
123
124 let mut history = self.history.write();
126 history.push(updates.get(&id).unwrap().clone());
127
128 if history.len() > 100 {
130 history.remove(0);
131 }
132
133 id
134 }
135
136 pub fn get_optimistic_data(&self, key: &QueryKey) -> Option<T> {
138 let updates = self.updates.read();
139
140 updates
142 .values()
143 .filter(|update| update.key == *key && !update.confirmed && !update.rolled_back)
144 .max_by_key(|update| update.applied_at)
145 .map(|update| update.data.clone())
146 }
147
148 pub fn confirm_update(&self, update_id: &str) -> Result<(), QueryError> {
150 let mut updates = self.updates.write();
151
152 if let Some(update) = updates.get_mut(update_id) {
153 update.confirm();
154 Ok(())
155 } else {
156 Err(QueryError::GenericError("Update not found".to_string()))
157 }
158 }
159
160 pub fn rollback_update(&self, update_id: &str) -> Result<Option<T>, QueryError> {
162 let mut updates = self.updates.write();
163
164 if let Some(update) = updates.get_mut(update_id) {
165 update.rollback();
166 Ok(update.rollback_data.clone())
167 } else {
168 Err(QueryError::GenericError("Update not found".to_string()))
169 }
170 }
171
172 pub fn rollback_key(&self, key: &QueryKey) -> Vec<T> {
174 let mut updates = self.updates.write();
175 let mut rollback_data = Vec::new();
176
177 for update in updates.values_mut() {
178 if update.key == *key && !update.rolled_back {
179 update.rollback();
180 if let Some(data) = &update.rollback_data {
181 rollback_data.push(data.clone());
182 }
183 }
184 }
185
186 rollback_data
187 }
188
189 pub fn rollback_pattern(&self, pattern: &QueryKeyPattern) -> Vec<T> {
191 let mut updates = self.updates.write();
192 let mut rollback_data = Vec::new();
193
194 for update in updates.values_mut() {
195 if update.key.matches_pattern(pattern) && !update.rolled_back {
196 update.rollback();
197 if let Some(data) = &update.rollback_data {
198 rollback_data.push(data.clone());
199 }
200 }
201 }
202
203 rollback_data
204 }
205
206 pub fn cleanup_expired(&self) -> usize {
208 let mut updates = self.updates.write();
209 let initial_count = updates.len();
210
211 updates.retain(|_, update| !update.is_expired(self.config.rollback_timeout));
212
213 initial_count - updates.len()
214 }
215
216 pub fn get_active_updates(&self) -> Vec<OptimisticUpdate<T>> {
218 let updates = self.updates.read();
219 updates
220 .values()
221 .filter(|update| !update.confirmed && !update.rolled_back)
222 .cloned()
223 .collect()
224 }
225
226 pub fn get_stats(&self) -> OptimisticStats {
228 let updates = self.updates.read();
229 let history = self.history.read();
230
231 let active = updates.values().filter(|u| !u.confirmed && !u.rolled_back).count();
232 let confirmed = updates.values().filter(|u| u.confirmed).count();
233 let rolled_back = updates.values().filter(|u| u.rolled_back).count();
234 let expired = updates.values().filter(|u| u.is_expired(self.config.rollback_timeout)).count();
235
236 OptimisticStats {
237 active_updates: active,
238 confirmed_updates: confirmed,
239 rolled_back_updates: rolled_back,
240 expired_updates: expired,
241 total_history: history.len(),
242 }
243 }
244
245 pub fn clear_all(&self) {
247 let mut updates = self.updates.write();
248 updates.clear();
249 }
250
251 pub fn clear_history(&self) {
253 let mut history = self.history.write();
254 history.clear();
255 }
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct OptimisticStats {
261 pub active_updates: usize,
263 pub confirmed_updates: usize,
265 pub rolled_back_updates: usize,
267 pub expired_updates: usize,
269 pub total_history: usize,
271}
272
273pub struct OptimisticMutationResult<T> {
275 pub update_id: String,
277 pub applied: bool,
279 pub optimistic_data: Option<T>,
281 pub rollback: Box<dyn Fn() -> Result<(), QueryError> + Send + Sync>,
283}
284
285impl<T> OptimisticMutationResult<T> {
286 pub fn new(
288 update_id: String,
289 applied: bool,
290 optimistic_data: Option<T>,
291 rollback: Box<dyn Fn() -> Result<(), QueryError> + Send + Sync>,
292 ) -> Self {
293 Self {
294 update_id,
295 applied,
296 optimistic_data,
297 rollback,
298 }
299 }
300
301 pub fn rollback(&self) -> Result<(), QueryError> {
303 (self.rollback)()
304 }
305}
306
307#[cfg(test)]
308mod tests {
309 use super::*;
310 use crate::types::QueryKey;
311
312 #[test]
313 fn test_optimistic_update_creation() {
314 let config = OptimisticConfig::default();
315 let manager = OptimisticManager::<String>::new(config);
316
317 let key = QueryKey::from("test");
318 let data = "optimistic data".to_string();
319 let rollback = "original data".to_string();
320
321 let update_id = manager.apply_update(&key, data.clone(), Some(rollback.clone()));
322
323 let optimistic_data = manager.get_optimistic_data(&key);
325 assert_eq!(optimistic_data, Some(data));
326
327 assert!(manager.confirm_update(&update_id).is_ok());
329
330 let rollback_data = manager.rollback_update(&update_id);
332 assert!(rollback_data.is_ok());
333 }
334
335 #[test]
336 fn test_optimistic_update_rollback() {
337 let config = OptimisticConfig::default();
338 let manager = OptimisticManager::<String>::new(config);
339
340 let key = QueryKey::from("test");
341 let original_data = "original data".to_string();
342 let optimistic_data = "optimistic data".to_string();
343
344 let update_id = manager.apply_update(&key, optimistic_data.clone(), Some(original_data.clone()));
346
347 assert_eq!(manager.get_optimistic_data(&key), Some(optimistic_data));
349
350 let rollback_result = manager.rollback_update(&update_id);
352 assert!(rollback_result.is_ok());
353 assert_eq!(rollback_result.unwrap(), Some(original_data));
354
355 assert_eq!(manager.get_optimistic_data(&key), None);
357 }
358
359 #[test]
360 fn test_optimistic_update_expiration() {
361 let mut config = OptimisticConfig::default();
362 config.rollback_timeout = Duration::from_millis(10); let manager = OptimisticManager::<String>::new(config);
365
366 let key = QueryKey::from("test");
367 let data = "test data".to_string();
368
369 manager.apply_update(&key, data, None);
370
371 std::thread::sleep(Duration::from_millis(20));
373
374 let cleaned = manager.cleanup_expired();
376 assert_eq!(cleaned, 1);
377
378 assert_eq!(manager.get_optimistic_data(&key), None);
380 }
381}