tower_worker/periodic/
layer.rs1use crate::WorkerLayer;
2use futures_util::future::poll_fn;
3use std::{future::Future, time::Duration};
4use tokio::time::interval;
5use tower_layer::Layer;
6use tower_service::Service;
7
8pub struct PeriodicLayer<T> {
16 make_request: T,
17 period: Duration,
18}
19
20impl<T> PeriodicLayer<T> {
21 pub fn new(make_request: T, period: Duration) -> Self {
27 PeriodicLayer {
28 make_request,
29 period,
30 }
31 }
32}
33
34impl<S, T, F, Request> Layer<S> for PeriodicLayer<T>
35where
36 S: Service<Request, Future = F> + Clone + Send + 'static,
37 T: Fn() -> Request + Clone + Send + Sync + 'static,
38 F: Future<Output = Result<S::Response, S::Error>> + Send + 'static,
39 Request: Send,
40{
41 type Service = S;
42
43 fn layer(&self, inner: S) -> Self::Service {
44 let make_request = self.make_request.clone();
45 let period = self.period;
46 let make_worker = |mut service: S| {
47 let make_request = make_request.clone();
48 let period = period;
49
50 async move {
51 let mut interval = interval(period);
52
53 loop {
54 let _ = interval.tick().await;
55
56 if poll_fn(|cx| service.poll_ready(cx)).await.is_err() {
57 break;
58 };
59
60 if service.call(make_request()).await.is_err() {
61 break;
62 }
63 }
64 }
65 };
66 let worker_layer = WorkerLayer::new(make_worker);
67
68 Layer::<S>::layer(&worker_layer, inner)
69 }
70}