flow_guard/
integration.rs1use crate::error::FlowError;
6use crate::{FlowGuard, LimitStrategy};
7use futures_util::future::BoxFuture;
8use std::sync::Arc;
9use std::task::{Context, Poll};
10use tower::{Layer, Service};
11
12pub struct FlowGuardLayer<S: LimitStrategy> {
14 guard: Arc<FlowGuard<S>>,
15}
16
17impl<S: LimitStrategy> Clone for FlowGuardLayer<S> {
19 fn clone(&self) -> Self {
20 Self {
21 guard: self.guard.clone(),
22 }
23 }
24}
25
26impl<S: LimitStrategy + 'static> FlowGuardLayer<S> {
27 pub fn new(strategy: S) -> Self {
28 Self {
29 guard: Arc::new(FlowGuard::new(strategy)),
30 }
31 }
32}
33
34impl<S, L> Layer<S> for FlowGuardLayer<L>
35where
36 L: LimitStrategy + 'static,
37{
38 type Service = FlowGuardService<S, L>;
39
40 fn layer(&self, inner: S) -> Self::Service {
41 FlowGuardService {
42 inner,
43 guard: self.guard.clone(),
44 }
45 }
46}
47
48pub struct FlowGuardService<S, L: LimitStrategy> {
50 inner: S,
51 guard: Arc<FlowGuard<L>>,
52}
53
54impl<S: Clone, L: LimitStrategy> Clone for FlowGuardService<S, L> {
55 fn clone(&self) -> Self {
56 Self {
57 inner: self.inner.clone(),
58 guard: self.guard.clone(),
59 }
60 }
61}
62
63impl<S, L, Req> Service<Req> for FlowGuardService<S, L>
64where
65 S: Service<Req> + Clone + Send + 'static,
66 S::Future: Send + 'static,
67 L: LimitStrategy + 'static,
68 Req: Send + 'static,
69{
70 type Response = S::Response;
71 type Error = FlowError<S::Error>;
72 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
73
74 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
75 self.inner.poll_ready(cx).map_err(FlowError::AppError)
77 }
78
79 fn call(&mut self, req: Req) -> Self::Future {
80 let mut inner = self.inner.clone();
81 let guard = self.guard.clone();
82
83 Box::pin(async move {
84 guard.run(inner.call(req)).await
86 })
87 }
88}