1use crate::{Error, Result};
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9
10#[cfg(feature = "native")]
11use tokio::sync::{Mutex as TokioMutex, RwLock as TokioRwLock};
12
13#[cfg(feature = "wasm")]
14use parking_lot::{Mutex as ParkingMutex, RwLock as ParkingRwLock};
15
16#[cfg(feature = "native")]
18pub type AsyncMutex<T> = TokioMutex<T>;
19
20#[cfg(feature = "wasm")]
21pub type AsyncMutex<T> = ParkingMutex<T>;
22
23#[cfg(feature = "native")]
25pub type AsyncRwLock<T> = TokioRwLock<T>;
26
27#[cfg(feature = "wasm")]
28pub type AsyncRwLock<T> = ParkingRwLock<T>;
29
30pub struct Debouncer {
32 last_execution: Arc<AsyncMutex<Option<Instant>>>,
33 delay: Duration,
34}
35
36impl Debouncer {
37 pub fn new(delay: Duration) -> Self {
39 Self {
40 last_execution: Arc::new(AsyncMutex::new(None)),
41 delay,
42 }
43 }
44
45 #[cfg(feature = "native")]
47 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<Option<T>>
48 where
49 F: FnOnce() -> Fut,
50 Fut: std::future::Future<Output = Result<T>>,
51 {
52 let now = Instant::now();
53 let mut last_exec = self.last_execution.lock().await;
54
55 if let Some(last) = *last_exec {
56 if now.duration_since(last) < self.delay {
57 return Ok(None);
58 }
59 }
60
61 *last_exec = Some(now);
62 drop(last_exec);
63
64 f().await.map(Some)
65 }
66
67 #[cfg(feature = "wasm")]
69 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<Option<T>>
70 where
71 F: FnOnce() -> Fut,
72 Fut: std::future::Future<Output = Result<T>>,
73 {
74 let now = Instant::now();
75 let mut last_exec = self.last_execution.lock();
76
77 if let Some(last) = *last_exec {
78 if now.duration_since(last) < self.delay {
79 return Ok(None);
80 }
81 }
82
83 *last_exec = Some(now);
84 drop(last_exec);
85
86 f().await.map(Some)
87 }
88
89 #[cfg(feature = "native")]
91 pub async fn should_execute(&self) -> bool {
92 let now = Instant::now();
93 let last_exec = self.last_execution.lock().await;
94
95 if let Some(last) = *last_exec {
96 now.duration_since(last) >= self.delay
97 } else {
98 true
99 }
100 }
101
102 #[cfg(feature = "wasm")]
104 pub async fn should_execute(&self) -> bool {
105 let now = Instant::now();
106 let last_exec = self.last_execution.lock();
107
108 if let Some(last) = *last_exec {
109 now.duration_since(last) >= self.delay
110 } else {
111 true
112 }
113 }
114}
115
116pub struct RateLimiter {
118 tokens: Arc<AsyncMutex<f64>>,
119 max_tokens: f64,
120 refill_rate: f64, last_refill: Arc<AsyncMutex<Instant>>,
122}
123
124impl RateLimiter {
125 pub fn new(max_tokens: f64, refill_rate: f64) -> Self {
127 Self {
128 tokens: Arc::new(AsyncMutex::new(max_tokens)),
129 max_tokens,
130 refill_rate,
131 last_refill: Arc::new(AsyncMutex::new(Instant::now())),
132 }
133 }
134
135 #[cfg(feature = "native")]
137 pub async fn try_acquire(&self, tokens: f64) -> bool {
138 self.refill_tokens().await;
139
140 let mut current_tokens = self.tokens.lock().await;
141 if *current_tokens >= tokens {
142 *current_tokens -= tokens;
143 true
144 } else {
145 false
146 }
147 }
148
149 #[cfg(feature = "wasm")]
151 pub async fn try_acquire(&self, tokens: f64) -> bool {
152 self.refill_tokens().await;
153
154 let mut current_tokens = self.tokens.lock();
155 if *current_tokens >= tokens {
156 *current_tokens -= tokens;
157 true
158 } else {
159 false
160 }
161 }
162
163 #[cfg(feature = "native")]
165 pub async fn acquire(&self, tokens: f64) -> Result<()> {
166 loop {
167 if self.try_acquire(tokens).await {
168 return Ok(());
169 }
170
171 let wait_time = Duration::from_secs_f64(tokens / self.refill_rate);
173 tokio::time::sleep(wait_time).await;
174 }
175 }
176
177 #[cfg(feature = "wasm")]
179 pub async fn acquire(&self, tokens: f64) -> Result<()> {
180 loop {
181 if self.try_acquire(tokens).await {
182 return Ok(());
183 }
184
185 wasm_bindgen_futures::JsFuture::from(
187 js_sys::Promise::resolve(&wasm_bindgen::JsValue::UNDEFINED)
188 ).await.map_err(|_| Error::Other(anyhow::anyhow!("JS Promise failed")))?;
189 }
190 }
191
192 #[cfg(feature = "native")]
194 async fn refill_tokens(&self) {
195 let now = Instant::now();
196 let mut last_refill = self.last_refill.lock().await;
197 let elapsed = now.duration_since(*last_refill).as_secs_f64();
198
199 if elapsed > 0.0 {
200 let mut tokens = self.tokens.lock().await;
201 let new_tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
202 *tokens = new_tokens;
203 *last_refill = now;
204 }
205 }
206
207 #[cfg(feature = "wasm")]
209 async fn refill_tokens(&self) {
210 let now = Instant::now();
211 let mut last_refill = self.last_refill.lock();
212 let elapsed = now.duration_since(*last_refill).as_secs_f64();
213
214 if elapsed > 0.0 {
215 let mut tokens = self.tokens.lock();
216 let new_tokens = (*tokens + elapsed * self.refill_rate).min(self.max_tokens);
217 *tokens = new_tokens;
218 *last_refill = now;
219 }
220 }
221
222 #[cfg(feature = "native")]
224 pub async fn tokens(&self) -> f64 {
225 self.refill_tokens().await;
226 *self.tokens.lock().await
227 }
228
229 #[cfg(feature = "wasm")]
231 pub async fn tokens(&self) -> f64 {
232 self.refill_tokens().await;
233 *self.tokens.lock()
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
239pub enum CircuitState {
240 Closed,
241 Open,
242 HalfOpen,
243}
244
245pub struct CircuitBreaker {
246 state: Arc<AsyncMutex<CircuitState>>,
247 failure_count: Arc<AsyncMutex<u32>>,
248 success_count: Arc<AsyncMutex<u32>>,
249 failure_threshold: u32,
250 success_threshold: u32,
251 timeout: Duration,
252 last_failure: Arc<AsyncMutex<Option<Instant>>>,
253}
254
255impl CircuitBreaker {
256 pub fn new(failure_threshold: u32, success_threshold: u32, timeout: Duration) -> Self {
258 Self {
259 state: Arc::new(AsyncMutex::new(CircuitState::Closed)),
260 failure_count: Arc::new(AsyncMutex::new(0)),
261 success_count: Arc::new(AsyncMutex::new(0)),
262 failure_threshold,
263 success_threshold,
264 timeout,
265 last_failure: Arc::new(AsyncMutex::new(None)),
266 }
267 }
268
269 #[cfg(feature = "native")]
271 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
272 where
273 F: FnOnce() -> Fut,
274 Fut: std::future::Future<Output = Result<T>>,
275 {
276 let state = *self.state.lock().await;
278 match state {
279 CircuitState::Open => {
280 let last_failure = *self.last_failure.lock().await;
281 if let Some(failure_time) = last_failure {
282 if Instant::now().duration_since(failure_time) >= self.timeout {
283 *self.state.lock().await = CircuitState::HalfOpen;
285 *self.success_count.lock().await = 0;
286 } else {
287 return Err(Error::Other(anyhow::anyhow!("Circuit breaker is open")));
288 }
289 }
290 }
291 CircuitState::HalfOpen => {
292 }
294 CircuitState::Closed => {
295 }
297 }
298
299 match f().await {
301 Ok(result) => {
302 self.on_success().await;
303 Ok(result)
304 }
305 Err(error) => {
306 self.on_failure().await;
307 Err(error)
308 }
309 }
310 }
311
312 #[cfg(feature = "wasm")]
314 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
315 where
316 F: FnOnce() -> Fut,
317 Fut: std::future::Future<Output = Result<T>>,
318 {
319 let state = *self.state.lock();
321 match state {
322 CircuitState::Open => {
323 let last_failure = *self.last_failure.lock();
324 if let Some(failure_time) = last_failure {
325 if Instant::now().duration_since(failure_time) >= self.timeout {
326 *self.state.lock() = CircuitState::HalfOpen;
328 *self.success_count.lock() = 0;
329 } else {
330 return Err(Error::Other(anyhow::anyhow!("Circuit breaker is open")));
331 }
332 }
333 }
334 CircuitState::HalfOpen => {
335 }
337 CircuitState::Closed => {
338 }
340 }
341
342 match f().await {
344 Ok(result) => {
345 self.on_success().await;
346 Ok(result)
347 }
348 Err(error) => {
349 self.on_failure().await;
350 Err(error)
351 }
352 }
353 }
354
355 #[cfg(feature = "native")]
357 async fn on_success(&self) {
358 let state = *self.state.lock().await;
359 match state {
360 CircuitState::HalfOpen => {
361 let mut success_count = self.success_count.lock().await;
362 *success_count += 1;
363 if *success_count >= self.success_threshold {
364 *self.state.lock().await = CircuitState::Closed;
365 *self.failure_count.lock().await = 0;
366 }
367 }
368 CircuitState::Closed => {
369 *self.failure_count.lock().await = 0;
370 }
371 _ => {}
372 }
373 }
374
375 #[cfg(feature = "wasm")]
377 async fn on_success(&self) {
378 let state = *self.state.lock();
379 match state {
380 CircuitState::HalfOpen => {
381 let mut success_count = self.success_count.lock();
382 *success_count += 1;
383 if *success_count >= self.success_threshold {
384 *self.state.lock() = CircuitState::Closed;
385 *self.failure_count.lock() = 0;
386 }
387 }
388 CircuitState::Closed => {
389 *self.failure_count.lock() = 0;
390 }
391 _ => {}
392 }
393 }
394
395 #[cfg(feature = "native")]
397 async fn on_failure(&self) {
398 let mut failure_count = self.failure_count.lock().await;
399 *failure_count += 1;
400 *self.last_failure.lock().await = Some(Instant::now());
401
402 if *failure_count >= self.failure_threshold {
403 *self.state.lock().await = CircuitState::Open;
404 }
405 }
406
407 #[cfg(feature = "wasm")]
409 async fn on_failure(&self) {
410 let mut failure_count = self.failure_count.lock();
411 *failure_count += 1;
412 *self.last_failure.lock() = Some(Instant::now());
413
414 if *failure_count >= self.failure_threshold {
415 *self.state.lock() = CircuitState::Open;
416 }
417 }
418
419 #[cfg(feature = "native")]
421 pub async fn state(&self) -> CircuitState {
422 *self.state.lock().await
423 }
424
425 #[cfg(feature = "wasm")]
427 pub async fn state(&self) -> CircuitState {
428 *self.state.lock()
429 }
430}
431
432pub struct TimeoutWrapper {
434 timeout: Duration,
435}
436
437impl TimeoutWrapper {
438 pub fn new(timeout: Duration) -> Self {
440 Self { timeout }
441 }
442
443 #[cfg(feature = "native")]
445 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
446 where
447 F: FnOnce() -> Fut,
448 Fut: std::future::Future<Output = Result<T>>,
449 {
450 match tokio::time::timeout(self.timeout, f()).await {
451 Ok(result) => result,
452 Err(_) => Err(Error::Other(anyhow::anyhow!("Operation timed out"))),
453 }
454 }
455
456 #[cfg(feature = "wasm")]
458 pub async fn execute<F, Fut, T>(&self, f: F) -> Result<T>
459 where
460 F: FnOnce() -> Fut,
461 Fut: std::future::Future<Output = Result<T>>,
462 {
463 f().await
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471
472 #[cfg(feature = "native")]
473 #[tokio::test]
474 async fn test_debouncer() {
475 let debouncer = Debouncer::new(Duration::from_millis(100));
476 let mut counter = 0;
477
478 let result = debouncer.execute(|| async {
480 counter += 1;
481 Ok(counter)
482 }).await.unwrap();
483 assert_eq!(result, Some(1));
484
485 let result = debouncer.execute(|| async {
487 counter += 1;
488 Ok(counter)
489 }).await.unwrap();
490 assert_eq!(result, None);
491
492 tokio::time::sleep(Duration::from_millis(150)).await;
494
495 let result = debouncer.execute(|| async {
497 counter += 1;
498 Ok(counter)
499 }).await.unwrap();
500 assert_eq!(result, Some(2));
501 }
502
503 #[cfg(feature = "native")]
504 #[tokio::test]
505 async fn test_rate_limiter() {
506 let limiter = RateLimiter::new(2.0, 1.0); assert!(limiter.try_acquire(1.0).await);
510 assert!(limiter.try_acquire(1.0).await);
511
512 assert!(!limiter.try_acquire(1.0).await);
514
515 tokio::time::sleep(Duration::from_secs(1)).await;
517
518 assert!(limiter.try_acquire(1.0).await);
520 }
521
522 #[cfg(feature = "native")]
523 #[tokio::test]
524 async fn test_circuit_breaker() {
525 let breaker = CircuitBreaker::new(2, 1, Duration::from_millis(100));
526
527 assert_eq!(breaker.state().await, CircuitState::Closed);
529
530 let _result = breaker.execute(|| async {
532 Err::<(), _>(Error::Other(anyhow::anyhow!("Test error")))
533 }).await;
534 let _result = breaker.execute(|| async {
535 Err::<(), _>(Error::Other(anyhow::anyhow!("Test error")))
536 }).await;
537
538 assert_eq!(breaker.state().await, CircuitState::Open);
540
541 tokio::time::sleep(Duration::from_millis(150)).await;
543
544 let _result = breaker.execute(|| async { Ok(()) }).await;
546 assert_eq!(breaker.state().await, CircuitState::Closed);
547 }
548}