1use crate::error::FlowError;
5use crate::LimitStrategy;
6use std::sync::Arc;
7use std::time::Instant;
8
9use crate::semaphore::DynamicSemaphore;
10
11#[derive(Clone)]
12pub struct FlowGuard<S: LimitStrategy> {
13 strategy: Arc<S>,
14 semaphore: Arc<DynamicSemaphore>,
15}
16
17impl<S: LimitStrategy + 'static> FlowGuard<S> {
18 pub fn new(strategy: S) -> Self {
19 let initial_limit = strategy.current_limit();
20 Self {
21 strategy: Arc::new(strategy),
22 semaphore: Arc::new(DynamicSemaphore::new(initial_limit)),
23 }
24 }
25
26 pub async fn run<F, T, E>(&self, f: F) -> Result<T, FlowError<E>>
27 where
28 F: std::future::Future<Output = Result<T, E>>,
29 {
30 let _permit = self
32 .semaphore
33 .acquire()
34 .await
35 .map_err(|_| FlowError::Dropped)?;
36
37 let start = Instant::now();
38
39 let result = f.await;
41
42 let duration = start.elapsed();
43
44 match &result {
46 Ok(_) => self.strategy.on_success(duration),
47 Err(_) => self.strategy.on_error(),
48 }
49
50 let new_limit = self.strategy.current_limit();
52 self.semaphore.set_limit(new_limit);
53
54 result.map_err(FlowError::AppError)
56 }
57
58 pub fn current_limit(&self) -> usize {
60 self.strategy.current_limit()
61 }
62
63 pub fn available_permits(&self) -> usize {
64 self.semaphore.available_permits()
65 }
66}