code_mesh_core/
sync.rs

1//! Synchronization primitives for Code Mesh Core
2//!
3//! This module provides cross-platform synchronization primitives that work
4//! consistently across native and WASM environments.
5
6use crate::{Error, Result};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10#[cfg(feature = "native")]
11use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
12
13#[cfg(feature = "wasm")]
14use parking_lot::{Mutex as ParkingMutex, RwLock as ParkingRwLock};
15
16/// Cross-platform async mutex
17#[cfg(feature = "native")]
18pub type AsyncMutex<T> = TokioMutex<T>;
19
20#[cfg(feature = "wasm")]
21pub type AsyncMutex<T> = ParkingMutex<T>;
22
23/// Cross-platform async read-write lock
24#[cfg(feature = "native")]
25pub type AsyncRwLock<T> = TokioRwLock<T>;
26
27#[cfg(feature = "wasm")]
28pub type AsyncRwLock<T> = ParkingRwLock<T>;
29
30/// Debouncer for rate-limiting operations
31pub struct Debouncer {
32    last_execution: Arc<AsyncMutex<Option<Instant>>>,
33    delay: Duration,
34}
35
36impl Debouncer {
37    /// Create a new debouncer with the specified delay
38    pub fn new(delay: Duration) -> Self {
39        Self {
40            last_execution: Arc::new(AsyncMutex::new(None)),
41            delay,
42        }
43    }
44
45    /// Execute a function with debouncing
46    #[cfg(feature = "native")]
47    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<Option<T>>
48    where
49        F: FnOnce() -> Fut,
50        Fut: std::future::Future<Output = Result<T>>,
51    {
52        let now = Instant::now();
53        let mut last_exec = self.last_execution.lock().await;
54        
55        if let Some(last) = *last_exec {
56            if now.duration_since(last) < self.delay {
57                return Ok(None);
58            }
59        }
60        
61        *last_exec = Some(now);
62        drop(last_exec);
63        
64        f().await.map(Some)
65    }
66
67    /// Execute a function with debouncing (WASM version)
68    #[cfg(feature = "wasm")]
69    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<Option<T>>
70    where
71        F: FnOnce() -> Fut,
72        Fut: std::future::Future<Output = Result<T>>,
73    {
74        let now = Instant::now();
75        let mut last_exec = self.last_execution.lock();
76        
77        if let Some(last) = *last_exec {
78            if now.duration_since(last) < self.delay {
79                return Ok(None);
80            }
81        }
82        
83        *last_exec = Some(now);
84        drop(last_exec);
85        
86        f().await.map(Some)
87    }
88
89    /// Check if enough time has passed since the last execution
90    #[cfg(feature = "native")]
91    pub async fn should_execute(&self) -> bool {
92        let now = Instant::now();
93        let last_exec = self.last_execution.lock().await;
94        
95        if let Some(last) = *last_exec {
96            now.duration_since(last) >= self.delay
97        } else {
98            true
99        }
100    }
101
102    /// Check if enough time has passed since the last execution (WASM version)
103    #[cfg(feature = "wasm")]
104    pub async fn should_execute(&self) -> bool {
105        let now = Instant::now();
106        let last_exec = self.last_execution.lock();
107        
108        if let Some(last) = *last_exec {
109            now.duration_since(last) >= self.delay
110        } else {
111            true
112        }
113    }
114}
115
116/// Rate limiter for controlling operation frequency
117pub struct RateLimiter {
118    tokens: Arc<AsyncMutex<f64>>,
119    max_tokens: f64,
120    refill_rate: f64, // tokens per second
121    last_refill: Arc<AsyncMutex<Instant>>,
122}
123
124impl RateLimiter {
125    /// Create a new rate limiter
126    pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
127        Self {
128            tokens: Arc::new(AsyncMutex::new(max_tokens)),
129            max_tokens,
130            refill_rate,
131            last_refill: Arc::new(AsyncMutex::new(Instant::now())),
132        }
133    }
134
135    /// Try to acquire a token (non-blocking)
136    #[cfg(feature = "native")]
137    pub async fn try_acquire(&self, tokens: f64) -> bool {
138        self.refill_tokens().await;
139        
140        let mut current_tokens = self.tokens.lock().await;
141        if *current_tokens >= tokens {
142            *current_tokens -= tokens;
143            true
144        } else {
145            false
146        }
147    }
148
149    /// Try to acquire a token (non-blocking, WASM version)
150    #[cfg(feature = "wasm")]
151    pub async fn try_acquire(&self, tokens: f64) -> bool {
152        self.refill_tokens().await;
153        
154        let mut current_tokens = self.tokens.lock();
155        if *current_tokens >= tokens {
156            *current_tokens -= tokens;
157            true
158        } else {
159            false
160        }
161    }
162
163    /// Acquire a token (blocking until available)
164    #[cfg(feature = "native")]
165    pub async fn acquire(&self, tokens: f64) -> Result<()> {
166        loop {
167            if self.try_acquire(tokens).await {
168                return Ok(());
169            }
170            
171            // Calculate wait time
172            let wait_time = Duration::from_secs_f64(tokens / self.refill_rate);
173            tokio::time::sleep(wait_time).await;
174        }
175    }
176
177    /// Acquire a token (blocking until available, WASM version)
178    #[cfg(feature = "wasm")]
179    pub async fn acquire(&self, tokens: f64) -> Result<()> {
180        loop {
181            if self.try_acquire(tokens).await {
182                return Ok(());
183            }
184            
185            // In WASM, we can't really sleep, so we yield
186            wasm_bindgen_futures::JsFuture::from(
187                js_sys::Promise::resolve(&wasm_bindgen::JsValue::UNDEFINED)
188            ).await.map_err(|_| Error::Other(anyhow::anyhow!("JS Promise failed")))?;
189        }
190    }
191
192    /// Refill tokens based on elapsed time
193    #[cfg(feature = "native")]
194    async fn refill_tokens(&self) {
195        let now = Instant::now();
196        let mut last_refill = self.last_refill.lock().await;
197        let elapsed = now.duration_since(*last_refill).as_secs_f64();
198        
199        if elapsed > 0.0 {
200            let mut tokens = self.tokens.lock().await;
201            let new_tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
202            *tokens = new_tokens;
203            *last_refill = now;
204        }
205    }
206
207    /// Refill tokens based on elapsed time (WASM version)
208    #[cfg(feature = "wasm")]
209    async fn refill_tokens(&self) {
210        let now = Instant::now();
211        let mut last_refill = self.last_refill.lock();
212        let elapsed = now.duration_since(*last_refill).as_secs_f64();
213        
214        if elapsed > 0.0 {
215            let mut tokens = self.tokens.lock();
216            let new_tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
217            *tokens = new_tokens;
218            *last_refill = now;
219        }
220    }
221
222    /// Get current token count
223    #[cfg(feature = "native")]
224    pub async fn tokens(&self) -> f64 {
225        self.refill_tokens().await;
226        *self.tokens.lock().await
227    }
228
229    /// Get current token count (WASM version)
230    #[cfg(feature = "wasm")]
231    pub async fn tokens(&self) -> f64 {
232        self.refill_tokens().await;
233        *self.tokens.lock()
234    }
235}
236
237/// Circuit breaker for handling failures gracefully
238#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum CircuitState {
240    Closed,
241    Open,
242    HalfOpen,
243}
244
245pub struct CircuitBreaker {
246    state: Arc<AsyncMutex<CircuitState>>,
247    failure_count: Arc<AsyncMutex<u32>>,
248    success_count: Arc<AsyncMutex<u32>>,
249    failure_threshold: u32,
250    success_threshold: u32,
251    timeout: Duration,
252    last_failure: Arc<AsyncMutex<Option<Instant>>>,
253}
254
255impl CircuitBreaker {
256    /// Create a new circuit breaker
257    pub fn new(failure_threshold: u32, success_threshold: u32, timeout: Duration) -> Self {
258        Self {
259            state: Arc::new(AsyncMutex::new(CircuitState::Closed)),
260            failure_count: Arc::new(AsyncMutex::new(0)),
261            success_count: Arc::new(AsyncMutex::new(0)),
262            failure_threshold,
263            success_threshold,
264            timeout,
265            last_failure: Arc::new(AsyncMutex::new(None)),
266        }
267    }
268
269    /// Execute a function with circuit breaker protection
270    #[cfg(feature = "native")]
271    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
272    where
273        F: FnOnce() -> Fut,
274        Fut: std::future::Future<Output = Result<T>>,
275    {
276        // Check if circuit is open
277        let state = *self.state.lock().await;
278        match state {
279            CircuitState::Open => {
280                let last_failure = *self.last_failure.lock().await;
281                if let Some(failure_time) = last_failure {
282                    if Instant::now().duration_since(failure_time) >= self.timeout {
283                        // Transition to half-open
284                        *self.state.lock().await = CircuitState::HalfOpen;
285                        *self.success_count.lock().await = 0;
286                    } else {
287                        return Err(Error::Other(anyhow::anyhow!("Circuit breaker is open")));
288                    }
289                }
290            }
291            CircuitState::HalfOpen => {
292                // Allow limited requests
293            }
294            CircuitState::Closed => {
295                // Normal operation
296            }
297        }
298
299        // Execute the function
300        match f().await {
301            Ok(result) => {
302                self.on_success().await;
303                Ok(result)
304            }
305            Err(error) => {
306                self.on_failure().await;
307                Err(error)
308            }
309        }
310    }
311
312    /// Execute a function with circuit breaker protection (WASM version)
313    #[cfg(feature = "wasm")]
314    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
315    where
316        F: FnOnce() -> Fut,
317        Fut: std::future::Future<Output = Result<T>>,
318    {
319        // Check if circuit is open
320        let state = *self.state.lock();
321        match state {
322            CircuitState::Open => {
323                let last_failure = *self.last_failure.lock();
324                if let Some(failure_time) = last_failure {
325                    if Instant::now().duration_since(failure_time) >= self.timeout {
326                        // Transition to half-open
327                        *self.state.lock() = CircuitState::HalfOpen;
328                        *self.success_count.lock() = 0;
329                    } else {
330                        return Err(Error::Other(anyhow::anyhow!("Circuit breaker is open")));
331                    }
332                }
333            }
334            CircuitState::HalfOpen => {
335                // Allow limited requests
336            }
337            CircuitState::Closed => {
338                // Normal operation
339            }
340        }
341
342        // Execute the function
343        match f().await {
344            Ok(result) => {
345                self.on_success().await;
346                Ok(result)
347            }
348            Err(error) => {
349                self.on_failure().await;
350                Err(error)
351            }
352        }
353    }
354
355    /// Handle successful execution
356    #[cfg(feature = "native")]
357    async fn on_success(&self) {
358        let state = *self.state.lock().await;
359        match state {
360            CircuitState::HalfOpen => {
361                let mut success_count = self.success_count.lock().await;
362                *success_count += 1;
363                if *success_count >= self.success_threshold {
364                    *self.state.lock().await = CircuitState::Closed;
365                    *self.failure_count.lock().await = 0;
366                }
367            }
368            CircuitState::Closed => {
369                *self.failure_count.lock().await = 0;
370            }
371            _ => {}
372        }
373    }
374
375    /// Handle successful execution (WASM version)
376    #[cfg(feature = "wasm")]
377    async fn on_success(&self) {
378        let state = *self.state.lock();
379        match state {
380            CircuitState::HalfOpen => {
381                let mut success_count = self.success_count.lock();
382                *success_count += 1;
383                if *success_count >= self.success_threshold {
384                    *self.state.lock() = CircuitState::Closed;
385                    *self.failure_count.lock() = 0;
386                }
387            }
388            CircuitState::Closed => {
389                *self.failure_count.lock() = 0;
390            }
391            _ => {}
392        }
393    }
394
395    /// Handle failed execution
396    #[cfg(feature = "native")]
397    async fn on_failure(&self) {
398        let mut failure_count = self.failure_count.lock().await;
399        *failure_count += 1;
400        *self.last_failure.lock().await = Some(Instant::now());
401
402        if *failure_count >= self.failure_threshold {
403            *self.state.lock().await = CircuitState::Open;
404        }
405    }
406
407    /// Handle failed execution (WASM version)
408    #[cfg(feature = "wasm")]
409    async fn on_failure(&self) {
410        let mut failure_count = self.failure_count.lock();
411        *failure_count += 1;
412        *self.last_failure.lock() = Some(Instant::now());
413
414        if *failure_count >= self.failure_threshold {
415            *self.state.lock() = CircuitState::Open;
416        }
417    }
418
419    /// Get current circuit state
420    #[cfg(feature = "native")]
421    pub async fn state(&self) -> CircuitState {
422        *self.state.lock().await
423    }
424
425    /// Get current circuit state (WASM version)
426    #[cfg(feature = "wasm")]
427    pub async fn state(&self) -> CircuitState {
428        *self.state.lock()
429    }
430}
431
432/// Timeout wrapper for operations
433pub struct TimeoutWrapper {
434    timeout: Duration,
435}
436
437impl TimeoutWrapper {
438    /// Create a new timeout wrapper
439    pub fn new(timeout: Duration) -> Self {
440        Self { timeout }
441    }
442
443    /// Execute a function with timeout
444    #[cfg(feature = "native")]
445    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
446    where
447        F: FnOnce() -> Fut,
448        Fut: std::future::Future<Output = Result<T>>,
449    {
450        match tokio::time::timeout(self.timeout, f()).await {
451            Ok(result) => result,
452            Err(_) => Err(Error::Other(anyhow::anyhow!("Operation timed out"))),
453        }
454    }
455
456    /// Execute a function with timeout (WASM version - simplified)
457    #[cfg(feature = "wasm")]
458    pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
459    where
460        F: FnOnce() -> Fut,
461        Fut: std::future::Future<Output = Result<T>>,
462    {
463        // WASM doesn't have real timeouts, so we just execute the function
464        f().await
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471
472    #[cfg(feature = "native")]
473    #[tokio::test]
474    async fn test_debouncer() {
475        let debouncer = Debouncer::new(Duration::from_millis(100));
476        let mut counter = 0;
477
478        // First call should execute
479        let result = debouncer.execute(|| async {
480            counter += 1;
481            Ok(counter)
482        }).await.unwrap();
483        assert_eq!(result, Some(1));
484
485        // Immediate second call should be debounced
486        let result = debouncer.execute(|| async {
487            counter += 1;
488            Ok(counter)
489        }).await.unwrap();
490        assert_eq!(result, None);
491
492        // Wait for debounce period
493        tokio::time::sleep(Duration::from_millis(150)).await;
494
495        // Now it should execute again
496        let result = debouncer.execute(|| async {
497            counter += 1;
498            Ok(counter)
499        }).await.unwrap();
500        assert_eq!(result, Some(2));
501    }
502
503    #[cfg(feature = "native")]
504    #[tokio::test]
505    async fn test_rate_limiter() {
506        let limiter = RateLimiter::new(2.0, 1.0); // 2 tokens, refill 1 per second
507
508        // Should be able to acquire 2 tokens immediately
509        assert!(limiter.try_acquire(1.0).await);
510        assert!(limiter.try_acquire(1.0).await);
511        
512        // Third token should fail
513        assert!(!limiter.try_acquire(1.0).await);
514
515        // Wait for refill
516        tokio::time::sleep(Duration::from_secs(1)).await;
517        
518        // Should be able to acquire one more token
519        assert!(limiter.try_acquire(1.0).await);
520    }
521
522    #[cfg(feature = "native")]
523    #[tokio::test]
524    async fn test_circuit_breaker() {
525        let breaker = CircuitBreaker::new(2, 1, Duration::from_millis(100));
526        
527        // Should start closed
528        assert_eq!(breaker.state().await, CircuitState::Closed);
529
530        // Fail twice to open the circuit
531        let _result = breaker.execute(|| async { 
532            Err::<(), _>(Error::Other(anyhow::anyhow!("Test error")))
533        }).await;
534        let _result = breaker.execute(|| async { 
535            Err::<(), _>(Error::Other(anyhow::anyhow!("Test error")))
536        }).await;
537
538        // Circuit should now be open
539        assert_eq!(breaker.state().await, CircuitState::Open);
540
541        // Wait for timeout
542        tokio::time::sleep(Duration::from_millis(150)).await;
543
544        // Should transition to half-open on next call
545        let _result = breaker.execute(|| async { Ok(()) }).await;
546        assert_eq!(breaker.state().await, CircuitState::Closed);
547    }
548}