tower_worker/periodic/
layer.rs

1use crate::WorkerLayer;
2use futures_util::future::poll_fn;
3use std::{future::Future, time::Duration};
4use tokio::time::interval;
5use tower_layer::Layer;
6use tower_service::Service;
7
8/// Spawns a worker task with a clone of the inner service that periodically
9/// makes a request to the inner service.
10/// 
11/// The default Tokio executor is used to run the given service, which means
12/// that this layer can only be used on the Tokio runtime.
13///
14/// See the module documentation for more details.
15pub struct PeriodicLayer<T> {
16    make_request: T,
17    period: Duration,
18}
19
20impl<T> PeriodicLayer<T> {
21    /// Creates a new [`PeriodicLayer`] with the provided `make_request` closure
22    /// and `period`.
23    /// 
24    /// `make_request` returns a request to be called on the inner service.
25    /// `period` gives with interval with which to send the request from `make_request`.
26    pub fn new(make_request: T, period: Duration) -> Self {
27        PeriodicLayer {
28            make_request,
29            period,
30        }
31    }
32}
33
34impl<S, T, F, Request> Layer<S> for PeriodicLayer<T>
35where
36    S: Service<Request, Future = F> + Clone + Send + 'static,
37    T: Fn() -> Request + Clone + Send + Sync + 'static,
38    F: Future<Output = Result<S::Response, S::Error>> + Send + 'static,
39    Request: Send,
40{
41    type Service = S;
42
43    fn layer(&self, inner: S) -> Self::Service {
44        let make_request = self.make_request.clone();
45        let period = self.period;
46        let make_worker = |mut service: S| {
47            let make_request = make_request.clone();
48            let period = period;
49
50            async move {
51                let mut interval = interval(period);
52
53                loop {
54                    let _ = interval.tick().await;
55
56                    if poll_fn(|cx| service.poll_ready(cx)).await.is_err() {
57                        break;
58                    };
59
60                    if service.call(make_request()).await.is_err() {
61                        break;
62                    }
63                }
64            }
65        };
66        let worker_layer = WorkerLayer::new(make_worker);
67
68        Layer::<S>::layer(&worker_layer, inner)
69    }
70}